1+ from abc import abstractmethod
12from ctypes import LittleEndianStructure , c_ubyte , c_byte , c_short , c_int
23from datetime import datetime
4+ from typing import TypeAlias
5+
6+ FieldList : TypeAlias = list [tuple ]
37
48
59class InvalidByteData (Exception ):
610 pass
711
812
13+ # I have no idea how to make this actually an abstract sub-class of
14+ # LittleEndianStructure so be mindful that this class is intended to be
15+ # abstract. It is only exposed to the user to use as a type hint for
16+ # functions that accept any Atmotube packet.
917class AtmoTubePacket (LittleEndianStructure ):
10- _byte_size_ = 0 # To be defined in subclasses
18+ """
19+ Abstract base class for Atmotube data packets.
20+ """
21+ _byte_size_ : int = 0 # To be defined in subclasses
1122
12- def __new__ (cls , data , date_time = None ):
23+ def __new__ (cls , data : bytearray , date_time : datetime | None = None ):
1324 if len (data ) != cls ._byte_size_ :
1425 raise InvalidByteData (f"Expected { cls ._byte_size_ } bytes, "
1526 f"got { len (data )} bytes" )
1627 return cls .from_buffer_copy (data )
1728
18- def __init__ (self , data , date_time = None ):
29+ def __init__ (self , data : bytearray , date_time : datetime | None = None ):
30+
1931 if date_time is None :
2032 date_time = datetime .now ()
2133 self .date_time = date_time
2234 self ._process_bytes ()
2335
24- def __repr__ (self ):
36+ def __repr__ (self ) -> str :
2537 return str (self )
2638
27- def _process_bytes (self ):
39+ @abstractmethod
40+ def __str__ (self ) -> str :
41+ ...
42+
43+ @abstractmethod
44+ def _process_bytes (self ) -> None :
2845 ...
2946
3047
3148class StatusPacket (AtmoTubePacket ):
32- _fields_ = [
33- ("_pm_sensor" , c_ubyte , 1 ),
34- ("_error" , c_ubyte , 1 ),
35- ("_bonding" , c_ubyte , 1 ),
36- ("_charging" , c_ubyte , 1 ),
37- ("_charging_timer" , c_ubyte , 1 ),
38- ("_bit_6" , c_ubyte , 1 ),
39- ("_pre_heating" , c_ubyte , 1 ),
40- ("_bit_8" , c_ubyte , 1 ),
41- ("_battery" , c_ubyte , 8 ),
49+ """
50+ Represents the status packet from an Atmotube device.
51+ """
52+ _fields_ : FieldList = [
53+ ("_pm_sensor" , c_ubyte , 1 ),
54+ ("_error" , c_ubyte , 1 ),
55+ ("_bonding" , c_ubyte , 1 ),
56+ ("_charging" , c_ubyte , 1 ),
57+ ("_charging_timer" , c_ubyte , 1 ),
58+ ("_bit_6" , c_ubyte , 1 ),
59+ ("_pre_heating" , c_ubyte , 1 ),
60+ ("_bit_8" , c_ubyte , 1 ),
61+ ("_battery" , c_ubyte , 8 ),
4262 ]
4363
44- _byte_size_ = 2
64+ _byte_size_ : int = 2
4565
46- def _process_bytes (self ):
66+ def _process_bytes (self ) -> None :
4767 self .pm_sensor_status = bool (self ._pm_sensor )
4868 self .error_flag = bool (self ._error )
4969 self .bonding_flag = bool (self ._bonding )
@@ -52,7 +72,7 @@ def _process_bytes(self):
5272 self .pre_heating = bool (self ._pre_heating )
5373 self .battery_level = self ._battery
5474
55- def __str__ (self ):
75+ def __str__ (self ) -> str :
5676 return (f"StatusPacket(date_time={ str (self .date_time )} , "
5777 f"pm_sensor_status={ self .pm_sensor_status } , "
5878 f"error_flag={ self .error_flag } , "
@@ -64,69 +84,81 @@ def __str__(self):
6484
6585
6686class SPS30Packet (AtmoTubePacket ):
67- _fields_ = [
68- ('_pm1' , c_byte * 3 ),
87+ """
88+ Represents the SPS30 particulate matter sensor data packet from an
89+ Atmotube device.
90+ """
91+ _fields_ : FieldList = [
92+ ('_pm1' , c_byte * 3 ),
6993 ('_pm2_5' , c_byte * 3 ),
70- ('_pm10' , c_byte * 3 ),
71- ('_pm4' , c_byte * 3 ),
94+ ('_pm10' , c_byte * 3 ),
95+ ('_pm4' , c_byte * 3 ),
7296 ]
7397
74- _pack_ = 1
75- _layout_ = "ms"
76- _byte_size_ = 12
98+ _pack_ : int = 1
99+ _layout_ : str = "ms"
100+ _byte_size_ : int = 12
77101
78- def pm_from_bytes (self , byte_array ) :
102+ def pm_from_bytes (self , byte_array : bytearray ) -> float | None :
79103 res = int .from_bytes (byte_array , byteorder = 'little' , signed = True )
80104 return res / 100.0 if res > 0 else None
81105
82- def _process_bytes (self ):
106+ def _process_bytes (self ) -> None :
83107 self .pm1 = self .pm_from_bytes (self ._pm1 )
84108 self .pm2_5 = self .pm_from_bytes (self ._pm2_5 )
85109 self .pm10 = self .pm_from_bytes (self ._pm10 )
86110 self .pm4 = self .pm_from_bytes (self ._pm4 )
87111
88- def __str__ (self ):
112+ def __str__ (self ) -> str :
89113 return (f"SPS30Packet(date_time={ str (self .date_time )} , "
90114 f"pm1={ self .pm1 } µg/m³, pm2_5={ self .pm2_5 } µg/m³, "
91115 f"pm10={ self .pm10 } µg/m³, pm4={ self .pm4 } µg/m³)" )
92116
93117
94118class BME280Packet (AtmoTubePacket ):
95- _fields_ = [
96- ('_rh' , c_byte ),
97- ('_T' , c_byte ),
98- ('_P' , c_int ),
119+ """
120+ Represents the BME280 environmental sensor data packet from an
121+ Atmotube device.
122+ """
123+ _fields_ : FieldList = [
124+ ('_rh' , c_byte ),
125+ ('_T' , c_byte ),
126+ ('_P' , c_int ),
99127 ('_T_dec' , c_short ),
100128 ]
101129
102- _pack_ = 1
103- _layout_ = "ms"
104- _byte_size_ = 8
130+ _pack_ : int = 1
131+ _layout_ : str = "ms"
132+ _byte_size_ : int = 8
105133
106- def _process_bytes (self ):
134+ def _process_bytes (self ) -> None :
107135 self .humidity = self ._rh if self ._rh > 0 else None
108136 self .temperature = self ._T_dec / 100.0
109137 self .pressure = self ._P / 100.0 if self ._P > 0 else None
110138
111- def __str__ (self ):
139+ def __str__ (self ) -> str :
112140 return (f"BME280Packet(date_time={ str (self .date_time )} , "
113141 f"humidity={ self .humidity } %, "
114142 f"temperature={ self .temperature } °C, "
115143 f"pressure={ self .pressure } mbar)" )
116144
117145
118146class SGPC3Packet (AtmoTubePacket ):
119- _fields_ = [
147+ """
148+ Represents the SGPC3 air quality sensor data packet from an
149+ Atmotube device.
150+ """
151+ _fields_ : FieldList = [
120152 ('_tvoc' , c_short )
121153 ]
122154
123- _pack_ = 1
124- _layout_ = "ms"
125- _byte_size_ = 4
155+ _pack_ : int = 1
156+ _layout_ : str = "ms"
157+ _byte_size_ : int = 4
126158
127- def _process_bytes (self ):
159+ def _process_bytes (self ) -> None :
128160 self .tvoc = self ._tvoc / 1000.0 if self ._tvoc > 0 else None
129161
130- def __str__ (self ):
162+ def __str__ (self ) -> str :
131163 return (f"SGPC3Packet(date_time={ str (self .date_time )} , "
132164 f"tvoc={ self .tvoc } ppb)" )
0 commit comments