如何解决使用 Bleak 和 PyQtGraph 包实时绘制 BLE 数据
我正在尝试使用基于 ESP32 的传感器和 BLE 实时绘制传感器数据。我尝试使用 Bleak 并结合 PyQtGraph 包中的简单 scrolling example。我知道从 ESP32 中正确读取了传感器数据,但没有出现绘图,因此显然是错误地集成了代码。
我当前的代码:
import asyncio
from bleak import BleakClient
import time
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np
# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"
# Plot details
win = pg.GraphicslayoutWidget(show=True)
win.setwindowTitle('pyqtgraph example: Scrolling Plots')
p1 = win.addplot()
data1 = np.zeros(5)
curve1 = p1.plot(data1)
async def run(address,loop):
global data1
while True:
time.sleep(5)
async with BleakClient(address,loop=loop) as client:
data = await client.read_gatt_char(UUID)
#Parse sensor data from BLE characteristic
sensor_data = data.decode('utf_8')
print(sensor_data)
sensor_data = sensor_data.split(",")
temperature = sensor_data[4]
#Update the array with newest data so plot will appear scroll
def update():
print(temperature)
data1[:-1] = data1[1:]
data1[-1] = temperature
curve1.setData(data1)
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(1000)
loop = asyncio.get_event_loop()
loop.run_until_complete(run(address,loop))
if __name__ == '__main__':
QtGui.QApplication.instance().exec_()
解决方法
你永远不应该在 pyqtgraph 中使用 time.sleep。
pyqtgraph 默认不支持 asyncio,但您可以使用 asyncqt
模块(python -m pip install asyncqt
)或 qasync
模块(python -m pip install qasync
)创建一个支持Qt。
import asyncio
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np
from bleak import BleakClient
from asyncqt import QEventLoop,asyncSlot
# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"
class Window(pg.GraphicsLayoutWidget):
def __init__(self,loop=None,parent=None):
super().__init__(parent)
self._loop = loop
self.setWindowTitle("pyqtgraph example: Scrolling Plots")
plot = self.addPlot()
self._data = np.zeros(5)
self._curve = plot.plot(self.data)
self._client = BleakClient(address,loop=self._loop)
@property
def client(self):
return self._client
async def start(self):
await self.client.connect()
self.start_read()
async def stop(self):
await self.client.disconnect()
@property
def data(self):
return self._data
@property
def curve(self):
return self._curve
async def read(self):
data = await self.client.read_gatt_char(UUID)
sensor_data = data.split(",")
if len(sensor_data) >= 5:
temperature_str = sensor_data[4]
try:
temperature = float(temperature_str)
except ValueError as e:
print(f"{temperature_str} not is float")
else:
self.update_plot(temperature)
QtCore.QTimer.singleShot(5000,self.start_read)
def start_read(self):
asyncio.ensure_future(self.read(),loop=self._loop)
def update_plot(self,temperature):
self.data[:-1] = self.data[1:]
self.data[-1] = temperature
self.curve.setData(self.data)
def closeEvent(self,event):
super().closeEvent(event)
asyncio.ensure_future(self.client.stop(),loop=self._loop)
def main(args):
app = QtGui.QApplication(args)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
window = Window()
window.show()
with loop:
asyncio.ensure_future(window.start(),loop=loop)
loop.run_forever()
if __name__ == "__main__":
import sys
main(sys.argv)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。