如何解决如何处理来自binance websocket的多个流数据?
我正在使用 unicorn_binance_websocket_api 来传输 100 种加密货币和来自 2 个不同时间范围的价格数据, 我想处理这些数据以存储不同加密货币在其时间范围内的收盘价,然后执行我的策略以查看我需要交易哪个加密货币和时间范围
from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import
BinanceWebSocketapimanager
import json,numpy,talib
binance_websocket_api_manager = BinanceWebSocketapimanager(exchange="binance.com-futures")
binance_websocket_api_manager.create_stream('kline_1m','btcusdt')
closes =[]
RSI_PERIOD = 14
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30
while True:
received_stream_data_json = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
if received_stream_data_json:
json_data = json.loads(received_stream_data_json)
candle_data = json_data.get('data',{})
candle = candle_data.get('k',{})
symboll = candle.get('s',{})
timeframe = candle.get('i',{})
close_prices = candle.get('c',{})
open_prices = candle.get('o',{})
is_candle_closed = candle.get('x',{})
if is_candle_closed:
closes.append(float(close_prices))
if len(closes) > RSI_PERIOD:
np_closes = numpy.array(closes)
rsi = talib.RSI(np_closes,RSI_PERIOD)
if (rsi[-1] > RSI_OVERBOUGHT):
print("SELL")
elif (rsi[-1] < RSI_OVERSOLD):
print('BUY')
解决方法
您只需使用 subscribe_to_stream
函数并附加您想要观看的其他频道和市场。我试图通过 python-binance 库手动编写它,但它看起来很粗糙、笨拙且效率低下。所以我找到了你的问题并决定改用这个独角兽库,我得说,它非常棒。这是我的解决方案,您不需要使用 asyncio btw
class BinanceWs:
def __init__(self,channels,markets):
market = 'btcusdt'
tf = 'kline_1w'
self.binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")
stream = self.binance_websocket_api_manager.create_stream(tf,market)
self.binance_websocket_api_manager.subscribe_to_stream(stream,markets)
async def run(self):
while True:
received_stream_data_json = self.binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
if received_stream_data_json:
json_data = json.loads(received_stream_data_json)
candle_data = json_data.get('data',{})
candle = candle_data.get('k',{})
symbol = candle.get('s',{})
timeframe = candle.get('i',{})
close_prices = candle.get('c',{})
open_prices = candle.get('o',{})
is_candle_closed = candle.get('x',{})
print(candle_data)
# do stuff with data ...
async def main():
tasks = []
channels = ['kline_1m','kline_5m','kline_15m','kline_30m','kline_1h','kline_12h','miniTicker']
markets = {'btcusdt','ethusdt','ltcusdt'}
print(f'Main starting streams ... ')
kl_socket = BinanceWs(channels=channels,markets=markets)
task = await kl_socket.run()
tasks.append(task)
print(tasks)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。