11from abc import abstractmethod
2- from ctypes import LittleEndianStructure , c_ubyte , c_byte , c_short , c_int
2+ from ctypes import (BigEndianStructure ,
3+ LittleEndianStructure ,
4+ c_ubyte ,
5+ c_byte ,
6+ c_short ,
7+ c_int ,
8+ c_char ,
9+ c_long )
310from datetime import datetime
411from typing import TypeAlias
512
@@ -14,7 +21,7 @@ class InvalidByteData(Exception):
1421# LittleEndianStructure so be mindful that this class is intended to be
1522# abstract. It is only exposed to the user to use as a type hint for
1623# functions that accept any Atmotube packet.
17- class AtmotubePacket (LittleEndianStructure ):
24+ class AtmotubeGATTPacket (LittleEndianStructure ):
1825 """
1926 Abstract base class for Atmotube data packets.
2027 """
@@ -49,7 +56,7 @@ def _process_bytes(self) -> None:
4956 ...
5057
5158
52- class AtmotubeProStatus (AtmotubePacket ):
59+ class AtmotubeProStatus (AtmotubeGATTPacket ):
5360 """
5461 Represents the status packet from an Atmotube device.
5562 """
@@ -77,7 +84,7 @@ def _process_bytes(self) -> None:
7784 self .battery_level = self ._battery
7885
7986 def __str__ (self ) -> str :
80- return (f"StatusPacket (date_time={ str (self .date_time )} , "
87+ return (f"AtmotubeProStatus (date_time={ str (self .date_time )} , "
8188 f"pm_sensor_status={ self .pm_sensor_status } , "
8289 f"error_flag={ self .error_flag } , "
8390 f"bonding_flag={ self .bonding_flag } , "
@@ -100,7 +107,7 @@ def __eq__(self, other: object) -> bool:
100107 return False
101108
102109
103- class AtmotubeProSPS30 (AtmotubePacket ):
110+ class AtmotubeProSPS30 (AtmotubeGATTPacket ):
104111 """
105112 Represents the SPS30 particulate matter sensor data packet from an
106113 Atmotube device.
@@ -112,7 +119,7 @@ class AtmotubeProSPS30(AtmotubePacket):
112119 ('_pm4' , c_byte * 3 ),
113120 ]
114121
115- _pack_ : int = 1
122+ _pack_ : bool = True
116123 _layout_ : str = "ms"
117124 _byte_size_ : int = 12
118125
@@ -127,7 +134,7 @@ def _process_bytes(self) -> None:
127134 self .pm4 = self .pm_from_bytes (self ._pm4 )
128135
129136 def __str__ (self ) -> str :
130- return (f"SPS30Packet (date_time={ str (self .date_time )} , "
137+ return (f"AtmotubeProSPS30 (date_time={ str (self .date_time )} , "
131138 f"pm1={ self .pm1 } µg/m³, pm2_5={ self .pm2_5 } µg/m³, "
132139 f"pm10={ self .pm10 } µg/m³, pm4={ self .pm4 } µg/m³)" )
133140
@@ -142,7 +149,7 @@ def __eq__(self, other: object) -> bool:
142149 return False
143150
144151
145- class AtmotubeProBME280 (AtmotubePacket ):
152+ class AtmotubeProBME280 (AtmotubeGATTPacket ):
146153 """
147154 Represents the BME280 environmental sensor data packet from an
148155 Atmotube device.
@@ -154,7 +161,7 @@ class AtmotubeProBME280(AtmotubePacket):
154161 ('_T_dec' , c_short ),
155162 ]
156163
157- _pack_ : int = 1
164+ _pack_ : bool = True
158165 _layout_ : str = "ms"
159166 _byte_size_ : int = 8
160167
@@ -164,7 +171,7 @@ def _process_bytes(self) -> None:
164171 self .pressure = self ._P / 100.0 if self ._P > 0 else None
165172
166173 def __str__ (self ) -> str :
167- return (f"BME280Packet (date_time={ str (self .date_time )} , "
174+ return (f"AtmotubeProBME280 (date_time={ str (self .date_time )} , "
168175 f"humidity={ self .humidity } %, "
169176 f"temperature={ self .temperature } °C, "
170177 f"pressure={ self .pressure } mbar)" )
@@ -179,7 +186,7 @@ def __eq__(self, other: object) -> bool:
179186 return False
180187
181188
182- class AtmotubeProSGPC3 (AtmotubePacket ):
189+ class AtmotubeProSGPC3 (AtmotubeGATTPacket ):
183190 """
184191 Represents the SGPC3 air quality sensor data packet from an
185192 Atmotube device.
@@ -188,15 +195,15 @@ class AtmotubeProSGPC3(AtmotubePacket):
188195 ('_tvoc' , c_short )
189196 ]
190197
191- _pack_ : int = 1
198+ _pack_ : bool = True
192199 _layout_ : str = "ms"
193200 _byte_size_ : int = 4
194201
195202 def _process_bytes (self ) -> None :
196203 self .tvoc = self ._tvoc / 1000.0 if self ._tvoc > 0 else None
197204
198205 def __str__ (self ) -> str :
199- return (f"SGPC3Packet (date_time={ str (self .date_time )} , "
206+ return (f"AtmotubeProSGPC3 (date_time={ str (self .date_time )} , "
200207 f"tvoc={ self .tvoc } ppb)" )
201208
202209 def __eq__ (self , other : object ) -> bool :
@@ -205,3 +212,153 @@ def __eq__(self, other: object) -> bool:
205212 self .tvoc == other .tvoc ))
206213 else :
207214 return False
215+
216+
217+ class AtmotubeBLEPacket (BigEndianStructure ):
218+ """
219+ Abstract base class for Atmotube data packets.
220+ """
221+ _byte_size_ : int = 0 # To be defined in subclasses
222+
223+ def __new__ (cls , data : bytearray , date_time : datetime | None = None ):
224+ if len (data ) != cls ._byte_size_ :
225+ raise InvalidByteData (f"Expected { cls ._byte_size_ } bytes, "
226+ f"got { len (data )} bytes" )
227+ return cls .from_buffer_copy (data )
228+
229+ def __init__ (self , data : bytearray , date_time : datetime | None = None ):
230+
231+ if date_time is None :
232+ date_time = datetime .now ()
233+ self .date_time = date_time
234+ self ._process_bytes ()
235+
236+ def __repr__ (self ) -> str :
237+ return str (self )
238+
239+ @abstractmethod
240+ def __eq__ (self , other : object ) -> bool :
241+ ...
242+
243+ @abstractmethod
244+ def __str__ (self ) -> str :
245+ ...
246+
247+ @abstractmethod
248+ def _process_bytes (self ) -> None :
249+ ...
250+
251+
252+ class AtmotubeProBLEAdvertising (AtmotubeBLEPacket ):
253+ """
254+ Represents the BLE advertising packet from an Atmotube PRO device.
255+ """
256+ #>hhbblbb
257+ _fields_ : FieldList = [
258+ ('_tvoc' , c_short , 16 ),
259+ ('_devid' , c_short , 16 ),
260+ ('_rh' , c_byte , 8 ),
261+ ('_T' , c_byte , 8 ),
262+ ('_P' , c_int , 32 ),
263+ ("_bit_8" , c_ubyte , 1 ),
264+ ("_pre_heating" , c_ubyte , 1 ),
265+ ("_bit_6" , c_ubyte , 1 ),
266+ ("_charging_timer" , c_ubyte , 1 ),
267+ ("_charging" , c_ubyte , 1 ),
268+ ("_bonding" , c_ubyte , 1 ),
269+ ("_error" , c_ubyte , 1 ),
270+ ("_pm_sensor" , c_ubyte , 1 ),
271+ ("_battery" , c_ubyte , 8 ),
272+ ]
273+
274+ _pack_ : bool = True
275+ _layout_ : str = "ms"
276+ _byte_size_ : int = 12
277+
278+ def _process_bytes (self ) -> None :
279+ self .tvoc = self ._tvoc / 1000.0 if self ._tvoc > 0 else None
280+ self .device_id = self ._devid
281+ self .humidity = self ._rh if self ._rh > 0 else None
282+ self .temperature = self ._T
283+ self .pressure = self ._P / 100.0 if self ._P > 0 else None
284+ self .pm_sensor_status = bool (self ._pm_sensor )
285+ self .error_flag = bool (self ._error )
286+ self .bonding_flag = bool (self ._bonding )
287+ self .charging = bool (self ._charging )
288+ self .charging_timer = bool (self ._charging_timer )
289+ self .pre_heating = bool (self ._pre_heating )
290+ self .battery_level = self ._battery
291+
292+ def __str__ (self ) -> str :
293+ return (f"AtmotubeProBLEAdvertising(date_time={ str (self .date_time )} , "
294+ f"device_id={ self .device_id } , "
295+ f"tvoc={ self .tvoc } ppb, "
296+ f"humidity={ self .humidity } %, "
297+ f"temperature={ self .temperature } °C, "
298+ f"pressure={ self .pressure } mbar, "
299+ f"pm_sensor_status={ self .pm_sensor_status } , "
300+ f"error_flag={ self .error_flag } , "
301+ f"bonding_flag={ self .bonding_flag } , "
302+ f"charging={ self .charging } , "
303+ f"charging_timer={ self .charging_timer } , "
304+ f"pre_heating={ self .pre_heating } , "
305+ f"battery_level={ self .battery_level } %)" )
306+
307+ def __eq__ (self , other : object ) -> bool :
308+ if isinstance (other , AtmotubeProBLEAdvertising ):
309+ return all ((self .date_time == other .date_time ,
310+ self .device_id == other .device_id ,
311+ self .tvoc == other .tvoc ,
312+ self .humidity == other .humidity ,
313+ self .temperature == other .temperature ,
314+ self .pressure == other .pressure ,
315+ self .pm_sensor_status == other .pm_sensor_status ,
316+ self .error_flag == other .error_flag ,
317+ self .bonding_flag == other .bonding_flag ,
318+ self .charging == other .charging ,
319+ self .charging_timer == other .charging_timer ,
320+ self .pre_heating == other .pre_heating ,
321+ self .battery_level == other .battery_level ))
322+ else :
323+ return False
324+
325+
326+ class AtmotubeProBLEScanResponse (AtmotubeBLEPacket ):
327+ """
328+ Represents the BLE scan response packet from an Atmotube PRO device.
329+ """
330+ _fields_ : FieldList = [
331+ ('_pm1' , c_short ),
332+ ('_pm2_5' , c_short ),
333+ ('_pm10' , c_short ),
334+ ('_fw_maj' , c_ubyte ),
335+ ('_fw_min' , c_ubyte ),
336+ ('_fw_bld' , c_ubyte ),
337+ ]
338+
339+ _pack_ : bool = True
340+ _layout_ : str = "ms"
341+ _byte_size_ : int = 9
342+
343+ def _process_bytes (self ) -> None :
344+ self .pm1 = self ._pm1 if self ._pm1 > 0 else None
345+ self .pm2_5 = self ._pm2_5 if self ._pm2_5 > 0 else None
346+ self .pm10 = self ._pm10 if self ._pm10 > 0 else None
347+ self .firmware_version = f"{ self ._fw_maj } .{ self ._fw_min } .{ self ._fw_bld } "
348+
349+ def __str__ (self ) -> str :
350+ return (f"AtmotubeProBLEScanResponse(date_time={ str (self .date_time )} , "
351+ f"pm1={ self .pm1 } µg/m³, "
352+ f"pm2_5={ self .pm2_5 } µg/m³, "
353+ f"pm10={ self .pm10 } µg/m³, "
354+ f"firmware_version={ self .firmware_version } )" )
355+
356+ def __eq__ (self , other : object ) -> bool :
357+ if isinstance (other , AtmotubeProBLEScanResponse ):
358+ return all ((self .date_time == other .date_time ,
359+ self .pm1 == other .pm1 ,
360+ self .pm2_5 == other .pm2_5 ,
361+ self .pm10 == other .pm10 ,
362+ self .firmware_version == other .firmware_version ))
363+ else :
364+ return False
0 commit comments