微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 PUB/SUB 防止缓冲/延迟? 简化的服务器代码简化的客户端代码绑定服务端socket之前

如何解决如何使用 PUB/SUB 防止缓冲/延迟? 简化的服务器代码简化的客户端代码绑定服务端socket之前

我将视频作为一系列图像(等于 zmq 消息)发送,但有时,也许当网络速度较慢时,它们的接收速度比发送速度慢,并且出现越来越长的延迟,似乎高达约一分钟的视频或数百张图片或兆字节的数据。它通常最终会自行清除,订阅者接收消息的速度比发布者发送的速度快。

相反,如果订阅recv处理它们的速度太慢,我希望它以应该的方式丢弃错过的消息。我希望 zmq.CONFLATE=1 会这样做,但事实并非如此。那么如何?我怀疑它们在发布者处被缓冲,它不应该有任何 zmq 缓冲区,或者以某种方式在网络堆栈中。

简化的服务器代码

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:12345")
camera = PiCamera()
stream = io.BytesIO()
for _ in camera.capture_continuous(stream,'jpeg',use_video_port=True):
  stream.truncate()
  stream.seek(0)
  socket.send(stream.read())
  stream.seek(0)

简化的客户端代码

# Initialization
self.context = zmq.Context()
self.video_socket = self.context.socket(zmq.SUB)
self.video_socket.setsockopt(zmq.CONFLATE,1)
self.video_socket.setsockopt(zmq.SUBSCRIBE,b"")
self.video_socket.connect("tcp://" + ip_address + ":12345")

def get_image(self):
  # Receive the latest image
  poll_result = self.video_socket.poll(timeout=0)
  if poll_result == zmq.POLLIN:
    return self.video_socket.recv()
  else:
    return None

发布者使用 RaspBerry Pi,订阅者使用 Windows。

解决方法

我不确定您使用的是哪个版本的 python zmq,但基于您需要的底层 c++ libzmq:

  • 在服务器套接字上设置 ZMQ_SNDHWM 套接字选项
  • 在客户端套接字上设置 ZMQ_RCVHWM 套接字选项。

在发布/订阅的情况下,这些选项限制每个已完成连接排队的消息数量。如果队列增长大于 HWM(高水位线),消息将被丢弃。

同时关闭合并,因为这会干扰这些选项。

,

还要在服务器上设置 zmq.CONFLATE=1 以仅保留发送队列中的最新消息。

绑定服务端socket之前

socket.setsockopt(zmq.CONFLATE,1)

出于某种原因,我错误地认为 PUB 套接字没有发送队列,但它有。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。