Skip to content

Commit

Permalink
repeat subscribe when reconnected to MQTT broker; added comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannes.Hennecke committed Nov 16, 2023
1 parent 2bb2f94 commit f62bbfc
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 46 deletions.
12 changes: 12 additions & 0 deletions mqtt_io/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ class StreamDataSentEvent(Event):
stream_name: str
data: bytes

@dataclass
class StreamDataSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""

@dataclass
class DigitalSubscribeEvent(Event):
"""
Trigger MQTT subscribe
"""


class EventBus:
"""
Expand Down
86 changes: 85 additions & 1 deletion mqtt_io/mqtt/asyncio_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,25 @@


def _map_exception(func: Func) -> Func:
"""
Creates a decorator that wraps a function and maps any raised `MqttError` exception to a `MQTTException`.
:param func: The function to be wrapped.
:type func: Func
:return: The wrapped function.
:rtype: Func
"""
@wraps(func)
async def inner(*args: Any, **kwargs: Any) -> Any:
"""
Decorator for asynchronous functions that catches `MqttError` exceptions and raises `MQTTException` instead.
Parameters:
func (Callable): The function to be decorated.
Returns:
Callable: The decorated function.
"""
try:
await func(*args, **kwargs)
except MqttError as exc:
Expand All @@ -42,6 +59,15 @@ class MQTTClient(AbstractMQTTClient):
"""

def __init__(self, options: MQTTClientOptions):
"""
Initializes a new instance of the MQTTClient class.
Args:
options (MQTTClientOptions): The options for the MQTT client.
Returns:
None
"""
super().__init__(options)
protocol_map = {
MQTTProtocol.V31: paho.MQTTv31,
Expand All @@ -66,7 +92,7 @@ def __init__(self, options: MQTTClientOptions):
username=options.username,
password=options.password,
client_id=options.client_id,
# keepalive=options.keepalive, # This isn't implemented yet on 0.8.1
keepalive=options.keepalive,
tls_context=tls_context,
protocol=protocol_map[options.protocol],
will=will,
Expand All @@ -76,28 +102,80 @@ def __init__(self, options: MQTTClientOptions):

@_map_exception
async def connect(self, timeout: int = 10) -> None:
"""
Connects to the client asynchronously.
Args:
timeout (int): The timeout value in seconds (default: 10).
Returns:
None: This function does not return anything.
"""
await self._client.connect(timeout=timeout)

@_map_exception
async def disconnect(self) -> None:
"""
This function is an asynchronous method that handles the disconnection of the client.
Parameters:
self: The current instance of the class.
Returns:
None
"""
try:
await self._client.disconnect()
except TimeoutError:
await self._client.force_disconnect()

@_map_exception
async def subscribe(self, topics: List[Tuple[str, int]]) -> None:
"""
Subscribe to the given list of topics.
Args:
topics (List[Tuple[str, int]]): A list of tuples representing the topics to subscribe to.
Each tuple should contain a string representing the topic name and an integer representing the QoS level.
Returns:
None: This function does not return anything.
Raises:
Exception: If there is an error while subscribing to the topics.
"""
await self._client.subscribe(topics)

@_map_exception
async def publish(self, msg: MQTTMessageSend) -> None:
"""
Publishes an MQTT message to the specified topic.
Args:
msg (MQTTMessageSend): The MQTT message to be published.
Returns:
None: This function does not return anything.
"""
await self._client.publish(
topic=msg.topic, payload=msg.payload, qos=msg.qos, retain=msg.retain
)

def _on_message(
self, client: paho.Client, userdata: Any, msg: paho.MQTTMessage
) -> None:
"""
Callback function that is called when a message is received through MQTT.
Args:
client (paho.Client): The MQTT client instance.
userdata (Any): The user data associated with the client.
msg (paho.MQTTMessage): The received MQTT message.
Returns:
None: This function does not return anything.
"""
if self._message_queue is None:
_LOG.warning("Discarding MQTT message because queue is not initialised")
return
Expand All @@ -111,6 +189,12 @@ def _on_message(

@property
def message_queue(self) -> "asyncio.Queue[MQTTMessage]":
"""
Returns the message queue for receiving MQTT messages.
:return: The message queue for receiving MQTT messages.
:rtype: asyncio.Queue[MQTTMessage]
"""
if self._message_queue is None:
self._message_queue = asyncio.Queue(self._options.message_queue_size)
# pylint: disable=protected-access
Expand Down
Loading

0 comments on commit f62bbfc

Please sign in to comment.