-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Background tasks for MQTT #120
base: main
Are you sure you want to change the base?
Conversation
…hed messages to MQTT
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughThe changes introduced in this pull request primarily enhance the handling of MQTT messages and implement asynchronous operations across the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant MQTTQueue
participant MongoDB
Client->>Server: Request to create aggregate
Server->>MQTTQueue: Put new aggregate message
alt Queue full
MQTTQueue-->>Server: Raise QueueFull exception
Server->>MongoDB: Attempt to save metadata
alt Save successful
Server-->>Client: Success response
else Save failed
MongoDB-->>Server: Raise exception
Server->>S3: Delete object
Server-->>Client: HTTP 500 error
end
else Queue not full
Server-->>Client: Success response
end
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (2)
aggrec/settings.py (1)
28-28
: LGTM! Consider adding field documentation.The addition of
queue_size
aligns well with the PR's objective of implementing background MQTT message processing.Consider adding documentation to explain the field's purpose and implications:
- queue_size: int = Field(default=1024) + queue_size: int = Field( + default=1024, + description="Maximum number of MQTT messages that can be queued for processing. When queue is full, new messages will be dropped." + )aggrec/server.py (1)
88-111
: Enhance exception handling inmqtt_publisher
While
MqttError
andCancelledError
are handled, consider catching other potential exceptions to prevent the task from terminating unexpectedly. Including a general exception handler will improve the robustness of the MQTT publisher task.Apply this diff to add a general exception handler:
except MqttError as exc: _logger.error("MQTT error: %s", str(exc)) +except Exception as exc: + _logger.exception("Unhandled exception in MQTT publisher: %s", str(exc)) except asyncio.exceptions.CancelledError: _logger.debug("MQTT publish task cancelled") return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
aggrec/aggregates.py
(2 hunks)aggrec/server.py
(5 hunks)aggrec/settings.py
(1 hunks)
🔇 Additional comments (4)
aggrec/server.py (3)
35-35
: Confirm the use of lifespan
parameter in super().__init__
The inclusion of lifespan=self.lifespan
in the super().__init__
call modifies the application lifecycle management. Ensure that the lifespan
method is correctly defined and that this change is compatible with FastAPI's expected usage to prevent unexpected behavior.
51-51
: Verify MQTT queue size configuration
The mqtt_new_aggregate_messages
queue is initialized with a maximum size defined by self.settings.mqtt.queue_size
. Confirm that this queue size is adequately configured to handle the expected message load without causing QueueFull
exceptions, and adjust if necessary.
152-153
: Initialize application with updated settings handling
The AggrecServer
is now instantiated directly with Settings()
without using a factory method. Verify that any previous configurations reliant on the factory method are appropriately adapted to this new initialization approach.
aggrec/aggregates.py (1)
1-1
: Addition of import asyncio
module is appropriate
The asyncio
module is imported to support the asynchronous operations introduced in this file. This is necessary and correctly implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
aggrec/server.py (1)
53-63
: Consider adding connection timeout and error handlingWhile the MongoDB connection logic is sound, it lacks timeout configuration and error handling for connection failures.
def connect_mongodb(self): if mongodb_host := str(self.settings.mongodb.server): params = {"host": mongodb_host} + params["serverSelectionTimeoutMS"] = 5000 # 5 second timeout if "host" in params and params["host"].startswith("mongomock://"): import mongomock params["host"] = params["host"].replace("mongomock://", "mongodb://") params["mongo_client_class"] = mongomock.MongoClient logger.info("Connecting to MongoDB %s", params) - mongoengine.connect(**params, tz_aware=True) + try: + mongoengine.connect(**params, tz_aware=True) + logger.info("MongoDB connected") + except mongoengine.connection.MongoEngineConnectionError as e: + logger.error("Failed to connect to MongoDB: %s", str(e)) + raise - logger.info("MongoDB connected")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
aggrec/aggregates.py
(3 hunks)aggrec/server.py
(5 hunks)
🔇 Additional comments (6)
aggrec/server.py (4)
2-4
: LGTM: Import changes align with new async functionality
The new imports support the async MQTT handling and error management. The tracer setup is appropriately placed at the module level.
Also applies to: 11-11, 28-29
112-125
: LGTM: Well-implemented lifecycle management
The lifespan implementation properly manages application lifecycle, including graceful startup and shutdown of background tasks.
153-154
: LGTM: Simplified server initialization
The direct initialization with settings is cleaner than the previous factory method approach.
51-51
: Verify queue size configuration in settings
The queue size is obtained from settings.mqtt.queue_size
. Ensure this configuration exists and has appropriate bounds.
✅ Verification successful
Queue size configuration is properly defined with default value
The queue_size
configuration is correctly defined in MqttSettings
class with a default value of 1024 using Pydantic's Field
. The configuration is properly typed as int
and uses Pydantic's validation system.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for queue_size definition in settings
rg -A 5 "queue_size" --type python
# Search for any validation on queue_size
ast-grep --pattern 'class MqttSettings:
$$$
queue_size: $_
$$$'
Length of output: 137
Script:
#!/bin/bash
# Search for queue_size definition in settings using ripgrep
rg -l "queue_size"
# Search for MqttSettings class definition
rg -A 10 "class MqttSettings"
# Try finding settings files
fd "settings.py|config.py"
Length of output: 736
aggrec/aggregates.py (2)
46-49
: LGTM! Well-structured metric for monitoring queue health
The new OpenTelemetry counter follows best practices with a clear name and description, providing valuable insights into queue overflow incidents.
346-351
:
Critical: Implement robust message handling to prevent data loss
The current implementation drops messages when the queue is full, which could lead to data inconsistency. While tracking dropped messages is good, preventing message loss should be prioritized given the PR's objective of improving robustness.
Consider these approaches in order of preference:
- Implement backpressure mechanism:
- request.app.mqtt_new_aggregate_messages.put_nowait(
- json.dumps(get_new_aggregate_event_message(metadata, request.app.settings))
- )
-except asyncio.QueueFull:
- aggregates_mqtt_queue_drops.add(1)
- logger.warning("MQTT queue full, message dropped")
+ message = json.dumps(get_new_aggregate_event_message(metadata, request.app.settings))
+ try:
+ # Try non-blocking put first
+ request.app.mqtt_new_aggregate_messages.put_nowait(message)
+ except asyncio.QueueFull:
+ # If queue is full, wait with timeout
+ try:
+ await asyncio.wait_for(
+ request.app.mqtt_new_aggregate_messages.put(message),
+ timeout=2.0 # Adjust timeout as needed
+ )
+ except asyncio.TimeoutError:
+ aggregates_mqtt_queue_drops.add(1)
+ logger.error("MQTT queue full, message dropped after timeout")
+ # Consider raising HTTP 503 Service Unavailable here
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Service temporarily unable to handle request"
+ )
- Use blocking put with timeout (simpler alternative):
- request.app.mqtt_new_aggregate_messages.put_nowait(
- json.dumps(get_new_aggregate_event_message(metadata, request.app.settings))
- )
-except asyncio.QueueFull:
- aggregates_mqtt_queue_drops.add(1)
- logger.warning("MQTT queue full, message dropped")
+ try:
+ await asyncio.wait_for(
+ request.app.mqtt_new_aggregate_messages.put(
+ json.dumps(get_new_aggregate_event_message(metadata, request.app.settings))
+ ),
+ timeout=2.0
+ )
+ except asyncio.TimeoutError:
+ aggregates_mqtt_queue_drops.add(1)
+ logger.error("MQTT queue full, message dropped after timeout")
+ raise HTTPException(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ detail="Service temporarily unable to handle request"
+ )
Both approaches:
- Prevent immediate message drops
- Provide time for queue to process messages
- Signal upstream systems when service is overwhelmed
- Allow for monitoring via the existing counter
Enqueue all MQTT message to queue and process in the background (log error in case the queue is full). Also start up the application as part of the lifespan handler.
This creates a single MQTT client for the application, no more reconnected at every request.
Summary by CodeRabbit
New Features
queue_size
for MQTT settings, enhancing message processing capabilities.Bug Fixes
Refactor