Python psycopg2 模块,InterfaceError() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.InterfaceError()。
def execute_read_query(self, query: str, args: tuple = ()) -> (list, list):
""" results are returned as a list of dicts"""
if not self.__conn:
raise psycopg2.InterfaceError("null connection")
rows = None
try:
with self.__conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curs:
curs.execute(query, args)
rows = curs.fetchall()
except Exception as e:
tb.print_exc()
raise(e)
if len(rows) == 0:
return []
return rows
def execute_write_query(self, args: tuple = ()) -> int:
""" perform a write query
query -- the query string with optional %s placeholders
args -- optional tuple of values for %s placeholders
returns number of rows modified
"""
if not self.__conn:
raise psycopg2.InterfaceError("null connection")
try:
with self.__conn.cursor() as curs:
curs.execute(query, args)
except Exception as e:
tb.print_exc()
raise(e)
return curs.rowcount
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session,
ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
setattr, self.conn, 'autocommit', True)
# The getter doesn't have a guard. We may change this in future
# to make it consistent with other methods; meanwhile let's just check
# it doesn't explode.
try:
self.assertTrue(self.conn.autocommit in (True, False))
except psycopg2.InterfaceError:
pass
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s, d):
self.assertEqual(HstoreAdapter.parse(s, None), d)
ok(None, None)
ok('', {})
ok('"a"=>"1","b"=>"2"', {'a': '1', 'b': '2'})
ok('"a" => "1","b" => "2"', 'b': '2'})
ok('"a"=>NULL, {'a': None, 'b': '2'})
ok(r'"a"=>"\"","\""=>"2"', {'a': '"', '"': '2'})
ok('"a"=>"\'","\'"=>"2"', {'a': "'", "'": '2'})
ok('"a"=>"1","b"=>NULL', 'b': None})
ok(r'"a\\"=>"1"', {'a\\': '1'})
ok(r'"a\""=>"1"', {'a"': '1'})
ok(r'"a\\\""=>"1"', {r'a\"': '1'})
ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'})
def ko(s):
self.assertRaises(psycopg2.InterfaceError,
HstoreAdapter.parse, s, None)
ko('a')
ko('"a"')
ko(r'"a\\""=>"1"')
ko(r'"a\\\\""=>"1"')
ko('"a=>"1"')
ko('"a"=>"1","b"=>NUL')
def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur
gc.collect()
self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn)
# The connection is still usable
cur = self.conn.cursor()
cur.execute("select 42;")
self.wait(self.conn)
self.assertEqual(cur.fetchone(), (42,))
def test_write_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")
def test_read_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.read, 5)
def test_seek_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)
def test_tell_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.tell)
def parse(self, cur, _bsdec=_re.compile(r"\\(.)")):
"""Parse an hstore representation in a Python string.
The hstore is represented as something like::
"a"=>"1","b"=>"2"
with backslash-escaped strings.
"""
if s is None:
return None
rv = {}
start = 0
for m in self._re_hstore.finditer(s):
if m is None or m.start() != start:
raise psycopg2.InterfaceError(
"error parsing hstore pair at char %d" % start)
k = _bsdec.sub(r'\1', m.group(1))
v = m.group(2)
if v is not None:
v = _bsdec.sub(r'\1', v)
rv[k] = v
start = m.end()
if start < len(s):
raise psycopg2.InterfaceError(
"error parsing hstore: unparsed data after char %d" % start)
return rv
def tokenize(self, s):
rv = []
for m in self._re_tokenize.finditer(s):
if m is None:
raise psycopg2.InterfaceError("can't parse type: %r" % s)
if m.group(1) is not None:
rv.append(None)
elif m.group(2) is not None:
rv.append(self._re_undouble.sub(r"\1", m.group(2)))
else:
rv.append(m.group(3))
return rv
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
self.assertRaises(psycopg2.InterfaceError, 1)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, True)
# The getter doesn't have a guard. We may change this in future
# to make it consistent with other methods; meanwhile let's just check
# it doesn't explode.
try:
self.assert_(self.conn.autocommit in (True, False))
except psycopg2.InterfaceError:
pass
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s,"b"=>NUL')
def test_bad_subclass(self):
# check that we get an error message instead of a segfault
# for badly written subclasses.
# see http://stackoverflow.com/questions/22019341/
class StupidCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
# I am stupid so not calling superclass init
pass
cur = StupidCursor()
self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
self.assertRaises(psycopg2.InterfaceError, cur.executemany,
'select 1', [])
def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur
gc.collect()
self.assertRaises(psycopg2.InterfaceError,))
def test_read_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, 5)
def test_seek_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, 0)
def test_tell_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.tell)
def test_seek_larger_than_2gb(self):
lo = self.conn.lobject()
offset = 1 << 32 # 4gb
self.assertRaises(
(OverflowError, 0)
def parse(self, v)
rv[k] = v
start = m.end()
if start < len(s):
raise psycopg2.InterfaceError(
"error parsing hstore: unparsed data after char %d" % start)
return rv
def tokenize(self, m.group(2)))
else:
rv.append(m.group(3))
return rv
def update_exact(self, table_name: str, conditions: dict, newvalues: dict) -> int:
""" updates rows that match the given equality conditions with new values. both conditions
and newvalues are dictionaries. kv pairs in conditions are translated into 'column = value'
conditions and kv pairs in newvalues are used in the SET clause
returns the number of rows updated
"""
if not self.__conn:
raise psycopg2.InterfaceError("null connection")
try:
pass
except Exception as e:
tb.print_exc()
raise(e)
def insert_one(self, row: dict, retcol: str):
""" inserts a single row into the specified table and returns the value of the specified
column.
row - a dict containing all row values to insert
returns: new id
"""
if not self.__conn or not row:
raise psycopg2.InterfaceError("null connection")
try:
columns = "(" + ",".join(row.keys()) + ")"
holders = "(" + ",".join(["%s"] * len(row.keys())) + ")"
fillers = tuple(row.values())
query = "insert into {table_name} {columnnames} values {values} returning {retcol}".format(**{
"table_name": table_name,
"columnnames": columns,
"values": holders,
"retcol": retcol
})
with self.__conn.cursor() as curs:
curs.execute(query, fillers)
return curs.fetchone()[0]
except Exception as e:
tb.print_exc()
raise(e)
def insert(self, rows: list) -> int:
""" inserts the given data into the specified table.
An individual insert statement is generated per row. The query is handled as a prepared
statement so multi-inserts are more efficient.
table_name --
rows -- list of dicts. each dict representing a row to be inserted
returns: the number of rows inserted
"""
n_inserted = 0
if not self.__conn or not rows:
raise psycopg2.InterfaceError("null connection")
try:
for row in rows:
columns = "(" + ",".join(row.keys()) + ")" # "(col1,col2,col3...)"
params = ",".join(["%s"] * len(row.keys())) # "%s,%s,%s..."
query = "insert into {table_name} {columns} select {params}".format(**{
"table_name": table_name,
"columns": columns,
"params": params,
})
n_inserted += self.execute_write_query(query, tuple(row.values()))
return n_inserted
except Exception as e:
tb.print_exc()
raise(e)
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
self.assertRaises(psycopg2.InterfaceError, 1)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s,"b"=>NUL')
def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur
gc.collect()
self.assertRaises(psycopg2.InterfaceError,))
def test_write_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, b"some data")
def test_read_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, 5)
def test_seek_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, 0)
def test_tell_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.tell)
def test_seek_larger_than_2gb(self):
lo = self.conn.lobject()
offset = 1 << 32 # 4gb
self.assertRaises(
(OverflowError, 0)
def test_truncate_larger_than_2gb(self):
lo = self.conn.lobject()
length = 1 << 32 # 4gb
self.assertRaises(
(OverflowError, length)
def parse(self, v)
rv[k] = v
start = m.end()
if start < len(s):
raise psycopg2.InterfaceError(
"error parsing hstore: unparsed data after char %d" % start)
return rv
def tokenize(self, m.group(2)))
else:
rv.append(m.group(3))
return rv
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。