如何解决zmq 多进程间通信
我有 n 个进程,它们拥有自己的本地数据和操作,我希望每个进程将其本地数据的“快照”发送到其余运行的节点进程。
到目前为止,我的代码如下所示:
def node1():
Process(target=sync_1).start()
sleep(4)
data = {'node': 1,'data': 'node 1 data'}
context_b = zmq.Context()
socket_b = context_b.socket(zmq.PUB)
connnected = False
try:
socket_b.bind("tcp://*:%s" % 5560)
connnected = True
except Exception as e:
print(e)
if connnected:
topic = "101"
try:
socket_b.send_string(topic + ' ' + json.dumps(data))
except Exception as e:
print(e)
socket_b.close()
context_b.term()
def node2():
Process(target=sync_2).start()
def sync_1():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE,topicfilter,encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 1] received {}-{}]".format(topic,msg))
except Exception as e:
print(e)
def sync_2():
context_c = zmq.Context()
socket_c = context_c.socket(zmq.SUB)
_port = 5560
try:
socket_c.connect("tcp://localhost:%s" % _port)
except Exception as e:
print(e)
topicfilter = "101"
socket_c.setsockopt_string(zmq.SUBSCRIBE,encoding='utf-8')
try:
raw = socket_c.recv().decode("utf-8")
json0 = raw.find('{')
topic = raw[0:json0].strip()
msg = json.loads(raw[json0:])
print("[SYNC 2] received {}-{}]".format(topic,msg))
except Exception as e:
print(e)
if __name__ == '__main__':
Process(target=node1).start()
Process(target=node2).start()
每个节点都有一个在后台运行的“侦听器”进程(同步功能),以便接收每个节点的数据并相应地使用它,并且当所有子套接字都连接到一个节点(节点1在那种情况)但我希望每个节点都向所有侦听器发送数据,因此我不确定如何实现这一点,因为侦听器进程可以连接到一个端口。
此外,每次有更新时,节点都必须发送本地数据快照,因此这不能是一次性通信,因此我想到了让侦听器进程一直积极等待更新。
可以有更简单的方法来解决此问题,因此我们将不胜感激!
解决方法
更新:
解决方案是使用 XPUB-XSUB 模式。
通过使用这种模式,我创建了一个代理线程,允许我做我想做的事。
我能找到的最有用的 Python 示例是 this 。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。