如何解决从austpg代理向worker租用asyncpg连接池
我正在使用kafka + faust
我有示例任务应用程序
:from faust import Record
import faust
import asyncio
import asyncpg
from os import environ
async def init_app(): #async method to create a pool
pool = await asyncpg.create_pool(f"postgresql://{environ.get('POSTGRES_USER')}:{environ.get('POSTGRES_PASSWORD')}@postgres:5432/{environ.get('POSTGRES_DB')}")
return pool
class Task(Record,serializer='json'):
task: str
p1: str
class kTask(Record,serializer='json'):
task_id: str
app = faust.App('task',broker='kafka://broker:29092',stream_wait_empty=False)
topic = app.topic('task',value_type=Task,key_type=kTask)
loop = asyncio.get_event_loop() # here is where I try to get on the same event loop as the agent
pool = loop.run_until_complete(init_app())
@app.agent(topic)
async def hello(tasks):
async with pool.acquire() as connection: #for every worker that has to do a task I want them to use the connection pool from the agent
async for key,task in tasks.items():
print(f'request: {key.task_id*6} task:{task.task} with {task.p1}')
async with connection.transaction():
result = await connection.fetchval('select 2 ^ 4') #testing to make sure this is being run on the DB
print(result)
if __name__ == '__main__':
app.main()
启动我的浮士德代理后,我可以看到postgres创建了一个包含10个连接的池,并且代码按预期执行。 5分钟后,所有10个连接均断开。我在想的是,我可能正在为一个工作人员创建一个池-当该工作人员被自动杀死后,该池就会被丢弃。
是否有任何简便的方法可以从代理创建连接池并将其传递给工作人员?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。