如何解决我将如何正确实现异步任务?
我有两个工作人员(蜘蛛),其中基础蜘蛛负责查找将数据发送到工作人员 2 的基础数据,该工作人员基于该数据抓取数据。假设来自基础数据的 IDS,因此从 worker-1 依赖数据到 worker-2 的 IDS 需要在 google、bing、GitHub 中找到这些 id。 worker-1 每天仅与 worker-2 异步运行一次,以根据数据 worker-1 每小时执行一次任务。我看过 celery + celerybeat 和消息代理 redis / rabbitmq 来构建这个解决方案,但我真正的问题是。我应该将 def start
转换为 celery 任务还是 producer-consumer
函数?
注意:def start
是每小时运行一次的处理程序,而 def read_nvd_data
每天运行以执行基于此的操作。
架构概览
.
├── backend
│ ├── services
│ │ ├── newsservice
│ └── subscription
│ ├── newsletter
│ └── subscription
├── patches
├── spiders
│ └── blog_crawler
│ ├── common
├── third_party
│ ├── pipeline
│ │ ├── common
│ │ └── tests
│ │ └── async_sample.py
│ │ └── pipeline_async.py
数据源模拟响应
class A:
def __init__(self):
pass
def run(self):
return {"ID-2002-0201":{"id":"ID-2002-0201","updated_at":"2018-05-14T22:25:51Z","html_url":"xxxxxxxxxxxx"}}
class B:
def __init__(self):
pass
def run(self):
return {"ID-2002-0202":{"id":"ID-2002-0202","html_url":"xxxxxxxxxxxx"}}
经理
class Manager:
async def producer(self,pipeline,data_sources):
"""Pretend we're getting a number from the network."""
for data_stream in data_sources:
await pipeline.set_message(data_stream.run(),"Producer")
logging.info("Producer got message: %s",data_stream)
async def consumer(self,pipeline):
""" Pretend we're saving a number in the database. """
while True:
# wait for an item from the Producer
message = await pipeline.get_message("Consumer")
# process the msg
logging.info(
"Consumer storing message: %s",message
)
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
pipeline.task_done()
async def start(self):
pipeline = Pipeline()
data_sources = [A(),B()]
# schedule the consumer
consume = asyncio.ensure_future(self.consumer(pipeline))
# run the producer and wait for completion
await self.producer(pipeline,data_sources)
# wait until the consumer has processed all items
await pipeline.join()
# the consumer is still awaiting for an item,cancel it
consume.cancel()
if __name__ == '__main__':
asyncio.run(Manager().start())
管道
class Pipeline(asyncio.Queue):
def __init__(self):
super().__init__(maxsize=10)
async def get_message(self,name):
logging.debug("%s:about to get from queue",name)
value = await self.get()
logging.debug("%s:got %s from queue",name,value)
return value
async def set_message(self,value,name):
logging.debug("%s:about to add %s to queue",value)
await self.put(value)
print(name,value)
logging.debug("%s:added %s to queue",value)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。