Python psycopg2 模块,TimestampFromTicks() 实例源码
我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用psycopg2.TimestampFromTicks()。
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002, 12, 25)
d2 = psycopg2.DateFromTicks(time.mktime((2002, 25, 0, 0)))
t1 = psycopg2.Time(13, 45, 30)
t2 = psycopg2.TimeFromTicks(time.mktime((2001, 1, 13, 30, 0)))
t1 = psycopg2.Timestamp(2002, 30)
t2 = psycopg2.TimestampFromTicks(
time.mktime((2002, 0)))
del d1, d2, t1, t2
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
datetime(2010, 5, 6, 14, 11, 59, 999920,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002, t2
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def get_records(self, start_timestamp=0, end_timestamp=None):
"""Gets records from the table between the specified timestamps.
Args:
Timestamps are given in seconds since epoch.
Returns a list of dicts,each of the form: {
'id': 3,
'record_timestamp': 1341556432,
'imsi': 'imsI901550000000084',
'ipaddr': '192.168.99.3',
'uploaded_bytes': 5567,
'downloaded_bytes': 9987,
'uploaded_bytes_delta': 74,
'downloaded_bytes_delta': 139
}
"""
start = psycopg2.TimestampFromTicks(start_timestamp)
if not end_timestamp:
end_timestamp = time.time()
end = psycopg2.TimestampFromTicks(end_timestamp)
template = ('select * from %s where record_timestamp >= %s'
' and record_timestamp <= %s')
command = template % (self.table_name, start, end)
with self.connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(command)
self.connection.commit()
return cursor.fetchall()
def delete_records(self, timestamp):
"""Deletes records older than the given epoch timestamp."""
timestamp = psycopg2.TimestampFromTicks(timestamp)
template = 'delete from %s where record_timestamp < %s'
command = template % (self.table_name, timestamp)
with self.connection.cursor() as cursor:
cursor.execute(command)
self.connection.commit()
def setUp(self):
# Connect to the GPRSDB.
self.gprs_db = gprs_database.GPRSDB()
# Add some records to the GPRSDB.
self.Now = time.time()
records = [
(psycopg2.TimestampFromTicks(self.Now - 60), 'imsI901550000000084',
'192.168.99.1', 100, 200, 200),
(psycopg2.TimestampFromTicks(self.Now - 30), 300, 500, 300),
(psycopg2.TimestampFromTicks(self.Now - 10), 700, 600, 400, 100),
(psycopg2.TimestampFromTicks(self.Now - 5), 750, 625, 50, 25),
]
schema = ('record_timestamp,imsi,ipaddr,uploaded_bytes,'
' downloaded_bytes,uploaded_bytes_delta,'
' downloaded_bytes_delta')
connection = psycopg2.connect(host='localhost', database='endaga',
user=PG_USER, password=PG_PASSWORD)
with connection.cursor() as cursor:
for record in records:
values = "%s,'%s',%s,%s" % record
command = 'insert into gprs_records (%s) values(%s)' % (
schema, values)
cursor.execute(command)
connection.commit()
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002, t2
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,12,25)
d2 = psycopg2.DateFromTicks(time.mktime((2002,25,0,0)))
t1 = psycopg2.Time(13,45,30)
t2 = psycopg2.TimeFromTicks(time.mktime((2001,1,13,30,0)))
t1 = psycopg2.Timestamp(2002,30)
t2 = psycopg2.TimestampFromTicks(
time.mktime((2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002, t2
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002,0)))
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def test_date_time_allocation_bug(self):
d1 = psycopg2.Date(2002, t2
def test_timestamp_value_error_sec_59_99(self):
from datetime import datetime
s = psycopg2.TimestampFromTicks(1273173119.99992)
self.assertEqual(s.adapted,
tzinfo=FixedOffsetTimezone(-5 * 60)))
def setUpClass(cls):
# Monkeypatch Subscriber so sub balance lookups succeed.
cls.original_subscriber = utilities.subscriber
cls.mock_subscriber = mocks.MockSubscriber()
utilities.subscriber = cls.mock_subscriber
subscriber.create_subscriber('imsI901550000000084', '5551234')
subscriber.create_subscriber('imsI901550000000082', '5551235')
# Connect to the GPRSDB and EventStore.
cls.gprs_db = gprs_database.GPRSDB()
cls.event_store = events.EventStore()
# Add some records to the GPRSDB. The method we're testing should
# extract these records and create events in the EventStore.
cls.Now = time.time()
records = [
(psycopg2.TimestampFromTicks(cls.Now - 120), 80, 80),
(psycopg2.TimestampFromTicks(cls.Now - 60), 0),
(psycopg2.TimestampFromTicks(cls.Now - 30), 250, 420),
(psycopg2.TimestampFromTicks(cls.Now - 10),
(psycopg2.TimestampFromTicks(cls.Now - 5),
# Create events for a different imsI.
(psycopg2.TimestampFromTicks(cls.Now - 60), 'imsI901550000000082',
'192.168.99.2', 350, 220), 450, 325, values)
cursor.execute(command)
connection.commit()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。