微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Bleak 和 PyQtGraph 包实时绘制 BLE 数据

如何解决使用 Bleak 和 PyQtGraph 包实时绘制 BLE 数据

我正在尝试使用基于 ESP32 的传感器和 BLE 实时绘制传感器数据。我尝试使用 Bleak 并结合 PyQtGraph 包中的简单 scrolling example。我知道从 ESP32 中正确读取了传感器数据,但没有出现绘图,因此显然是错误地集成了代码

我当前的代码

import asyncio
from bleak import BleakClient
import time
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np

# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"

# Plot details
win = pg.GraphicslayoutWidget(show=True)
win.setwindowTitle('pyqtgraph example: Scrolling Plots')
p1 = win.addplot()
data1 = np.zeros(5)
curve1 = p1.plot(data1)


async def run(address,loop):
    global data1

    while True:
        time.sleep(5)

        async with BleakClient(address,loop=loop) as client:
            data = await client.read_gatt_char(UUID)

            #Parse sensor data from BLE characteristic
            sensor_data = data.decode('utf_8')
            print(sensor_data)
            sensor_data = sensor_data.split(",")
            temperature = sensor_data[4]

            #Update the array with newest data so plot will appear scroll
            def update():
                print(temperature)
                data1[:-1] = data1[1:]
                data1[-1] = temperature
                curve1.setData(data1)

        timer = pg.QtCore.QTimer()
        timer.timeout.connect(update)
        timer.start(1000)


loop = asyncio.get_event_loop()
loop.run_until_complete(run(address,loop))

if __name__ == '__main__':
    QtGui.QApplication.instance().exec_()

解决方法

你永远不应该在 pyqtgraph 中使用 time.sleep。

pyqtgraph 默认不支持 asyncio,但您可以使用 asyncqt 模块(python -m pip install asyncqt)或 qasync 模块(python -m pip install qasync)创建一个支持Qt。

import asyncio

import pyqtgraph as pg
from pyqtgraph.Qt import QtCore,QtGui
import numpy as np

from bleak import BleakClient

from asyncqt import QEventLoop,asyncSlot

# BLE peripheral ID
address = "6C9F597F-7085-4AAB-806B-D2558588D50D"
UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"


class Window(pg.GraphicsLayoutWidget):
    def __init__(self,loop=None,parent=None):
        super().__init__(parent)
        self._loop = loop

        self.setWindowTitle("pyqtgraph example: Scrolling Plots")
        plot = self.addPlot()
        self._data = np.zeros(5)
        self._curve = plot.plot(self.data)
        self._client = BleakClient(address,loop=self._loop)

    @property
    def client(self):
        return self._client

    async def start(self):
        await self.client.connect()
        self.start_read()

    async def stop(self):
        await self.client.disconnect()

    @property
    def data(self):
        return self._data

    @property
    def curve(self):
        return self._curve

    async def read(self):
        data = await self.client.read_gatt_char(UUID)
        sensor_data = data.split(",")
        if len(sensor_data) >= 5:
            temperature_str = sensor_data[4]
            try:
                temperature = float(temperature_str)
            except ValueError as e:
                print(f"{temperature_str} not is float")
            else:
                self.update_plot(temperature)
        QtCore.QTimer.singleShot(5000,self.start_read)

    def start_read(self):
        asyncio.ensure_future(self.read(),loop=self._loop)

    def update_plot(self,temperature):
        self.data[:-1] = self.data[1:]
        self.data[-1] = temperature
        self.curve.setData(self.data)

    def closeEvent(self,event):
        super().closeEvent(event)
        asyncio.ensure_future(self.client.stop(),loop=self._loop)


def main(args):
    app = QtGui.QApplication(args)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)

    window = Window()
    window.show()

    with loop:
        asyncio.ensure_future(window.start(),loop=loop)
        loop.run_forever()


if __name__ == "__main__":
    import sys

    main(sys.argv)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。