微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 pydbus 访问 bluez-obexd 属性状态?

如何解决如何使用 pydbus 访问 bluez-obexd 属性状态?

我正在使用 BlueZ-obexd 和 pydbus 创建一个用于 opp 文件传输的 python obex 客户端。到目前为止,我已经能够使文件传输正常工作,并且我正在尝试从 API 实现 RemoveSession 以仅在传输完成或失败后触发。我知道 API 中也有一个 status 属性,但我完全不知道如何使用它。我对 dbus 或 pydbus 没有很多经验,所以任何帮助都会很棒。我的代码如下。

import pydbus
import time
import subprocess

# Setup of device specific values
address = subprocess.check_output("bluetoothctl paired-devices",shell=True)
dev_id = str(address)[9:26]
TRANSFER_IFACE = 'org.bluez.obex.Transfer1'
ses = pydbus.SessionBus()
obex = ses.get('org.bluez.obex','/org/bluez/obex')
ses1 = obex.CreateSession(dev_id,{'Target': pydbus.Variant('s','OPP')})
ses1_dbus = ses.get('org.bluez.obex',ses1)
props = ses1_dbus.SendFile('/home/pi/Desktop/image.jpg')

obex.RemoveSession(ses1)

解决方法

SendFile 的文档说返回一个 DBus 对象路径和一个属性字典。返回对象的接口是org.bluez.obex.Transfer1,允许监听Status

监控此状态的理想方法是使用来自 GLib.MainLoop 的事件循环并观察 PropertiesChanged 返回的 DBus 对象路径上的 SendFile 信号。

此外,通过获取配对设备信息,有一个 BlueZ DBus API 可以获取该信息,而无需对 subprocess 进行 bluetoothctl 调用。

我试过为两者做一个例子:

import pydbus
from gi.repository import GLib
from time import sleep

OBEX_TRANSFER = 'org.bluez.obex.Transfer1'
BLUEZ_DEVICE = 'org.bluez.Device1'

ses_bus = pydbus.SessionBus()
sys_bus = pydbus.SystemBus()
mngr = sys_bus.get('org.bluez','/')

mainloop = GLib.MainLoop()

def paired_devices():
    mngd_objs = mngr.GetManagedObjects()
    for path,iface in mngd_objs.items():
        if BLUEZ_DEVICE in iface:
            addr = iface.get(BLUEZ_DEVICE,{}).get('Address')
            name = iface.get(BLUEZ_DEVICE,{}).get('Name',addr)
            paired = iface.get(BLUEZ_DEVICE,{}).get('Paired')
            if paired:
                return addr
            

def exit_session():
    print('Exit session')
    mainloop.quit()
    obex.RemoveSession(ses1)

def transfer_status_handler(iface,props_changed,props_removed):
    if iface == OBEX_TRANSFER:
        status = props_changed.get('Status')
        if status == 'complete':
            print('Transfer complete')
            exit_session()
        elif status == 'queued':
            print('Still queued')
        elif status == 'active':
            print('transferring')
        elif status == 'suspended':
            print('Suspended')
        elif status == 'error':
            print('error')
            exit_session()

obex = ses_bus.get('org.bluez.obex','/org/bluez/obex')
my_addr = paired_devices()
ses1 = obex.CreateSession(my_addr,{'Target': pydbus.Variant('s','OPP')})
ses1_dbus = ses_bus.get('org.bluez.obex',ses1)
path,props = ses1_dbus.SendFile('/home/pi/.bashrc')
transfer = ses_bus.get('org.bluez.obex',path)
transfer.onPropertiesChanged = transfer_status_handler
try:
    mainloop.run()
except KeyboardInterrupt:
    exit_session()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。