从austpg代理向worker租用asyncpg连接池

如何解决从austpg代理向worker租用asyncpg连接池

我正在使用kafka + faust

我有示例任务应用程序

from faust import Record
import faust
import asyncio
import asyncpg
from os import environ

async def init_app(): #async method to create a pool
    pool = await asyncpg.create_pool(f"postgresql://{environ.get('POSTGRES_USER')}:{environ.get('POSTGRES_PASSWORD')}@postgres:5432/{environ.get('POSTGRES_DB')}")
    return pool

class Task(Record,serializer='json'):
    task: str
    p1: str
class kTask(Record,serializer='json'):
    task_id: str

app = faust.App('task',broker='kafka://broker:29092',stream_wait_empty=False)
topic = app.topic('task',value_type=Task,key_type=kTask)

loop = asyncio.get_event_loop() # here is where I try to get on the same event loop as the agent
pool = loop.run_until_complete(init_app())

@app.agent(topic)
async def hello(tasks):
    async with pool.acquire() as connection: #for every worker that has to do a task I want them to use the connection pool from the agent
        async for key,task in tasks.items():
            print(f'request: {key.task_id*6} task:{task.task} with {task.p1}')
            async with connection.transaction():

                result = await connection.fetchval('select 2 ^ 4') #testing to make sure this is being run on the DB
                print(result)

if __name__ == '__main__':
    app.main()

启动我的浮士德代理后,我可以看到postgres创建了一个包含10个连接的池,并且代码按预期执行。 5分钟后,所有10个连接均断开。我在想的是,我可能正在为一个工作人员创建一个池-当该工作人员被自动杀死后,该池就会被丢弃。

enter image description here

是否有任何简便的方法可以从代理创建连接池并将其传递给工作人员?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?