如何解决带有非阻塞 RabbitMQ Twisted 协议的 Scrapy
我试图绕过 Scrapy 的要求,即使用来自 start_urls 数组或 start_requests 生成器的 URL 来喂养蜘蛛。相反,我想从 RabbigMQ 的 Twisted 协议触发的回调中踢出下载器(以及随后的下载器中间件),只要它在特定交换中接收到消息。
接下来,我有一个简单的消费者和生产者应用示例。生产者应用程序是一个 Python 程序,它将 URL 发送到 RabbitMQ 交换。消费者是一个非阻塞的 Twisted 协议,它只是将它们打印到标准输出。
这是示例消费者 Twisted 协议。它侦听交易所报价上的路由关键网址:
import pika
from pika import spec
from pika.adapters import twisted_connection
from pika.exchange_type import ExchangeType
from twisted.internet import reactor,protocol,defer
from twisted.internet.defer import inlineCallbacks
from twisted.python import log
PREFETCH_COUNT = 2
EXCHANGE = "quotes"
ROUTING_KEY = "urls"
class PikaProtocol(twisted_connection.TwistedProtocolConnection):
connected = False
name = "AMQP:Protocol"
def __init__(self,factory,parameters):
super().__init__(parameters)
self.factory = factory
@inlineCallbacks
def connectionReady(self):
self._channel = yield self.channel()
yield self._channel.basic_qos(prefetch_count=PREFETCH_COUNT)
self.connected = True
yield self._channel.confirm_delivery()
for (
exchange,routing_key,callback,) in self.factory.read_list:
yield self.setup_read(exchange,callback)
self.send()
@inlineCallbacks
def read(self,exchange,callback):
"""Add an exchange to the list of exchanges to read from."""
if self.connected:
yield self.setup_read(exchange,callback)
@inlineCallbacks
def setup_read(self,callback):
"""This function does the work to read from an exchange."""
if exchange:
yield self._channel.exchange_declare(
exchange=exchange,exchange_type=ExchangeType.topic,durable=True,auto_delete=False,)
yield self._channel.queue_declare(queue=routing_key,durable=True)
if exchange:
yield self._channel.queue_bind(queue=routing_key,exchange=exchange)
yield self._channel.queue_bind(
queue=routing_key,exchange=exchange,routing_key=routing_key
)
(
queue,_consumer_tag,) = yield self._channel.basic_consume(queue=routing_key,auto_ack=False)
d = queue.get()
d.addCallback(self._read_item,queue,callback)
d.addErrback(self._read_item_err)
def _read_item(self,item,callback):
"""Callback function which is called when an item is read."""
d = queue.get()
d.addCallback(self._read_item,callback)
d.addErrback(self._read_item_err)
(
channel,deliver,_props,msg,) = item
log.msg(
"%s (%s): %s" % (deliver.exchange,deliver.routing_key,repr(msg)),system="Pika:<=",)
d = defer.maybeDeferred(callback,item)
d.addCallbacks(
lambda _: channel.basic_ack(deliver.delivery_tag),lambda _: channel.basic_nack(deliver.delivery_tag),)
@staticmethod
def _read_item_err(error):
print(error)
def send(self):
"""If connected,send all waiting messages."""
if self.connected:
while self.factory.queued_messages:
(
exchange,r_key,message,) = self.factory.queued_messages.pop(0)
self.send_message(exchange,message)
@inlineCallbacks
def send_message(self,msg):
"""Send a single message."""
log.msg("%s (%s): %s" % (exchange,system="Pika:=>")
yield self._channel.exchange_declare(
exchange=exchange,)
prop = spec.BasicProperties(delivery_mode=2)
try:
yield self._channel.basic_publish(
exchange=exchange,routing_key=routing_key,body=msg,properties=prop
)
except Exception as error: # pylint: disable=W0703
log.msg("Error while sending message: %s" % error,system=self.name)
class PikaFactory(protocol.ReconnectingClientFactory):
name = "AMQP:Factory"
def __init__(self,parameters):
self.parameters = parameters
self.client = None
self.queued_messages = []
self.read_list = []
def startedConnecting(self,connector):
log.msg("Started to connect.",system=self.name)
def buildProtocol(self,addr):
self.resetDelay()
log.msg("Connected",system=self.name)
self.client = PikaProtocol(self,self.parameters)
return self.client
def clientConnectionLost(self,connector,reason): # pylint: disable=W0221
log.msg("Lost connection. Reason: %s" % reason.value,system=self.name)
protocol.ReconnectingClientFactory.clientConnectionLost(self,reason)
def clientConnectionFailed(self,reason):
log.msg("Connection failed. Reason: %s" % reason.value,system=self.name)
protocol.ReconnectingClientFactory.clientConnectionFailed(
self,reason
)
def send_message(self,exchange=None,routing_key=None,message=None):
self.queued_messages.append((exchange,message))
if self.client is not None:
self.client.send()
def read_messages(self,callback):
"""Configure an exchange to be read from."""
self.read_list.append((exchange,callback))
if self.client is not None:
self.client.read(exchange,callback)
def main():
parameters = pika.ConnectionParameters(
host="localhost",virtual_host="/",credentials=pika.PlainCredentials("guest","guest"),)
def callback(tup):
url = tup[-1].decode('utf-8')
print(f'Received {url}')
f = PikaFactory(parameters)
f.read_messages(EXCHANGE,ROUTING_KEY,callback)
reactor.connectTCP(parameters._host,parameters._port,f)
reactor.run()
if __name__ == "__main__":
main()
以下是您将 URL 提供给它的方法(并查看它们打印在标准输出上):
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.basic_publish(exchange='quotes',routing_key='urls',body='http://quotes.toscrape.com/page/1/')
print(" [x] Sent url to quotes exchange")
connection.close()
如何运行报价蜘蛛下载器(而不是简单地将 URL 打印到标准输出)?
谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。