如何解决使用NATS.io PUB-SUB架构的Python asyncio
Python:3.6.9
我正在通过PUB-SUB
体系结构在云中实现机器学习(ML)模型。 Python程序会侦听许多 topic 。
两个主要的主题是:train
,predict
train
主题训练一个需要一些时间的ML模型。发生这种情况时,我想通过较早的训练,对已经存在于应用程序内存中的另一个模型发出predict
请求。我正在使用asyncio
为我侦听的每个主题创建任务,然后在侦听该主题后立即执行callback
函数。
我面临的问题是进行predict
操作时无法立即train
。它总是顺序发生。
我认为这是一个multithreading
问题,甚至可能与Python的GIL
有关。
我已经使用NATS.io
(Pub-Sub架构)模拟了这种情况。这段代码是我实际代码的精简版。
import asyncio
from nats.aio.client import Client as Nats
import time
import json
nc = Nats()
is_connected = False
async def connect():
"""
Initial connection to the NATS server will make a ping every 5min(3 times) and will mark the connection stale if
it does not receive any reply(PONG) at all
"""
await nc.connect(servers=["http://0.0.0.0:4222"],ping_interval=300,max_outstanding_pings=3,token='development')
async def listen():
if is_connected is False:
await connect()
return
async def train(msg):
"""
The bot train requests come here
:return:
"""
subject = msg.subject
reply = msg.reply
data_train = json.loads(msg.data.decode())
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject,reply=reply,data=data_train))
print('Performing train . . ')
time.sleep(20)
print('Train completed. . ')
async def predict(msg):
"""
The bot predict requests come here
:return:
"""
subject = msg.subject
reply = msg.reply
data_train = json.loads(msg.data.decode())
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject,data=data_train))
time.sleep(1)
print('I am predict. I need to respond immediately')
async def main():
await listen()
loop.create_task(nc.subscribe('train',cb=train))
loop.create_task(nc.subscribe('predict',cb=predict))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
同时在终端中:
实际输出:
Received a message on 'train ': {'data': 123}
Performing train . .
Train completed. .
Received a message on 'predict ': {'data': 'text'}
I am predict. I need to respond immediately
预期输出:
Received a message on 'train ': {'data': 123}
Performing train . .
Received a message on 'predict ': {'data': 'text'}
I am predict. I need to respond immediately
Train completed. .
我什至尝试在导入main()
模块后对multiprocessing
函数进行更改来使用多处理。但是没有运气!
async def main():
await listen()
p1 = multiprocessing.Process(target=loop.create_task,args=[await nc.subscribe('train',cb=train)])
p2 = multiprocessing.Process(target=loop.create_task,args=[await nc.subscribe('predict',cb=predict)])
p1.start()
p2.start()
print('Started processes')
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。