如何解决paho-mqtt的python unittest无法正常工作-简单的语法问题
我正在尝试使用用于python的unittest包编写一个简单的测试,该测试仅检测是否存在代理连接。尽管成功建立了代理连接,但它似乎还是失败了,我90%的肯定是语法方面的问题-特别是has_connected布尔变量的定义。
import paho.mqtt.client as mqtt
import time
class TestbrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "10.0.2.4"
self.port = 1883
self.has_connected = False
def on_connect(client,userdata,flags,rc): #connect function
if rc==0:
self.has_connected = True
def test_connection(self): #test to check connection to broker
self.client.connect(self.broker,self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
if __name__ == '__main__':
unittest.main()
任何帮助将不胜感激:)
解决方法
我复制了您的代码示例,并使用了paho.mqtt客户端提供的示例进行连接
client.connect("mqtt.eclipse.org",1883,60)
我认为您的问题可能在您的on_connect函数上,您正在引用self.has_connected,但是您没有将self传递给该函数。
这对我来说很有效,请让我知道将self添加到on_connect中是否可以解决您遇到的问题!
class TestBrokerConnection(unittest.TestCase):
def setUp(self):
self.client = mqtt.Client("Test Client")
self.client.on_connect = self.on_connect
self.broker = "mqtt.eclipse.org"
self.port = 1883
self.has_connected = False
def on_connect(self,client,userdata,flags,rc): # connect function
if rc == 0:
self.has_connected = True
def test_connection(self): # test to check connection to broker
self.client.connect(self.broker,self.port)
self.client.loop_start()
time.sleep(2)
self.client.loop_stop()
self.assertTrue(self.has_connected)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。