
Python sqlalchemy 模块-types() 实例源码

项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia
def _get_dtype(self, sqltype):
        from sqlalchemy.types import (Integer, Float, Boolean, DateTime,
                                      Date, TIMESTAMP)

        if isinstance(sqltype, Float):
            return float
        elif isinstance(sqltype, Integer):
            # Todo: Refine integer size.
            return np.dtype('int64')
        elif isinstance(sqltype, TIMESTAMP):
            # we have a timezone capable type
            if not sqltype.timezone:
                return datetime
            return DatetimeTZDtype
        elif isinstance(sqltype, DateTime):
            # Caution: np.datetime64 is also a subclass of np.number.
            return datetime
        elif isinstance(sqltype, Date):
            return date
        elif isinstance(sqltype, Boolean):
            return bool
        return object
项目:sqlalchemy-drill    作者:JohnOmernik
def get_columns(self, connection, table_name, schema=None, **kw):
        q = "SELECT * FROM `%(table_id)s` LIMIT 0" % ({"table_id": table_name})
        columns = connection.execute(q)
        result = []
        for column_name in columns.keys():
            # Todo Handle types better            
            column = {
                "name": column_name,
                "type": VARCHAR,
                "default": None,
                "autoincrement": None,
                "nullable": False,


        return result
项目:sqlacodegen    作者:agronholm
def __init__(self, table):
        super(Model, self).__init__()
        self.table = table
        self.schema = table.schema

        # Adapt column types to the most reasonable generic types (ie. VARCHAR -> String)
        for column in table.columns:
            cls = column.type.__class__
            for supercls in cls.__mro__:
                if hasattr(supercls, '__visit_name__'):
                    cls = supercls
                if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'):

            column.type = column.type.adapt(cls)
项目:sqlacodegen    作者:agronholm
def render_column_type(coltype):
        args = []
        if isinstance(coltype, Enum):
            args.extend(repr(arg) for arg in coltype.enums)
            if coltype.name is not None:
            # All other types
            argspec = _getargspec_init(coltype.__class__.__init__)
            defaults = dict(zip(argspec.args[-len(argspec.defaults or ()):], argspec.defaults or ()))
            missing = object()
            use_kwargs = False
            for attr in argspec.args[1:]:
                # Remove annoyances like _warn_on_bytestring
                if attr.startswith('_'):

                value = getattr(coltype, attr, missing)
                default = defaults.get(attr, missing)
                if value is missing or value == default:
                    use_kwargs = True
                elif use_kwargs:
                    args.append('{0}={1}'.format(attr, repr(value)))

        rendered = coltype.__class__.__name__
        if args:
            rendered += '({0})'.format(','.join(args))

        return rendered
项目:sadrill    作者:JohnOmernik
def LIMIT1_get_columns(self, **kw):
        # This method stinks at getting columns from a speed perspective,because it's slow (it has to process a query)
        # However,it's accurate,because it takes a look at the data,and provides a good representation of the types it saw. (for 1 record,not idea...)

        q = "SELECT * FROM %(table_id)s LIMIT 1" % ({"table_id": table_name})#

#        q = "DESCRIBE %(table_id)s" % ({"table_id": table_name})
        cursor = connection.execute(q)

        desc = cursor.cursor.getdesc()
        result = []
        for col in desc:
            cname = col[0]
            bisnull = True
            ctype = _type_map[col[1]]
            column = {
                "name": cname,
                "type": ctype,
                "nullable": bisnull,
项目:sqlalchemy-drill    作者:JohnOmernik
def LIMIT1_get_columns(self,
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia
def _harmonize_columns(self, parse_dates=None):
        Make the DataFrame's column types align with the sql table
        column types.
        Need to work around limited NA value support. Floats are always
        fine,ints must always be floats if there are Null values.
        Booleans are hard because converting bool column with None replaces
        all Nones with false. Therefore only convert bool if there are no
        NA values.
        Datetimes should already be converted to np.datetime64 if supported,
        but here we also force conversion if required
        # handle non-list entries for parse_dates gracefully
        if parse_dates is True or parse_dates is None or parse_dates is False:
            parse_dates = []

        if not hasattr(parse_dates, '__iter__'):
            parse_dates = [parse_dates]

        for sql_col in self.table.columns:
            col_name = sql_col.name
                df_col = self.frame[col_name]
                # the type the dataframe column should have
                col_type = self._get_dtype(sql_col.type)

                if (col_type is datetime or col_type is date or
                        col_type is DatetimeTZDtype):
                    self.frame[col_name] = _handle_date_column(df_col)

                elif col_type is float:
                    # floats support NA,can always convert!
                    self.frame[col_name] = df_col.astype(col_type, copy=False)

                elif len(df_col) == df_col.count():
                    # No NA values,can convert ints and bools
                    if col_type is np.dtype('int64') or col_type is bool:
                        self.frame[col_name] = df_col.astype(
                            col_type, copy=False)

                # Handle date parsing
                if col_name in parse_dates:
                        fmt = parse_dates[col_name]
                    except TypeError:
                        fmt = None
                    self.frame[col_name] = _handle_date_column(
                        df_col, format=fmt)

            except KeyError:
                pass  # this column not in results
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia
def _sqlalchemy_type(self, col):

        dtype = self.dtype or {}
        if col.name in dtype:
            return self.dtype[col.name]

        col_type = self._get_notnull_col_dtype(col)

        from sqlalchemy.types import (BigInteger, Integer,
                                      DateTime, Date, Time)

        if col_type == 'datetime64' or col_type == 'datetime':
                tz = col.tzinfo  # noqa
                return DateTime(timezone=True)
                return DateTime
        if col_type == 'timedelta64':
            warnings.warn("the 'timedelta' type is not supported,and will be "
                          "written as integer values (ns frequency) to the "
                          "database.", UserWarning, stacklevel=8)
            return BigInteger
        elif col_type == 'floating':
            if col.dtype == 'float32':
                return Float(precision=23)
                return Float(precision=53)
        elif col_type == 'integer':
            if col.dtype == 'int32':
                return Integer
                return BigInteger
        elif col_type == 'boolean':
            return Boolean
        elif col_type == 'date':
            return Date
        elif col_type == 'time':
            return Time
        elif col_type == 'complex':
            raise ValueError('Complex datatypes not supported')

        return Text
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia
def to_sql(self, frame, name, if_exists='fail', index=True,
               index_label=None, chunksize=None, dtype=None):
        Write records stored in a DataFrame to a sql database.

        frame : DataFrame
        name : string
            Name of sql table
        if_exists : {'fail','replace','append'},default 'fail'
            - fail: If table exists,do nothing.
            - replace: If table exists,drop it,recreate it,and insert data.
            - append: If table exists,insert data. Create if does not exist.
        index : boolean,default True
            Write DataFrame index as a column
        index_label : string or sequence,default None
            Column label for index column(s). If None is given (default) and
            `index` is True,then the index names are used.
            A sequence should be given if the DataFrame uses MultiIndex.
        schema : string,default None
            Name of sql schema in database to write to (if database flavor
            supports this). If specified,this overwrites the default
            schema of the sqlDatabase object.
        chunksize : int,default None
            If not None,then rows will be written in batches of this size at a
            time.  If None,all rows will be written at once.
        dtype : dict of column name to sql type,default None
            Optional specifying the datatype for columns. The sql type should
            be a sqlAlchemy type.

        if dtype is not None:
            from sqlalchemy.types import to_instance, TypeEngine
            for col, my_type in dtype.items():
                if not isinstance(to_instance(my_type), TypeEngine):
                    raise ValueError('The type of %s is not a sqlAlchemy '
                                     'type ' % col)

        table = sqlTable(name, self, frame=frame, index=index,
                         if_exists=if_exists, index_label=index_label,
                         schema=schema, dtype=dtype)
        # check for potentially case sensitivity issues (GH7815)
        engine = self.connectable.engine
        with self.connectable.connect() as conn:
            table_names = engine.table_names(
                schema=schema or self.Meta.schema,
        if name not in table_names:
            warnings.warn("The provided table name '{0}' is not found exactly "
                          "as such in the database after writing the table,"
                          "possibly due to case sensitivity issues. Consider "
                          "using lower case table names.".format(name),

