-
-
Notifications
You must be signed in to change notification settings - Fork 129
Subscriber hangs after a Redis sever disconnection #144
Description
When I subscribe to events using the Redis backend, any errors raised by the Redis library seem to be swallowed by the subscriber side of broadcaster
.
Steps to reproduce
Launch a Redis server (for example using Docker):
docker run -d --name broadcaster_redis_test -p 6379:6379 redis:7.2-alpine
Launch a message consumer:
import asyncio
import broadcaster
async def main() -> None:
async with broadcaster.Broadcast("redis://:@localhost:6379") as broadcast:
print("Subscribing")
async with broadcast.subscribe(channel="channel") as subscriber:
print("Iterating")
async for event in subscriber: # type: ignore[union-attr]
print("Received:", event)
if __name__ == "__main__":
asyncio.run(main())
Launch a message publisher:
import asyncio
import itertools
import broadcaster
async def main() -> None:
async with broadcaster.Broadcast("redis://:@localhost:6379") as broadcast:
for i in itertools.count():
print(f"Publishing {i}")
await broadcast.publish(channel="channel", message=f"message {i}")
await asyncio.sleep(1)
print(f"Published {i}")
if __name__ == "__main__":
asyncio.run(main())
Stop the Redis server:
docker stop broadcaster_redis_test
Observed behaviour
Publisher stops with a ConnectionError
:
redis.exceptions.ConnectionError: Error Multiple exceptions: [Errno 111] Connect call failed ('::1', 6379, 0, 0), [Errno 111] Connect call failed ('127.0.0.1', 6379) connecting to localhost:6379.
Consumer just hangs as if everything is ok, except no new messages are received. No exceptions reach the caller code on the consumer side.
If the server comes back online:
docker start broadcaster_redis_test
The subscriber code stays in the same state regardless.
Even if we restart the publisher code, the subscriber side never does anything else, unable to receive new messages but still not raising any exceptions.
Expected behaviour
I'd expect both sides of the communication to receive an exception when a connection error happens. Or any other error raised by the Redis library not explicitly handled by the backend.
That way if the library can't handle the network issue (trying to reconnect, etc) at least the caller code could try to do that, or handle it in some way depending on business logic.
Comments
I think the issue is caused by two similar failing points:
broadcaster/broadcaster/_base.py
Line 71 in 69cf29a
self._listener_task = asyncio.create_task(self._listener()) |
self._listener = asyncio.create_task(self._pubsub_listener()) |
The tasks created with asyncio.create_task(...)
are never observed in case they stop or fail.
Using add_done_callback
in both places I was able to pass the exception from the backend to the Broadcaster class, and then re-raise it from there.
I've got half a fix ready, but it's tricky to test it.
If everything above makes sense I can try to get it ready for a PR.
Versions used
> pip freeze | grep -i broadcaster
broadcaster==0.3.1
> python --version
Python 3.12.3