如何解决如何使用 Muiltiprocessing 订阅多个 Websocket 流
我是在 python 中处理多处理、多线程等的新手。
我正在尝试使用 multiprocessing
从我的加密交换 (API Docs Here) 订阅多个 Websocket 流。
但是,当我运行下面的代码时,我只收到 ticker information
,而不是 order book updates
。
如何修复代码以获取这两个信息?
在 multiprocessing
上运行时,似乎只有一个 websocket 正常工作的原因是什么?
(当我分别运行 ws_orderBookUpdates()
和 ws_tickerInfo()
函数时,不使用 multiprocessing
,它单独运行良好,所以这不是交换的问题。)
import websocket
import json
import pprint
from datetime import datetime
import time
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe","params": {'channel': "lightning_ticker_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self,message,prev=None):
print(f"Ticker Info,Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(self):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,on_open=on_open,on_message=on_message,on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(self):
print("opened")
subscribe_message = {
"method": "subscribe","params": {'channel': "lightning_board_BTC_JPY"}
}
ws.send(json.dumps(subscribe_message))
def on_message(self,message):
print(f"Order Book,on_close=on_close)
ws.run_forever()
# Multiprocessing two functions
if __name__ == '__main__':
import multiprocessing as mp
mp.Process(target=ws_tickerInfo(),daemon=True).start()
mp.Process(target=ws_orderBookUpdates(),daemon=True).start()
解决方法
更新
您已经创建了两个 daemon 进程。当所有非守护进程终止时,它们将终止,在这种情况下是主进程,它在创建守护进程后立即终止。您很幸运,即使其中一个流程也有机会产生输出,但为什么要冒险呢? 不要使用 dameon 进程。相反:
if __name__ == '__main__':
import multiprocessing as mp
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion
p2.join() # wait for completion
但真正的问题是盯着我们的脸,我们都错过了!你有:
p1 = mp.Process(target=ws_tickerInfo(),daemon=True)
p2 = mp.Process(target=ws_orderBookUpdates(),daemon=True)
应该是什么时候:
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
看到区别了吗?您实际上不是将函数 Process
传递给 ws_tickerInfo
,而是调用 ws_tickerInfo
并尝试传递返回值,这将是无意义的 {{1}如果函数曾经返回(它没有返回)。因此,您甚至从未执行过第二个流程创建语句。
您可能也为此使用了多线程而不是多处理,尽管 Ctrl-C 中断处理程序可能不起作用(见下文)。还应该有一个机制来终止程序。我添加了一些代码来检测 Ctrl-C 并在输入时终止。此外,您已将 None
用作函数参数,就好像该函数实际上是一个类方法一样,但事实并非如此。这不是好的编程风格。这是更新的来源:
self
使用多线程
import websocket
import json
import pprint
from datetime import datetime
import time
import sys
import signal
# Function to subscribe to ticker information.
def ws_tickerInfo():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe","params": {'channel': "lightning_ticker_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp,message,prev=None):
print(f"Ticker Info,Received : {datetime.now()}")
###### full json payloads ######
# pprint.pprint(json.loads(message))
def on_close(wsapp):
print("closed connection")
endpoint = 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws = websocket.WebSocketApp(endpoint,on_open=on_open,on_message=on_message,on_close=on_close)
ws.run_forever()
# Function to subscribe to order book updates.
def ws_orderBookUpdates():
def on_open(wsapp):
print("opened")
subscribe_message = {
"method": "subscribe","params": {'channel': "lightning_board_BTC_JPY"}
}
wsapp.send(json.dumps(subscribe_message))
def on_message(wsapp,message):
print(f"Order Book,on_close=on_close)
ws.run_forever()
def handle_ctrl_c(signum,stack_frame):
sys.exit(0)
if __name__ == '__main__':
import multiprocessing as mp
signal.signal(signal.SIGINT,handle_ctrl_c) # terminate on ctrl-c
print('Enter Ctrl-C to terminate.')
p1 = mp.Process(target=ws_tickerInfo)
p2 = mp.Process(target=ws_orderBookUpdates)
p1.start()
p2.start()
p1.join() # wait for completion (will never happen)
p2.join() # wait for completion (will never happen)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。