forked from sysofwan/ha-triones
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathbeurer.py
326 lines (284 loc) · 12.4 KB
/
beurer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
from typing import Tuple, Callable
from bleak import BleakClient, BleakScanner, BLEDevice, BleakGATTCharacteristic, BleakError
import traceback
import asyncio
from homeassistant.components.light import (COLOR_MODE_RGB, COLOR_MODE_WHITE)
from .const import LOGGER
WRITE_CHARACTERISTIC_UUIDS = ["8b00ace7-eb0b-49b0-bbe9-9aee0a26e1a3"]
READ_CHARACTERISTIC_UUIDS = ["0734594a-a8e7-4b1a-a6b1-cd5243059a57"]
async def discover():
"""Discover Bluetooth LE devices."""
devices = await BleakScanner.discover()
LOGGER.debug("Discovered devices: %s", [{"address": device.address, "name": device.name} for device in devices])
return [device for device in devices if device.name and device.name.lower().startswith("tl100")]
def create_status_callback(future: asyncio.Future):
def callback(sender: int, data: bytearray):
if not future.done():
future.set_result(data)
return callback
async def get_device(mac: str) -> BLEDevice:
devices = await BleakScanner.discover()
LOGGER.debug(f"Discovered devices: {devices}")
return next((device for device in devices if device.address.lower()==mac.lower()),None)
class BeurerInstance:
def __init__(self, device: BLEDevice) -> None:
self._mac = device.address
#device = get_device(self._mac)
if device == None:
LOGGER.error(f"Was not able to find device with mac {self._mac}")
self._device = BleakClient(device, disconnected_callback=self.disconnected_callback)
self._trigger_update = None
self._is_on = False
self._light_on = None
self._color_on = None
self._rgb_color = (0,0,0)
self._brightness = None
self._color_brightness = None
self._effect = None
self._write_uuid = None
self._read_uuid = None
self._mode = None
self._supported_effects = ["Off", "Random", "Rainbow", "Rainbow Slow", "Fusion", "Pulse", "Wave", "Chill", "Action", "Forest", "Summer"]
asyncio.create_task(self.connect())
def disconnected_callback(self, client):
LOGGER.debug("Disconnected callback called!")
self._is_on = False
self._light_on = False
self._color_on = False
self._write_uuid = None
self._read_uuid = None
asyncio.create_task(self.trigger_entity_update())
def set_update_callback(self, trigger_update: Callable):
LOGGER.debug(f"Setting update callback to {trigger_update}")
self._trigger_update = trigger_update
async def _write(self, data: bytearray):
LOGGER.debug("Sending in write: " + ''.join(format(x, ' 03x') for x in data)+f" to characteristic {self._write_uuid}, device is {self._device.is_connected}")
try:
if (not self._device.is_connected) or (self._write_uuid == None):
await self._device.connect(timeout=20)
await self._device.write_gatt_char(self._write_uuid, data)
except (BleakError) as error:
track = traceback.format_exc()
LOGGER.debug(track)
LOGGER.warn(f"Error while trying to write to device: {error}")
self.disconnect()
@property
def mac(self):
return self._mac
@property
def is_on(self):
return self._is_on
@property
def rgb_color(self):
return self._rgb_color
@property
def color_brightness(self):
return self._color_brightness
@property
def white_brightness(self):
return self._brightness
@property
def effect(self):
return self._effect
@property
def color_mode(self):
return self._mode
@property
def supported_effects(self):
return self._supported_effects
def find_effect_position(self, effect) -> int:
try:
return self._supported_effects.index(effect)
except ValueError:
#return Off if not found
return 0
def makeChecksum(self, b: int, bArr: list[int]) -> int:
for b2 in bArr:
b = b ^ b2
return b
async def sendPacket(self, message: list[int]):
#LOGGER.debug(f"Sending packet with length {message.length}: {message}")
if not self._device.is_connected:
await self.connect()
length=len(message)
checksum = self.makeChecksum(length+2,message) #Plus two bytes
packet=[0xFE,0xEF,0x0A,length+7,0xAB,0xAA,length+2]+message+[checksum,0x55,0x0D,0x0A]
print("Sending message"+''.join(format(x, ' 03x') for x in packet))
await self._write(packet)
async def set_color(self, rgb: Tuple[int, int, int]):
r, g, b = rgb
LOGGER.debug(f"Setting to color: %s, %s, %s", r, g, b)
self._mode = COLOR_MODE_RGB
self._rgb_color = (r,g,b)
if not self._color_on:
await self.turn_on()
#Send color
await self.sendPacket([0x32,r,g,b])
await asyncio.sleep(0.1)
await self.triggerStatus()
async def set_color_brightness(self, brightness: int):
LOGGER.debug(f"Setting to brightness {brightness}")
self._mode = COLOR_MODE_RGB
if not self._color_on:
await self.turn_on()
#Send brightness
await self.sendPacket([0x31,0x02,int(brightness/255*100)])
await asyncio.sleep(0.1)
await self.triggerStatus()
async def set_white(self, intensity: int):
LOGGER.debug(f"Setting white to intensity: %s", intensity)
#self._brightness = intensity
self._mode = COLOR_MODE_WHITE
if not self._light_on:
await self.turn_on()
await self.sendPacket([0x31,0x01,int(intensity/255*100)])
await asyncio.sleep(0.2)
self.set_effect("Off")
await self.triggerStatus()
async def set_effect(self, effect: str):
LOGGER.debug(f"Setting effect {effect}")
self._mode = COLOR_MODE_RGB
if not self._color_on:
await self.turn_on()
await self.sendPacket([0x34,self.find_effect_position(effect)])
await self.triggerStatus()
async def turn_on(self):
LOGGER.debug("Turning on")
if not self._device.is_connected:
await self.connect()
#WHITE mode
if self._mode == COLOR_MODE_WHITE:
await self.sendPacket([0x37,0x01])
#COLOR mode
else:
await self.sendPacket([0x37,0x02])
LOGGER.debug(f"Current color state: {self._color_on}, {self._rgb_color}, {self._color_brightness}, {self._effect}")
#Lamp wants to turn on on rainbow mode when enabling mood light, so send last status
if not self._color_on:
LOGGER.debug(f"Restoring last known color state")
self._color_on = True
await asyncio.sleep(0.2)
await self.set_effect(self._effect)
await asyncio.sleep(0.2)
await self.set_color(self._rgb_color)
await self.set_color_brightness(self._color_brightness)
await asyncio.sleep(0.2)
await self.triggerStatus()
async def turn_off(self):
LOGGER.debug("Turning off")
#turn off white
await self.sendPacket([0x35,0x01])
#turn off color
await self.sendPacket([0x35,0x02])
await asyncio.sleep(0.1)
await self.triggerStatus()
async def triggerStatus(self):
#Trigger notification with current values
await self.sendPacket([0x30,0x01])
await asyncio.sleep(0.2)
await self.sendPacket([0x30,0x02])
LOGGER.info(f"Triggered update")
async def trigger_entity_update(self):
if self._trigger_update:
LOGGER.debug(f"Triggering async update")
self._trigger_update()
else:
LOGGER.warn(f"No async update function provided: {self._trigger_update}")
#We receive status version 1 then version 2.
# So changes to the light status shall only be done in version 2 handler
async def notification_handler(self, characteristic: BleakGATTCharacteristic, res: bytearray):
"""Simple notification handler which prints the data received."""
#LOGGER.info("Received notification %s: %r", characteristic.description, res)
LOGGER.debug("Received notification: " + ''.join(format(x, ' 03x') for x in res))
if len(res) < 9:
return
reply_version = res[8]
LOGGER.debug(f"Reply version is {reply_version}")
#Short version with only _brightness
if reply_version == 1:
self._light_on = True if res[9] == 1 else False
if res[9] == 1:
self._brightness = int(res[10]*255/100) if res[10] > 0 else None
self._mode = COLOR_MODE_WHITE
#self._is_on = self._light_on or self._color_on
LOGGER.debug(f"Short version, on: {self._is_on}, brightness: {self._brightness}")
#Long version with color information
elif reply_version == 2:
self._color_on = True if res[9] == 1 else False
if res[9] == 1:
self._mode = COLOR_MODE_RGB
#effect will be turned off if light off, update only if light on
self._effect = self._supported_effects[res[16]]
self._color_brightness = int(res[10]*255/100) if res[10] > 0 else None
self._rgb_color = (res[13], res[14], res[15])
self._is_on = self._light_on or self._color_on
LOGGER.debug(f"Long version, on: {self._is_on}, brightness: {self._color_brightness}, rgb color: {self._rgb_color}, effect: {self._effect}")
LOGGER.debug(f"res: {res[9]}, light_on {self._light_on}, color_on {self._color_on}")
await self.trigger_entity_update()
#Device turned off
elif reply_version == 255:
self._is_on = False
self._light_on = False
self._color_on = False
LOGGER.debug(f"Device off")
await self.trigger_entity_update()
#Device is going to shutdown
elif reply_version == 0:
LOGGER.debug(f"Device is going to shut down")
await self.disconnect()
return
else:
LOGGER.debug(f"Received unknown notification")
return
async def connect(self) -> bool:
LOGGER.debug(f"Going to connect to device")
try:
if not self._device.is_connected:
await self._device.connect(timeout=20)
await asyncio.sleep(0.1)
for char in self._device.services.characteristics.values():
if char.uuid in WRITE_CHARACTERISTIC_UUIDS:
self._write_uuid = char.uuid
if char.uuid in READ_CHARACTERISTIC_UUIDS:
self._read_uuid = char.uuid
if not self._read_uuid or not self._write_uuid:
LOGGER.error("No supported read/write UUIDs found")
return False
LOGGER.info(f"Read UUID: {self._read_uuid}, Write UUID: {self._write_uuid}")
await asyncio.sleep(0.1)
LOGGER.info(f"Starting notifications")
await self._device.start_notify(self._read_uuid, self.notification_handler)
await self.triggerStatus()
await asyncio.sleep(0.1)
except (Exception) as error:
track = traceback.format_exc()
LOGGER.debug(track)
LOGGER.error(f"Error connecting: {error}")
self.disconnect()
return False
await asyncio.sleep(0.1)
return True
async def update(self):
try:
if not self._device.is_connected:
if not await self.connect():
LOGGER.info("Was not able to connect to device for updates")
await self.disconnect()
return
await self._device.start_notify(self._read_uuid, self.notification_handler)
LOGGER.info(f"Triggering update")
await self.triggerStatus()
#await asyncio.sleep(0.1)
except (Exception) as error:
track = traceback.format_exc()
LOGGER.debug(track)
LOGGER.error(f"Error getting status: {error}")
self.disconnect()
async def disconnect(self):
LOGGER.debug("Disconnecting")
if self._device.is_connected:
await self._device.disconnect()
self._is_on = False
self._light_on = False
self._color_on = False
await self.trigger_entity_update()