Source code for able.bluezdbus.application

#    Copyright (c)  2021  Allthenticate
"""
For references of this implementation see of a bluezdbus application:

Advertising: https://gitlab.com/allthenticate/dependencies/bluez/-/blob/master/doc/advertising-api.txt
GATT: https://gitlab.com/allthenticate/dependencies/bluez/-/blob/master/doc/gatt-api.txt
DBUS: https://dbus.freedesktop.org/doc/dbus-specification.html
dbus-next: https://python-dbus-next.readthedocs.io/_/downloads/en/latest/pdf/
"""

# Native libraries
import asyncio
import logging
import re
import subprocess
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set

# Dependencies
from dbus_next import DBusError, Message, Variant  # type: ignore
from dbus_next.aio import MessageBus, ProxyInterface, ProxyObject  # type: ignore
from dbus_next.constants import BusType, MessageFlag  # type: ignore
from dbus_next.service import ServiceInterface  # type: ignore

from able.bluezdbus import (
    BluezServiceInterfaces,
    DBusInterface,
    DbusServiceInterfaces,
    get_advertisement_object_path,
    get_characteristic_path,
    get_service_path,
)
from able.bluezdbus.advertisement import BluezLEAdvertisement
from able.bluezdbus.characteristic import BluezCharacteristic
from able.bluezdbus.configurator import configure_bluez_conf_file
from able.bluezdbus.service import BluezService
from able.bluezdbus.utils import add_match, remove_match

# Able Dependencies
from able.utils import DeviceCallbackData, asyncio_create_task_wrapper

if TYPE_CHECKING:
    from able.advertisement import ABleAdvertisement
    from able.central import ABleCentral
    from able.characteristic import ABleCharacteristic
    from able.service import ABleService

# Setup logging
logger = logging.getLogger(name=__name__)

BLUETOOTH_FIVE_HCI_VERSION = 0x9


def _check_support_for_always_advertising(adapter_path: str) -> bool:
    """
    Advertising while connected is only supported on adapters with HCI version ^5.0, this function uses
    hciconfig to check the version the adapter specified.

    :param adapter_path: the adapter path to check (eg. /org/bluez/hci0)
    :return: `True` if multiple
    """
    try:
        # Get the HCI version from hciconfig
        output = subprocess.run(["hciconfig", "-a", adapter_path], stdout=subprocess.PIPE, check=True)

        # Find the version number using a re
        version_match = re.search(r"HCI Version:[^|\n]*\((.*)\)", output.stdout.decode())

        if version_match is None:
            logger.warning("Unable to parse HCI version from hciconfig")
            return False

        version = int(version_match.group(1), 16)
        logger.debug(
            f"Detected HCI version is {version}, needs to be {BLUETOOTH_FIVE_HCI_VERSION} for "
            f"advertising while connected"
        )

        # Return if it is greater than or equal to bluetooth 5
        return version >= BLUETOOTH_FIVE_HCI_VERSION
    except Exception:
        logger.exception("Unable to determine if always advertising is supported, defaulting to False")
        return False


def _check_support_for_directed_notifications(gatt_manager: ProxyInterface) -> bool:
    """
    Uses the proxy interface to determine whether or not a custom patch enabling device specific notifications
    is supported. The method name we are looking for is `call_notify_characteristic_changed`

    :param gatt_manager: The proxy interface of the gatt manager
    :return: `True` if the patched method is present and callable, `False` otherwise
    """
    return "call_notify_characteristic_changed" in gatt_manager.__dict__


[docs]class BluezApplication(ServiceInterface): """ The high level application that abstracts Dbus Calls :param name: the name that this application should request on the dbus :param auto_recover: if `True` will try to recover if bluez restarts, defaults to `False` :param adapter_path: what is the path to the hardware adapter, will be the first one discovered if not provided, eg. /org/bluez/hci0 :param auto_configure: if `True` will try to set bluez settings at startup, defaults to `False` """ def __init__( self, name: str, adapter_path: Optional[str] = None, auto_recover: bool = False, auto_configure: bool = False, ): """Initializer for the bluezdbus application""" super().__init__(BluezServiceInterfaces.GATT_MANAGER_INTERFACE.value) # Dbus related self.name = name self.path = "/" self._bus: Optional[MessageBus] = None self.gatt_manager_interface = None self.adapter_interface: Optional[ProxyInterface] = None self.proxy_objects: Dict[str, Dict[str, ProxyObject]] = {} self.match_rules: Set[str] = set() self.auto_recover = auto_recover self.error_state = False self.dbus_lock: Optional[asyncio.Lock] = None self.supports_directed_notifications: bool = False # Advertising related self.exported_advertisements: Set["ABleAdvertisement"] = set() self.registered_advertisements: Set["ABleAdvertisement"] = set() self.current_advertisement_index: int = 0 self.advertising_manager: Optional[ProxyInterface] = None self.adapter_path = adapter_path self.adapter_properties: Optional[ProxyInterface] = None self.supported_includes = None # Services related self.services: List[BluezService] = [] # Params self.max_advertisements = 5 self.advertising_capabilities: Dict[str, int] = {} self.always_advertising_supported = False # Connection related self.has_new_central = asyncio.Event() self.new_connection_queue: Set["ABleCentral"] = set() self.connected_centrals: Dict[str, "ABleCentral"] = {} self.user_connect_callback: Optional[Callable] = None self.user_disconnect_callback: Optional[Callable] = None # Communication related self.marked_characteristic: Optional[BluezCharacteristic] = None # If set to autoconfigure, try to do so for root if auto_configure: logger.warning("Configuring bluez permissions automatically...") if configure_bluez_conf_file(): # We need to restart the service in this instance logger.warning( "BlueZ permissions were modified, you must restart the bluetooth service for changes " "to take affect. (Trying to restart now)" ) subprocess.call(["service", "bluetooth", "restart"], shell=True) def __setattr__(self, key, value): self.__dict__[key] = value # If something is setting the error state, call recovery functions if key == "error_state" and value and self.auto_recover: logger.warning("Error state declared by a module with auto recover set, attempting to recover") try: for advertisement in self.exported_advertisements: asyncio_create_task_wrapper(self.refresh_advertisement(advertisement)) self.error_state = False logger.info("Recovered from error state!") except Exception: logger.exception("Recovery failed...")
[docs] async def cleanup(self) -> None: """ This function should be called when the program exits and will handle all cleanup similar to a destructor. TODO(Bernie): Debug handling here :return: None """ self.bus.remove_message_handler(self._dbus_message_parser) if self.bus is not None: self.bus.disconnect() logger.info("BluezApplication cleanup completed. ")
async def _get_proxy_object(self, bus_name: str, path: str) -> ProxyObject: """Gets a proxy object, the dbus equivalent is bus.get_object(bus_name, path)""" # Check if we already have this proxy stored if bus_name not in self.proxy_objects: self.proxy_objects[bus_name] = {} if path not in self.proxy_objects[bus_name]: self.proxy_objects[bus_name][path] = self.bus.get_proxy_object( bus_name, path, await self.bus.introspect(bus_name, path) ) # Return the proxy object return self.proxy_objects[bus_name][path]
[docs] async def get_proxy_interface( self, bus_name: DBusInterface, path: str, interface_name: DBusInterface ) -> ProxyInterface: """Gets a proxy interface using the _get_proxy_object method""" proxy_object = await self._get_proxy_object(bus_name.value, path) return proxy_object.get_interface(interface_name.value)
[docs] async def setup(self) -> None: """ Setup the application and get it on the dbus, this is a necessary function call to initialize the object. :return: None """ # Get on the Dbus self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect() self.bus.add_message_handler(self._dbus_message_parser) await self.bus.request_name(f"{BluezServiceInterfaces.BLUEZ_ORG.value}.{self.name}") # Connect to bluez remote_object_manager = await self.get_proxy_interface( bus_name=BluezServiceInterfaces.BLUEZ_ORG, path="/", interface_name=DbusServiceInterfaces.OBJECT_MANAGER_INTERFACE, ) # Find the path of an adapter with advertising capability if self.adapter_path is None: objects = await remote_object_manager.call_get_managed_objects() # type: ignore for adapter, props in objects.items(): if BluezServiceInterfaces.ADVERTISING_MANAGER_INTERFACE.value in props: self.adapter_path = adapter break # If no adapter is found, error out if self.adapter_path is None: raise RuntimeError("No adapter found, is a bluetooth adapter plugged in?") # Power the adapter logger.debug(self.adapter_path) self.adapter_properties = await self.get_proxy_interface( bus_name=BluezServiceInterfaces.BLUEZ_ORG, path=self.adapter_path, interface_name=DbusServiceInterfaces.PROPERTIES_INTERFACE, ) await self.adapter_properties.call_set( # type: ignore BluezServiceInterfaces.ADAPTER_INTERFACE.value, "Powered", Variant("b", True), ) logger.debug(f"{self.adapter_path} is powered!") # Check if the adapter supports advertising while connected (ble v^5) self.always_advertising_supported = _check_support_for_always_advertising(self.adapter_path) if not self.always_advertising_supported: logger.warning("HCI version is sub 5.0 which doesn't support advertising while connected always.") # Get the adapter interface, we need this for removing devices self.adapter_interface = await self.get_proxy_interface( bus_name=BluezServiceInterfaces.BLUEZ_ORG, path=self.adapter_path, interface_name=BluezServiceInterfaces.ADAPTER_INTERFACE, ) # Find the advertisement manager and store it for later self.advertising_manager = await self.get_proxy_interface( bus_name=BluezServiceInterfaces.BLUEZ_ORG, path=self.adapter_path, interface_name=BluezServiceInterfaces.ADVERTISING_MANAGER_INTERFACE, ) # Try to get the maximum number of advertisements we can use try: self.max_advertisements = await self.advertising_manager.get_supported_instances() # type: ignore logger.info(f"Hardware supports {self.max_advertisements} advertisements.") except Exception: logger.warning("Unable to get the number of supported instances from the adapter.") # Try to get the supported capabilities of advertising manager try: self.advertising_capabilities = await self.advertising_manager.get_supported_capabilities() # type: ignore # Try to pull out variant values for key, value in self.advertising_capabilities.items(): if isinstance(value, Variant): self.advertising_capabilities[key] = value.value logger.info(f"Advertising manager supports: {self.advertising_capabilities}") except Exception: logger.warning("Unable to get supported capabilities of advertising manager.") # Try to get the params we can utilize try: self.supported_includes = await self.advertising_manager.get_supported_includes() # type: ignore logger.info(f"Hardware supports {self.supported_includes} which can be controlled in software") except Exception: logger.warning("Unable to get the supported includes from the adapter interface.") logger.info(f"BluezApplication started with adapter: {self.adapter_path}") self.match_rules.add( "type=signal,interface=org.freedesktop.DBus.Properties,member=PropertiesChanged," f"path_namespace={self.adapter_path}" ) self.match_rules.add( "type=signal,interface=org.freedesktop.DBus.ObjectManager,member=InterfacesRemoved," f"arg0path={self.adapter_path}/" ) self.match_rules.add( "type=signal,interface=org.freedesktop.DBus.ObjectManager,member=InterfacesAdded," f"arg0path={self.adapter_path}/" ) # Clean up any previous setup await self.reset_bus() for rule in self.match_rules: await add_match(self.bus, match_rule=rule) self.dbus_lock = asyncio.Lock() logger.debug("Setup complete!")
[docs] async def add_advertisement(self, advertisement: "ABleAdvertisement") -> None: """Adds an le_advertisement object to the application""" # This needs to be under a lock on the case that multiple tasks are changing resources # (ex. A task is switching ibeacons on the side) async with self.dbus_lock: # Fetch the bluez advert from the wrapper le_advertisement = advertisement.le_advertisement # Input validation if not isinstance(le_advertisement, BluezLEAdvertisement): raise TypeError( f"Advertisement error is the wrong type for this platform (BlueZ): {type(le_advertisement)}" ) if advertisement in self.registered_advertisements: raise ValueError("Attempting to add le_advertisement already added!") if len(self.exported_advertisements) >= self.max_advertisements: raise RuntimeError("Unable to add any more advertisements!") if le_advertisement.path is None: index = self.current_advertisement_index self.current_advertisement_index += 1 le_advertisement.path = get_advertisement_object_path(self.name, index) advertisement_path = le_advertisement.path # Export the object onto the system bus if advertisement not in self.exported_advertisements: self.bus.export(advertisement_path, le_advertisement) self.exported_advertisements.add(advertisement) # Add it to bluez try: # Add a timeout so that we don't hold the lock potentially indefinitely await asyncio.wait_for( self.advertising_manager.call_register_advertisement(advertisement_path, {}), 20 # type: ignore ) self.registered_advertisements.add(advertisement) except asyncio.TimeoutError: self.bus.unexport(advertisement_path, le_advertisement) self.exported_advertisements.remove(advertisement) raise except DBusError as e: self.bus.unexport(advertisement_path, le_advertisement) self.exported_advertisements.remove(advertisement) raise RuntimeError("Attempted to add an invalid le_advertisement") from e # Add this application to the le_advertisement le_advertisement.application = self logger.debug(f"Advertisement added! ({advertisement_path})")
[docs] async def remove_advertisement(self, advertisement: "ABleAdvertisement") -> None: """Removes an le_advertisement object from the application""" # See note in add advertisement for why this lock exists async with self.dbus_lock: # Input validation if not isinstance(advertisement.le_advertisement, BluezLEAdvertisement): raise TypeError( f"Advertisement error is the wrong type for this platform: {type(advertisement.le_advertisement)}" ) if advertisement not in self.exported_advertisements: raise ValueError("Attempting remove le_advertisement not being advertised!") advertisement_path = advertisement.le_advertisement.path try: if advertisement in self.registered_advertisements: await self.advertising_manager.call_unregister_advertisement(advertisement_path) # type: ignore self.registered_advertisements.remove(advertisement) except DBusError as e: if e.text == "Does Not Exist": logger.warning("Tried to unregister advertisement not registered with bluez") else: raise # Unexport the object from the system bus self.bus.unexport(advertisement_path, advertisement.le_advertisement) self.exported_advertisements.remove(advertisement) logger.debug(f"Advertisement removed ({advertisement_path})")
[docs] def add_service(self, service: "ABleService") -> None: """ Adds a bluez service to the application, this must be done before running start_application() :raises TypeError: if service is not of type BluezService :raises RuntimeError: if the service has already been added to the application :param service: the service to add to the bluez application :return: None """ if not isinstance(service, BluezService): raise TypeError( f"The service that is trying to be added is the wrong type for this platform (BlueZ): {type(service)}" ) # Double check it has not already been added if service in self.services: raise RuntimeError(f"Already added {service} to the application.") # Determine the path of the service and add it to the services list # pylint: disable=protected-access service._path = get_service_path(self.name, len(self.services) + 1) self.services.append(service) # Export this service as an object on the dbus self.bus.export(service.path, service)
[docs] def add_characteristic( self, service: "ABleService", characteristic: "ABleCharacteristic", is_comms_char: bool = False, ) -> None: """ Adds a characteristic to the application, this must done before running start_application() TODO(Bernie): What kind of validation should we do here? :param is_comms_char: if `True` the characteristic will become the comms characteristic, defaults to `False` :param service: the service which the characteristic will be added under :param characteristic: the characteristic you wish to add to the service :return: None """ if not isinstance(service, BluezService): raise TypeError("The service must be the of type BluezService to add to a Bluez application") if not isinstance(characteristic, BluezCharacteristic): raise TypeError("The characteristic must be of the type BluezCharacteristic to add to a Bluez application") # Make sure there wasn't already a characteristic with that UUID if characteristic.uuid in service.characteristics.keys(): raise ValueError("Adding a characteristic with a UUID that matches another characteristic in the service!") # Calculate the path of the characteristic characteristic._path = get_characteristic_path( # pylint: disable=protected-access service.path, len(service.characteristics) + 1 ) # Add the characteristic to the service service.characteristics[characteristic.uuid] = characteristic # Link the characteristic to the service characteristic.service = service characteristic.application = self if is_comms_char: # Now handle marking updating by removing the old one and adding the new one if self.marked_characteristic is not None: logger.debug(f"{self.marked_characteristic} is no longer marked for comms.") self.marked_characteristic.is_marked = False characteristic.is_marked = True self.marked_characteristic = characteristic logger.debug(f"{self.marked_characteristic} is now marked for comms") # Export it on the bus self.bus.export(characteristic.path, characteristic)
[docs] def add_connect_callback(self, callback: Callable) -> None: """ Adds a user-defined callback function to the application to be called after connecting to a central device. :param callback: the function to be called after a successful connection :return: None """ self.user_connect_callback = callback
[docs] def add_disconnect_callback(self, callback: Callable) -> None: """ Adds a user-defined callback function to the application to be called after disconnecting from a central device. :param callback: the function to be called after a successful disconnect :return: None """ self.user_disconnect_callback = callback
[docs] async def start_interface(self): """ Starts the GATT manager interface by registering it with the bluez service manager. Note that any services and their respective characteristics must be added before this is called. :return: None """ # Check to see that at least one service was added, bluez will not allow willy nilly service-less advertising # https://github.com/luetzel/bluez/blob/aae6a2c4ce9963db110535647aa723b96561f6ac/src/gatt-database.c#L2452 if len(self.services) == 0: raise RuntimeError("You cannot start the interface on bluez without adding at least one service") if self.gatt_manager_interface is None: self.gatt_manager_interface: ProxyInterface = await self.get_proxy_interface( BluezServiceInterfaces.BLUEZ_ORG, self.adapter_path, BluezServiceInterfaces.GATT_MANAGER_INTERFACE, ) # Determine if the bluez installed supported directed notifications self.supports_directed_notifications = _check_support_for_directed_notifications( self.gatt_manager_interface ) # Try to unexport anything on the message bus self.bus.unexport(self.path) self.bus.export(self.path, self) await self.gatt_manager_interface.call_register_application(self.path, {}) logger.debug("Registered application with the bluez service manager.")
[docs] async def refresh_advertisement(self, advertisement: "ABleAdvertisement"): """Hack to try to update params by stopping and restarting the interface""" if not isinstance(self.dbus_lock, asyncio.Lock): raise RuntimeError("Unable to refresh advertisement, has the setup method been ran?") if not self.always_advertising_supported and len(self.connected_centrals) > 0: # We don't support updating advertisements with a client connected, do nothing return async with self.dbus_lock: if advertisement not in self.exported_advertisements: raise RuntimeError("Tried to refresh unexported advertisement") advertisement_path = advertisement.le_advertisement.path try: if advertisement in self.registered_advertisements: await self.advertising_manager.call_unregister_advertisement(advertisement_path) # type: ignore self.registered_advertisements.remove(advertisement) except DBusError as e: if e.text == "Does Not Exist": logger.warning("Tried to unregister nonexistent advertisement when refreshing advertisement") else: raise # Add a timeout so that we don't hold the lock potentially indefinitely await asyncio.wait_for( self.advertising_manager.call_register_advertisement(advertisement_path, {}), 20 # type: ignore ) self.registered_advertisements.add(advertisement)
def _dbus_message_parser(self, message: Message) -> None: """ Handler for all dbus messages that we have match rules for. We have two main routes and check them through either properties changed or interfaces added. TODO(Ori): Change the mac address parsing to use the parse_identifier_from_path function in utils.py :param message: The dbus message we caught. :return: None """ try: if message.member == "PropertiesChanged": # Handle validation, we are expecting a list of length 3 if not isinstance(message.body, list) or len(message.body) != 3: return # The first element in the 3 long list needs to be org.bluez.Device1 if message.body[0] != "org.bluez.Device1": return # Make sure the message has a path if message.path is None: return data: dict = message.body[1] object_path: str = message.path device_mac = object_path[object_path.find("_") + 1 : object_path.rfind("_") + 3] # Make sure that this is the connected properties changed signal if "Connected" not in data: return if data["Connected"].value: logger.info("Calling connected callback off of props changed...") self.connect_callback(DeviceCallbackData(devices_identifier=device_mac)) else: logger.info("Calling disconnect callback off of props changed...") self.disconnect_callback(DeviceCallbackData(devices_identifier=device_mac)) elif message.member == "InterfacesAdded": # Handle validation, we expect a list of length 2 if not isinstance(message.body, list) or len(message.body) != 2: return # Make sure that a device path is the first element in the path if self.adapter_path not in message.body[0]: return # Make sure this is the right interfaces added if "org.bluez.Device1" not in message.body[1]: return # Make sure that this is a device with an address! if "Address" not in message.body[1]["org.bluez.Device1"]: return # Make sure that it is connected and not a signal from a disconnected but cached device if ( "Connected" in message.body[1]["org.bluez.Device1"] and not message.body[1]["org.bluez.Device1"]["Connected"].value ): return # Parse out the path of the central central_mac = message.body[1]["org.bluez.Device1"]["Address"].value # Call the connect callback logger.info("Calling connected callback off of interfaces added signal...") self.connect_callback(DeviceCallbackData(devices_identifier=central_mac)) except Exception: logger.exception("Unhandled exception when parsing dbus-next signal message...")
[docs] def connect_callback(self, callback_data: DeviceCallbackData) -> None: """ Handler for the new connections being signalled from bluez. Verifies that the central was not already connected and is being stored in the connected_centrals dictionary (this would mean we lost state). Adds an entry to the new connection queue so that a peripheral server can accept the new connection. Adds a new ABleCentral to the connected centrals dictionary. Calls an optional user defined callback last. :param callback_data: data including the identifier required to create ABleCentral :return: None """ from able.central import ABleCentral # pylint: disable=import-outside-toplevel logger.info(f"New connection from {callback_data}") if callback_data.identifier in self.connected_centrals: raise RuntimeWarning( f"Received a double connection from {callback_data.identifier}, we missed a disconnect " "callback" ) # Add to the NCQ # Adding an MTU value of 512 for now, this will be updated when the central will write to the characteristic new_central: "ABleCentral" = ABleCentral( application=self, identifier=callback_data.identifier, mtu=512, adapter_path=self.adapter_path, ) self.new_connection_queue.add(new_central) self.connected_centrals[callback_data.identifier] = new_central # Add a receive queue for each characteristic for service in self.services: for char in service.characteristics.values(): new_central.char_queues[hash(char)] = asyncio.Queue() logger.debug(f"Queue was created for characteristic {char.uuid} on central {new_central}") # Set the new connection event if it is not already set if not self.has_new_central.is_set(): self.has_new_central.set() logger.debug(f"{self.new_connection_queue} | {self.connected_centrals}") if self.always_advertising_supported: for advert in self.exported_advertisements: asyncio_create_task_wrapper(self.refresh_advertisement(advert)) if self.user_connect_callback: try: self.user_connect_callback(callback_data) except TypeError: logger.exception("User-defined connection callback is not able to be called with the argument given")
[docs] def disconnect_callback(self, callback_data: DeviceCallbackData) -> None: """ Handler for disconnections being signalled by bluez. Updates the disconnected ABleCentral's state to be disconnected. Removes the central from the connected centrals dictionary (if it was there, it will not be present if the connection was established before the application was running) and the new connection queue if it is present. Calls an optional user defined callback last. :param callback_data: data regarding the central which disconnected :return: None """ from able.central import ABleCentral # pylint: disable=import-outside-toplevel logger.info(f"Disconnect from {callback_data}") # Make sure the central was already connected, if not it is okay if callback_data.identifier not in self.connected_centrals: logger.warning("A central disconnected which we did not know was connected") return # Get the central which disconnected the centrals state to disconnected disconnecting_central: ABleCentral = self.connected_centrals[callback_data.identifier] disconnecting_central.disconnected_event.set() # Remove from the connected centrals dict and the NCQ self.new_connection_queue.discard(disconnecting_central) self.connected_centrals.pop(callback_data.identifier) # If we no longer have any connections, unset the event if len(self.new_connection_queue) == 0: logger.debug("There are no more clients in the NCQ, clearing has_new_central event") self.has_new_central.clear() logger.info(f"{self.new_connection_queue} | {self.connected_centrals}") for advert in self.exported_advertisements: asyncio_create_task_wrapper(self.refresh_advertisement(advert)) if self.user_disconnect_callback: try: self.user_disconnect_callback(callback_data) except TypeError: logger.exception("User-defined disconnect callback is not able to be called with the argument given")
[docs] async def disconnect(self, central: "ABleCentral") -> None: """ Call bluez to disconnect a connected central, this will not update the connected state of the central because the disconnect callback will handle that. # TODO(Bernie, Ori): Test the reliability of using remove_device vs disconnect :param central: The central object to force a disconnect. :return: None """ logger.info(f"Disconnecting from central: {central.identifier}") try: async with self.dbus_lock: # await self.adapter_interface.call_remove_device(central.dbus_path) device_interface = await self.get_proxy_interface( BluezServiceInterfaces.BLUEZ_ORG, central.dbus_path, BluezServiceInterfaces.DEVICE_INTERFACE, ) await device_interface.call_disconnect(flags=MessageFlag.NO_REPLY_EXPECTED) # type: ignore logger.info(f"Disconnected from {central}...") except Exception: logger.exception(f"Unable to disconnect from {central.dbus_path}")
[docs] async def reset_bus(self) -> None: """ Reset the bus. This will remove any current handlers and match rules """ logger.info("Resetting bus...") for rule in self.match_rules: await remove_match(self.bus, rule) logger.info("Bus reset...")
@property def communication_characteristic(self) -> BluezCharacteristic: """Property which returns the marked comms characteristic or raises an exception""" if self.marked_characteristic is None: raise RuntimeError("Attempted to fetch comms characteristic without defining one") return self.marked_characteristic @property def bus(self) -> MessageBus: """Property which returns the bus of this application if it is defined""" if self._bus is None: raise RuntimeError( "Message bus of the application was set to None, application cannot operate without the bus" ) return self._bus