如何解决在python中使用带有bluepy的BLE加速等待通知
我使用 bluepy 从配备低功耗蓝牙设备的 Python 传感器中获取数据。传感器是我自己设计的设备。因此,我可以访问传感器板上的处理。
现在需要 2 秒才能从传感器获取数据。在传感器上,数据采集过程仅需 200 毫秒。此延迟发生在“等待通知”(1.7 秒)。我在下面粘贴了我的整个蓝牙通信类。但是,简要代码如下:
- 通过蓝牙向传感器写入一个字符串(传感器解析它以查看它应该返回什么样的数据)
- 使用
while self.peripheral.waitForNotifications
等待通知。 - 读取数据(这很快
因此,这第 2 步需要时间(尽管传感器在大约 200 毫秒后开始发回)。
有谁知道是否可以加快第 2 步的速度?
import struct
import bluepy.btle as btle
import numpy
import time
class ReadDelegate(btle.DefaultDelegate):
def __init__(self):
self.data = b''
def reset(self):
self.data = b''
def handleNotification(self,cHandle,data):
self.data = self.data + data
@property
def data_length(self):
return len(self.data)
class myHC08:
def __init__(self,verbose=False):
self.verbose = verbose
self.mac = '34:14:B5:50:22:ED'
self.write_service_id = 4
self.write_service = None
self.delegate = ReadDelegate()
self.peripheral = None
def print_output(self,message):
print('HC08: %s' % message)
def connect(self):
self.peripheral = btle.Peripheral(self.mac)
self.peripheral.withDelegate(self.delegate)
s = self.write_service_id
services = self.peripheral.getServices()
self.write_service = self.peripheral.getServiceByUUID(list(services)[s].uuid)
def wait_for_notification(self,time_out=5):
start = time.time()
while self.peripheral.waitForNotifications(1):
waiting = time.time() - start
if waiting > time_out: return False,False
waiting = time.time() - start
return True,waiting
def wait_for_data(self,min_bytes,time_out=5):
start = time.time()
while self.delegate.data_length < min_bytes:
waiting = time.time() - start
if waiting > time_out: return False,waiting
def write(self,message,unpack_string=False):
if self.verbose: self.print_output('Writing %s' % message)
self.delegate.reset()
c = self.write_service.getcharacteristics()[0]
c.write(bytes(message,"utf-8"))
self.verbose: print("HC08 Receiving %i bytes" % min_bytes)
if self.verbose: self.print_output('Waiting for notification')
success,duration = self.wait_for_notification()
if not success: return False
if self.verbose: self.print_output('Duration: %.10f' % duration)
if self.verbose: self.print_output('Waiting for data')
success,duration = self.wait_for_data(min_bytes=min_bytes)
if not success: return False
if self.verbose: self.print_output('Duration: %.10f' % duration)
received = self.delegate.data
if unpack_string:
received = struct.unpack(unpack_string,received)
received = numpy.array(received)
return received
def write_robust(self,unpack_string=False,max_retry=3):
for attempt in range(max_retry):
if self.verbose: self.print_output('Attempt %i out of %i' % (attempt,max_retry))
result = self.write(message,unpack_string)
if result is not False: return result
if not result: self.reconnect()
if not result: raise ('Error Could not get data in %i attempts.' % max_retry)
def disconnect(self):
self.peripheral.disconnect()
def reconnect(self):
self.print_output('Reconnecting')
for x in range(5): self.disconnect()
self.connect()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。