本文介绍了主动BLE扫描(BlueZ)-DBus的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始了一个项目,在该项目中,我需要一直(一直)对BLE设备进行扫描.我在Linux上,使用Bluez 5.49,并且使用Python与dbus 1.10.20进行通信.我能够开始扫描,停止使用bluetoothctl进行扫描,并通过DBus(BlueZ接口的GetManagedObjects())获取BLE广告数据.我的问题是,当我让扫描工作了多个小时时,dbus-deamon开始占用越来越多的RAM,而我却找不到如何刷新"内存的方法.dbus从BlueZ那里收集了什么.最终,RAM满了,而Linux不高兴.

I've started a project where I need to actively (all the time) scan for BLE Devices. I'm on Linux, using Bluez 5.49 and I use Python to communicate with dbus 1.10.20).I' m able to start scanning, stop scanning with bluetoothctl and get the BLE Advertisement data through DBus (GetManagedObjects() of the BlueZ interface). The problem I have is when I let the scanning for many hours, dbus-deamon start to take more and more of the RAM and I'm not able to find how to "flush" what dbus has gathered from BlueZ. Eventually the RAM become full and Linux isn't happy.

因此,我尝试不扫描整个时间,这可能会让垃圾收集器进行清理.没用.

So I've tried not to scan for the entire time, that would maybe let the Garbage collector do its cleanup. It didn't work.

我已经编辑了/etc/dbus-1/system.d/bluetooth.conf以删除不需要的任何接口

I've edited the /etc/dbus-1/system.d/bluetooth.conf to remove any interface that I didn't need

<policy user="root">
    <allow own="org.bluez"/>
    <allow send_destination="org.bluez"/>
</policy>

这减慢了RAM的建立,但没有解决问题.

That has slow down the RAM build-up but didn't solve the issue.

我找到了一种检查哪个连接有字节等待的方法,并确认它来自blueZ

I've found a way to inspect which connection has byte waiting and confirmed that it comes from blueZ

Connection :1.74 with pid 3622 '/usr/libexec/bluetooth/bluetoothd --experimental ' (org.bluez):
        IncomingBytes=1253544
        PeakIncomingBytes=1313072
        OutgoingBytes=0
        PeakOutgoingBytes=210

最后,我发现有人需要读取DBus中正在等待的内容才能释放内存.所以我找到了这个: https://stackoverflow.com/a/60665430/15325057

and lastly, I've found that someone needs to read what is waiting in DBus in order to free the memory. So I've found this : https://stackoverflow.com/a/60665430/15325057

我收到了BlueZ正在发送的数据,但内存仍在建立.

And I receive the data that BlueZ is sending over but the memory still built-up.

我知道释放dbus的唯一方法是重新启动linux.这不理想.

The only way I know to free up dbus is to reboot linux. which is not ideal.

我对DBus的了解已经结束了,这就是为什么我今天在这里.如果您有什么见识可以帮助我从BlueZ消息中释放dbus,将不胜感激.

I'm coming at the end of what I understand of DBus and that's why I'm here today.If you have any insight that could help me to free dbus from BlueZ messages, it would be highly appreciated.

预先感谢

编辑,添加了我用来读取发现的设备的DBus代码:

EDIT Adding the DBus code i use to read the discovered devices:

#!/usr/bin/python3

import dbus

BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
DEVICES_IFACE = "org.bluez.Device1"

def main_loop(subproc):
    devinfo = None
    objects = None

    dbussys = dbus.SystemBus()
    dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
    bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)


    while True:
        try:
            objects = bluezInterface.GetManagedObjects()
        except dbus.DBusException as err:
            print("dbus Error : " + str(err))
            pass

        all_devices = (str(path) for path, interfaces in objects.items() if DEVICES_IFACE in interfaces.keys())

        for path, interfaces in objects.items():
            if "org.bluez.Adapter1" not in interfaces.keys():
                continue

            device_list = [d for d in all_devices if d.startswith(path + "/")]

            for dev_path in device_list:
                properties = objects[dev_path][DEVICES_IFACE]

                if "ServiceData" in properties.keys() and "Name" in properties.keys() and "RSSI" in properties.keys():
                    #[... Do someting...]

推荐答案

我无法真正准确地重现您的错误,但是我的系统运行循环时从GetManagedObjects重复获取数据时却不那么快地运行.下面是我根据您的代码运行的代码,并进行了一些重构...

I can't really reproduce your error exactly but my system is not happy running that fast while loop repeatedly getting the data from GetManagedObjects.Below is the code I ran based on your code with a little bit of refactoring...

import dbus

BLUEZ_SERVICE_NAME = "org.bluez"
DBUS_OM_IFACE = "org.freedesktop.DBus.ObjectManager"
ADAPTER_IFACE = "org.bluez.Adapter1"
DEVICES_IFACE = "org.bluez.Device1"

def main_loop():
    devinfo = None
    objects = None

    dbussys = dbus.SystemBus()
    dbusconnection = dbussys.get_object(BLUEZ_SERVICE_NAME, "/")
    bluezInterface = dbus.Interface(dbusconnection, DBUS_OM_IFACE)

    while True:
        objects = bluezInterface.GetManagedObjects()
        for path in objects:
            name = objects[path].get(DEVICES_IFACE, {}).get('Name')
            rssi = objects[path].get(DEVICES_IFACE, {}).get('RSSI')
            service_data = objects[path].get(DEVICES_IFACE, {}).get('ServiceData')
            if all((name, rssi, service_data)):
                print(f'{name} @ {rssi} = {service_data}')
                #[... Do someting...]

if __name__ == '__main__':
    main_loop()

我不确定您要在更广泛的项目中做什么,但是我能否提出一些建议...

I'm not sure what you are trying to do in the broader project but if I can make some recommendations...

扫描服务/制造商数据的一种更典型的方法是订阅D-Bus中的信号,当感兴趣的事情发生时,这些信号会触发回调.

A more typical way of scanning for service/manufacturer data is to subscribe to signals in D-Bus that trigger callbacks when something of interest happens.

下面是一些我用来查找iBeacons和Eddystone信标的代码.这是使用GLib事件循环运行的,这也许是您已经排除在外的事情,但是在资源上效率更高.

Below is some code I use to look for iBeacons and Eddystone beacons. This runs using the GLib event loop which is maybe something you have ruled out but is more efficient on resources.

当我发现 pydbus 更多"pythonic"时,它确实使用了不同的Python dbus绑定.

It does use different Python dbus bindings as I find pydbus more "pythonic".

我留了处理信标的代码,因为它可能是有用的参考.

I have left the code in processing the beacons as it might be a useful reference.

import argparse
from gi.repository import GLib
from pydbus import SystemBus
import uuid

DEVICE_INTERFACE = 'org.bluez.Device1'

remove_list = set()


def stop_scan():
    """Stop device discovery and quit event loop"""
    adapter.StopDiscovery()
    mainloop.quit()


def clean_beacons():
    """
    BlueZ D-Bus API does not show duplicates. This is a
    workaround that removes devices that have been found
    during discovery
    """
    not_found = set()
    for rm_dev in remove_list:
        try:
            adapter.RemoveDevice(rm_dev)
        except GLib.Error as err:
            not_found.add(rm_dev)
    for lost in not_found:
        remove_list.remove(lost)


def process_eddystone(data):
    """Print Eddystone data in human readable format"""
    _url_prefix_scheme = ['http://www.', 'https://www.',
                          'http://', 'https://', ]
    _url_encoding = ['.com/', '.org/', '.edu/', '.net/', '.info/',
                     '.biz/', '.gov/', '.com', '.org', '.edu',
                     '.net', '.info', '.biz', '.gov']
    tx_pwr = int.from_bytes([data[1]], 'big', signed=True)
    # Eddystone UID Beacon format
    if data[0] == 0x00:
        namespace_id = int.from_bytes(data[2:12], 'big')
        instance_id = int.from_bytes(data[12:18], 'big')
        print(f'\t\tEddystone UID: {namespace_id} - {instance_id} \u2197 {tx_pwr}')
    # Eddystone URL beacon format
    elif data[0] == 0x10:
        prefix = data[2]
        encoded_url = data[3:]
        full_url = _url_prefix_scheme[prefix]
        for letter in encoded_url:
            if letter < len(_url_encoding):
                full_url += _url_encoding[letter]
            else:
                full_url += chr(letter)
        print(f'\t\tEddystone URL: {full_url} \u2197 {tx_pwr}')


def process_ibeacon(data, beacon_type='iBeacon'):
    """Print iBeacon data in human readable format"""
    print('DATA:', data)
    beacon_uuid = uuid.UUID(bytes=bytes(data[2:18]))
    major = int.from_bytes(bytearray(data[18:20]), 'big', signed=False)
    minor = int.from_bytes(bytearray(data[20:22]), 'big', signed=False)
    tx_pwr = int.from_bytes([data[22]], 'big', signed=True)
    print(f'\t\t{beacon_type}: {beacon_uuid} - {major} - {minor} \u2197 {tx_pwr}')


def ble_16bit_match(uuid_16, srv_data):
    """Expand 16 bit UUID to full 128 bit UUID"""
    uuid_128 = f'0000{uuid_16}-0000-1000-8000-00805f9b34fb'
    return uuid_128 == list(srv_data.keys())[0]


def on_iface_added(owner, path, iface, signal, interfaces_and_properties):
    """
    Event handler for D-Bus interface added.
    Test to see if it is a new Bluetooth device
    """
    iface_path, iface_props = interfaces_and_properties
    if DEVICE_INTERFACE in iface_props:
        on_device_found(iface_path, iface_props[DEVICE_INTERFACE])


def on_device_found(device_path, device_props):
    """
    Handle new Bluetooth device being discover.
    If it is a beacon of type iBeacon, Eddystone, AltBeacon
    then process it
    """
    address = device_props.get('Address')
    address_type = device_props.get('AddressType')
    name = device_props.get('Name')
    alias = device_props.get('Alias')
    paired = device_props.get('Paired')
    trusted = device_props.get('Trusted')
    rssi = device_props.get('RSSI')
    service_data = device_props.get('ServiceData')
    manufacturer_data = device_props.get('ManufacturerData')
    if address.casefold() == '00:c3:f4:f1:58:69':
        print('Found mac address of interest')
    if service_data and ble_16bit_match('feaa', service_data):
        process_eddystone(service_data['0000feaa-0000-1000-8000-00805f9b34fb'])
        remove_list.add(device_path)
    elif manufacturer_data:
        for mfg_id in manufacturer_data:
            # iBeacon 0x004c
            if mfg_id == 0x004c and manufacturer_data[mfg_id][0] == 0x02:
                process_ibeacon(manufacturer_data[mfg_id])
                remove_list.add(device_path)
            # AltBeacon 0xacbe
            elif mfg_id == 0xffff and manufacturer_data[mfg_id][0:2] == [0xbe, 0xac]:
                process_ibeacon(manufacturer_data[mfg_id], beacon_type='AltBeacon')
                remove_list.add(device_path)
    clean_beacons()


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--duration', type=int, default=0,
                        help='Duration of scan [0 for continuous]')
    args = parser.parse_args()
    bus = SystemBus()
    adapter = bus.get('org.bluez', '/org/bluez/hci0')

    bus.subscribe(iface='org.freedesktop.DBus.ObjectManager',
                  signal='InterfacesAdded',
                  signal_fired=on_iface_added)

    mainloop = GLib.MainLoop()


    if args.duration > 0:
        GLib.timeout_add_seconds(args.duration, stop_scan)
    adapter.SetDiscoveryFilter({'DuplicateData': GLib.Variant.new_boolean(False)})
    adapter.StartDiscovery()

    try:
        print('\n\tUse CTRL-C to stop discovery\n')
        mainloop.run()
    except KeyboardInterrupt:
        stop_scan()

这篇关于主动BLE扫描(BlueZ)-DBus的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-03 09:55