Python sqlalchemy 模块,insert() 实例源码
我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用sqlalchemy.insert()。
def _versioned_update(row, session, user_id=None):
if not utils.is_modified(row, ignore={'va_id'}):
return
# Check if composite key has been changed
for col in row.va_version_columns:
hist = getattr(sa.inspect(row).attrs, col).history
if hist.has_changes():
# delete the original row from the archive table
session.execute(
sa.insert(row.ArchiveTable),
row.ArchiveTable.build_row_dict(
row, user_id=user_id, deleted=True, use_dirty=False
)
)
result = session.execute(
sa.insert(row.ArchiveTable),
row.ArchiveTable.build_row_dict(row, user_id=user_id)
)
return result.inserted_primary_key[0]
def load_session(db_path):
"""Load and return a new sqlalchemy session and engine.
Parameters
----------
db_path : str
Path to desired database location,can be relative or use tilde to specify the user $HOME.
Returns
-------
session : sqlalchemy.orm.session.Session
Session instance.
engine : sqlalchemy.engine.Engine
Engine instance.
"""
db_path = "sqlite:///" + path.abspath(path.expanduser(db_path))
engine = create_engine(db_path, echo=False)
#it is very important that `autoflush == False`,otherwise if "treatments" or "measurements" entried precede "external_ids" the latter will insert a null on the animal_id column
Session = sessionmaker(bind=engine, autoflush=False)
session = Session()
Base.Metadata.create_all(engine)
return session, engine
def write_version_info(version_table, version_value):
"""
Inserts the version value in to the version table.
Parameters
----------
version_table : sa.Table
The version table of the asset database
version_value : int
The version to write in to the database
"""
sa.insert(version_table, values={'version': version_value}).execute()
def _generate_output_dataframe(data_subset, defaults):
"""
Generates an output dataframe from the given subset of user-provided
data,the given column names,and the given default values.
Parameters
----------
data_subset : DataFrame
A DataFrame,usually from an AssetData object,
that contains the user's input Metadata for the asset type being
processed
defaults : dict
A dict where the keys are the names of the columns of the desired
output DataFrame and the values are the default values to insert in the
DataFrame if no user data is provided
Returns
-------
DataFrame
A DataFrame containing all user-provided Metadata,and default values
wherever user-provided Metadata was missing
"""
# The columns provided.
cols = set(data_subset.columns)
desired_cols = set(defaults)
# Drop columns with unrecognised headers.
data_subset.drop(cols - desired_cols,
axis=1,
inplace=True)
# Get those columns which we need but
# for which no data has been supplied.
for col in desired_cols - cols:
# write the default value for any missing columns
data_subset[col] = defaults[col]
return data_subset
def write_version_info(conn, version_table, version_value):
"""
Inserts the version value in to the version table.
Parameters
----------
conn : sa.Connection
The connection to use to execute the insert.
version_table : sa.Table
The version table of the asset database
version_value : int
The version to write in to the database
"""
conn.execute(sa.insert(version_table, values={'version': version_value}))
def test_json_encode_read_failure(self):
self.session.execute(
sa.insert(sa.table('test', sa.column('json_list')), values={'json_list': '{"a": "b"}'})
)
try:
self.session.query(TestModel).first()
except ValueError as e:
self.assertEqual(e.message, "value of type <type 'dict'> is not <type 'list'>")
return
self.assertTrue(False, 'Test should have raised ValueError')
def _versioned_delete(row, user_id=None):
result = session.execute(
sa.insert(row.ArchiveTable), user_id=user_id)
)
# This should not matter since the row is being deleted anyways from the user
# table but is here if we ever decide to implement soft deletes on the user table
return result.inserted_primary_key[0]
def _versioned_insert(row, user_id=user_id)
)
return result.inserted_primary_key[0]
def get_or_create(session, table, thread, board):
instance = session.query(table).filter_by(
thread=thread, archived=False, board=board).first()
if instance:
return instance
else:
try:
i = insert(table)
i = i.values({"thread": thread, "archived": False, "board": board})
session.execute(i)
session.commit()
except IntegrityError as e:
logger.info(e)
def _generate_output_dataframe(data_subset,
inplace=True)
# Get those columns which we need but
# for which no data has been supplied.
need = desired_cols - cols
# Combine the users supplied data with our required columns.
output = pd.concat(
(data_subset, pd.DataFrame(
{k: defaults[k] for k in need},
data_subset.index,
)),
axis=1,
copy=False
)
return output
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。