Source code for able.peripheral

#    Copyright (c)  2021  Allthenticate
"""
This module contains the implementation of ABlePeripheralServer (A BLE Peripheral Server) which serves as a wrapper
around the platform specific implementations of BLE applications with services, characteristics and advertisements
within this library and acts as an abstraction layer following the Berkeley Socket API for handling BLE comms.
"""
import asyncio
import logging

# Native modules
import sys
from typing import Callable, List, Optional, Union

# Able modules
from able import ABleApplication
from able.advertisement import ABleAdvertisement
from able.central import ABleCentral
from able.characteristic import ABleCharacteristic
from able.service import ABleService

# Setup logging
logger = logging.getLogger(name=__name__)


[docs]class ABlePeripheralServer: """ A platform agnostic Ble Peripheral Server Class. This class serves as the high level client for the ABle library and serves as the main interface into the low level platform specific-backends. Class attribute `_app` will be the `AbleApplication` for the platform this server is running on (eg. BluezDbus App). Note that this class doesn't track the advertisements, services, nor characteristics that the application tracks under the hood. :param name: The name you wish to bind to this application on the DBus, defaults to "ABle" :param auto_recover: Set to `True` if you want the underlying application to attempt to recover on driver or other incidents :param auto_configure: Set to `True` if you want ABle to do a best effort attempt to configure permissions on your system to allow ABle to work, defaults to `False` """ def __init__( self, name: str = "ABle", *, auto_recover: bool = True, adapter_path: Optional[str] = None, auto_configure: bool = False, loop=asyncio.get_event_loop(), ): """Constructor method for the ABlePeripheralServer""" self.name = name # Create an application for your platform with the name provided self._app = ABleApplication( name=self.name, adapter_path=adapter_path, auto_recover=auto_recover, auto_configure=auto_configure, ) # Track an event loop self.loop = loop async def setup_peripheral_server(self) -> None: """ Handle all the setup that the underlying application needs to do within the running event loop. This needs to be a separate asynchronous function from the initializer for the class because the `setup()` function for the app may be async as well. TODO(Bernie): Is there a scenario where we have the `__init__()` call this directly? :return: None """ # Do any setup the underlying application needs to do await self._app.setup() async def add_advertisement(self, advertisement: ABleAdvertisement) -> None: """ Add an advertisement to the peripheral server. Note that the underlying application manages checking whether or not you are over the advertisement limit, if the advertisement has already been added and more on a platform specific basis. TODO(Ori): As we develop the OSX backend there may be duplicated code in the `_app.add_advertisement()` implementation that we should extract to here? :param advertisement: An instance of `ABleAdvertisement` to add to the application this server is running. :return: None """ # Add the advertisement to the app logger.debug(f"Adding advertisement {advertisement} to the peripheral server.") await self._app.add_advertisement(advertisement=advertisement) async def remove_advertisement(self, advertisement: ABleAdvertisement) -> None: """ Removes an advertisement from the underlying application, note that the application will handle removal with a bunch of validation. TODO(Bernie): Ensure that this works at runtime, we should be able to have the APS remove an advert and refresh the application to make it vanish. :param advertisement: The advertisement you want to have removed from the server. :return: None """ logger.debug(f"Removing advertisement {advertisement} from the peripheral server.") await self._app.remove_advertisement(advertisement=advertisement) # type: ignore def add_service(self, service: ABleService) -> None: """ Adds a BLEService to the peripheral server using the underlying application. :param service: the service to add to the application :return: None """ logger.debug(f"Adding {service} to the peripheral server.") self._app.add_service(service) # type: ignore def add_characteristic( self, service: ABleService, characteristic: Union[List[ABleCharacteristic], ABleCharacteristic], is_comms_char: bool = False, ) -> None: """ Adds a BLECharacteristic to the peripheral server using the underlying application. :param is_comms_char: `True` will set the characteristic as the communications characteristic, defaults to `False` :param service: the service to add the characteristic too :param characteristic: the characteristic to add, this can also be a list of ABleCharacteristics and each will be added for batch adding. :return: None """ if isinstance(characteristic, ABleCharacteristic): characteristic = [characteristic] for chrc in characteristic: logger.debug(f"Adding {chrc} to {service}...") # If there are no marked characteristics, set this one as the comms if self._app.marked_characteristic is None: self._app.add_characteristic(service=service, characteristic=chrc, is_comms_char=True) # type: ignore else: self._app.add_characteristic( service=service, characteristic=chrc, is_comms_char=is_comms_char # type: ignore ) def add_connect_callback(self, callback: Callable) -> None: """ Adds a user-defined callback function to the application to be called after connecting to a central device. :param callback: the function to be called after a succesfull connection :return: None """ if not callable(callback): raise TypeError( f"The function {callback} that is trying to be added as a connection callback is not callable" ) logger.debug(f"Adding user-defined callback {callback} to be called after connection") self._app.add_connect_callback(callback) def add_disconnect_callback(self, callback: Callable) -> None: """ Adds a user-defined callback function to the application to be called after disconnecting from a central device. :param callback: the function to be called after a succesfull disconnect :return: None """ if not callable(callback): raise TypeError( f"The function {callback} that is trying to be added as a disconnect callback is not callable" ) logger.debug(f"Adding user-defined callback {callback} to be called after disconnect") self._app.add_disconnect_callback(callback) async def listen(self, remove_connected: bool = False) -> None: """ Listen is the equivalent its Berkeley Socket API counterpart, it will handle any binding that is required and sets the application to be in a "listening state". :param remove_connected: if `True` remove all currently connected centrals, (default is `False`) :return: None """ if remove_connected: raise NotImplementedError("This feature is not yet implemented") await self._app.start_interface() async def accept(self) -> ABleCentral: """ Blocking call which polls the new connection queue (NCQ) of the underlying application to see if there are any new connections. Will only return a ABleCentral once it is present on the NCQ and the app's connected central dict. :return: A newly connected ABleCentral object ready to be interacted with """ # Wait until the app signals it has a new connection via an event, re checking that there is a client in the # queue before proceeding while len(self._app.new_connection_queue) <= 0: await self._app.has_new_central.wait() # Pop the new central off of the 'queue' new_central = self._app.new_connection_queue.pop() logger.debug(new_central) # If there are no more centrals in the queue, clear the event if len(self._app.new_connection_queue) == 0: self._app.has_new_central.clear() # Return the new central return new_central async def broadcast(self, data: Union[bytes, str], characteristic: ABleCharacteristic = None) -> None: """ This function will notify all connected centrals on the provided characteristic of new data, for example notifying all centrals of the battery level, a state of this peripheral, etc. # TODO(Bernie): How should we handle this with possibly different MTU's for different centrals :param characteristic: the characteristic to notify all connected centrals on, defaults to the comms characteristic :param data: the data to notify the centrals on :return: None """ # If the characteristic is none, default to the communication characteristic if characteristic is None: characteristic = self._app.communication_characteristic # If is a string is passed in, encode it if isinstance(data, str): data = data.encode("utf-8") logger.debug(f"Broadcasting {data!r} on {characteristic}.") await characteristic.broadcast(data) # type: ignore async def close(self) -> None: """ Equivalent to the socket API's close, this will cleanup all existing connections that the app is managing and trigger all cleanup within the app (eg. removing match rules). :return: None """ if sys.platform == "darwin": self._app.stop() elif sys.platform == "linux": logger.info("Closing ABle peripheral server") for connected_central in list(self._app.connected_centrals.values()): await connected_central.close() logger.info("Disconnected remaining devices, cleaning up application") await self._app.cleanup() elif sys.platform == "win32": self._app.stop()