在币安twisted.internet websocket 和asyncio 线程之间共享数据

如何解决在币安twisted.internet websocket 和asyncio 线程之间共享数据

我正在使用以下库:

from binance.websockets import BinanceSocketManager
from twisted.internet import reactor    

创建一个 websocket 以从 API(Binance API)获取数据并以 1 秒为间隔打印比特币的价格:

conn_key = bsm.start_symbol_ticker_socket('BTCUSDT',asset_price_stream)

每次有新的更新时,都会执行函数asset_price_stream,以新数据为参数。这有效(例如,我可以简单地将数据打印到 asset_price_stream 中的控制台)。

现在,我想用 asyncio 函数共享这些数据。目前,我正在为此使用 janus 队列。

在资产价格流中:

price_update_queue_object.queue.sync_q.put_Nowait({msg['s']: msg['a']})

在异步线程中:

async def price_updater(update_queue):
    while True:
        priceupdate = await update_queue.async_q.get()
        print(priceupdate)
        update_queue.async_q.task_done()

我在 asset_price_stream 中使用 sync_q 接口,在 asyncio 函数中使用 async_q 接口。如果我也在 asset_price_stream 中使用 async_q 接口,则会出现错误

Unhandled Error
Traceback (most recent call last):
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/log.py",line 101,in callWithLogger
    return callWithContext({"system": lp},func,*args,**kw)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/log.py",line 85,in callWithContext
    return context.call({ILogContext: newCtx},**kw)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/python/context.py",line 118,in callWithContext
    return self.currentContext().callWithContext(ctx,line 83,in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/posixbase.py",line 687,in _doReadOrWrite
    why = selectable.doRead()
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/tcp.py",line 246,in doRead
    return self._dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/internet/tcp.py",line 251,in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/tls.py",line 324,in dataReceived
    self._flushReceiveBIO()
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/tls.py",line 290,in _flushReceiveBIO
    ProtocolWrapper.dataReceived(self,bytes)
  File "/home/elias/.local/lib/python3.7/site-packages/twisted/protocols/policies.py",line 107,in dataReceived
    self.wrappedProtocol.dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",in dataReceived
    self._dataReceived(data)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1207,in _dataReceived
    self.consumeData()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1219,in consumeData
    while self.processData() and self.state != WebSocketProtocol.STATE_CLOSED:
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1579,in processData
    fr = self.onFrameEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 1704,in onFrameEnd
    self._onMessageEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",line 318,in _onMessageEnd
    self.onMessageEnd()
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/websocket/protocol.py",line 628,in onMessageEnd
    self._onMessage(payload,self.message_is_binary)
  File "/home/elias/.local/lib/python3.7/site-packages/autobahn/twisted/websocket.py",line 321,in _onMessage
    self.onMessage(payload,isBinary)
  File "/home/elias/.local/lib/python3.7/site-packages/binance/websockets.py",line 31,in onMessage
    self.factory.callback(payload_obj)
  File "dollarbot.py",line 425,in asset_price_stream
    price_update_queue_object.queue.async_q.put_Nowait({msg['s']: msg['a']})
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 438,in put_Nowait
    self._parent._notify_async_not_empty(threadsafe=False)
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 158,in _notify_async_not_empty
    self._call_soon(task_maker)
  File "/home/elias/.local/lib/python3.7/site-packages/janus/__init__.py",line 60,in checked_call_soon
    self._loop.call_soon(callback,*args)
  File "/usr/lib/python3.7/asyncio/base_events.py",line 690,in call_soon
    self._check_thread()
  File "/usr/lib/python3.7/asyncio/base_events.py",line 728,in _check_thread
    "Non-thread-safe operation invoked on an event loop other "
builtins.RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

当使用sync_q 时,它可以正常工作。异步线程可以打印价格更新。但有时,它只是卡住了,我不知道为什么。 API 仍在提供数据(因为我已经仔细检查过),但它停止通过队列到达异步线程。我不知道为什么会发生这种情况,而且它并不总是可重现的(我可以连续 5 次运行相同的代码而无需修改,四次有效,一次无效)。有趣的是:只要我按下 CTRL-C 中止程序,数据就会立即到达队列,而之前没有! (在程序等待关闭所有 asyncio 线程的几秒钟内)所以我觉得同步异步队列通信的某些内容错误的/与我按下 CTRL-C 时中止的其他内容同时发生。

所以我的问题是:我该如何改进该程序?有没有比 janus.Queue() 更好的方法将数据从twisted.internet websocket 发送到异步线程?我尝试了一些不同的事情(例如访问全局对象,但我无法从 asset_price_stream 访问 asyncio.lock() 因为它不是异步函数......所以它不会是线程安全的)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?