如何解决访问 SQLite DB /w python 并获取格式错误的 DB
我有一些 python 代码可以跨 sftp 复制 SQLite 数据库。然而,它是一个高度活跃的数据库,所以很多时候我都遇到了格式错误的数据库。我正在考虑这些可能的选项,但我不知道如何实现它们,因为我是 Python 的新手。
- 复制 sqlite 数据库的替代方法?
- 也许有办法从设备查询 sqlite 文件?不确定这是否可行,因为 sqlite 更像是一个本地数据库,不知道如何查询它,就像我可以使用 mysql 等...
- 创建一个循环?我可以在异常中再次调用该函数,但不确定如何重试其余代码。
- 另外,格式错误的数据库问题可能会发生在我想的其他部分吗?也许我需要运行 pragma quick_check?
这就是我经常看到的......另一个问题是为什么我经常看到它?因为如果我从我的主机加载 sqlite 文件,它运行查询文件?
(venv) dulanic@mediaserver:/opt/python_scripts/rpi$ cd /opt/python_scripts/rpi ; /usr/bin/env /opt/python_scripts/rpi/venv/bin/python /home/dulanic/.vscode-server/extensions/ms-python.python-2021.2.636928669/pythonFiles/lib/python/debugpy/launcher 37599 -- /opt/python_scripts/rpi/rpdb.py
An error occurred: database disk image is malformed
这是我当前的代码:
#!/usr/bin/env python3
import psycopg2,sqlite3,sys,paramiko,os,socket,time
scpuser=os.getenv('scpuser')
scppw = os.getenv('scppw')
sqdb = os.getenv('sqdb')
sqlike = os.getenv('sqlike')
pgdb = os.getenv('pgdb')
pguser = os.getenv('pguser')
pgpswd = os.getenv('pgpswd')
pghost = os.getenv('pghost')
pgport = os.getenv('pgport')
pgschema = os.getenv('pgschema')
database = r"./pihole.db"
pihole = socket.gethostbyname('pi.hole')
tabnames=[]
tabgrab = ''
def pullsqlite():
sftp.get('/etc/pihole/pihole-FTL.db','pihole.db')
sftp.close()
# SFTP pull config
ssh_client=paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=pihole,username=scpuser,password=scppw)
sftp=ssh_client.open_sftp()
# Pull SQlite
pullsqlite()
# Load sqlite tables to list
consq=sqlite3.connect(sqdb)
cursq=consq.cursor()
cursq.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name in ({sqlike})" )
tabgrab = cursq.fetchall()
# postgres connection
conpg = psycopg2.connect(database=pgdb,user=pguser,password=pgpswd,host=pghost,port=pgport)
#Load data to postgres from sqlite
for item in tabgrab:
tabnames.append(item[0])
start = time.perf_counter()
for table in tabnames:
curpg = conpg.cursor()
if table=='queries':
curpg.execute(f"SELECT max(id) FROM {table};")
max_id = curpg.fetchone()[0]
cursq.execute(f"SELECT * FROM {table} where id > {max_id};")
else:
cursq.execute(f"SELECT * FROM {table};")
try:
rows=cursq.fetchall()
except sqlite3.Error as e:
print("An error occurred:",e.args[0])
colcount=len(rows[0])
pholder=('%s,'*colcount)[:-1]
try:
curpg.execute(f"SET search_path TO {pgschema};" )
curpg.executemany(f"INSERT INTO {table} VALUES ({pholder}) ON CONFLICT DO NOTHING;",rows)
conpg.commit()
print(f'Inserted {len(rows)} rows into {table}')
except psycopg2.DatabaseError as e:
print (f'Error {e}')
sys.exit(1)
if 'start' in locals():
elapsed = time.perf_counter() - start
print(f'Time {elapsed:0.4}')
consq.close()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。