diff --git a/broadlinkmanager/VERSION b/broadlinkmanager/VERSION index ef8d756..28cbf7c 100644 --- a/broadlinkmanager/VERSION +++ b/broadlinkmanager/VERSION @@ -1 +1 @@ -4.2.0 \ No newline at end of file +5.0.0 \ No newline at end of file diff --git a/broadlinkmanager/broadlink/__init__.py b/broadlinkmanager/broadlink/__init__.py index e4e19f6..fc64e84 100644 --- a/broadlinkmanager/broadlink/__init__.py +++ b/broadlinkmanager/broadlink/__init__.py @@ -1,1112 +1,247 @@ -#!/usr/bin/python - -import codecs -import json -import random +#!/usr/bin/env python3 +"""The python-broadlink library.""" import socket -import struct -import threading -import time -from datetime import datetime - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - -from .exceptions import check_error, exception -from .helpers import get_local_ip - - -def get_devices(): - return { - 0x0000: (sp1, "SP1", "Broadlink"), - - 0x2711: (sp2, "SP2", "Broadlink"), - 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271a: (sp2, "SP2-compatible", "Honeywell"), - 0x2720: (sp2, "SP mini", "Broadlink"), - 0x2728: (sp2, "SP2-compatible", "URANT"), - 0x2733: (sp2, "SP3", "Broadlink"), - 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273e: (sp2, "SP mini", "Broadlink"), - 0x7530: (sp2, "SP2", "Broadlink (OEM)"), - 0x753e: (sp2, "SP mini 3", "Broadlink"), - 0X7544: (sp2, "SP2-CL", "Broadlink"), - 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), - 0x7547: (sp2, "SC1", "Broadlink"), - 0x7918: (sp2, "SP2", "Broadlink (OEM)"), - 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791a: (sp2, "SP2-compatible", "Honeywell"), - 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), - 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947a: (sp2, "SP3S-EU", "Broadlink"), - - 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272a: (rm, "RM pro", "Broadlink"), - 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273d: (rm, "RM pro", "Broadlink"), - 0x277c: (rm, "RM home", "Broadlink"), - 0x2783: (rm, "RM home", "Broadlink"), - 0x2787: (rm, "RM pro", "Broadlink"), - 0x278b: (rm, "RM plus", "Broadlink"), - 0x278f: (rm, "RM mini", "Broadlink"), - 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279d: (rm, "RM pro+", "Broadlink"), - 0x27a1: (rm, "RM plus", "Broadlink"), - 0x27a6: (rm, "RM plus", "Broadlink"), - 0x27a9: (rm, "RM pro+", "Broadlink"), - 0x27c2: (rm, "RM mini 3", "Broadlink"), - 0x27d1: (rm, "RM mini 3", "Broadlink"), - 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x27c3: (rm, "RM pro+", "Broadlink"), - 0x27c7: (rm, "RM mini 3", "Broadlink"), - 0x27cc: (rm, "RM mini 3", "Broadlink"), - 0x27cd: (rm, "RM mini 3", "Broadlink"), - 0x27D0: (rm, "RM mini 3", "Broadlink"), - 0x27d3: (rm, "RM mini 3", "Broadlink"), - 0x27dc: (rm, "RM mini 3", "Broadlink"), - 0x6507: (rm, "RM mini 3", "Broadlink"), - 0x6508: (rm, "RM mini 3", "Broadlink"), - - 0x6539: (rm4, "RM4C mini", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), - 0x5f36: (rm4, "RM mini 3", "Broadlink"), - 0x6026: (rm4, "RM4 pro", "Broadlink"), - 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610e: (rm4, "RM4 mini", "Broadlink"), - 0x610f: (rm4, "RM4C mini", "Broadlink"), - 0x61a2: (rm4, "RM4 pro", "Broadlink"), - 0x62bc: (rm4, "RM4 mini", "Broadlink"), - 0x62be: (rm4, "RM4C mini", "Broadlink"), - 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x2714: (a1, "e-Sensor", "Broadlink"), - - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), - - 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504e: (lb1, "LB1", "Broadlink"), - 0x60c7: (lb1, "LB1", "Broadlink"), - 0x60c8: (lb1, "LB1", "Broadlink"), - 0x6112: (lb1, "LB1", "Broadlink"), - - 0x2722: (S1C, "S2KIT", "Broadlink"), - - 0x4ead: (hysen, "HY02B05H", "Hysen"), - - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), - 0x653c: (rm4, "RM4 pro", "Broadlink"), - 0x649b: (rm4, "RM4 pro", "Broadlink"), - 0x653a: (rm4, "RM4 mini", "Broadlink"), - 0x6184: (rm4, "RM4C mini", "Broadlink"), - 0x5209: (rm4, "RM4 TV Mate", "Broadlink") - } +import typing as t + +from . import exceptions as e +from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT +from .alarm import S1C +from .climate import hysen +from .cover import dooya +from .device import Device, ping, scan +from .light import lb1, lb2 +from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro +from .sensor import a1 +from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b + +SUPPORTED_TYPES = { + sp1: { + 0x0000: ("SP1", "Broadlink"), + }, + sp2: { + 0x2717: ("NEO", "Ankuoo"), + 0x2719: ("SP2-compatible", "Honeywell"), + 0x271A: ("SP2-compatible", "Honeywell"), + 0x2720: ("SP mini", "Broadlink"), + 0x2728: ("SP2-compatible", "URANT"), + 0x273E: ("SP mini", "Broadlink"), + 0x7530: ("SP2", "Broadlink (OEM)"), + 0x7539: ("SP2-IL", "Broadlink (OEM)"), + 0x753E: ("SP mini 3", "Broadlink"), + 0x7540: ("MP2", "Broadlink"), + 0x7544: ("SP2-CL", "Broadlink"), + 0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"), + 0x7547: ("SC1", "Broadlink"), + 0x7918: ("SP2", "Broadlink (OEM)"), + 0x7919: ("SP2-compatible", "Honeywell"), + 0x791A: ("SP2-compatible", "Honeywell"), + 0x7D0D: ("SP mini 3", "Broadlink (OEM)"), + }, + sp2s: { + 0x2711: ("SP2", "Broadlink"), + 0x2716: ("NEO PRO", "Ankuoo"), + 0x271D: ("Ego", "Efergy"), + 0x2736: ("SP mini+", "Broadlink"), + }, + sp3: { + 0x2733: ("SP3", "Broadlink"), + 0x7D00: ("SP3-EU", "Broadlink (OEM)"), + }, + sp3s: { + 0x9479: ("SP3S-US", "Broadlink"), + 0x947A: ("SP3S-EU", "Broadlink"), + }, + sp4: { + 0x7568: ("SP4L-CN", "Broadlink"), + 0x756C: ("SP4M", "Broadlink"), + 0x756F: ("MCB1", "Broadlink"), + 0x7579: ("SP4L-EU", "Broadlink"), + 0x757B: ("SP4L-AU", "Broadlink"), + 0x7583: ("SP mini 3", "Broadlink"), + 0x7587: ("SP4L-UK", "Broadlink"), + 0x7D11: ("SP mini 3", "Broadlink"), + 0xA56A: ("MCB1", "Broadlink"), + 0xA56B: ("SCB1E", "Broadlink"), + 0xA56C: ("SP4L-EU", "Broadlink"), + 0xA589: ("SP4L-UK", "Broadlink"), + 0xA5D3: ("SP4L-EU", "Broadlink"), + }, + sp4b: { + 0x5115: ("SCB1E", "Broadlink"), + 0x51E2: ("AHC/U-01", "BG Electrical"), + 0x6111: ("MCB1", "Broadlink"), + 0x6113: ("SCB1E", "Broadlink"), + 0x618B: ("SP4L-EU", "Broadlink"), + 0x6489: ("SP4L-AU", "Broadlink"), + 0x648B: ("SP4M-US", "Broadlink"), + 0x6494: ("SCB2", "Broadlink"), + }, + rmmini: { + 0x2737: ("RM mini 3", "Broadlink"), + 0x278F: ("RM mini", "Broadlink"), + 0x27C2: ("RM mini 3", "Broadlink"), + 0x27C7: ("RM mini 3", "Broadlink"), + 0x27CC: ("RM mini 3", "Broadlink"), + 0x27CD: ("RM mini 3", "Broadlink"), + 0x27D0: ("RM mini 3", "Broadlink"), + 0x27D1: ("RM mini 3", "Broadlink"), + 0x27D3: ("RM mini 3", "Broadlink"), + 0x27DC: ("RM mini 3", "Broadlink"), + 0x27DE: ("RM mini 3", "Broadlink"), + }, + rmpro: { + 0x2712: ("RM pro/pro+", "Broadlink"), + 0x272A: ("RM pro", "Broadlink"), + 0x273D: ("RM pro", "Broadlink"), + 0x277C: ("RM home", "Broadlink"), + 0x2783: ("RM home", "Broadlink"), + 0x2787: ("RM pro", "Broadlink"), + 0x278B: ("RM plus", "Broadlink"), + 0x2797: ("RM pro+", "Broadlink"), + 0x279D: ("RM pro+", "Broadlink"), + 0x27A1: ("RM plus", "Broadlink"), + 0x27A6: ("RM plus", "Broadlink"), + 0x27A9: ("RM pro+", "Broadlink"), + 0x27C3: ("RM pro+", "Broadlink"), + }, + rmminib: { + 0x5F36: ("RM mini 3", "Broadlink"), + 0x6507: ("RM mini 3", "Broadlink"), + 0x6508: ("RM mini 3", "Broadlink"), + }, + rm4mini: { + 0x51DA: ("RM4 mini", "Broadlink"), + 0x6070: ("RM4C mini", "Broadlink"), + 0x610E: ("RM4 mini", "Broadlink"), + 0x610F: ("RM4C mini", "Broadlink"), + 0x62BC: ("RM4 mini", "Broadlink"), + 0x62BE: ("RM4C mini", "Broadlink"), + 0x6364: ("RM4S", "Broadlink"), + 0x648D: ("RM4 mini", "Broadlink"), + 0x6539: ("RM4C mini", "Broadlink"), + 0x653A: ("RM4 mini", "Broadlink"), + }, + rm4pro: { + 0x6026: ("RM4 pro", "Broadlink"), + 0x6184: ("RM4C pro", "Broadlink"), + 0x61A2: ("RM4 pro", "Broadlink"), + 0x649B: ("RM4 pro", "Broadlink"), + 0x653C: ("RM4 pro", "Broadlink"), + }, + a1: { + 0x2714: ("e-Sensor", "Broadlink"), + }, + mp1: { + 0x4EB5: ("MP1-1K4S", "Broadlink"), + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: ("MP1-1K3S2U", "Broadlink"), + }, + lb1: { + 0x5043: ("SB800TD", "Broadlink (OEM)"), + 0x504E: ("LB1", "Broadlink"), + 0x606E: ("SB500TD", "Broadlink (OEM)"), + 0x60C7: ("LB1", "Broadlink"), + 0x60C8: ("LB1", "Broadlink"), + 0x6112: ("LB1", "Broadlink"), + }, + lb2: { + 0xA4F4: ("LB27 R1", "Broadlink"), + }, + S1C: { + 0x2722: ("S2KIT", "Broadlink"), + }, + hysen: { + 0x4EAD: ("HY02/HY03", "Hysen"), + }, + dooya: { + 0x4E4D: ("DT360E-45/20", "Dooya"), + }, + bg1: { + 0x51E3: ("BG800/BG900", "BG Electrical"), + }, +} -def gendevice(dev_type, host, mac, name=None, is_locked=None): +def gendevice( + dev_type: int, + host: t.Tuple[str, int], + mac: t.Union[bytes, str], + name: str = "", + is_locked: bool = False, +) -> Device: """Generate a device.""" - try: - dev_class, model, manufacturer = get_devices()[dev_type] - - except KeyError: - return device(host, mac, dev_type, name=name, is_locked=is_locked) - - return dev_class( - host, - mac, - dev_type, - name=name, - model=model, - manufacturer=manufacturer, - is_locked=is_locked, - ) - - -def discover( - timeout=None, - local_ip_address=None, - discover_ip_address='255.255.255.255', - discover_ip_port=80 -): - local_ip_address = local_ip_address or get_local_ip() - address = local_ip_address.split('.') - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - cs.bind((local_ip_address, 0)) - port = cs.getsockname()[1] - starttime = time.time() - - devices = [] - - timezone = int(time.timezone / -3600) - packet = bytearray(0x30) - - year = datetime.now().year - - if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff - else: - packet[0x08] = timezone - packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour - subyear = str(year)[2:] - packet[0x10] = int(subyear) - packet[0x11] = datetime.now().isoweekday() - packet[0x12] = datetime.now().day - packet[0x13] = datetime.now().month - packet[0x18] = int(address[0]) - packet[0x19] = int(address[1]) - packet[0x1a] = int(address[2]) - packet[0x1b] = int(address[3]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 - packet[0x26] = 6 - - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - cs.sendto(packet, (discover_ip_address, discover_ip_port)) - if timeout is None: - response = cs.recvfrom(1024) - responsepacket = bytearray(response[0]) - host = response[1] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - mac = responsepacket[0x3f:0x39:-1] - name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8') - is_locked = bool(responsepacket[-1]) - device = gendevice(devtype, host, mac, name=name, is_locked=is_locked) - cs.close() - return device - - while (time.time() - starttime) < timeout: - cs.settimeout(timeout - (time.time() - starttime)) + for dev_cls, products in SUPPORTED_TYPES.items(): try: - response = cs.recvfrom(1024) - except socket.timeout: - cs.close() - return devices - responsepacket = bytearray(response[0]) - host = response[1] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - mac = responsepacket[0x3f:0x39:-1] - name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8') - is_locked = bool(responsepacket[-1]) - device = gendevice(devtype, host, mac, name=name, is_locked=is_locked) - devices.append(device) - cs.close() - return devices - - -class device: - def __init__( - self, - host, - mac, - devtype, - timeout=10, - name=None, - model=None, - manufacturer=None, - is_locked=None - ): - self.host = host - self.mac = mac.encode() if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272a - self.timeout = timeout - self.name = name - self.model = model - self.manufacturer = manufacturer - self.is_locked = is_locked - self.count = random.randrange(0xffff) - self.iv = bytearray( - [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) - self.id = bytearray([0, 0, 0, 0]) - self.type = "Unknown" - self.lock = threading.Lock() - - self.aes = None - key = bytearray( - [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) - self.update_aes(key) - - def update_aes(self, key): - self.aes = Cipher(algorithms.AES(key), modes.CBC(self.iv), - backend=default_backend()) - - def encrypt(self, payload): - encryptor = self.aes.encryptor() - return encryptor.update(payload) + encryptor.finalize() - - def decrypt(self, payload): - decryptor = self.aes.decryptor() - return decryptor.update(payload) + decryptor.finalize() - - def auth(self): - payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') - - response = self.send_packet(0x65, payload) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False - - self.id = payload[0x03::-1] - self.update_aes(key) - - return True - - def get_fwversion(self): - packet = bytearray([0x68]) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[0x4] | payload[0x5] << 8 - - def set_name(self, name): - packet = bytearray(4) - packet += name.encode('utf-8') - packet += bytearray(0x50 - len(packet)) - packet[0x43] = bool(self.is_locked) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - self.name = name - - def set_lock(self, state): - packet = bytearray(4) - packet += self.name.encode('utf-8') - packet += bytearray(0x50 - len(packet)) - packet[0x43] = bool(state) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - self.is_locked = bool(state) - - def get_type(self): - return self.type - - def send_packet(self, command, payload): - self.count = (self.count + 1) & 0xffff - packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa - packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa - packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xff - packet[0x25] = self.devtype >> 8 - packet[0x26] = command - packet[0x28] = self.count & 0xff - packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[5] - packet[0x2b] = self.mac[4] - packet[0x2c] = self.mac[3] - packet[0x2d] = self.mac[2] - packet[0x2e] = self.mac[1] - packet[0x2f] = self.mac[0] - packet[0x30] = self.id[3] - packet[0x31] = self.id[2] - packet[0x32] = self.id[1] - packet[0x33] = self.id[0] - - # pad the payload for AES encryption - if payload: - payload += bytearray((16 - len(payload)) % 16) - - checksum = sum(payload, 0xbeaf) & 0xffff - packet[0x34] = checksum & 0xff - packet[0x35] = checksum >> 8 - - payload = self.encrypt(payload) - for i in range(len(payload)): - packet.append(payload[i]) - - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - start_time = time.time() - with self.lock: - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - while True: - try: - cs.sendto(packet, self.host) - cs.settimeout(1) - resp, _ = cs.recvfrom(2048) - resp = bytearray(resp) - break - except socket.timeout: - if (time.time() - start_time) > self.timeout: - cs.close() - raise exception(-4000) # Network timeout. - cs.close() - - if len(resp) < 0x30: - raise exception(-4007) # Length error. - - checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: - raise exception(-4008) # Checksum error. - - return resp - - -class mp1(device): - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "MP1" - - def set_power_mask(self, sid_mask, state): - """Sets the power state of the smart power strip.""" - - packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xc0 - packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 - - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def set_power(self, sid, state): - """Sets the power state of the smart power strip.""" - sid_mask = 0x01 << (sid - 1) - return self.set_power_mask(sid_mask, state) - - def check_power_raw(self): - """Returns the power state of the smart power strip in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 - packet[0x08] = 0x01 - - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if isinstance(payload[0x4], int): - state = payload[0x0e] - else: - state = ord(payload[0x0e]) - return state - - def check_power(self): - """Returns the power state of the smart power strip.""" - state = self.check_power_raw() - if state is None: - return {'s1': None, 's2': None, 's3': None, 's4': None} - data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) - return data - - -class bg1(device): - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "BG1" - - def get_state(self): - """Get state of device. - Returns: - dict: Dictionary of current state - eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`""" - packet = self._encode(1, b'{}') - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None): - data = {} - if pwr is not None: - data['pwr'] = int(bool(pwr)) - if pwr1 is not None: - data['pwr1'] = int(bool(pwr1)) - if pwr2 is not None: - data['pwr2'] = int(bool(pwr2)) - if maxworktime is not None: - data['maxworktime'] = maxworktime - if maxworktime1 is not None: - data['maxworktime1'] = maxworktime1 - if maxworktime2 is not None: - data['maxworktime2'] = maxworktime2 - if idcbrightness is not None: - data['idcbrightness'] = idcbrightness - js = json.dumps(data).encode('utf8') - packet = self._encode(2, js) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def _encode(self, flag, js): - # packet format is: - # 0x00-0x01 length - # 0x02-0x05 header - # 0x06-0x07 00 - # 0x08 flag (1 for read or 2 write?) - # 0x09 unknown (0xb) - # 0x0a-0x0d length of json - # 0x0e- json data - packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into('> 8 - - return packet - - def _decode(self, response): - payload = self.decrypt(bytes(response[0x38:])) - js_len = struct.unpack_from(' Device: + """Direct device discovery. - def send_data(self, data): - packet = bytearray(self._code_sending_header) - packet += bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def enter_learning(self): - packet = bytearray(self._request_header) - packet.append(0x03) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def sweep_frequency(self): - packet = bytearray(self._request_header) - packet.append(0x19) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def cancel_sweep_frequency(self): - packet = bytearray(self._request_header) - packet.append(0x1e) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - - def check_frequency(self): - packet = bytearray(self._request_header) - packet.append(0x1a) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def find_rf_packet(self): - packet = bytearray(self._request_header) - packet.append(0x1b) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if payload[len(self._request_header) + 4] == 1: - return True - return False - - def _check_sensors(self, command): - packet = bytearray(self._request_header) - packet.append(command) - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return bytearray(payload[len(self._request_header) + 4:]) - - def check_temperature(self): - data = self._check_sensors(0x1) - return data[0x0] + data[0x1] / 10.0 - - def check_sensors(self): - data = self._check_sensors(0x1) - return {'temperature': data[0x0] + data[0x1] / 10.0} - - -class rm4(rm): - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "RM4" - self._request_header = b'\x04\x00' - self._code_sending_header = b'\xda\x00' - - def check_temperature(self): - data = self._check_sensors(0x24) - return data[0x0] + data[0x1] / 100.0 - - def check_humidity(self): - data = self._check_sensors(0x24) - return data[0x2] + data[0x3] / 100.0 - - def check_sensors(self): - data = self._check_sensors(0x24) - return { - 'temperature': data[0x0] + data[0x1] / 100.0, - 'humidity': data[0x2] + data[0x3] / 100.0 - } - - -# For legacy compatibility - don't use this -class rm2(rm): - def __init__(self): - device.__init__(self, None, None, None) - - def discover(self): - dev = discover() - self.host = dev.host - self.mac = dev.mac - - -class hysen(device): - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "Hysen heating controller" - - # Send a request - # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) - # Returns decrypted payload - # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails - # The function prepends length (2 bytes) and appends CRC - - def calculate_crc16(self, input_data): - from ctypes import c_ushort - crc16_tab = [] - crc16_constant = 0xA001 - - for i in range(0, 256): - crc = c_ushort(i).value - for j in range(0, 8): - if (crc & 0x0001): - crc = c_ushort(crc >> 1).value ^ crc16_constant - else: - crc = c_ushort(crc >> 1).value - crc16_tab.append(hex(crc)) - - try: - is_string = isinstance(input_data, str) - is_bytes = isinstance(input_data, bytes) - - if not is_string and not is_bytes: - raise Exception("Please provide a string or a byte sequence " - "as argument for calculation.") - - crcValue = 0xffff - - for c in input_data: - d = ord(c) if is_string else c - tmp = crcValue ^ d - rotated = c_ushort(crcValue >> 8).value - crcValue = rotated ^ int(crc16_tab[(tmp & 0x00ff)], 0) - - return crcValue - except Exception as e: - print("EXCEPTION(calculate): {}".format(e)) - - def send_request(self, input_payload): - - crc = self.calculate_crc16(bytes(input_payload)) - - # first byte is length, +2 for CRC16 - request_payload = bytearray([len(input_payload) + 2, 0x00]) - request_payload.extend(input_payload) - - # append CRC - request_payload.append(crc & 0xFF) - request_payload.append((crc >> 8) & 0xFF) - - # send to device - response = self.send_packet(0x6a, request_payload) - check_error(response[0x22:0x24]) - response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) - - # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) - response_payload_len = response_payload[0] - if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error', 'first byte of response is not length') - crc = self.calculate_crc16(bytes(response_payload[2:response_payload_len])) - if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): - return response_payload[2:response_payload_len] - raise ValueError('hysen_response_error', 'CRC check on response failed') - - # Get current room temperature in degrees celsius - def get_temp(self): - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[0x05] / 2.0 - - # Get current external temperature in degrees celsius - def get_external_temp(self): - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[18] / 2.0 - - # Get full status (including timer schedule) - def get_full_status(self): - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) - data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255) / 2.0 - data['thermostat_temp'] = (payload[6] & 255) / 2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255) / 2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] - - weekday = [] - for i in range(0, 6): - weekday.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekday'] = weekday - weekend = [] - for i in range(6, 8): - weekend.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) - - data['weekend'] = weekend - return data - - # Change controller mode - # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. - # Manual mode will activate last used temperature. - # In typical usage call set_temp to activate manual control and set temp. - # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] - # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule - # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule - # The sensor command is currently experimental - def set_mode(self, auto_mode, loop_mode, sensor=0): - mode_byte = ((loop_mode + 1) << 4) + auto_mode - self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) - - # Advanced settings - # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, - # 2 for internal control temperature, external limit temperature. Factory default: 0. - # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C - # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C - # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C - # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C - # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C - # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, - # 1 for anti-freezing function open. Factory default: 0 - # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 - def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron): - input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, - (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) - self.send_request(input_payload) - - # For backwards compatibility only. Prefer calling set_mode directly. - # Note this function invokes loop_mode=0 and sensor=0. - def switch_to_auto(self): - self.set_mode(auto_mode=1, loop_mode=0) - - def switch_to_manual(self): - self.set_mode(auto_mode=0, loop_mode=0) - - # Set temperature for manual mode (also activates manual mode if currently in automatic) - def set_temp(self, temp): - self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) - - # Set device on(1) or off(0), does not deactivate Wifi connectivity. - # Remote lock disables control by buttons on thermostat. - def set_power(self, power=1, remote_lock=0): - self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) - - # set time on device - # n.b. day=1 is Monday, ..., day=7 is Sunday - def set_time(self, hour, minute, second, day): - self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) - - # Set timer schedule - # Format is the same as you get from get_full_status. - # weekday is a list (ordered) of 6 dicts like: - # {'start_hour':17, 'start_minute':30, 'temp': 22 } - # Each one specifies the thermostat temp that will become effective at start_hour:start_minute - # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday, weekend): - # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) - - # Now simply append times/temps - # weekday times - for i in range(0, 6): - input_payload.append(weekday[i]['start_hour']) - input_payload.append(weekday[i]['start_minute']) - - # weekend times - for i in range(0, 2): - input_payload.append(weekend[i]['start_hour']) - input_payload.append(weekend[i]['start_minute']) - - # weekday temperatures - for i in range(0, 6): - input_payload.append(int(weekday[i]['temp'] * 2)) - - # weekend temperatures - for i in range(0, 2): - input_payload.append(int(weekend[i]['temp'] * 2)) - - self.send_request(input_payload) - - -S1C_SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex -} - - -class S1C(device): + Useful if the device is locked. """ - Its VERY VERY VERY DIRTY IMPLEMENTATION of S1C - """ - - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = 'S1C' - - def get_sensors_status(self): - packet = bytearray(16) - packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - if not payload: - return None - count = payload[0x4] - sensors = payload[0x6:] - sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] - - sens_res = [] - for sens in sensors_a: - status = ord(chr(sens[0])) - _name = str(bytes(sens[4:26]).decode()) - _order = ord(chr(sens[1])) - _type = ord(chr(sens[3])) - _serial = bytes(codecs.encode(sens[26:30], "hex")).decode() - - type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') - - r = { - 'status': status, - 'name': _name.strip('\x00'), - 'type': type_str, - 'order': _order, - 'serial': _serial, - } - if r['serial'] != '00000000': - sens_res.append(r) - result = { - 'count': count, - 'sensors': sens_res - } - return result - - -class dooya(device): - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "Dooya DT360E" - - def _send(self, magic1, magic2): - packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xbb - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xfa - packet[10] = 0x44 - response = self.send_packet(0x6a, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(bytes(response[0x38:])) - return ord(payload[4]) - - def open(self): - return self._send(0x01, 0x00) - - def close(self): - return self._send(0x02, 0x00) - - def stop(self): - return self._send(0x03, 0x00) - - def get_percentage(self): - return self._send(0x06, 0x5d) - - def set_percentage_and_wait(self, new_percentage): - current = self.get_percentage() - if current > new_percentage: - self.close() - while current is not None and current > new_percentage: - time.sleep(0.2) - current = self.get_percentage() - - elif current < new_percentage: - self.open() - while current is not None and current < new_percentage: - time.sleep(0.2) - current = self.get_percentage() - self.stop() - -class lb1(device): - state_dict = [] - effect_map_dict = { 'lovely color' : 0, - 'flashlight' : 1, - 'lightning' : 2, - 'color fading' : 3, - 'color breathing' : 4, - 'multicolor breathing' : 5, - 'color jumping' : 6, - 'multicolor jumping' : 7 } - - def __init__(self, *args, **kwargs): - device.__init__(self, *args, **kwargs) - self.type = "SmartBulb" - - def send_command(self, command, type='set'): - packet = bytearray(16+(int(len(command)/16) + 1)*16) - packet[0x00] = 0x0c + len(command) & 0xff - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0b - packet[0x0a] = len(command) - packet[0x0e:] = map(ord, command) - - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x06] = checksum & 0xff # Checksum 1 position - packet[0x07] = checksum >> 8 # Checksum 2 position - - response = self.send_packet(0x6a, packet) - check_error(response[0x36:0x38]) - payload = self.decrypt(bytes(response[0x38:])) - - responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) - if responseLength > 0: - self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) - - def set_json(self, jsonstr): - reconvert = json.loads(jsonstr) - if 'bulb_sceneidx' in reconvert.keys(): - reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + try: + return next( + xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) + ) + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err - self.send_command(json.dumps(reconvert)) - return json.dumps(self.state_dict) - def set_state(self, state): - cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) - self.send_command(cmd) +def discover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: str = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> t.List[Device]: + """Discover devices connected to the local network.""" + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + return [gendevice(*resp) for resp in responses] + + +def xdiscover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: str = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> t.Generator[Device, None, None]: + """Discover devices connected to the local network. + + This function returns a generator that yields devices instantly. + """ + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + for resp in responses: + yield gendevice(*resp) - def get_state(self): - cmd = "{}" - self.send_command(cmd) - return self.state_dict # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid, password, security_mode): +def setup(ssid: str, password: str, security_mode: int) -> None: + """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) payload[0x26] = 0x14 # This seems to always be set to 14 @@ -1125,15 +260,14 @@ def setup(ssid, password, security_mode): payload[0x84] = ssid_length # Character length of SSID payload[0x85] = pass_length # Character length of password - payload[0x86] = security_mode # Type of encryption (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2) + payload[0x86] = security_mode # Type of encryption - checksum = sum(payload, 0xbeaf) & 0xffff - payload[0x20] = checksum & 0xff # Checksum 1 position + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position payload[0x21] = checksum >> 8 # Checksum 2 position - sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ('255.255.255.255', 80)) + sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) sock.close() diff --git a/broadlinkmanager/broadlink/alarm.py b/broadlinkmanager/broadlink/alarm.py new file mode 100644 index 0000000..a9b5e87 --- /dev/null +++ b/broadlinkmanager/broadlink/alarm.py @@ -0,0 +1,43 @@ +"""Support for alarm kits.""" +from . import exceptions as e +from .device import Device + + +class S1C(Device): + """Controls a Broadlink S1C.""" + + TYPE = "S1C" + + _SENSORS_TYPES = { + 0x31: "Door Sensor", + 0x91: "Key Fob", + 0x21: "Motion Sensor", + } + + def get_sensors_status(self) -> dict: + """Return the state of the sensors.""" + packet = bytearray(16) + packet[0] = 0x06 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + count = payload[0x4] + sensor_data = payload[0x6:] + sensors = [ + bytearray(sensor_data[i * 83 : (i + 1) * 83]) + for i in range(len(sensor_data) // 83) + ] + return { + "count": count, + "sensors": [ + { + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), + } + for sensor in sensors + if any(sensor[26:30]) + ], + } diff --git a/broadlinkmanager/broadlink/climate.py b/broadlinkmanager/broadlink/climate.py new file mode 100644 index 0000000..eee5f11 --- /dev/null +++ b/broadlinkmanager/broadlink/climate.py @@ -0,0 +1,228 @@ +"""Support for HVAC units.""" +import typing as t + +from . import exceptions as e +from .device import Device +from .helpers import CRC16 + + +class hysen(Device): + """Controls a Hysen heating thermostat. + + This device is manufactured by Hysen and sold under different + brands, including Floureon, Beca Energy, Beok and Decdeal. + + Supported models: + - HY02B05H + - HY03WE + """ + + TYPE = "HYS" + + def send_request(self, request: t.Sequence[int]) -> bytes: + """Send a request to the device.""" + packet = bytearray() + packet.extend((len(request) + 2).to_bytes(2, "little")) + packet.extend(request) + packet.extend(CRC16.calculate(request).to_bytes(2, "little")) + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + p_len = int.from_bytes(payload[:0x02], "little") + if p_len + 2 > len(payload): + raise ValueError( + "hysen_response_error", "first byte of response is not length" + ) + + nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: + raise ValueError("hysen_response_error", "CRC check on response failed") + + return payload[0x02:p_len] + + def get_temp(self) -> float: + """Return the room temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return payload[0x05] / 2.0 + + def get_external_temp(self) -> float: + """Return the external temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return payload[18] / 2.0 + + def get_full_status(self) -> dict: + """Return the state of the device. + + Timer schedule included. + """ + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]) + data = {} + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["room_temp"] = payload[5] / 2.0 + data["thermostat_temp"] = payload[6] / 2.0 + data["auto_mode"] = payload[7] & 0xF + data["loop_mode"] = payload[7] >> 4 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ( + int.from_bytes(payload[13:15], "big", signed=True) / 10.0 + ) + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = payload[18] / 2.0 + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] + + weekday = [] + for i in range(0, 6): + weekday.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekday"] = weekday + weekend = [] + for i in range(6, 8): + weekend.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekend"] = weekend + return data + + # Change controller mode + # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. + # Manual mode will activate last used temperature. + # In typical usage call set_temp to activate manual control and set temp. + # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) + # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) + # The sensor command is currently experimental + def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + """Set the mode of the device.""" + mode_byte = ((loop_mode + 1) << 4) + auto_mode + self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) + + # Advanced settings + # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, + # 2 for internal control temperature, external limit temperature. Factory default: 0. + # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C + # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C + # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C + # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C + # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C + # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, + # 1 for anti-freezing function open. Factory default: 0 + # Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0 + def set_advanced( + self, + loop_mode: int, + sensor: int, + osv: int, + dif: int, + svh: int, + svl: int, + adj: float, + fre: int, + poweron: int, + ) -> None: + """Set advanced options.""" + self.send_request( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + int(adj * 10) >> 8 & 0xFF, + int(adj * 10) & 0xFF, + fre, + poweron, + ] + ) + + # For backwards compatibility only. Prefer calling set_mode directly. + # Note this function invokes loop_mode=0 and sensor=0. + def switch_to_auto(self) -> None: + """Switch mode to auto.""" + self.set_mode(auto_mode=1, loop_mode=0) + + def switch_to_manual(self) -> None: + """Switch mode to manual.""" + self.set_mode(auto_mode=0, loop_mode=0) + + # Set temperature for manual mode (also activates manual mode if currently in automatic) + def set_temp(self, temp: float) -> None: + """Set the target temperature.""" + self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]) + + # Set device on(1) or off(0), does not deactivate Wifi connectivity. + # Remote lock disables control by buttons on thermostat. + def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + """Set the power state of the device.""" + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) + + # set time on device + # n.b. day=1 is Monday, ..., day=7 is Sunday + def set_time(self, hour: int, minute: int, second: int, day: int) -> None: + """Set the time.""" + self.send_request( + [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + ) + + # Set timer schedule + # Format is the same as you get from get_full_status. + # weekday is a list (ordered) of 6 dicts like: + # {'start_hour':17, 'start_minute':30, 'temp': 22 } + # Each one specifies the thermostat temp that will become effective at start_hour:start_minute + # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) + def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + """Set timer schedule.""" + request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] + + # weekday times + for i in range(0, 6): + request.append(weekday[i]["start_hour"]) + request.append(weekday[i]["start_minute"]) + + # weekend times + for i in range(0, 2): + request.append(weekend[i]["start_hour"]) + request.append(weekend[i]["start_minute"]) + + # weekday temperatures + for i in range(0, 6): + request.append(int(weekday[i]["temp"] * 2)) + + # weekend temperatures + for i in range(0, 2): + request.append(int(weekend[i]["temp"] * 2)) + + self.send_request(request) diff --git a/broadlinkmanager/broadlink/const.py b/broadlinkmanager/broadlink/const.py new file mode 100644 index 0000000..19c37f5 --- /dev/null +++ b/broadlinkmanager/broadlink/const.py @@ -0,0 +1,5 @@ +"""Constants.""" +DEFAULT_BCAST_ADDR = "255.255.255.255" +DEFAULT_PORT = 80 +DEFAULT_RETRY_INTVL = 1 +DEFAULT_TIMEOUT = 10 diff --git a/broadlinkmanager/broadlink/cover.py b/broadlinkmanager/broadlink/cover.py new file mode 100644 index 0000000..c0f08ab --- /dev/null +++ b/broadlinkmanager/broadlink/cover.py @@ -0,0 +1,57 @@ +"""Support for covers.""" +import time + +from . import exceptions as e +from .device import Device + + +class dooya(Device): + """Controls a Dooya curtain motor.""" + + TYPE = "Dooya DT360E" + + def _send(self, magic1: int, magic2: int) -> int: + """Send a packet to the device.""" + packet = bytearray(16) + packet[0] = 0x09 + packet[2] = 0xBB + packet[3] = magic1 + packet[4] = magic2 + packet[9] = 0xFA + packet[10] = 0x44 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[4] + + def open(self) -> int: + """Open the curtain.""" + return self._send(0x01, 0x00) + + def close(self) -> int: + """Close the curtain.""" + return self._send(0x02, 0x00) + + def stop(self) -> int: + """Stop the curtain.""" + return self._send(0x03, 0x00) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + return self._send(0x06, 0x5D) + + def set_percentage_and_wait(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + current = self.get_percentage() + if current > new_percentage: + self.close() + while current is not None and current > new_percentage: + time.sleep(0.2) + current = self.get_percentage() + + elif current < new_percentage: + self.open() + while current is not None and current < new_percentage: + time.sleep(0.2) + current = self.get_percentage() + self.stop() diff --git a/broadlinkmanager/broadlink/device.py b/broadlinkmanager/broadlink/device.py new file mode 100644 index 0000000..74e916f --- /dev/null +++ b/broadlinkmanager/broadlink/device.py @@ -0,0 +1,332 @@ +"""Support for Broadlink devices.""" +import socket +import threading +import random +import time +import typing as t + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from . import exceptions as e +from .const import ( + DEFAULT_BCAST_ADDR, + DEFAULT_PORT, + DEFAULT_RETRY_INTVL, + DEFAULT_TIMEOUT, +) +from .protocol import Datetime + +HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] + + +def scan( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: str = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> t.Generator[HelloResponse, None, None]: + """Broadcast a hello message and yield responses.""" + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + if local_ip_address: + conn.bind((local_ip_address, 0)) + port = conn.getsockname()[1] + else: + local_ip_address = "0.0.0.0" + port = 0 + + packet = bytearray(0x30) + packet[0x08:0x14] = Datetime.pack(Datetime.now()) + packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1] + packet[0x1C:0x1E] = port.to_bytes(2, "little") + packet[0x26] = 6 + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + start_time = time.time() + discovered = [] + + try: + while (time.time() - start_time) < timeout: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, (discover_ip_address, discover_ip_port)) + + while True: + try: + resp, host = conn.recvfrom(1024) + except socket.timeout: + break + + devtype = resp[0x34] | resp[0x35] << 8 + mac = resp[0x3A:0x40][::-1] + + if (host, mac, devtype) in discovered: + continue + discovered.append((host, mac, devtype)) + + name = resp[0x40:].split(b"\x00")[0].decode() + is_locked = bool(resp[0x7F]) + yield devtype, host, mac, name, is_locked + finally: + conn.close() + + +def ping(address: str, port: int = DEFAULT_PORT) -> None: + """Send a ping packet to an address. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + packet = bytearray(0x30) + packet[0x26] = 1 + conn.sendto(packet, (address, port)) + + +class Device: + """Controls a Broadlink device.""" + + TYPE = "Unknown" + + __INIT_KEY = "097628343fe99e23765c1513accf8b02" + __INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58" + + def __init__( + self, + host: t.Tuple[str, int], + mac: t.Union[bytes, str], + devtype: int, + timeout: int = DEFAULT_TIMEOUT, + name: str = "", + model: str = "", + manufacturer: str = "", + is_locked: bool = False, + ) -> None: + """Initialize the controller.""" + self.host = host + self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac + self.devtype = devtype + self.timeout = timeout + self.name = name + self.model = model + self.manufacturer = manufacturer + self.is_locked = is_locked + self.count = random.randint(0x8000, 0xFFFF) + self.iv = bytes.fromhex(self.__INIT_VECT) + self.id = 0 + self.type = self.TYPE # For backwards compatibility. + self.lock = threading.Lock() + + self.aes = None + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + def __repr__(self) -> str: + """Return a formal representation of the device.""" + return ( + "%s.%s(%s, mac=%r, devtype=%r, timeout=%r, name=%r, " + "model=%r, manufacturer=%r, is_locked=%r)" + ) % ( + self.__class__.__module__, + self.__class__.__qualname__, + self.host, + self.mac, + self.devtype, + self.timeout, + self.name, + self.model, + self.manufacturer, + self.is_locked, + ) + + def __str__(self) -> str: + """Return a readable representation of the device.""" + return "%s (%s / %s:%s / %s)" % ( + self.name or "Unknown", + " ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])), + *self.host, + ":".join(format(x, "02X") for x in self.mac), + ) + + def update_aes(self, key: bytes) -> None: + """Update AES.""" + self.aes = Cipher( + algorithms.AES(bytes(key)), modes.CBC(self.iv), backend=default_backend() + ) + + def encrypt(self, payload: bytes) -> bytes: + """Encrypt the payload.""" + encryptor = self.aes.encryptor() + return encryptor.update(bytes(payload)) + encryptor.finalize() + + def decrypt(self, payload: bytes) -> bytes: + """Decrypt the payload.""" + decryptor = self.aes.decryptor() + return decryptor.update(bytes(payload)) + decryptor.finalize() + + def auth(self) -> bool: + """Authenticate to the device.""" + self.id = 0 + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + packet = bytearray(0x50) + packet[0x04:0x14] = [0x31] * 16 + packet[0x1E] = 0x01 + packet[0x2D] = 0x01 + packet[0x30:0x36] = "Test 1".encode() + + response = self.send_packet(0x65, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + self.id = int.from_bytes(payload[:0x4], "little") + self.update_aes(payload[0x04:0x14]) + return True + + def hello(self, local_ip_address=None) -> bool: + """Send a hello message to the device. + + Device information is checked before updating name and lock status. + """ + responses = scan( + timeout=self.timeout, + local_ip_address=local_ip_address, + discover_ip_address=self.host[0], + discover_ip_port=self.host[1], + ) + try: + devtype, _, mac, name, is_locked = next(responses) + + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {self.timeout}s", + ) from err + + if mac != self.mac: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The MAC address is different", + f"Expected {self.mac} and received {mac}", + ) + + if devtype != self.devtype: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The product ID is different", + f"Expected {self.devtype} and received {devtype}", + ) + + self.name = name + self.is_locked = is_locked + return True + + def ping(self) -> None: + """Ping the device. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + ping(self.host[0], port=self.host[1]) + + def get_fwversion(self) -> int: + """Get firmware version.""" + packet = bytearray([0x68]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x4] | payload[0x5] << 8 + + def set_name(self, name: str) -> None: + """Set device name.""" + packet = bytearray(4) + packet += name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = self.is_locked + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.name = name + + def set_lock(self, state: bool) -> None: + """Lock/unlock the device.""" + packet = bytearray(4) + packet += self.name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = bool(state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.is_locked = bool(state) + + def get_type(self) -> str: + """Return device type.""" + return self.type + + def send_packet(self, packet_type: int, payload: bytes) -> bytes: + """Send a packet to the device.""" + self.count = ((self.count + 1) | 0x8000) & 0xFFFF + packet = bytearray(0x38) + packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") + packet[0x24:0x26] = self.devtype.to_bytes(2, "little") + packet[0x26:0x28] = packet_type.to_bytes(2, "little") + packet[0x28:0x2A] = self.count.to_bytes(2, "little") + packet[0x2A:0x30] = self.mac[::-1] + packet[0x30:0x34] = self.id.to_bytes(4, "little") + + p_checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34:0x36] = p_checksum.to_bytes(2, "little") + + padding = (16 - len(payload)) % 16 + payload = self.encrypt(payload + bytes(padding)) + packet.extend(payload) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() + + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, self.host) + + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout as err: + if (time.time() - start_time) > timeout: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + if len(resp) < 0x30: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 48 bytes and received {len(resp)}", + ) + + nom_checksum = int.from_bytes(resp[0x20:0x22], "little") + real_checksum = sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF + + if nom_checksum != real_checksum: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_checksum} and received {real_checksum}", + ) + + return resp diff --git a/broadlinkmanager/broadlink/exceptions.py b/broadlinkmanager/broadlink/exceptions.py index d3f8b4a..2343ad6 100644 --- a/broadlinkmanager/broadlink/exceptions.py +++ b/broadlinkmanager/broadlink/exceptions.py @@ -1,19 +1,17 @@ """Exceptions for Broadlink devices.""" +import collections import struct class BroadlinkException(Exception): - """Common base class for all Broadlink exceptions.""" + """Base class common to all Broadlink exceptions.""" def __init__(self, *args, **kwargs): """Initialize the exception.""" super().__init__(*args, **kwargs) - if len(args) >= 3: + if len(args) >= 2: self.errno = args[0] - self.strerror = "%s: %s" % (args[1], args[2]) - elif len(args) == 2: - self.errno = args[0] - self.strerror = str(args[1]) + self.strerror = ": ".join(str(arg) for arg in args[1:]) elif len(args) == 1: self.errno = None self.strerror = str(args[0]) @@ -22,119 +20,96 @@ def __init__(self, *args, **kwargs): self.strerror = "" def __str__(self): - """Return the error message.""" + """Return str(self).""" if self.errno is not None: return "[Errno %s] %s" % (self.errno, self.strerror) return self.strerror + def __eq__(self, other): + """Return self==value.""" + # pylint: disable=unidiomatic-typecheck + return type(self) == type(other) and self.args == other.args -class FirmwareException(BroadlinkException): - """Common base class for all firmware exceptions.""" - - pass + def __hash__(self): + """Return hash(self).""" + return hash((type(self), self.args)) -class AuthenticationError(FirmwareException): - """Authentication error.""" +class MultipleErrors(BroadlinkException): + """Multiple errors.""" - pass + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + errors = args[0][:] if args else [] + counter = collections.Counter(errors) + strerror = "Multiple errors occurred: %s" % counter + super().__init__(strerror, **kwargs) + self.errors = errors + def __repr__(self): + """Return repr(self).""" + return "MultipleErrors(%r)" % self.errors -class AuthorizationError(FirmwareException): - """Authorization error.""" + def __str__(self): + """Return str(self).""" + return self.strerror - pass +class AuthenticationError(BroadlinkException): + """Authentication error.""" -class CommandNotSupportedError(FirmwareException): - """Command not supported error.""" - pass +class AuthorizationError(BroadlinkException): + """Authorization error.""" -class ConnectionClosedError(FirmwareException): - """Connection closed error.""" +class CommandNotSupportedError(BroadlinkException): + """Command not supported error.""" - pass +class ConnectionClosedError(BroadlinkException): + """Connection closed error.""" -class DataValidationError(FirmwareException): - """Data validation error.""" - pass +class StructureAbnormalError(BroadlinkException): + """Structure abnormal error.""" -class DeviceOfflineError(FirmwareException): +class DeviceOfflineError(BroadlinkException): """Device offline error.""" - pass - -class ReadError(FirmwareException): +class ReadError(BroadlinkException): """Read error.""" - pass - -class SendError(FirmwareException): +class SendError(BroadlinkException): """Send error.""" - pass - -class SSIDNotFoundError(FirmwareException): +class SSIDNotFoundError(BroadlinkException): """SSID not found error.""" - pass - -class StorageError(FirmwareException): +class StorageError(BroadlinkException): """Storage error.""" - pass - -class WriteError(FirmwareException): +class WriteError(BroadlinkException): """Write error.""" - pass - - -class SDKException(BroadlinkException): - """Common base class for all SDK exceptions.""" - - pass - - -class ChecksumError(SDKException): - """Received data packet check error.""" - - pass - -class LengthError(SDKException): - """Received data packet length error.""" - - pass - - -class DNSLookupError(SDKException): - """Failed to obtain local IP address.""" - - pass - - -class NetworkTimeoutError(SDKException): +class NetworkTimeoutError(BroadlinkException): """Network timeout error.""" - pass + +class DataValidationError(BroadlinkException): + """Data validation error.""" class UnknownError(BroadlinkException): """Unknown error.""" - pass - BROADLINK_EXCEPTIONS = { # Firmware-related errors are generated by the device. @@ -143,31 +118,35 @@ class UnknownError(BroadlinkException): -3: (DeviceOfflineError, "The device is offline"), -4: (CommandNotSupportedError, "Command not supported"), -5: (StorageError, "The device storage is full"), - -6: (DataValidationError, "Structure is abnormal"), + -6: (StructureAbnormalError, "Structure is abnormal"), -7: (AuthorizationError, "Control key is expired"), -8: (SendError, "Send error"), -9: (WriteError, "Write error"), -10: (ReadError, "Read error"), -11: (SSIDNotFoundError, "SSID could not be found in AP configuration"), - # DNASDK related errors are generated by this module. + # SDK related errors are generated by this module. + -2040: (DataValidationError, "Device information is not intact"), -4000: (NetworkTimeoutError, "Network timeout"), - -4007: (LengthError, "Received data packet length error"), - -4008: (ChecksumError, "Received data packet check error"), - -4013: (DNSLookupError, "Failed to obtain local IP address"), + -4007: (DataValidationError, "Received data packet length error"), + -4008: (DataValidationError, "Received data packet check error"), + -4009: (DataValidationError, "Received data packet information type error"), + -4010: (DataValidationError, "Received encrypted data packet length error"), + -4011: (DataValidationError, "Received encrypted data packet check error"), + -4012: (AuthorizationError, "Device control ID error"), } -def exception(error_code): +def exception(err_code: int) -> BroadlinkException: """Return exception corresponding to an error code.""" try: - exc, msg = BROADLINK_EXCEPTIONS[error_code] - return exc(error_code, msg) + exc, msg = BROADLINK_EXCEPTIONS[err_code] + return exc(err_code, msg) except KeyError: - return UnknownError(error_code, "Unknown error") + return UnknownError(err_code, "Unknown error") -def check_error(error): +def check_error(error: bytes) -> None: """Raise exception if an error occurred.""" error_code = struct.unpack("h", error)[0] if error_code: - raise exception(error_code) \ No newline at end of file + raise exception(error_code) diff --git a/broadlinkmanager/broadlink/helpers.py b/broadlinkmanager/broadlink/helpers.py index 1ff112a..6ee5499 100644 --- a/broadlinkmanager/broadlink/helpers.py +++ b/broadlinkmanager/broadlink/helpers.py @@ -1,20 +1,43 @@ -"""Helper functions.""" -import socket +"""Helper functions and classes.""" +import typing as t -from .exceptions import exception +class CRC16: + """Helps with CRC-16 calculation. -def get_local_ip() -> str: - """Try to determine the local IP address of the machine.""" - # Useful for VPNs. - try: - local_ip_address = socket.gethostbyname(socket.gethostname()) - if not local_ip_address.startswith('127.'): - return local_ip_address - except socket.gaierror: - raise exception(-4013) # DNS Error + CRC tables are cached for performance. + """ - # Connecting to UDP address does not send packets. - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect(('8.8.8.8', 53)) - return s.getsockname()[0] \ No newline at end of file + _cache: t.Dict[int, t.List[int]] = {} + + @classmethod + def get_table(cls, polynomial: int) -> t.List[int]: + """Return the CRC-16 table for a polynomial.""" + try: + crc_table = cls._cache[polynomial] + except KeyError: + crc_table = [] + for dividend in range(0, 256): + remainder = dividend + for _ in range(0, 8): + if remainder & 1: + remainder = remainder >> 1 ^ polynomial + else: + remainder = remainder >> 1 + crc_table.append(remainder) + cls._cache[polynomial] = crc_table + return crc_table + + @classmethod + def calculate( + cls, + sequence: t.Sequence[int], + polynomial: int = 0xA001, # CRC-16-ANSI. + init_value: int = 0xFFFF, + ) -> int: + """Calculate the CRC-16 of a sequence of integers.""" + crc_table = cls.get_table(polynomial) + crc = init_value + for item in sequence: + crc = crc >> 8 ^ crc_table[(crc ^ item) & 0xFF] + return crc diff --git a/broadlinkmanager/broadlink/light.py b/broadlinkmanager/broadlink/light.py new file mode 100644 index 0000000..b3e20a1 --- /dev/null +++ b/broadlinkmanager/broadlink/light.py @@ -0,0 +1,197 @@ +"""Support for lights.""" +import enum +import json +import struct + +from . import exceptions as e +from .device import Device + + +class lb1(Device): + """Controls a Broadlink LB1.""" + + TYPE = "LB1" + + @enum.unique + class ColorMode(enum.IntEnum): + """Enumerates color modes.""" + + RGB = 0 + WHITE = 1 + SCENE = 2 + + def get_state(self) -> dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: bool = None, + red: int = None, + blue: int = None, + green: int = None, + brightness: int = None, + colortemp: int = None, + hue: int = None, + saturation: int = None, + transitionduration: int = None, + maxworktime: int = None, + bulb_colormode: int = None, + bulb_scenes: str = None, + bulb_scene: str = None, + bulb_sceneidx: int = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + if bulb_sceneidx is not None: + state["bulb_sceneidx"] = int(bulb_sceneidx) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + p_len = 12 + len(data) + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: bool = None, + red: int = None, + blue: int = None, + green: int = None, + brightness: int = None, + colortemp: int = None, + hue: int = None, + saturation: int = None, + transitionduration: int = None, + maxworktime: int = None, + bulb_colormode: int = None, + bulb_scenes: str = None, + bulb_scene: str = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into(" dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" bytes: + """Pack the timestamp to be sent over the Broadlink protocol.""" + data = bytearray(12) + utcoffset = int(datetime.utcoffset().total_seconds() / 3600) + data[:0x04] = utcoffset.to_bytes(4, "little", signed=True) + data[0x04:0x06] = datetime.year.to_bytes(2, "little") + data[0x06] = datetime.minute + data[0x07] = datetime.hour + data[0x08] = int(datetime.strftime("%y")) + data[0x09] = datetime.isoweekday() + data[0x0A] = datetime.day + data[0x0B] = datetime.month + return data + + @staticmethod + def unpack(data: bytes) -> dt.datetime: + """Unpack a timestamp received over the Broadlink protocol.""" + utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True) + year = int.from_bytes(data[0x04:0x06], "little") + minute = data[0x06] + hour = data[0x07] + subyear = data[0x08] + isoweekday = data[0x09] + day = data[0x0A] + month = data[0x0B] + + tz_info = dt.timezone(dt.timedelta(hours=utcoffset)) + datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info) + + if datetime.isoweekday() != isoweekday: + raise ValueError("isoweekday does not match") + if int(datetime.strftime("%y")) != subyear: + raise ValueError("subyear does not match") + + return datetime + + @staticmethod + def now() -> dt.datetime: + """Return the current date and time with timezone info.""" + tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone)) + return dt.datetime.now(tz_info) diff --git a/broadlinkmanager/broadlink/remote.py b/broadlinkmanager/broadlink/remote.py new file mode 100644 index 0000000..017dac4 --- /dev/null +++ b/broadlinkmanager/broadlink/remote.py @@ -0,0 +1,126 @@ +"""Support for universal remotes.""" +import struct + +from . import exceptions as e +from .device import Device + + +class rmmini(Device): + """Controls a Broadlink RM mini 3.""" + + TYPE = "RMMINI" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" None: + """Update device name and lock status.""" + resp = self._send(0x1) + self.name = resp[0x48:].split(b"\x00")[0].decode() + self.is_locked = bool(resp[0x87]) + + def send_data(self, data: bytes) -> None: + """Send a code to the device.""" + self._send(0x2, data) + + def enter_learning(self) -> None: + """Enter infrared learning mode.""" + self._send(0x3) + + def check_data(self) -> bytes: + """Return the last captured code.""" + return self._send(0x4) + + +class rmpro(rmmini): + """Controls a Broadlink RM pro.""" + + TYPE = "RMPRO" + + def sweep_frequency(self) -> None: + """Sweep frequency.""" + self._send(0x19) + + def check_frequency(self) -> bool: + """Return True if the frequency was identified successfully.""" + resp = self._send(0x1A) + return resp[0] == 1 + + def find_rf_packet(self) -> None: + """Enter radiofrequency learning mode.""" + self._send(0x1B) + + def cancel_sweep_frequency(self) -> None: + """Cancel sweep frequency.""" + self._send(0x1E) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + resp = self._send(0x1) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + +class rmminib(rmmini): + """Controls a Broadlink RM mini 3 (new firmware).""" + + TYPE = "RMMINIB" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" dict: + """Return the state of the sensors.""" + resp = self._send(0x24) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + def check_humidity(self) -> float: + """Return the humidity.""" + return self.check_sensors()["humidity"] + + +class rm4pro(rm4mini, rmpro): + """Controls a Broadlink RM4 pro.""" + + TYPE = "RM4PRO" + + +class rm(rmpro): + """For backwards compatibility.""" + + TYPE = "RM2" + + +class rm4(rm4pro): + """For backwards compatibility.""" + + TYPE = "RM4" diff --git a/broadlinkmanager/broadlink/sensor.py b/broadlinkmanager/broadlink/sensor.py new file mode 100644 index 0000000..33e7587 --- /dev/null +++ b/broadlinkmanager/broadlink/sensor.py @@ -0,0 +1,47 @@ +"""Support for sensors.""" +import struct + +from . import exceptions as e +from .device import Device + + +class a1(Device): + """Controls a Broadlink A1.""" + + TYPE = "A1" + + _SENSORS_AND_LEVELS = ( + ("light", ("dark", "dim", "normal", "bright")), + ("air_quality", ("excellent", "good", "normal", "bad")), + ("noise", ("quiet", "normal", "noisy")), + ) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self.check_sensors_raw() + for sensor, levels in self._SENSORS_AND_LEVELS: + try: + data[sensor] = levels[data[sensor]] + except IndexError: + data[sensor] = "unknown" + return data + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + packet = bytearray([0x1]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + data = payload[0x4:] + + temperature = struct.unpack(" None: + """Set the power state of the device.""" + packet = bytearray(4) + packet[0] = bool(pwr) + response = self.send_packet(0x66, packet) + e.check_error(response[0x22:0x24]) + + +class sp2(Device): + """Controls a Broadlink SP2.""" + + TYPE = "SP2" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4]) + + +class sp2s(sp2): + """Controls a Broadlink SP2S.""" + + TYPE = "SP2S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray(16) + packet[0] = 4 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return int.from_bytes(payload[0x4:0x7], "little") / 1000 + + +class sp3(Device): + """Controls a Broadlink SP3.""" + + TYPE = "SP3" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = self.check_nightlight() << 1 | bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(ntlight) << 1 | self.check_power() + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 1) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 2) + + +class sp3s(sp2): + """Controls a Broadlink SP3S.""" + + TYPE = "SP3S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 + + +class sp4(Device): + """Controls a Broadlink SP4.""" + + TYPE = "SP4" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + self.set_state(pwr=pwr) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + self.set_state(ntlight=ntlight) + + def set_state( + self, + pwr: bool = None, + ntlight: bool = None, + indicator: bool = None, + ntlbrightness: int = None, + maxworktime: int = None, + childlock: bool = None, + ) -> dict: + """Set state of device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if ntlight is not None: + state["ntlight"] = int(bool(ntlight)) + if indicator is not None: + state["indicator"] = int(bool(indicator)) + if ntlbrightness is not None: + state["ntlbrightness"] = ntlbrightness + if maxworktime is not None: + state["maxworktime"] = maxworktime + if childlock is not None: + state["childlock"] = int(bool(childlock)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def check_power(self) -> bool: + """Return the power state of the device.""" + state = self.get_state() + return bool(state["pwr"]) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + state = self.get_state() + return bool(state["ntlight"]) + + def get_state(self) -> dict: + """Get full state of device.""" + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Get full state of device.""" + state = super().get_state() + + # Convert sensor data to float. Remove keys if sensors are not supported. + sensor_attrs = ["current", "volt", "power", "totalconsum", "overload"] + for attr in sensor_attrs: + value = state.pop(attr, -1) + if value != -1: + state[attr] = value / 1000 + return state + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: bool = None, + pwr1: bool = None, + pwr2: bool = None, + maxworktime: int = None, + maxworktime1: int = None, + maxworktime2: int = None, + idcbrightness: int = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if maxworktime is not None: + state["maxworktime"] = maxworktime + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) + packet[0x07] = 0xC0 + packet[0x08] = 0x02 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if pwr else 0 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_power(self, sid: int, pwr: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, pwr) + + def check_power_raw(self) -> int: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x0E] + + def check_power(self) -> dict: + """Return the power state of the device.""" + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + } diff --git a/broadlinkmanager/broadlinkmanager.py b/broadlinkmanager/broadlinkmanager.py index c9b3ed4..4ce1378 100644 --- a/broadlinkmanager/broadlinkmanager.py +++ b/broadlinkmanager/broadlinkmanager.py @@ -1,9 +1,30 @@ # region Importing -import os, json, subprocess, time, broadlink, argparse, datetime, re, shutil, uvicorn, socket, aiofiles +import os +import json +import subprocess +import time +import broadlink +import argparse +import datetime +import re +import shutil +import uvicorn +import socket +import aiofiles from os import environ, path from json import dumps from broadlink.exceptions import ReadError, StorageError +from broadlink import exceptions as e +from broadlink.const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT +from broadlink.alarm import S1C +from broadlink.climate import hysen +from broadlink.cover import dooya +from broadlink.device import Device, ping, scan +from broadlink.light import lb1, lb2 +from broadlink.remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro +from broadlink.sensor import a1 +from broadlink.switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b from subprocess import call from loguru import logger from fastapi import FastAPI, Request, File, Form, UploadFile @@ -21,40 +42,45 @@ ENABLE_GOOGLE_ANALYTICS = os.getenv("ENABLE_GOOGLE_ANALYTICS") # endregion -#Get Lan IP +# Get Lan IP + + def GetLocalIP(): - p = subprocess.Popen("hostname -I | awk '{print $1}'", stdout=subprocess.PIPE, shell=True) + p = subprocess.Popen( + "hostname -I | awk '{print $1}'", stdout=subprocess.PIPE, shell=True) (output, err) = p.communicate() p_status = p.wait() ip = re.findall(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", str(output))[0] logger.debug(ip) return str(ip) + local_ip_address = GetLocalIP() # Get version from version file for dynamic change + + def GetVersionFromFle(): - with open("VERSION","r") as version: + with open("VERSION", "r") as version: v = version.read() return v # Tags metadata for swagger docs tags_metadata = [ - + { "name": "Commands", "description": "Learn / Send RF or IR commands", - - }, - { + + }, + { "name": "Devices", "description": "Scan for devices on the network or load/save from/to file", - - }, - -] + }, + +] # region Parsing Default arguments for descovery @@ -72,7 +98,8 @@ def GetVersionFromFle(): # region Declaring Flask app -app = FastAPI(title="Apprise API", description="Send multi channel notification using single endpoint", version=GetVersionFromFle(), openapi_tags=tags_metadata,contact={"name":"Tomer Klein","email":"tomer.klein@gmail.com","url":"https://github.com/t0mer/broadlinkmanager-docker"}) +app = FastAPI(title="Apprise API", description="Send multi channel notification using single endpoint", version=GetVersionFromFle( +), openapi_tags=tags_metadata, contact={"name": "Tomer Klein", "email": "tomer.klein@gmail.com", "url": "https://github.com/t0mer/broadlinkmanager-docker"}) logger.info("Configuring app") app.mount("/dist", StaticFiles(directory="dist"), name="dist") app.mount("/js", StaticFiles(directory="dist/js"), name="js") @@ -111,8 +138,9 @@ def GetVersionFromFle(): def get_analytics_code(): try: - if ENABLE_GOOGLE_ANALYTICS=="True": - analytics_file_path = os.path.join(app.root_path, 'templates', 'analytics_code.html') + if ENABLE_GOOGLE_ANALYTICS == "True": + analytics_file_path = os.path.join( + app.root_path, 'templates', 'analytics_code.html') f = open(analytics_file_path, "r") content = f.read() f.close() @@ -127,73 +155,110 @@ def get_analytics_code(): analytics_code = get_analytics_code() + def getDeviceName(deviceType): name = { - 0x2711: "SP2", - 0x2719: "Honeywell SP2", - 0x7919: "Honeywell SP2", - 0x271a: "Honeywell SP2", - 0x791a: "Honeywell SP2", - 0x2720: "SPMini", - 0x753e: "SP3", - 0x7D00: "OEM branded SP3", - 0x947a: "SP3S", - 0x9479: "SP3S", - 0x2728: "SPMini2", - 0x2733: "OEM branded SPMini", - 0x273e: "OEM branded SPMini", - 0x7530: "OEM branded SPMini2", - 0x7546: "OEM branded SPMini2", - 0x7918: "OEM branded SPMini2", - 0x7D0D: "TMall OEM SPMini3", - 0x2736: "SPMiniPlus", - 0x2712: "RM2", - 0x2737: "RM Mini", - 0x273d: "RM Pro Phicomm", - 0x2783: "RM2 Home Plus", - 0x277c: "RM2 Home Plus GDT", - 0x272a: "RM2 Pro Plus", - 0x2787: "RM2 Pro Plus2", - 0x279d: "RM2 Pro Plus3", - 0x27a9: "RM2 Pro Plus_300", - 0x278b: "RM2 Pro Plus BL", - 0x2797: "RM2 Pro Plus HYC", - 0x27a1: "RM2 Pro Plus R1", - 0x27a6: "RM2 Pro PP", - 0x278f: "RM Mini Shate", - 0x27c2: "RM Mini 3", - 0x2714: "A1", - 0x4EB5: "MP1", - 0x4EF7: "Honyar oem mp1", - 0x4EAD: "Hysen controller", - 0x2722: "S1 (SmartOne Alarm Kit)", - 0x4E4D: "Dooya DT360E (DOOYA_CURTAIN_V2)", - 0x51da: "RM4 Mini", - 0x5f36: "RM Mini 3", - 0x6026: "RM4 Pro", - 0x6070: "RM4c Mini", - 0x61a2: "RM4 Pro", - 0x610e: "RM4 Mini", - 0x610f: "RM4c", - 0x62bc: "RM4 Mini", - 0x62be: "RM4c Mini", - 0x51E3: "BG Electrical Smart Power Socket", - 0x60c8: "RGB Smart Bulb", - 0x6539: "RM4c Mini", - 0x653a: "RM4 Mini", - 0x653c: "RM4 Pro", - 0x649b: "RM4 Pro", - 0x6184: "RM4C mini", - 0x648d: "RM4 Mini", - 0x5209: "RM4 TV Mate", - 0x27C3: "RM pro+", - 0x27C7: "RM mini 3", - 0x27CC: "RM mini 3", - 0x27D0: "RM mini 3", - 0x27D3: "RM mini 3", - 0x27DC: "RM mini 3", - 0x6507: "RM mini 3", - 0x6508: "RM mini 3", + 0x0000: "SP1 ( Broadlink)", + 0x2717: "NEO ( Ankuoo)", + 0x2719: "SP2-compatible ( Honeywell)", + 0x271A: "SP2-compatible ( Honeywell)", + 0x2720: "SPmini ( Broadlink)", + 0x2728: "SP2-compatible ( URANT)", + 0x273E: "SPmini ( Broadlink)", + 0x7530: "SP2 ( Broadlink(OEM)", + 0x7539: "SP2-IL ( Broadlink(OEM)", + 0x753E: "SPmini3 ( Broadlink)", + 0x7540: "MP2 ( Broadlink)", + 0x7544: "SP2-CL ( Broadlink)", + 0x7546: "SP2-UK/BR/IN ( Broadlink(OEM)", + 0x7547: "SC1 ( Broadlink)", + 0x7918: "SP2 ( Broadlink(OEM)", + 0x7919: "SP2-compatible ( Honeywell)", + 0x791A: "SP2-compatible ( Honeywell)", + 0x7D0D: "SPmini3 ( Broadlink(OEM)", + 0x2711: "SP2 ( Broadlink)", + 0x2716: "NEOPRO ( Ankuoo)", + 0x271D: "Ego ( Efergy)", + 0x2736: "SPmini+ ( Broadlink)", + 0x2733: "SP3 ( Broadlink)", + 0x7D00: "SP3-EU ( Broadlink(OEM)", + 0x9479: "SP3S-US ( Broadlink)", + 0x947A: "SP3S-EU ( Broadlink)", + 0x7568: "SP4L-CN ( Broadlink)", + 0x756C: "SP4M ( Broadlink)", + 0x756F: "MCB1 ( Broadlink)", + 0x7579: "SP4L-EU ( Broadlink)", + 0x757B: "SP4L-AU ( Broadlink)", + 0x7583: "SPmini3 ( Broadlink)", + 0x7587: "SP4L-UK ( Broadlink)", + 0x7D11: "SPmini3 ( Broadlink)", + 0xA56A: "MCB1 ( Broadlink)", + 0xA56B: "SCB1E ( Broadlink)", + 0xA56C: "SP4L-EU ( Broadlink)", + 0xA589: "SP4L-UK ( Broadlink)", + 0xA5D3: "SP4L-EU ( Broadlink)", + 0x5115: "SCB1E ( Broadlink)", + 0x51E2: "AHC/U-01 ( BGElectrical)", + 0x6111: "MCB1 ( Broadlink)", + 0x6113: "SCB1E ( Broadlink)", + 0x618B: "SP4L-EU ( Broadlink)", + 0x6489: "SP4L-AU ( Broadlink)", + 0x648B: "SP4M-US ( Broadlink)", + 0x6494: "SCB2 ( Broadlink)", + 0x2737: "RMmini3 ( Broadlink)", + 0x278F: "RMmini ( Broadlink)", + 0x27C2: "RMmini3 ( Broadlink)", + 0x27C7: "RMmini3 ( Broadlink)", + 0x27CC: "RMmini3 ( Broadlink)", + 0x27CD: "RMmini3 ( Broadlink)", + 0x27D0: "RMmini3 ( Broadlink)", + 0x27D1: "RMmini3 ( Broadlink)", + 0x27D3: "RMmini3 ( Broadlink)", + 0x27DC: "RMmini3 ( Broadlink)", + 0x27DE: "RMmini3 ( Broadlink)", + 0x2712: "RMpro/pro+ ( Broadlink)", + 0x272A: "RMpro ( Broadlink)", + 0x273D: "RMpro ( Broadlink)", + 0x277C: "RMhome ( Broadlink)", + 0x2783: "RMhome ( Broadlink)", + 0x2787: "RMpro ( Broadlink)", + 0x278B: "RMplus ( Broadlink)", + 0x2797: "RMpro+ ( Broadlink)", + 0x279D: "RMpro+ ( Broadlink)", + 0x27A1: "RMplus ( Broadlink)", + 0x27A6: "RMplus ( Broadlink)", + 0x27A9: "RMpro+ ( Broadlink)", + 0x27C3: "RMpro+ ( Broadlink)", + 0x5F36: "RMmini3 ( Broadlink)", + 0x6507: "RMmini3 ( Broadlink)", + 0x6508: "RMmini3 ( Broadlink)", + 0x51DA: "RM4mini ( Broadlink)", + 0x6070: "RM4Cmini ( Broadlink)", + 0x610E: "RM4mini ( Broadlink)", + 0x610F: "RM4Cmini ( Broadlink)", + 0x62BC: "RM4mini ( Broadlink)", + 0x62BE: "RM4Cmini ( Broadlink)", + 0x6364: "RM4S ( Broadlink)", + 0x648D: "RM4mini ( Broadlink)", + 0x6539: "RM4Cmini ( Broadlink)", + 0x653A: "RM4mini ( Broadlink)", + 0x6026: "RM4pro ( Broadlink)", + 0x6184: "RM4Cpro ( Broadlink)", + 0x61A2: "RM4pro ( Broadlink)", + 0x649B: "RM4pro ( Broadlink)", + 0x653C: "RM4pro ( Broadlink)", + 0x2714: "e-Sensor ( Broadlink)", + 0x5043: "SB800TD ( Broadlink(OEM)", + 0x504E: "LB1 ( Broadlink)", + 0x606E: "SB500TD ( Broadlink(OEM)", + 0x60C7: "LB1 ( Broadlink)", + 0x60C8: "LB1 ( Broadlink)", + 0x6112: "LB1 ( Broadlink)", + 0xA4F4: "LB27R1 ( Broadlink)", + 0x2722: "S2KIT ( Broadlink)", + 0x4EAD: "HY02/HY03 ( Hysen)", + 0x4E4D: "DT360E-45/20 ( Dooya)", + 0x51E3: "BG800/BG900 ( BGElectrical)", } return name.get(deviceType, "Not Supported") @@ -202,6 +267,7 @@ def getDeviceName(deviceType): def auto_int(x): return int(x, 0) + def to_microseconds(bytes): result = [] # print bytes[0] # 0x26 = 38for IR @@ -218,6 +284,7 @@ def to_microseconds(bytes): break return result + def durations_to_broadlink(durations): result = bytearray() result.append(IR_TOKEN) @@ -232,6 +299,7 @@ def durations_to_broadlink(durations): result.append(num % 256) return result + def format_durations(data): result = '' for i in range(0, len(data)): @@ -240,24 +308,28 @@ def format_durations(data): result += ('+' if i % 2 == 0 else '-') + str(data[i]) return result + def parse_durations(str): result = [] for s in str.split(): result.append(abs(int(s))) return result + def initDevice(dtype, host, mac): dtypeTmp = dtype if dtypeTmp == '0x6539': - dtypeTmp = '0x610F' + dtypeTmp = '0x610F' _dtype = int(dtypeTmp, 0) _host = host _mac = bytearray.fromhex(mac) return broadlink.gendevice(_dtype, (_host, 80), _mac) + def GetDevicesFilePath(): return os.path.join(app.root_path, 'data', 'devices.json') + def writeXml(_file): root = ET.Element("root") doc = ET.SubElement(root, "doc") @@ -269,37 +341,37 @@ def writeXml(_file): @app.get('/', include_in_schema=False) def devices(request: Request): - return templates.TemplateResponse('index.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('index.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/generator', include_in_schema=False) def generator(request: Request): - return templates.TemplateResponse('generator.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('generator.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/livolo', include_in_schema=False) def livolo(request: Request): - return templates.TemplateResponse('livolo.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('livolo.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/energenie', include_in_schema=False) def energenie(request: Request): - return templates.TemplateResponse('energenie.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('energenie.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/repeats', include_in_schema=False) def repeats(request: Request): - return templates.TemplateResponse('repeats.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('repeats.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/convert', include_in_schema=False) def convert(request: Request): - return templates.TemplateResponse('convert.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('convert.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/about', include_in_schema=False) def about(request: Request): - return templates.TemplateResponse('about.html', context={'request': request,'analytics':analytics_code, 'version': GetVersionFromFle()}) + return templates.TemplateResponse('about.html', context={'request': request, 'analytics': analytics_code, 'version': GetVersionFromFle()}) @app.get('/temperature', tags=["Commands"], summary="Read Temperature") @@ -315,7 +387,7 @@ def temperature(request: Request, mac: str = "", host: str = "", type: str = "") @app.get('/ir/learn', tags=["Commands"], summary="Learn IR code") -def learnir(request: Request, mac: str = "", host: str = "", type: str = "", command: str =""): +def learnir(request: Request, mac: str = "", host: str = "", type: str = "", command: str = ""): logger.info("Learning IR Code for device: " + host) dev = initDevice(type, host, mac) dev.auth() @@ -338,8 +410,10 @@ def learnir(request: Request, mac: str = "", host: str = "", type: str = "", com return JSONResponse('{"data":"' + learned + '","success":1,"message":"IR Data Received"}') # Send IR/RF + + @app.get('/command/send', tags=["Commands"], summary="Send IR/RF Command") -def command(request: Request, mac: str = "", host: str = "", type: str = "", command: str =""): +def command(request: Request, mac: str = "", host: str = "", type: str = "", command: str = ""): logger.info("Sending Command (IR/RF) using device: " + host) dev = initDevice(type, host, mac) logger.info("Sending command: " + command) @@ -355,15 +429,15 @@ def command(request: Request, mac: str = "", host: str = "", type: str = "", com # Learn RF @app.get('/rf/learn', include_in_schema=False) -def sweep(request: Request, mac: str = "", host: str = "", type: str = "", command: str =""): +def sweep(request: Request, mac: str = "", host: str = "", type: str = "", command: str = ""): global _continu_to_sweep global _rf_sweep_message global _rf_sweep_status _continu_to_sweep = False _rf_sweep_message = '' _rf_sweep_status = False - logger.info("Device:" + host + " entering RF learning mode" ) - dev = initDevice(type, host,mac) + logger.info("Device:" + host + " entering RF learning mode") + dev = initDevice(type, host, mac) dev.auth() logger.info("Device:" + host + " is sweeping for frequency") dev.sweep_frequency() @@ -389,9 +463,10 @@ def sweep(request: Request, mac: str = "", host: str = "", type: str = "", comma _rf_sweep_message = "Click The Continue button" _rf_sweep_message = "To complete learning, single press the button you want to learn" - logger.info("To complete learning, single press the button you want to learn") + logger.info( + "To complete learning, single press the button you want to learn") _rf_sweep_status = False - logger.error("Device:" +host + " is searching for RF packets!") + logger.error("Device:" + host + " is searching for RF packets!") dev.find_rf_packet() start = time.time() while time.time() - start < TIMEOUT: @@ -417,6 +492,7 @@ def sweep(request: Request, mac: str = "", host: str = "", type: str = "", comma # Get RF Learning state + @app.get('/rf/status', include_in_schema=False) def rfstatus(request: Request): global _continu_to_sweep @@ -425,6 +501,8 @@ def rfstatus(request: Request): return JSONResponse('{"_continu_to_sweep":"' + str(_continu_to_sweep) + '","_rf_sweep_message":"' + _rf_sweep_message + '","_rf_sweep_status":"' + str(_rf_sweep_status) + '" }') # Continue with RF Scan + + @app.get('/rf/continue', include_in_schema=False) def rfcontinue(request: Request): global _continu_to_sweep @@ -469,17 +547,19 @@ def load_devices_from_file(request: Request): @app.get('/autodiscover', tags=["Devices"]) -def search_for_devices(request: Request,freshscan: str = "1"): +def search_for_devices(request: Request, freshscan: str = "1"): _devices = '' if path.exists(GetDevicesFilePath()) and freshscan != "1": return load_devices_from_file(request) else: logger.info("Searcing for devices...") _devices = '[' - devices = broadlink.discover(timeout=5, local_ip_address=local_ip_address, discover_ip_address="255.255.255.255") + devices = broadlink.discover( + timeout=5, local_ip_address=local_ip_address, discover_ip_address="255.255.255.255") for device in devices: if device.auth(): - logger.info("New device detected: " + getDeviceName(device.devtype) + " (ip: " + device.host[0] + ", mac: " + ''.join(format(x, '02x') for x in device.mac) + ")") + logger.info("New device detected: " + getDeviceName(device.devtype) + " (ip: " + + device.host[0] + ", mac: " + ''.join(format(x, '02x') for x in device.mac) + ")") _devices = _devices + '{"name":"' + \ getDeviceName(device.devtype) + '",' _devices = _devices + '"type":"' + \ @@ -488,7 +568,7 @@ def search_for_devices(request: Request,freshscan: str = "1"): _devices = _devices + '"mac":"' + \ ''.join(format(x, '02x') for x in device.mac) + '"},' - if len(_devices)==1: + if len(_devices) == 1: _devices = _devices + ']' logger.debug("No Devices Found " + str(_devices)) else: @@ -497,32 +577,30 @@ def search_for_devices(request: Request,freshscan: str = "1"): return JSONResponse(_devices) - - @app.get('/device/ping', tags=["Devices"]) -def get_device_status(request: Request, host: str=""): +def get_device_status(request: Request, host: str = ""): try: - if host =="": + if host == "": logger.error("Host must be a valid ip or hostname") return JSONResponse('{"status":"Host must be a valid ip or hostname","success":"0"}') - p = subprocess.Popen("fping -C1 -q "+ host +" 2>&1 | grep -v '-' | wc -l", stdout=subprocess.PIPE, shell=True) + p = subprocess.Popen( + "fping -C1 -q " + host + " 2>&1 | grep -v '-' | wc -l", stdout=subprocess.PIPE, shell=True) logger.debug(host) (output, err) = p.communicate() p_status = p.wait() logger.debug(str(output)) status = re.findall('\d+', str(output))[0] - if status=="1": + if status == "1": return JSONResponse('{"status":"online","success":"1"}') else: return JSONResponse('{"status":"offline","success":"1"}') except Exception as e: - logger.error("Error pinging "+ host + " Error: " + str(e)) + logger.error("Error pinging " + host + " Error: " + str(e)) return JSONResponse('{"status":"Error pinging ' + host + '" ,"success":"0"}') # endregion API Methods - # Start Application if __name__ == '__main__': logger.info("Broadllink Manager is up and running")