Skip to content

Conversation

@gwbischof
Copy link
Contributor

@gwbischof gwbischof commented Nov 7, 2025

Fixes #1198
CI failures don't look related to this PR.

Copy link
Member

@danielballan danielballan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good coverage!

@gwbischof gwbischof marked this pull request as ready for review November 12, 2025 18:42
if data is None:
self.stream_closed.process(self)
self._disconnect()
for attempt in self._websocket_retry_context():
Copy link
Member

@danielballan danielballan Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see another structural issue here. We're using a single "retry context" for the entire lifecycle of the subscription. Imagine that we run for days and get disconnected once a day. We want a fresh "retry context" each time we get disconnected.

There are three loops:

  1. Loop forever (until self._disconnect_event.is_set()) where "disconnect" here refers to the Subscription, not to a particular websocket.
  2. Loop over retries when disconnected, and give up if retries are exhausted.
  3. Loop over polling recv() calls.

Here is a sketch that introduces one new method, _run for the outer loop.

def _run(self):
    "This runs once for the lifecycle of the Subscription."
    while not self._disconnect_event.is_set():
        self._connect()
        try:
            self._receive()
        except (websockets.exceptions.ConnectionClosedError, OSError):
            logger.debug("Disconnected! Will attempt to reconnect")
            continue  # reconnect

def _receive(self):
    "Receive and process websocket messages."
    while not self._disconnect_event.is_set():
        try:
            data = self._websocket.recv(timeout=RECEIVE_TIMEOUT)
        except (TimeoutError, anyio.EndOfStream):
            continue
        ...

def _connect(self):
    for attempt in stamina.retry_context(
            on=(
                websockets.exceptions.ConnectionClosedError,
                OSError,
            ),
            ...
    ):
        with attempt:
            ...
    else:
         logger.warning("exhausted attempts...")
         # This will break the _run loop.
         self._disconnect()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as setting the _connect retries to unlimited?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The difference is, once you get a successful connection, the retry counter restarts fresh the next time the connection drops. But if you use up your N retries with no success, it (correctly) gives up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, so for something like the end-of-run-consumer we set the retries to unlimited? But for a typical client it retries N times and gives up if it doesn't connect? (And if its able to connect the number of retries get reset)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my reading, I am appreciating that unlimited retries is basically never the right solution. If something is persistently down, eventually the client should give up, and manual recovery becomes a feature rather than a bug. For a passive service like the end-of-run consumer, the retries might be generous—more generous than a client running in an interactive session like a Jupyter notebook—but still finite.

And if its able to connect the number of retries get reset

Yes. The feature of the two loops (well, three loops if you could the recv loop) is riding out an unlimited number of dropped-connection events, but a limited number of re-connection attempts per dropped-connection event.

A service running for a very long time may see an unlimited number of disconnection events, and that's fine. But it should not hammer a distressed service indefinitely before giving up.

@pytest.fixture(autouse=True)
def fast_retries(monkeypatch):
"""Set retry attempts to 2 for faster tests (down from default 10)."""
monkeypatch.setattr("tiled.client.stream.TILED_RETRY_ATTEMPTS", 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice :)

self._disconnect_event = threading.Event()
self._thread = None
self._last_received_sequence = None # Track last sequence for reconnection
self._connected = False # Track connection state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the general rule of "state is the root of all evil" I wonder if we can avoid adding this state. I think that the control flow ensures that self._connect(...) only gets called (1) when self._run(...) starts and (2) after a connection error is raised. So the if self._connected: return cut-out is protecting a codepath that, in fact, you can never hit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement automatic reconnection for WS

2 participants