如何解决如何处理传入的数据以便以后通过键合并?
我编写此代码是为了使用 asyncio 制作非阻塞管理器以及管道操作,我主要关注的是捕获接收到的项目生产者,以及接收操作何时完成。如果键匹配,我想一起返回并合并,但是,我怀疑我应该在哪里加入最终生产者或消费者的数据,因为当前工作流程如下
-
抓取所有数据库(多个客户端)(模拟)
-
推送到管理器(代理级服务器),多个客户端将其数据发送到管理器
-
根据传入的数据将多个数据源合并到一个列表中,没有数据库操作,例如
{"ID-2002-0201": {"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}
> 可能是生产者 -
使用 get_or_create (如果没有包含该数据的记录,则检查数据库,否则创建它)> 消费者
-
创建大量数据(当数据源从 2 增加到 100+ 时,可以将数据分成更小的块以进行扩展)> 消费者
server.py
# #!/usr/bin/env python3
import asyncio
import logging
import random
from pipeline_async import Pipeline
class A:
def __init__(self):
pass
def run(self):
return {"ID-2002-0201":{"id":"ID-2002-0201","html_url":"xxxxxxxxxxxx"}}
class B:
def __init__(self):
pass
def run(self):
return {"ID-2002-0202":{"id":"ID-2002-0202","html_url":"xxxxxxxxxxxx"}}
class Manager:
async def producer(self,pipeline,data_sources):
"""Pretend we're getting a number from the network."""
for data_stream in data_sources:
await pipeline.set_message(data_stream.run(),"Producer")
logging.info("Producer got message: %s",data_stream)
async def consumer(self,pipeline):
""" Pretend we're saving a number in the database. """
while True:
# wait for an item from the Producer
message = await pipeline.get_message("Consumer")
# process the msg
logging.info(
"Consumer storing message: %s",message
)
# simulate I/O operation using sleep
await asyncio.sleep(random.random())
pipeline.task_done()
async def start(self):
pipeline = Pipeline()
data_sources = [A(),B()]
# schedule the consumer
consume = asyncio.ensure_future(self.consumer(pipeline))
# run the producer and wait for completion
await self.producer(pipeline,data_sources)
# wait until the consumer has processed all items
await pipeline.join()
# the consumer is still awaiting for an item,cancel it
consume.cancel()
logging.info("Successfully shutdown the service.")
if __name__ == '__main__':
asyncio.run(Manager().start())
管道.py
class Pipeline(asyncio.Queue):
def __init__(self):
super().__init__(maxsize=10)
async def get_message(self,name):
logging.debug("%s:about to get from queue",name)
value = await self.get()
logging.debug("%s:got %s from queue",name,value)
return value
async def set_message(self,value,name):
logging.debug("%s:about to add %s to queue",value)
await self.put(value)
print(name,value)
logging.debug("%s:added %s to queue",value)
如果我错过了一些案例,我将不胜感激
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。