Skip to content

Commit

Permalink
Merge pull request #36 from jlusiardi/issue_35_read_descriptors
Browse files Browse the repository at this point in the history
#35  Proposes an extension to access GATT descriptors
  • Loading branch information
kelada authored Apr 1, 2019
2 parents d215ede + c1e422c commit e1b147d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
45 changes: 45 additions & 0 deletions examples/read_descriptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import gatt

from argparse import ArgumentParser


class AnyDevice(gatt.Device):
def connect_succeeded(self):
super().connect_succeeded()
print("[%s] Connected" % (self.mac_address))

def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))

def disconnect_succeeded(self):
super().disconnect_succeeded()
print("[%s] Disconnected" % (self.mac_address))

def services_resolved(self):
super().services_resolved()

print("[%s] Resolved services" % (self.mac_address))
for service in self.services:
print("[%s]\tService [%s]" % (self.mac_address, service.uuid))
for characteristic in service.characteristics:
print("[%s]\t\tCharacteristic [%s]" % (self.mac_address, characteristic.uuid))
for descriptor in characteristic.descriptors:
print("[%s]\t\t\tDescriptor [%s] (%s)" % (self.mac_address, descriptor.uuid, descriptor.read_value()))

def descriptor_read_value_failed(self, descriptor, error):
print('descriptor_value_failed')


arg_parser = ArgumentParser(description="GATT Connect Demo")
arg_parser.add_argument('mac_address', help="MAC address of device to connect")
args = arg_parser.parse_args()

print("Connecting...")

manager = gatt.DeviceManager(adapter_name='hci0')

device = AnyDevice(manager=manager, mac_address=args.mac_address)
device.connect()

manager.run()
43 changes: 43 additions & 0 deletions gatt/gatt_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ def characteristic_enable_notifications_failed(self, characteristic, error):
# To be implemented by subclass
pass

def descriptor_read_value_failed(self, descriptor, error):
"""
Called when a descriptor read command failed.
"""
# To be implemented by subclass
pass


class Service:
"""
Expand Down Expand Up @@ -505,6 +512,35 @@ def characteristics_resolved(self):
self._connect_characteristic_signals()


class Descriptor:
"""
Represents a GATT Descriptor which can contain metadata or configuration of its characteristic.
"""

def __init__(self, characteristic, path, uuid):
self.characteristic = characteristic
self.uuid = uuid
self._bus = characteristic._bus
self._path = path
self._object = self._bus.get_object('org.bluez', self._path)

def read_value(self, offset=0):
"""
Reads the value of this descriptor.
When successful, the value will be returned, otherwise `descriptor_read_value_failed()` of the related
device is invoked.
"""
try:
val = self._object.ReadValue(
{'offset': dbus.UInt16(offset, variant_level=1)},
dbus_interface='org.bluez.GattDescriptor1')
return val
except dbus.exceptions.DBusException as e:
error = _error_from_dbus_error(e)
self.service.device.descriptor_read_value_failed(self, error=error)


class Characteristic:
"""
Represents a GATT characteristic.
Expand All @@ -521,6 +557,13 @@ def __init__(self, service, path, uuid):
self._properties = dbus.Interface(self._object, "org.freedesktop.DBus.Properties")
self._properties_signal = None

descriptor_regex = re.compile(self._path + '/desc[0-9abcdef]{4}$')
self.descriptors = [
Descriptor(self, desc[0], desc[1]['org.bluez.GattDescriptor1']['UUID'])
for desc in self._object_manager.GetManagedObjects().items()
if descriptor_regex.match(desc[0])
]

def _connect_signals(self):
if self._properties_signal is None:
self._properties_signal = self._properties.connect_to_signal('PropertiesChanged', self.properties_changed)
Expand Down

0 comments on commit e1b147d

Please sign in to comment.