如何解决zmq 收不到消息
我已经处理这个问题几天了,但我相信解决方案必须相对简单。
我的应用程序的结构基于来自 this link 的“图 17 - 请求-回复代理”。
更详细地说,我的前端,它与 django 一起运行,看起来像这样:
context,client_socket,port = create_client_socket()
client_socket.connect("tcp://localhost:%s" % 5559)
to_send = list() # this is the request that it will be sent to the broker
to_send = ..... # setting what the request is
# sending the request
client_socket.send(json.dumps(to_send).encode('utf-8')) # send message
# waiting for response
results = client_socket.recv(1024).decode() # receive response
client_socket.close()
context.term()
if results:
# working with the results received from the broker
...............
return render(request,'front_app/my.html',{'records': results})
我的经纪人看起来像这样:
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559") # The port that the clients should be using for communication
backend_sockets = list()
for i in range(number_of_server_nodes): # currently there are 2 server nodes
port = range(5550,5558,2)[i]
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:{}".format(port))
backend_sockets.append(backend)
# Initialize poll set
poller = zmq.Poller()
poller.register(frontend,zmq.POLLIN)
# poller.register(backend1,zmq.POLLIN)
# poller.register(backend2,zmq.POLLIN)
for back_sock in backend_sockets:
poller.register(back_sock,zmq.POLLIN)
# Switch messages between sockets
while True:
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
raw_message = frontend.recv_multipart()
message = None
try:
message = json.loads(str(raw_message[2])[2:-1])
except JSONDecodeError as e:
try:
message = json.loads(str(raw_message[2])[2:-1].replace('ObjectId(\\\\"','').replace('\\\\")',''))
except Exception as e:
print(e)
action = message[0]['action'] # The first item of the list should always be the action code
# Fetching of records for a client
if action == "DATA_FETCH":
user_email = message[1].get('user_email')
if user_email:
# sending the request to all the backend nodes
for bs in backend_sockets:
bs.send_multipart(raw_message)
# waiting for results from all the backend nodes
results_from_sockets = list()
returned = 0
while returned < len(backend_sockets):
for _backend in backend_sockets:
message = _backend.recv_multipart()
returned += 1
results_from_sockets.append(message)
# formatting the results
results_final = list()
for res in results_from_sockets:
if results_from_sockets[0][2].decode() != 'NONE':
results_final.append(results_from_sockets[0][2].decode())
# THE CODE THAT DOESN'T WORK - THE COMMENTS ARE ALL THE ATTEMPTS THAT DIDN'T WORK
#frontend.send(str(results_final).encode())
#frontend.send_multipart([b'\x00\x00\x00\x00*',b'',str(results_final).encode() ])
frontend.send_multipart([b'\x00\x00\x00\x00*',b'NONE',])
else:
print("|DEBUG| the email is required")
# generally waiting for messages from the backends
for _backend in backend_sockets:
if socks.get(_backend) == zmq.POLLIN:
message = _backend.recv_multipart()
frontend.send_multipart(message)
最后,后端代码(在多个线程中运行,当前为 2 个)如下所示:
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:{}".format(port)) # 5550 and 5552
while True:
print("...")
# receive data stream. it won't accept data packet greater than 8096 bytes
raw_data = socket.recv()
print("|DEBUG| SERVER NODE {} RECEIVED: {}".format(node_name,raw_data) )
if len(raw_data) == 0:
continue
try:
data = json.loads(raw_data) # data converted to a list
except Exception as e:
print(e)
action = data.pop(0)['action'] # The first item of the list should always be the action code
if action == "DATA_FETCH":
for request in data:
--code that creates the results that will be sent back
if len(owned_records) != 0:
to_send = str(owned_records).encode()
socket.send(to_send.encode())
else:
socket.send(b'NONE')
长话短说,我的问题是,当代理尝试发送消息时,前端没有收到任何消息。当我在代码的“通常等待来自后端的消息”部分从后端向代理发送消息时,它工作正常。
我猜这与消息的格式有关,但我尝试了很多方法,但没有任何效果。
请帮忙!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。