微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

ZeroMQ XPUB recv() 是查找是否有订阅者并解决慢加入者综合症的解决方案吗?

如何解决ZeroMQ XPUB recv() 是查找是否有订阅者并解决慢加入者综合症的解决方案吗?

我的用例:

  1. 订阅者将是一个服务器(绑定到一个端口),它将等待来自多个发布者的消息。
  2. 发布者将在不同线程中作为客户端进行初始化(连接到端口)。
  3. 要在每个线程中发布的数据将是几条消息。
  4. 订阅者连接后,尽快收到每条消息非常重要。
  5. 如果订阅者未连接,那么我不想让发布者线程被阻塞,理想情况下,超时时间大约为 1-2 秒。

慢加入者问题:

仅运行 1000 多个线程(发布者)1 或 2 次,我就在订阅者中获取了所有数据。 添加几毫秒的睡眠可以解决这个问题,所以我 99.9% 确定我是众所周知的慢木工综合症的受害者。然而,就我而言,睡眠解决方案并不是一个好的解决方案,因为发布者的连接时间可能会发生变化,我希望数据尽快发送给订阅者。

解决这个问题的想法和实验代码

我的解决方案是基于使用 XPUB recv 方法。使用 XPUB 初始化发布者并将 RCVTIMEO 设置为 1000 毫秒。在发布者连接之后,我添加一个 recv() 调用来检查是否有订阅者。当我收到订阅消息时,我知道连接已经完成,我可以发送数据而不会丢失任何数据(除非订阅者发生问题但我不在乎)。

如果我没有收到任何订阅消息,则在 1000 毫秒内 recv() 超时并终止线程。

以下是 python(pyzmq) 中的示例代码,用于测试此实现(对于发布者,我不使用线程,而是使用 while 循环并同时运行多个发布者),它按我的意愿工作:

publisher.py:

import zmq

def main():
    """ main method """

    i = 0
    while True:
        # Prepare context and publisher
        context = zmq.Context()
        publisher = context.socket(zmq.XPUB)
        publisher.connect("tcp://0.0.0.0:5650")
        publisher.setsockopt(zmq.RCVTIMEO,1000)

        # Waiting for 1000ms to get a subscription
        i = i + 1
        try:
            publisher.recv()
            # Send the message
            publisher.send_multipart([b'test',bytes(str(i),'utf-8')])
        except Exception as e:
            print(e,flush=True)

        # Terminate socket and context
        publisher.close()
        context.term()
        if i >= 10000:
            break

if __name__ == "__main__":
    main()    

subscriber.py:

import zmq

def main():
    """ main method """

    # Prepare our context and subscriber
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    uri = "tcp://0.0.0.0:5650"
    subscriber.bind(uri)
    subscriber.setsockopt(zmq.SUBSCRIBE,b'')
    print('Subscriber connects to %s' % (uri),flush=True)

    # Receive messages
    i = 0
    while True:
        [topic,data] = subscriber.recv_multipart()
        i = i + 1
        print("%s: %s %s" % (i,topic,data),flush=True)

if __name__ == "__main__":
    main()

我的问题:

解决方案就这么简单吗?如果订阅者处于活动状态,我是否遗漏了会导致数据丢失的任何内容(与慢速加入者相关)?

解决方法

“解决方案有那么简单吗?”

恰恰相反。对于上面发布的内容,解决方案过于复杂 w.r.t.到目前为止发布的用例需求。

a) 鉴于上述要求,当在同一主机上的线程之间进行通信时,可以消除与 ISO-OSI-L3 tcp:// 传输类的设置和维护相关的所有成本属于同一个进程。而是采用超快、无堆栈、内存映射的 inproc:// 传输类以避免所有这些低效率。 ZeroMQ API v4.0+ 还具有设置 inproc://-TransportClass { .bind() | .connect() }-顺序出现的舒适性,因此我们可以享受最大的 MEM 映射超低延迟标记零复制“传输”消息(不移动 RAM 中的一个字节数据) - 很酷,不是吗? (除非您需要注入 MITM 协议嗅探,否则删除 tcp:// 矫枉过正)

b) 鉴于上述要求,在“静态”SUB 端订阅所有消息的情况下,发送几条消息是非常低效的 { {1}} 可扩展的正式通信模式原型。您的代码必须支付所有费用来设置一个新的 PUB/SUB-instance,然后它会爬行以设置一个有效的连接(通过 SUB-TransportClass' 堆栈,希望在 a) 下删除),接下来是设置一个新的 TOPIC 过滤器(无论是在早期版本的 SUB 端运行,还是在较新的 ZeroMQ 版本中的 PUB 端运行——所有这些都付出了巨大的代价,只是为了接收所有消息——即根本没有过滤)。同样的正式服务可以通过更轻量级的多节点-tcp://-on-one-node 方式实现。如果不需要任何反向/双向/更复杂的正式通信,只需一个 PUSH/PULL 即可完成请求的工作。

c) 鉴于上述要求,您的口音似乎已经通过过早地通过连接发送消息而不会丢失消息。在 ZeroMQ 设置中有用于确定这一点的工具,但您不小心使用它们:

  • 使用 PUSH/PULL 可能会在没有现成的连接工作(或永远)的情况下使用 AccessNode 的阻塞状态
  • 使用返回代码和 zmq.IMMEDIATE(或 errno 用于不兼容 POSIX 的操作系统/Win32 等)处理可以帮助您的代码检测和响应发生在“的任何和所有特定情况”自治代理网络”在 的整个生命周期内(无论代理是否确实“物理上”分布或共同定位,就像这里的情况一样)。不失控是这里的核心责任。什么是控制代码,它会在失去控制的状态下自锁,在这种状态下它甚至无法控制自己 ;) ?

d) 切勿使用 zmq.errno() 方法的阻塞形式。教科书的例子是专业信号/消息元平面实现应该是什么样子的反模式。确实从来没有 - 参考。上面的第 5) 项。

e) 更好地重复使用 { .recv() | .send() | .poll() | ... } 实例,而不是像上面所描绘的那样使其成为消耗品/一次性用品。线程可以自由共享一个预先实例化的 Context() 引擎,避免接下来大量重复的附加开销成本,如果每个分叉重新实例化一个消耗品/一次性 Context(),只是一个短暂的,对等客户端线程。

f) 如果有人知道更好的解决方案,请通知我们 :o)

来自评论的问题

a)
订阅者将在另一台机器上,所以我认为 Context() 是解决方案。*

当然,这里是 NP。 tcp://-transports 如果进一步进入更高的性能水平方向可能会很有趣

b)
订阅者将通过 { pgm:// | epgm:// | tipc:// } 套接字将消息转发给其他订阅者。 XPUB 可以工作,但如果我想将这些订阅及其过滤器传递给初始发布者并在源头过滤一些消息,我必须使用 PUSH/PULL 模式。

好吧,O/P 中没有提到。 PUB/SUBs/XPUBs 的任何分层都可以很好地工作,问题出在连接管理级别

c)
澄清一下,只有当有订阅者时,不丢失消息才重要。你能解释一下这部分吗?

当然,在 RTO 连接的链接上没有可用的订阅者,准备好立即“通过网络”传递,任何消息都无法传递(并且可能会被悄悄丢弃,这就是您试图反对的,不要不是吗?)。这就是 XSUB 可以通过调用 zmq.IMMEDIATE 方法来管理的内容。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。