Python psycopg2 模块,extensions() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用psycopg2.extensions()。
def test_interval_overflow(self):
cur = self.conn.cursor()
# hack a cursor to receive values too extreme to be represented
# but still I want an error,not a random number
psycopg2.extensions.register_type(
psycopg2.extensions.new_type(
psycopg2.STRING.values, 'WAT', psycopg2.extensions.INTERVAL),
cur)
def f(val):
cur.execute("select '%s'::text" % val)
return cur.fetchone()[0]
self.assertRaises(OverflowError, f, '100000000000000000:00:00')
self.assertRaises(OverflowError, '00:100000000000000000:00:00')
self.assertRaises(OverflowError, '00:00:100000000000000000:00')
self.assertRaises(OverflowError, '00:00:00.100000000000000000')
# Only run the datetime tests if psycopg was compiled with support.
def setUp(self):
ConnectingTestCase.setUp(self)
self.curs = self.conn.cursor()
self.DATE = psycopg2._psycopg.MXDATE
self.TIME = psycopg2._psycopg.MXTIME
self.DATETIME = psycopg2._psycopg.MXDATETIME
self.INTERVAL = psycopg2._psycopg.MXINTERVAL
psycopg2.extensions.register_type(self.DATE, self.conn)
psycopg2.extensions.register_type(self.TIME, self.conn)
psycopg2.extensions.register_type(self.DATETIME, self.conn)
psycopg2.extensions.register_type(self.INTERVAL, self.conn)
psycopg2.extensions.register_type(psycopg2.extensions.MXDATEARRAY, self.conn)
psycopg2.extensions.register_type(psycopg2.extensions.MXTIMEARRAY, self.conn)
psycopg2.extensions.register_type(
psycopg2.extensions.MXDATETIMEARRAY, self.conn)
psycopg2.extensions.register_type(
psycopg2.extensions.MXINTERVALARRAY, self.conn)
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.conn
stub = self.set_stub_wait_callback(conn)
curs = conn.cursor()
for mb in 1, 5, 10, 20, 50:
size = mb * 1024 * 1024
del stub.polls[:]
curs.execute("select %s;", ('x' * size,))
self.assertEqual(size, len(curs.fetchone()[0]))
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
return
# This is more a testing glitch than an error: it happens
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def notify(self, name, sec=0, payload=None):
"""Send a notification to the database,eventually after some time."""
if payload is None:
payload = ''
else:
payload = ",%r" % payload
script = ("""\
import time
time.sleep(%(sec)s)
import %(module)s as psycopg2
import %(module)s.extensions as ext
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
""" % {
'module': psycopg2.__name__,
'dsn': dsn, 'sec': sec, 'name': name, 'payload': payload})
return Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', 'bar', 'baz']:
self.listen(name)
pids = {}
for name in ['foo', 'baz', 'qux']:
pids[name] = int(self.notify(name).communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
for i in range(10):
self.assertEqual(extensions.POLL_OK, self.conn.poll())
self.assertEqual(3, len(self.conn.notifies))
names = dict.fromkeys(['foo', 'baz'])
for (pid, name) in self.conn.notifies:
self.assertEqual(pids[name], pid)
names.pop(name) # raise if name found twice
def test_adapt_8(self):
if self.conn.server_version >= 90000:
return self.skipTest("skipping dict adaptation with PG pre-9 Syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = '\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
self.assertTrue(q.startswith(b"(("), q)
ii = q[1:-1].split(b"||")
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertQuotedEqual(ii[0], b"('a' => '1')")
self.assertQuotedEqual(ii[1], b"('b' => '''')")
self.assertQuotedEqual(ii[2], b"('c' => NULL)")
if 'd' in o:
encc = '\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
def test_register_globally(self):
from psycopg2.extras import register_hstore, HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
try:
register_hstore(self.conn, globally=True)
conn2 = self.connect()
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assertTrue(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0][0])
# verify the caster is not around anymore
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assertTrue(isinstance(r[0], str))
def test_oid(self):
cur = self.conn.cursor()
cur.execute("select 'hstore'::regtype::oid")
oid = cur.fetchone()[0]
# Note: None as conn_or_cursor is just for testing: not public
# interface and it may break in future.
from psycopg2.extras import register_hstore
register_hstore(None, globally=True, oid=oid)
try:
cur.execute("select null::hstore,''::hstore,'a => b'::hstore")
t = cur.fetchone()
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
finally:
psycopg2.extensions.string_types.pop(oid)
def test_array_cast_oid(self):
cur = self.conn.cursor()
cur.execute("select 'hstore'::regtype::oid,'hstore[]'::regtype::oid")
oid, aoid = cur.fetchone()
from psycopg2.extras import register_hstore
register_hstore(None, oid=oid, array_oid=aoid)
try:
cur.execute("""
select null::hstore,
'a => b'::hstore,'{a=>b}'::hstore[]""")
t = cur.fetchone()
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {'a': 'b'})
self.assertEqual(t[3], [{'a': 'b'}])
finally:
psycopg2.extensions.string_types.pop(oid)
psycopg2.extensions.string_types.pop(aoid)
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = self.connect()
conn2 = self.connect()
try:
t = psycopg2.extras.register_composite("type_ii", conn1, globally=True)
try:
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1, 2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], 2))
finally:
# drop the registered typecasters to help the refcounting
# script to return precise values.
del psycopg2.extensions.string_types[t.typecaster.values[0]]
if t.array_typecaster:
del psycopg2.extensions.string_types[
t.array_typecaster.values[0]]
finally:
conn1.close()
conn2.close()
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid, array_oid = _get_json_oids(self.conn)
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
def loads(s):
return psycopg2.extras.json.loads(s, parse_float=Decimal)
try:
new, newa = psycopg2.extras.register_json(
loads=loads, array_oid=array_oid)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0,"b": null}'::json""")
data = curs.fetchone()[0]
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_register_globally(self):
old = psycopg2.extensions.string_types.get(3802)
olda = psycopg2.extensions.string_types.get(3807)
try:
new, newa = psycopg2.extras.register_json(self.conn,
loads=self.myloads, name='jsonb')
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0,"b": null}'::jsonb""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None, 'test': 1})
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_withhold_no_begin(self):
self._create_withhold_table()
curs = self.conn.cursor("w", withhold=True)
curs.execute("select data from withhold order by data")
self.assertEqual(curs.fetchone(), (10,))
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
self.conn.commit()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.assertEqual(curs.fetchone(), (20,
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
curs.close()
self.assertEqual(self.conn.status,
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_withhold_autocommit(self):
self._create_withhold_table()
self.conn.commit()
self.conn.autocommit = True
curs = self.conn.cursor("w", withhold=True)
curs.execute("select data from withhold order by data")
self.assertEqual(curs.fetchone(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.conn.commit()
self.assertEqual(self.conn.status,
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_unicode(self):
curs = self.conn.cursor()
curs.execute("SHOW server_encoding")
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
return self.skipTest(
"Unicode test skipped since server encoding is %s"
% server_encoding)
data = """some data with \t chars
to escape into,'quotes',\u20ac euro sign and \\ a backslash too.
"""
data += "".join(map(chr, [u for u in range(1, 65536)
if not 0xD800 <= u <= 0xDFFF])) # surrogate area
self.conn.set_client_encoding('UNICODE')
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn)
curs.execute("SELECT %s::text;", (data,))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assertTrue(not self.conn.notices)
def test_koi8(self):
self.conn.set_client_encoding('KOI8')
curs = self.conn.cursor()
if sys.version_info[0] < 3:
data = ''.join(map(chr, list(range(32, 127)) + list(range(128, 256))))
else:
data = bytes(list(range(32, 256))).decode('koi8_r')
# as string
curs.execute("SELECT %s::text;",))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assertTrue(not self.conn.notices)
# as unicode
if sys.version_info[0] < 3:
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn)
data = data.decode('koi8_r')
curs.execute("SELECT %s::text;",))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assertTrue(not self.conn.notices)
def test_interval_overflow(self):
cur = self.conn.cursor()
# hack a cursor to receive values too extreme to be represented
# but still I want an error, '00:00:00.100000000000000000')
# Only run the datetime tests if psycopg was compiled with support.
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.conn
stub = self.set_stub_wait_callback(conn)
curs = conn.cursor()
for mb in 1,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', pid)
names.pop(name) # raise if name found twice
def test_adapt_8(self):
if self.conn.server_version >= 90000:
return self.skipTest("skipping dict adaptation with PG pre-9 Syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
self.assert_(q.startswith(b"(("), b"('c' => NULL)")
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
def test_register_globally(self):
from psycopg2.extras import register_hstore, globally=True)
conn2 = self.connect()
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0][0])
# verify the caster is not around anymore
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assert_(isinstance(r[0], str))
def test_oid(self):
cur = self.conn.cursor()
cur.execute("select 'hstore'::regtype::oid")
oid = cur.fetchone()[0]
# Note: None as conn_or_cursor is just for testing: not public
# interface and it may break in future.
from psycopg2.extras import register_hstore
register_hstore(None,'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {'a': 'b'})
finally:
psycopg2.extensions.string_types.pop(oid)
def test_register_globally(self):
self._create_type("type_ii", 2))
finally:
# drop the registered typecasters to help the refcounting
# script to return precise values.
del psycopg2.extensions.string_types[t.typecaster.values[0]]
if t.array_typecaster:
del psycopg2.extensions.string_types[
t.array_typecaster.values[0]]
finally:
conn1.close()
conn2.close()
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid,"b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_register_globally(self):
old = psycopg2.extensions.string_types.get(3802)
olda = psycopg2.extensions.string_types.get(3807)
try:
new, 'test': 1})
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_withhold_no_begin(self):
self._create_withhold_table()
curs = self.conn.cursor("w",
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_withhold_autocommit(self):
self._create_withhold_table()
self.conn.commit()
self.conn.autocommit = True
curs = self.conn.cursor("w",
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_unicode(self):
curs = self.conn.cursor()
curs.execute("SHOW server_encoding")
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
return self.skipTest(
"Unicode test skipped since server encoding is %s"
% server_encoding)
data = u"""some data with \t chars
to escape into,\u20ac euro sign and \\ a backslash too.
"""
data += u"".join(map(unichr, data)
self.assert_(not self.conn.notices)
def test_koi8(self):
self.conn.set_client_encoding('KOI8')
curs = self.conn.cursor()
if sys.version_info[0] < 3:
data = ''.join(map(chr, range(32, 127) + range(128, 256)))
else:
data = bytes(range(32, 256)).decode('koi8_r')
# as string
curs.execute("SELECT %s::text;", data)
self.assert_(not self.conn.notices)
# as unicode
if sys.version_info[0] < 3:
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, data)
self.assert_(not self.conn.notices)
def patch_conn(conn, traced_conn_cls=dbapi.TracedConnection):
""" Wrap will patch the instance so that it's queries are traced."""
# ensure we've patched extensions (this is idempotent) in
# case we're only tracing some connections.
_patch_extensions(_psycopg2_extensions)
c = traced_conn_cls(conn)
# fetch tags from the dsn
dsn = sql.parse_pg_dsn(conn.dsn)
tags = {
net.TARGET_HOST: dsn.get("host"),
net.TARGET_PORT: dsn.get("port"),
db.NAME: dsn.get("dbname"),
db.USER: dsn.get("user"),
"db.application" : dsn.get("application_name"),
}
Pin(
service="postgres",
app="postgres",
app_type="db",
tags=tags).onto(c)
return c
def test_interval_overflow(self):
cur = self.conn.cursor()
# hack a cursor to receive values too extreme to be represented
# but still I want an error, '00:00:00.100000000000000000')
def test_redshift_day(self):
# Redshift is reported returning 1 day interval as microsec (bug #558)
cur = self.conn.cursor()
psycopg2.extensions.register_type(
psycopg2.extensions.new_type(
psycopg2.STRING.values,
cur)
from datetime import timedelta
for s, v in [
('0', timedelta(0)),
('1', timedelta(microseconds=1)),
('-1', timedelta(microseconds=-1)),
('1000000', timedelta(seconds=1)),
('86400000000', timedelta(days=1)),
('-86400000000', timedelta(days=-1)),
]:
cur.execute("select %s::text", (s,))
r = cur.fetchone()[0]
self.assertEqual(r, v, "%s -> %s != %s" % (s, r, v))
# Only run the datetime tests if psycopg was compiled with support.
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.conn
stub = self.set_stub_wait_callback(conn)
curs = conn.cursor()
for mb in 1,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def crappy_callback(self, conn):
"""green callback failing after `self.to_error` time it is called"""
import select
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
while 1:
if self.to_error is not None:
self.to_error -= 1
if self.to_error <= 0:
raise ZeroDivisionError("I accidentally the connection")
try:
state = conn.poll()
if state == POLL_OK:
break
elif state == POLL_READ:
select.select([conn.fileno()], [], [])
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [])
else:
raise conn.OperationalError("bad state from poll: %s" % state)
except KeyboardInterrupt:
conn.cancel()
# the loop will be broken by a server error
continue
def test_notifies_received_on_poll(self):
self.autocommit(self.conn)
self.listen('foo')
proc = self.notify('foo', 1)
t0 = time.time()
select.select([self.conn], 5)
t1 = time.time()
self.assertTrue(0.99 < t1 - t0 < 4, t1 - t0)
pid = int(proc.communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
self.assertEqual(extensions.POLL_OK, self.conn.poll())
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', pid)
names.pop(name) # raise if name found twice
def test_register_globally(self):
from psycopg2.extras import register_hstore, str))
def test_register_globally(self):
self._create_type("type_ii", 2))
finally:
# drop the registered typecasters to help the refcounting
# script to return precise values.
del psycopg2.extensions.string_types[t.typecaster.values[0]]
if t.array_typecaster:
del psycopg2.extensions.string_types[
t.array_typecaster.values[0]]
finally:
conn1.close()
conn2.close()
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid, Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_register_globally(self):
old = psycopg2.extensions.string_types.get(3802)
olda = psycopg2.extensions.string_types.get(3807)
try:
new, 'test': 1})
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_withhold_no_begin(self):
self._create_withhold_table()
curs = self.conn.cursor("w",
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_unicode(self):
curs = self.conn.cursor()
curs.execute("SHOW server_encoding")
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
return self.skipTest(
"Unicode test skipped since server encoding is %s"
% server_encoding)
data = """some data with \t chars
to escape into, data)
self.assertTrue(not self.conn.notices)
def test_latin1(self):
self.conn.set_client_encoding('latin1')
curs = self.conn.cursor()
if sys.version_info[0] < 3:
data = ''.join(map(chr, 127)) + list(range(160, 256))).decode('latin1')
# as string
curs.execute("SELECT %s::text;", self.conn)
data = data.decode('latin1')
curs.execute("SELECT %s::text;", data)
self.assertTrue(not self.conn.notices)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。