-
Notifications
You must be signed in to change notification settings - Fork 18.5k
fix: add exception handling to Redis subscription listener for resource cleanup #28542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,46 +62,72 @@ def _listen(self) -> None: | |
| """Main listener loop for processing messages.""" | ||
| pubsub = self._pubsub | ||
| assert pubsub is not None, "PubSub should not be None while starting listening." | ||
| while not self._closed.is_set(): | ||
| raw_message = self._get_message() | ||
| try: | ||
| while not self._closed.is_set(): | ||
| raw_message = self._get_message() | ||
|
|
||
| if raw_message is None: | ||
| continue | ||
| if raw_message is None: | ||
| continue | ||
|
|
||
| if raw_message.get("type") != self._get_message_type(): | ||
| continue | ||
| if raw_message.get("type") != self._get_message_type(): | ||
| continue | ||
|
|
||
| channel_field = raw_message.get("channel") | ||
| if isinstance(channel_field, bytes): | ||
| channel_name = channel_field.decode("utf-8") | ||
| elif isinstance(channel_field, str): | ||
| channel_name = channel_field | ||
| else: | ||
| channel_name = str(channel_field) | ||
| channel_field = raw_message.get("channel") | ||
| if isinstance(channel_field, bytes): | ||
| channel_name = channel_field.decode("utf-8") | ||
| elif isinstance(channel_field, str): | ||
| channel_name = channel_field | ||
| else: | ||
| channel_name = str(channel_field) | ||
|
|
||
| if channel_name != self._topic: | ||
| _logger.warning( | ||
| "Ignoring %s message from unexpected channel %s", self._get_subscription_type(), channel_name | ||
| ) | ||
| continue | ||
|
|
||
| payload_bytes: bytes | None = raw_message.get("data") | ||
| if not isinstance(payload_bytes, bytes): | ||
| _logger.error( | ||
| "Received invalid data from %s channel %s, type=%s", | ||
| self._get_subscription_type(), | ||
| self._topic, | ||
| type(payload_bytes), | ||
| ) | ||
| continue | ||
|
|
||
| if channel_name != self._topic: | ||
| self._enqueue_message(payload_bytes) | ||
| except Exception: | ||
| _logger.exception( | ||
| "Unexpected error in %s listener thread for channel %s", | ||
| self._get_subscription_type(), | ||
| self._topic, | ||
| ) | ||
| finally: | ||
| _logger.debug( | ||
| "%s listener thread stopped for channel %s", self._get_subscription_type().title(), self._topic | ||
| ) | ||
| try: | ||
| self._unsubscribe() | ||
| except Exception as e: | ||
| _logger.warning( | ||
| "Ignoring %s message from unexpected channel %s", self._get_subscription_type(), channel_name | ||
| "Error unsubscribing from %s channel %s: %s", | ||
| self._get_subscription_type(), | ||
| self._topic, | ||
| e, | ||
| ) | ||
| continue | ||
|
|
||
| payload_bytes: bytes | None = raw_message.get("data") | ||
| if not isinstance(payload_bytes, bytes): | ||
| _logger.error( | ||
| "Received invalid data from %s channel %s, type=%s", | ||
| try: | ||
| pubsub.close() | ||
| _logger.debug("%s PubSub closed for topic %s", self._get_subscription_type().title(), self._topic) | ||
| except Exception as e: | ||
| _logger.warning( | ||
| "Error closing PubSub for %s channel %s: %s", | ||
| self._get_subscription_type(), | ||
| self._topic, | ||
| type(payload_bytes), | ||
| e, | ||
| ) | ||
| continue | ||
|
|
||
| self._enqueue_message(payload_bytes) | ||
|
|
||
| _logger.debug("%s listener thread stopped for channel %s", self._get_subscription_type().title(), self._topic) | ||
| self._unsubscribe() | ||
| pubsub.close() | ||
| _logger.debug("%s PubSub closed for topic %s", self._get_subscription_type().title(), self._topic) | ||
| self._pubsub = None | ||
| finally: | ||
| self._pubsub = None | ||
|
Comment on lines
+119
to
+130
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nested try:
pubsub.close()
_logger.debug("%s PubSub closed for topic %s", self._get_subscription_type().title(), self._topic)
except Exception as e:
_logger.warning(
"Error closing PubSub for %s channel %s: %s",
self._get_subscription_type(),
self._topic,
e,
)
self._pubsub = None |
||
|
|
||
| def _enqueue_message(self, payload: bytes) -> None: | ||
| """Enqueue a message to the internal queue with dropping behavior.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent terminology in log message. This message uses "for topic %s" while all other log messages in this file use "channel" (e.g., lines 108, 115, 125: "for channel %s").
Consider changing to:
This would make the terminology consistent throughout the file.