Python psycopg2 模块,Error() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.Error()。
def main():
db_name = os.environ['PGDATABASE']
connection_parameters = {
'host': os.environ['PGHOST'],
'database': 'postgres',
'user': os.environ['PGUSER'],
'password': os.environ['PGPASSWORD']
}
drop_statement = 'DROP DATABASE IF EXISTS {};'.format(db_name)
ddl_statement = 'CREATE DATABASE {};'.format(db_name)
conn = connect(**connection_parameters)
conn.autocommit = True
try:
with conn.cursor() as cursor:
cursor.execute(drop_statement)
cursor.execute(ddl_statement)
conn.close()
sys.stdout.write('Created database environment successfully.\n')
except psycopg2.Error:
raise SystemExit(
'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
)
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error as exc:
e = exc
diag = e.diag
self.assertTrue(isinstance(diag, psycopg2.extensions.Diagnostics))
for attr in [
'column_name', 'constraint_name', 'context', 'datatype_name',
'internal_position', 'internal_query', 'message_detail',
'message_hint', 'message_primary', 'schema_name', 'severity',
'source_file', 'source_function', 'source_line', 'sqlstate',
'statement_position', 'table_name', ]:
v = getattr(diag, attr)
if v is not None:
self.assertTrue(isinstance(v, str))
def test_diagnostics_independent(self):
cur = self.conn.cursor()
try:
cur.execute("l'acqua e' poca e 'a papera nun galleggia")
except Exception as exc:
diag1 = exc.diag
self.conn.rollback()
try:
cur.execute("select level from water where ducks > 1")
except psycopg2.Error as exc:
diag2 = exc.diag
self.assertEqual(diag1.sqlstate, '42601')
self.assertEqual(diag2.sqlstate, '42P01')
def test_9_3_diagnostics(self):
cur = self.conn.cursor()
cur.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
)""")
try:
cur.execute("insert into test_exc values(2)")
except psycopg2.Error as exc:
e = exc
self.assertEqual(e.pgcode, '23514')
self.assertEqual(e.diag.schema_name[:7], "pg_temp")
self.assertEqual(e.diag.table_name, "test_exc")
self.assertEqual(e.diag.column_name, None)
self.assertEqual(e.diag.constraint_name, "chk_eq1")
self.assertEqual(e.diag.datatype_name, None)
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, exc:
e = exc
diag = e.diag
self.assert_(isinstance(diag, attr)
if v is not None:
self.assert_(isinstance(v, str))
def test_diagnostics_independent(self):
cur = self.conn.cursor()
try:
cur.execute("l'acqua e' poca e 'a papera nun galleggia")
except Exception, exc:
diag1 = exc.diag
self.conn.rollback()
try:
cur.execute("select level from water where ducks > 1")
except psycopg2.Error, exc:
diag2 = exc.diag
self.assertEqual(diag1.sqlstate, '42P01')
def test_9_3_diagnostics(self):
cur = self.conn.cursor()
cur.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
)""")
try:
cur.execute("insert into test_exc values(2)")
except psycopg2.Error, exc:
e = exc
self.assertEqual(e.pgcode, None)
def _connect(self) -> None:
logger.info("initializing database connection:")
logger.info("host: %s", self.host)
logger.info("port: %s", self.port)
logger.info("dbname: %s", self.dbname)
try:
self.conn = psycopg2.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.dbname,
connect_timeout=5)
self.conn.set_session(autocommit=True)
logger.info("successfully initialized database connection")
except psycopg2.Error as error:
logger.exception("unable to connect to database")
raise error
def from_environment(cls) -> Optional['PostgresDemoDatabase']:
host = os.environ.get("DEMO_POSTGRES_HOST")
port = os.environ.get("DEMO_POSTGRES_PORT") or "5432"
dbname = os.environ.get("DEMO_POSTGRES_dbnAME")
user = os.environ.get("DEMO_POSTGRES_USER")
password = os.environ.get("DEMO_POSTGRES_PASSWORD")
if all([host, port, dbname, user, password]):
try:
logger.info("Initializing demo database connection using environment variables")
return PostgresDemoDatabase(dbname=dbname, host=host, port=port, user=user, password=password)
except psycopg2.Error:
logger.exception("unable to connect to database,permalinks not enabled")
return None
else:
logger.info("Relevant environment variables not found,so no demo database")
return None
def add_result(self,
headers: JsonDict,
model_name: str,
inputs: JsonDict,
outputs: JsonDict) -> Optional[int]:
try:
self._health_check()
with self.conn.cursor() as curs:
logger.info("inserting into the database")
curs.execute(INSERT_sql,
{'model_name' : model_name,
'headers' : json.dumps(headers),
'request_data' : json.dumps(inputs),
'response_data': json.dumps(outputs),
'timestamp' : datetime.datetime.Now()})
perma_id = curs.fetchone()[0]
logger.info("received perma_id %s", perma_id)
return perma_id
except (psycopg2.Error, AttributeError):
logger.exception("Unable to insert permadata")
return None
def get_result(self, perma_id: int) -> Optional[Permadata]:
try:
self._health_check()
with self.conn.cursor() as curs:
logger.info("retrieving perma_id %s from database", perma_id)
curs.execute(RETRIEVE_sql, (perma_id,))
row = curs.fetchone()
# If there's no result,return None.
if row is None:
return None
# Otherwise,return a ``Permadata`` instance.
model_name, request_data, response_data = row
return Permadata(model_name, json.loads(request_data), json.loads(response_data))
except (psycopg2.Error, AttributeError):
logger.exception("Unable to retrieve result")
return None
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(datafield(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_insert = 'begin; '
for r in p_block.Rows:
v_insert = v_insert + 'insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + self.mogrify(r, v_fields) + '; '
v_insert = v_insert + 'commit;'
self.Execute(v_insert)
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def ExecuteScalar(self, p_sql):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of open() and Close() calls.')
else:
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql):
try:
if self.v_con is None:
raise Spartacus.Database.Exception('This method should be called in the middle of open() and Close() calls.')
else:
v_fields = []
self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1')
r = self.v_cur.fetchone()
if r != None:
k = 0
for c in self.v_cur.description:
v_fields.append(datafield(c[0], p_type=type(r[k]), p_dbtype=type(r[k])))
k = k + 1
else:
k = 0
for c in self.v_cur.description:
v_fields.append(datafield(c[0], p_type=type(None), p_dbtype=type(None)))
k = k + 1
return v_fields
except Spartacus.Database.Exception as exc:
raise exc
except sqlite3.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Open(self, p_autocommit=True):
try:
self.v_con = psycopg2.connect(
self.GetConnectionString(),
cursor_factory=psycopg2.extras.DictCursor
)
self.v_con.autocommit = p_autocommit
self.v_cur = self.v_con.cursor()
self.v_start = True
# Postgresql types
self.v_cur.execute('select oid,typname from pg_type')
self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()])
if not p_autocommit:
self.v_con.commit()
self.v_con.notices = DataList()
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_fields=None):
try:
v_columnames = []
if p_fields is None:
v_fields = []
for c in p_block.Columns:
v_columnames.append(c)
v_fields.append(datafield(c))
else:
v_fields = p_fields
for p in v_fields:
v_columnames.append(p.v_name)
v_values = []
for r in p_block.Rows:
v_values.append(self.mogrify(r, v_fields))
self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except psycopg2.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pyMysqL.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self,'.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except pyMysqL.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pyMysqL.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except pyMysqL.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except fdb.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self,'.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except fdb.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self,'.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except cx_Oracle.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except pymssql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self,'.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except pymssql.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def ExecuteScalar(self, p_sql):
try:
v_keep = None
if self.v_con is None:
self.Open()
v_keep = False
else:
v_keep = True
self.v_cur.execute(p_sql)
r = self.v_cur.fetchone()
if r != None:
s = r[0]
else:
s = None
return s
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
finally:
if not v_keep:
self.Close()
def InsertBlock(self,'.join(v_values) + '')
except Spartacus.Database.Exception as exc:
raise exc
except ibm_db_dbi.Error as exc:
raise Spartacus.Database.Exception(str(exc))
except Exception as exc:
raise Spartacus.Database.Exception(str(exc))
def get_all_sequences_in_schema(self, schema):
if not self.schema_exists(schema):
raise Error('Schema "%s" does not exist.' % schema)
query = """SELECT relname
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE nspname = %s AND relkind = 'S'"""
self.cursor.execute(query, (schema,))
return [t[0] for t in self.cursor.fetchall()]
### Methods for getting access control lists and group membership info
# To determine whether anything has changed after granting/revoking
# privileges,we compare the access control lists of the specified database
# objects before and afterwards. Python's list/string comparison should
# suffice for change detection,we should not actually have to parse ACLs.
# The same should apply to group membership information.
def psql(query:str, var=None):
import psycopg2
try:
db2=psycopg2.connect(database='firmware', user='firmadyne',
password='firmadyne', host='127.0.0.1')
cur = db2.cursor()
cur.execute(query,var)
if not query.startswith('SELECT'):
db2.commit()
return
else:
rows = cur.fetchall()
return rows
except psycopg2.Error as ex:
print(ex, file=sys.stderr)
db2.rollback()
raise ex
finally:
db2.close()
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error as exc:
e = exc
diag = e.diag
self.assertTrue(isinstance(diag, str))
def test_diagnostics_independent(self):
cur = self.conn.cursor()
try:
cur.execute("l'acqua e' poca e 'a papera nun galleggia")
except Exception as exc:
diag1 = exc.diag
self.conn.rollback()
try:
cur.execute("select level from water where ducks > 1")
except psycopg2.Error as exc:
diag2 = exc.diag
self.assertEqual(diag1.sqlstate, '42P01')
def test_9_3_diagnostics(self):
cur = self.conn.cursor()
cur.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
)""")
try:
cur.execute("insert into test_exc values(2)")
except psycopg2.Error as exc:
e = exc
self.assertEqual(e.pgcode, None)
def create_tables(self, drop_tables=False, store_tables=True):
for table in self.table_ddl:
if drop_tables:
sql_drop_clear='DROP TABLE IF EXISTS "%s"."%s" CASCADE ;' % (self.pg_conn.dest_schema, table,)
sql_drop_obf='DROP TABLE IF EXISTS "%s"."%s" CASCADE ;' % (self.obf_schema,)
self.pg_conn.pgsql_cur.execute(sql_drop_clear)
self.pg_conn.pgsql_cur.execute(sql_drop_obf)
try:
ddl_enum=self.type_ddl[table]
for sql_type in ddl_enum:
self.pg_conn.pgsql_cur.execute(sql_type)
except:
pass
sql_create=self.table_ddl[table]
try:
self.pg_conn.pgsql_cur.execute(sql_create)
except psycopg2.Error as e:
self.logger.error("sqlCODE: %s sqlERROR: %s" % (e.pgcode, e.pgerror))
self.logger.error(sql_create)
self.logger.debug('Storing table %s in t_replica_tables' % (table, ))
if store_tables:
self.store_table(table)
def refresh_indices(self):
self.logger.info("trying to create the indices on schemas %s and %s" %(self.pg_conn.dest_schema, self.pg_conn.schema_obf))
for index in self.idx_ddl:
idx_ddl= self.idx_ddl[index]
for sql_idx in idx_ddl:
path_clear = "SET search_path=%s;" % self.pg_conn.dest_schema
path_obf = "SET search_path=%s;" % self.pg_conn.schema_obf
sql_clear = "%s %s" %(path_clear , sql_idx)
sql_obf = "%s %s" %(path_clear , path_obf)
try:
self.logger.info("Executing %s on %s" %(sql_idx, self.pg_conn.dest_schema))
self.pg_conn.pgsql_cur.execute(sql_clear)
except psycopg2.Error as e:
self.logger.error("sqlCODE: %s sqlERROR: %s" % (e.pgcode, e.pgerror))
try:
self.logger.info("Executing %s on %s" %(sql_idx, self.pg_conn.schema_obf))
self.pg_conn.pgsql_cur.execute(sql_obf)
except psycopg2.Error as e:
self.logger.error("sqlCODE: %s sqlERROR: %s" % (e.pgcode, e.pgerror))
def reset_sequences(self, destination_schema):
""" method to reset the sequences to the max value available in table """
self.logger.info("resetting the sequences in schema %s" % destination_schema)
sql_gen_reset=""" SELECT
format('SELECT setval(%%L::regclass,(select max(id) FROM %%I.%%I));',
replace(replace(column_default,'nextval(''',''),'''::regclass)',
table_schema,
table_name
)
FROM
information_schema.columns
WHERE
table_schema=%s
AND column_default like 'nextval%%'
;"""
self.pg_conn.pgsql_cur.execute(sql_gen_reset, (destination_schema, ))
results=self.pg_conn.pgsql_cur.fetchall()
try:
for statement in results[0]:
self.pg_conn.pgsql_cur.execute(statement)
except psycopg2.Error as e:
self.logger.error("sqlCODE: %s sqlERROR: %s" % (e.pgcode, e.pgerror))
self.logger.error(statement)
except:
pass
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, str))
def test_diagnostics_life(self):
import gc
from weakref import ref
def tmp():
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, exc:
return cur, exc
cur, e = tmp()
diag = e.diag
w = ref(cur)
del e, cur
gc.collect()
assert(w() is not None)
self.assertEqual(diag.sqlstate, '42P01')
del diag
gc.collect(); gc.collect()
assert(w() is None)
def test_diagnostics_independent(self):
cur = self.conn.cursor()
try:
cur.execute("l'acqua e' poca e 'a papera nun galleggia")
except Exception, '42P01')
def test_9_3_diagnostics(self):
cur = self.conn.cursor()
cur.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
)""")
try:
cur.execute("insert into test_exc values(2)")
except psycopg2.Error, None)
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, str))
def test_diagnostics_life(self):
import gc
from weakref import ref
def tmp():
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
except psycopg2.Error, '42P01')
del diag
gc.collect(); gc.collect()
assert(w() is None)
def test_9_3_diagnostics(self):
cur = self.conn.cursor()
cur.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
)""")
try:
cur.execute("insert into test_exc values(2)")
except psycopg2.Error, None)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。