1212
1313import json
1414import os
15+ import queue
1516import sys
1617import threading
1718import time
@@ -39,12 +40,13 @@ def on_connect(client, userdata, flags, reason_code, properties=None):
3940 client .subscribe ("#" , qos = config .qos )
4041
4142
42- # Callback for MQTT message received event.
43- def on_message (client , userdata , message , properties = None ):
44- topic = message .topic
45- payload = message .payload .decode ()
46- state = userdata
47- if topic .endswith ("/cmd" ):
43+ # Command handler thread
44+ def command_handler ():
45+ while True :
46+ try :
47+ topic , payload , state = command_handler_queue .get ()
48+ except queue .ShutDown :
49+ break
4850 print (f"Received command on { topic } : { payload } " )
4951 # Command handling logic goes here.
5052 # For this example, only the shutdown command is handled, which will
@@ -63,22 +65,36 @@ def on_message(client, userdata, message, properties=None):
6365 ack_msg = json .dumps (
6466 {"status" : "acknowledged" , "time" : current_epoch_microseconds ()}
6567 )
66- state ["ack_msg_info" ].append (
67- client .publish (topic = rsp_topic , payload = ack_msg , qos = config .qos )
68- )
68+ rc_pub = client .publish (topic = rsp_topic , payload = ack_msg , qos = config .qos )
69+ rc_pub .wait_for_publish ()
70+ print (f"Finished command handling for: { topic } " )
71+ command_handler_queue .task_done ()
72+
73+
74+ # Callback for MQTT message received event.
75+ def on_message (client , userdata , message , properties = None ):
76+ topic = message .topic
77+ payload = message .payload .decode ()
78+ state = userdata
79+ if topic .endswith ("/cmd" ):
80+ command_handler_queue .put ((topic , payload , state ))
6981
7082
7183if config .auth_type not in ("basic" , "cert" ):
7284 raise ValueError ("auth_type must be 'basic' or 'cert'" )
7385
86+ # Initialize queue and start command handler thread
87+ command_handler_queue = queue .Queue ()
88+ command_handler_thread = threading .Thread (target = command_handler )
89+ command_handler_thread .start ()
90+
7491client = mqtt .Client (
7592 client_id = config .client_id , # Ensure client_id is set for persistent sessions.
7693 clean_session = False , # Enable persistent session.
7794 protocol = mqtt .MQTTv311 , # Use v311 unless v5 features are needed.
7895 callback_api_version = mqtt .CallbackAPIVersion .VERSION2 , # type: ignore
7996)
8097state = {
81- "ack_msg_info" : [],
8298 "shutdown_event" : threading .Event (),
8399}
84100client .user_data_set (state )
@@ -121,19 +137,14 @@ def on_message(client, userdata, message, properties=None):
121137 rc_pub .wait_for_publish ()
122138 count += 1
123139 time .sleep (config .message_delay )
124- # Drain ack_msg_info
125- while state ["ack_msg_info" ]:
126- state ["ack_msg_info" ].pop (0 ).wait_for_publish ()
127140
128141except KeyboardInterrupt :
129142 print ("\n Interrupted by user. Exiting..." )
130143
131- # Wait to process any potential /cmd messages before exit.
132- print ("Waiting 2 seconds to process possible /cmd messages..." )
133- time .sleep (2 )
134- # Drain ack_msg_info
135- while state ["ack_msg_info" ]:
136- state ["ack_msg_info" ].pop (0 ).wait_for_publish ()
144+ # Wait for the queue to drain
145+ print ("Waiting for commands to be processed..." )
146+ command_handler_queue .shutdown ()
147+ command_handler_thread .join ()
137148
138149# Tear down the client and exit.
139150client .loop_stop ()
0 commit comments