微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

python ThreadPoolExecutor在任务完成之前关闭连接

如何解决python ThreadPoolExecutor在任务完成之前关闭连接

我正在尝试从 Binance API 检索加密货币历史数据并将其存储到我的 postgresql 数据库中。 这是我的脚本的样子:

import config
from binance.client import Client
import psycopg2
import psycopg2.extras
from tqdm import tqdm
import concurrent.futures

client = Client(config.BINANCE_API_KEY,config.BINANCE_API_SECRET)
connection = psycopg2.connect(
    host=config.DB_HOST,database=config.DB_NAME,user=config.DB_USER,password=config.DB_PASSWORD)
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)


cursor.execute(
    "SELECT concat(symbol,quote_asset) as ticker FROM currency")
tickers = cursor.fetchall()

coin_data = []
for ticker in tickers:
    coin_data.append(ticker)


def unixtimetodatetime(unixtime):
    return unixtime.format("'YYYY-MM-DD HH:mm:ss'")


def retrieve_klines(ticker_symbol):
    minute_prices = []
    for ticker in tqdm(coin_data):
        klines = client.get_historical_klines(
            f"{coin_data.get(ticker)}",Client.KLINE_INTERVAL_1MINUTE,"4 May 2018","21 May,2021")
        for coin in klines:
            coin[0] = unixtimetodatetime(coin[0])
            coin[6] = ticker[1]
            minute_prices.append(coin[0:7])
        for row in minute_prices:
            
            sql_insert = f"""INSERT INTO currency_price (dt,open,high,low,close,volume,currency_id) VALUES ({row[0]},{row[1]},{row[2]},{row[3]},{row[4]},{row[5]},(SELECT id FROM currency WHERE symbol = '{row[6]}')) ON CONFLICT (dt,currency_id) DO nothing"""
            cursor.execute(sql_insert)

            connection.commit()


with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(retrieve_klines,coin_data)

现在的问题是,当使用 ThreadPoolExecutor 时,脚本会在所有线程启动后立即完成,而没有实际检索任何数据。 我已经阅读了 concurrent.futures 文档,它指出,如果我将上下文管理器与执行程序一起使用,则不需要指明“等待”参数。 在数据被检索并存储到我的数据库之前,保持程序运行的解决方案是什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?