如何解决ibapi nextValidId 并不总是被调用
我构建了一个小型 ibapi python 应用程序来运行一些策略,同时注意保持与 tws 的连接。
如果 tws 处于非活动状态,python 应用程序将启动并等待,并在 tws 启动时连接到 tws,但这是我的问题:
nextValidId 不会被调用
这是我的代码。我再说一遍,仅当此应用程序连接到我的应用程序连接之前处于活动状态的 tws 会话时,才会调用 nextValidId。
如您所见,在此脚本启动之前未处于活动状态的 tws 会话中,根本不会调用 nextValidId,甚至不会由 reqIds 手动调用。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *
connection_errors = (
CONNECT_FAIL,UPDATE_TWS,NOT_CONNECTED,UNKNOWN_ID,UNSUPPORTED_VERSION,#BAD_LENGTH,BAD_MESSAGE,SOCKET_EXCEPTION,FAIL_CREATE_SOCK,SSL_FAIL,)
connection_error_codes = [error.code() for error in connection_errors]
import threading
import time
import logging
class IBapi(EWrapper,EClient):
# connection_lost = True
clientid = None
hostname = None
portno = None
onConnected = None
MarketDataType = None
def __init__(
self,clientid,hostname='127.0.0.1',portno=7497,MarketDataType=None,onConnected=None,ka_interval=3):
self.clientid = clientid
self.hostname = hostname
self.portno = portno
self.ka_interval = ka_interval
EClient.__init__(self,self)
self.onConnected = onConnected
self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
self.host_connect()
# Initialise the threads for various components
thread = threading.Thread(target=self.run)
thread.start()
setattr(self,"_thread",thread)
thread = threading.Thread(target=self.keepAlive)
thread.start()
setattr(self,"_thread_ka",thread)
def host_connect(self):
"""Connects to TWS with the appropriate connection parameters"""
if not self.hostname or not self.portno:
logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
return
super().connect(self.hostname,self.portno,self.clientid)
def error(self,reqId,errorCode,errorString):
"""disconnect to handle communications errors"""
# clean the connection status
if errorCode in connection_error_codes and \
self.connState not in (EClient.CONNECTING,):
logging.error(
f'reset on connection_error {errorCode} "{errorString}"???')
# self.connection_lost = True
self.disconnect()
return super().error(reqId,errorString)
def keepAlive(self):
data_lock = threading.Lock()
while self.ka_interval:
time.sleep(self.ka_interval)
# isConnected = self.isConnected()
connState = None
with data_lock:
connState = self.connState
# connection_lost = self.connection_lost
isConnected = connState == EClient.CONNECTED
logging.error(f'is connected: {isConnected}')
if not isConnected:
isConnecting = connState == EClient.CONNECTING
if not isConnecting:
logging.error(f"let's connect")
self.host_connect()
else:
logging.error(f'already connecting')
else:
logging.error(f'requesting CurrentTime for keepAlive')
# if connection_lost:
# logging.error('reconnecting. should auto invoke nextValidId')
# self.reqIds(1)
# self.host_connected()
self.reqCurrentTime()
self.reqIds(1)
def host_connected(self):
# if self.connection_lost:
# self.connection_lost = False
self.reqMarketDataType(self.MarketDataType)
self.reqPositions()
def nextValidId(self,orderId):
print('====================================================')
logging.error(f'The next valid order id is: {orderId}')
print('====================================================')
super().nextValidId(orderId)
self.nextorderId = orderId
self.host_connected()
port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002
def main():
logging.basicConfig(
format='%(levelname)s:%(asctime)s:%(message)s',level=logging.WARN)
logging.info('Started')
logging.debug('This message should appear on the console')
app = IBapi(
1234,#portno=port_TWS_Live,portno=port_TWS_Simulated,)
logging.info('Finished')
if __name__ == '__main__':
main()
我愿意接受任何有趣的建议。
提前致谢, 亚历克斯
解决方法
有趣。我打赌你得到了 id,检查 api 日志。对于类似
13:05:15:387 -> --- 9-1-1007-
表示 9(NextValidId 的代码),1007 是到目前为止该 Client# 的 ID。
如果由于 reqIds 而使用您的重新连接逻辑,我会得到同样的结果,但它永远不会到达 nextValidId 回调。因此读取器线程(在 disconnect() 时停止)不起作用。
我尝试将 self.run 线程移动到 host_connect() 但它仍然不起作用。
我将 EClient__init__ 放在 host_connect() 中,现在它可以工作了。不知道为什么,代码太多了。
def host_connect(self):
"""Connects to TWS with the appropriate connection parameters"""
if not self.hostname or not self.portno:
logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
return
# don't know exactly why this works,all the reader,connection,decoder,etc. need a callback
EClient.__init__(self,self)
super().connect(self.hostname,self.portno,self.clientid)
#moved from init
thread = threading.Thread(target=self.run)
thread.start()
setattr(self,"_thread",thread)
,
根据@brian 的研究,我将工作代码放在这里。 此版本正在执行 disconnect() 并在连接错误时重新启动 run() 线程。 但不会像 __init__ 那样重置队列和丢失数据。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *
connection_errors = (
CONNECT_FAIL,UPDATE_TWS,NOT_CONNECTED,UNKNOWN_ID,UNSUPPORTED_VERSION,#BAD_LENGTH,BAD_MESSAGE,SOCKET_EXCEPTION,FAIL_CREATE_SOCK,SSL_FAIL,)
connection_error_codes = [error.code() for error in connection_errors]
import threading
import time
import logging
class IBapi(EWrapper,EClient):
nextorderId = None
ka_interval = None
clientid = None
hostname = None
portno = None
MarketDataType = None
onConnected = None
def __init__(
self,clientid,hostname='127.0.0.1',portno=7497,MarketDataType=None,onConnected=None,ka_interval=3):
self.clientid = clientid
self.hostname = hostname
self.portno = portno
self.ka_interval = ka_interval
EClient.__init__(self,self)
self.onConnected = onConnected
self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
self.host_connect()
# Initialise the threads for various components
thread = threading.Thread(target=self.keepAlive)
thread.start()
setattr(self,"_thread_ka",thread)
def host_connect(self):
"""Connects to TWS with the appropriate connection parameters"""
if not self.hostname or not self.portno:
logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
return
super().connect(self.hostname,self.clientid)
thread = threading.Thread(target=self.run)
thread.start()
setattr(self,thread)
def error(self,reqId,errorCode,errorString):
"""disconnect to handle communications errors"""
# clean the connection status
if errorCode in connection_error_codes and \
self.connState not in (EClient.CONNECTING,):
logging.error(
f'disconnect on connection_error {errorCode} "{errorString}"')
self.disconnect()
if hasattr(self,"_thread"):
self._thread.join(5)
time.sleep(5)
return super().error(reqId,errorString)
def keepAlive(self):
data_lock = threading.Lock()
while self.ka_interval:
time.sleep(self.ka_interval)
# isConnected = self.isConnected()
connState = None
with data_lock:
connState = self.connState
isConnected = connState == EClient.CONNECTED
logging.error(f'is connected: {isConnected}')
if not isConnected:
isConnecting = connState == EClient.CONNECTING
if not isConnecting:
logging.error(f"let's connect")
self.host_connect()
else:
logging.error(f'already connecting')
else:
logging.error(f'requesting CurrentTime for keepAlive')
self.reqCurrentTime()
self.reqIds(1)
def host_connected(self):
self.reqMarketDataType(self.MarketDataType)
self.reqPositions()
def nextValidId(self,orderId):
print('====================================================')
logging.error(f'The next valid order id is: {orderId}')
print('====================================================')
super().nextValidId(orderId)
self.nextorderId = orderId
self.host_connected()
port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002
def main():
logging.basicConfig(
format='%(levelname)s:%(asctime)s:%(message)s',level=logging.WARN)
logging.info('Started')
app = IBapi(
1234,#portno=port_TWS_Live,portno=port_TWS_Simulated,)
logging.info('Finished')
if __name__ == '__main__':
main()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。