ibapi nextValidId 并不总是被调用

如何解决ibapi nextValidId 并不总是被调用

我构建了一个小型 ibapi python 应用程序来运行一些策略,同时注意保持与 tws 的连接。

如果 tws 处于非活动状态,python 应用程序将启动并等待,并在 tws 启动时连接到 tws,但这是我的问题:

nextValidId 不会被调用

这是我的代码。我再说一遍,仅当此应用程序连接到我的应用程序连接之前处于活动状态的 tws 会话时,才会调用 nextValidId。

如您所见,在此脚本启动之前未处于活动状态的 tws 会话中,根本不会调用 nextValidId,甚至不会由 reqIds 手动调用。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *

connection_errors = (
    CONNECT_FAIL,UPDATE_TWS,NOT_CONNECTED,UNKNOWN_ID,UNSUPPORTED_VERSION,#BAD_LENGTH,BAD_MESSAGE,SOCKET_EXCEPTION,FAIL_CREATE_SOCK,SSL_FAIL,)
connection_error_codes = [error.code() for error in connection_errors]


import threading
import time
import logging


class IBapi(EWrapper,EClient):

#   connection_lost = True
    
    clientid = None
    hostname = None
    portno = None
    onConnected = None
    
    MarketDataType = None
    
    def __init__(
            self,clientid,hostname='127.0.0.1',portno=7497,MarketDataType=None,onConnected=None,ka_interval=3):
        
        self.clientid = clientid
        self.hostname = hostname
        self.portno = portno
        self.ka_interval = ka_interval
        
        EClient.__init__(self,self)
        
        self.onConnected = onConnected
        self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
        
        self.host_connect()
        
        # Initialise the threads for various components
        thread = threading.Thread(target=self.run)
        thread.start()
        setattr(self,"_thread",thread)
        
        thread = threading.Thread(target=self.keepAlive)
        thread.start()
        setattr(self,"_thread_ka",thread)

    def host_connect(self):
        """Connects to TWS with the appropriate connection parameters"""
        if not self.hostname or not self.portno:
            logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
            return
        super().connect(self.hostname,self.portno,self.clientid)

    def error(self,reqId,errorCode,errorString):
        """disconnect to handle communications errors"""
        # clean the connection status
        if errorCode in connection_error_codes and \
                self.connState not in (EClient.CONNECTING,):
            logging.error(
                f'reset on connection_error {errorCode} "{errorString}"???')
#           self.connection_lost = True
            self.disconnect()
        return super().error(reqId,errorString)

    def keepAlive(self):
        data_lock = threading.Lock()
        while self.ka_interval:
            time.sleep(self.ka_interval)
#           isConnected = self.isConnected()
            connState = None
            with data_lock:
                connState = self.connState
#               connection_lost = self.connection_lost
            isConnected = connState == EClient.CONNECTED
            logging.error(f'is connected: {isConnected}')
            if not isConnected:
                isConnecting = connState == EClient.CONNECTING
                if not isConnecting:
                    logging.error(f"let's connect")
                    self.host_connect()
                else:
                    logging.error(f'already connecting')
            else:
                logging.error(f'requesting CurrentTime for keepAlive')
#               if connection_lost:
#                   logging.error('reconnecting. should auto invoke nextValidId')
#                   self.reqIds(1)
#                   self.host_connected()
                self.reqCurrentTime()
                self.reqIds(1)

    def host_connected(self):
#       if self.connection_lost:
#           self.connection_lost = False
            self.reqMarketDataType(self.MarketDataType)
            self.reqPositions()

    def nextValidId(self,orderId):
        print('====================================================')
        logging.error(f'The next valid order id is: {orderId}')
        print('====================================================')
        super().nextValidId(orderId)
        self.nextorderId = orderId
        self.host_connected()

port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002

def main():
    logging.basicConfig(
        format='%(levelname)s:%(asctime)s:%(message)s',level=logging.WARN)
    logging.info('Started')
    
    logging.debug('This message should appear on the console')
    
    app = IBapi(
        1234,#portno=port_TWS_Live,portno=port_TWS_Simulated,)
    
    logging.info('Finished')

if __name__ == '__main__':
    main()

我愿意接受任何有趣的建议。

提前致谢, 亚历克斯

解决方法

有趣。我打赌你得到了 id,检查 api 日志。对于类似

13:05:15:387 -> --- 9-1-1007-

表示 9(NextValidId 的代码),1007 是到目前为止该 Client# 的 ID。

如果由于 reqIds 而使用您的重新连接逻辑,我会得到同样的结果,但它永远不会到达 nextValidId 回调。因此读取器线程(在 disconnect() 时停止)不起作用。

我尝试将 self.run 线程移动到 host_connect() 但它仍然不起作用。

我将 EClient__init__ 放在 host_connect() 中,现在它可以工作了。不知道为什么,代码太多了。

def host_connect(self):
    """Connects to TWS with the appropriate connection parameters"""
    if not self.hostname or not self.portno:
        logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
        return
    
    # don't know exactly why this works,all the reader,connection,decoder,etc. need a callback
    EClient.__init__(self,self) 
    super().connect(self.hostname,self.portno,self.clientid)

    #moved from init
    thread = threading.Thread(target=self.run)
    thread.start()
    setattr(self,"_thread",thread)
,

根据@brian 的研究,我将工作代码放在这里。 此版本正在执行 disconnect() 并在连接错误时重新启动 run() 线程。 但不会像 __init__ 那样重置队列和丢失数据。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.common import MarketDataTypeEnum
from ibapi.errors import *

connection_errors = (
    CONNECT_FAIL,UPDATE_TWS,NOT_CONNECTED,UNKNOWN_ID,UNSUPPORTED_VERSION,#BAD_LENGTH,BAD_MESSAGE,SOCKET_EXCEPTION,FAIL_CREATE_SOCK,SSL_FAIL,)
connection_error_codes = [error.code() for error in connection_errors]


import threading
import time
import logging


class IBapi(EWrapper,EClient):

    nextorderId = None
    ka_interval = None
    
    clientid = None
    hostname = None
    portno = None
    
    MarketDataType = None
    onConnected = None
    
    def __init__(
            self,clientid,hostname='127.0.0.1',portno=7497,MarketDataType=None,onConnected=None,ka_interval=3):
        
        self.clientid = clientid
        self.hostname = hostname
        self.portno = portno
        self.ka_interval = ka_interval
        
        EClient.__init__(self,self)
        
        self.onConnected = onConnected
        self.MarketDataType = MarketDataType or MarketDataTypeEnum.DELAYED
        
        self.host_connect()
        
        # Initialise the threads for various components
        thread = threading.Thread(target=self.keepAlive)
        thread.start()
        setattr(self,"_thread_ka",thread)

    def host_connect(self):
        """Connects to TWS with the appropriate connection parameters"""
        if not self.hostname or not self.portno:
            logging.error(f'hostname {self.hostname} or portno {self.portno} not [yet] defined')
            return
        super().connect(self.hostname,self.clientid)
        thread = threading.Thread(target=self.run)
        thread.start()
        setattr(self,thread)

    def error(self,reqId,errorCode,errorString):
        """disconnect to handle communications errors"""
        # clean the connection status
        if errorCode in connection_error_codes and \
                self.connState not in (EClient.CONNECTING,):
            logging.error(
                f'disconnect on connection_error {errorCode} "{errorString}"')
            self.disconnect()
            if hasattr(self,"_thread"):
                self._thread.join(5)
                time.sleep(5)
        return super().error(reqId,errorString)

    def keepAlive(self):
        data_lock = threading.Lock()
        while self.ka_interval:
            time.sleep(self.ka_interval)
#           isConnected = self.isConnected()
            connState = None
            with data_lock:
                connState = self.connState
            isConnected = connState == EClient.CONNECTED
            logging.error(f'is connected: {isConnected}')
            if not isConnected:
                isConnecting = connState == EClient.CONNECTING
                if not isConnecting:
                    logging.error(f"let's connect")
                    self.host_connect()
                else:
                    logging.error(f'already connecting')
            else:
                logging.error(f'requesting CurrentTime for keepAlive')
                self.reqCurrentTime()
                self.reqIds(1)

    def host_connected(self):
        self.reqMarketDataType(self.MarketDataType)
        self.reqPositions()

    def nextValidId(self,orderId):
        print('====================================================')
        logging.error(f'The next valid order id is: {orderId}')
        print('====================================================')
        super().nextValidId(orderId)
        self.nextorderId = orderId
        self.host_connected()

port_TWS_Live = 7496
port_IBGateway_Live = 4001
port_TWS_Simulated = 7497
port_IBGateway_Simulated = 4002

def main():
    
    logging.basicConfig(
        format='%(levelname)s:%(asctime)s:%(message)s',level=logging.WARN)
    
    logging.info('Started')
    
    app = IBapi(
        1234,#portno=port_TWS_Live,portno=port_TWS_Simulated,)
    
    logging.info('Finished')

if __name__ == '__main__':
    main()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res