Skip to content

Commit 80ce2dc

Browse files
committed
Incorporated Jacky's feedback
1 parent 248cdf1 commit 80ce2dc

File tree

4 files changed

+17
-40
lines changed

4 files changed

+17
-40
lines changed

src/replit_river/client_session.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ async def serve(self) -> None:
100100
await self._handle_messages_from_ws()
101101
except ConnectionClosed:
102102
if self._should_abort_streams_after_transport_failure():
103-
self._abort_all_streams()
103+
await self.close()
104104
if self._retry_connection_callback:
105105
self._task_manager.create_task(self._retry_connection_callback())
106106

@@ -109,12 +109,12 @@ async def serve(self) -> None:
109109
except FailedSendingMessageException:
110110
# Expected error if the connection is closed.
111111
if self._should_abort_streams_after_transport_failure():
112-
self._abort_all_streams()
112+
await self.close()
113113
logger.debug(
114114
"FailedSendingMessageException while serving", exc_info=True
115115
)
116116
except Exception:
117-
self._abort_all_streams()
117+
await self.close()
118118
logger.exception("caught exception at message iterator")
119119
except ExceptionGroup as eg:
120120
_, unhandled = eg.split(lambda e: isinstance(e, ConnectionClosed))

src/replit_river/server_session.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ async def serve(self) -> None:
8787
try:
8888
await self._handle_messages_from_ws(tg)
8989
except ConnectionClosed:
90-
if self._should_abort_streams_after_transport_failure():
91-
self._abort_all_streams()
9290
if self._retry_connection_callback:
9391
self._task_manager.create_task(
9492
self._retry_connection_callback()
@@ -98,13 +96,10 @@ async def serve(self) -> None:
9896
logger.debug("ConnectionClosed while serving", exc_info=True)
9997
except FailedSendingMessageException:
10098
# Expected error if the connection is closed.
101-
if self._should_abort_streams_after_transport_failure():
102-
self._abort_all_streams()
10399
logger.debug(
104100
"FailedSendingMessageException while serving", exc_info=True
105101
)
106102
except Exception:
107-
self._abort_all_streams()
108103
logger.exception("caught exception at message iterator")
109104
except ExceptionGroup as eg:
110105
_, unhandled = eg.split(lambda e: isinstance(e, ConnectionClosed))

src/replit_river/server_transport.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,18 @@ async def _get_or_create_session(
153153
close_session_callback=self._delete_session,
154154
)
155155
else:
156-
if not await old_session.is_session_open():
157-
logger.info(
158-
'Session "%s" is not active, creating replacement '
159-
"session %s instead of reusing",
160-
to_id,
161-
session_id,
162-
)
163-
new_session = ServerSession(
164-
transport_id,
165-
to_id,
166-
session_id,
167-
websocket,
168-
self._transport_options,
169-
self._handlers,
170-
close_session_callback=self._delete_session,
171-
)
172-
else:
173-
# If the instance id is the same, we reuse the session and assign
174-
# a new websocket to it.
175-
logger.debug(
176-
'Reuse old session with "%s" using new ws: %s',
177-
to_id,
178-
websocket.id,
179-
)
180-
try:
181-
await old_session.replace_with_new_websocket(websocket)
182-
new_session = old_session
183-
except FailedSendingMessageException as e:
184-
raise e
156+
# If the instance id is the same, we reuse the session and assign
157+
# a new websocket to it.
158+
logger.debug(
159+
'Reuse old session with "%s" using new ws: %s',
160+
to_id,
161+
websocket.id,
162+
)
163+
try:
164+
await old_session.replace_with_new_websocket(websocket)
165+
new_session = old_session
166+
except FailedSendingMessageException as e:
167+
raise e
185168

186169
self._sessions[new_session._to_id] = new_session
187170

@@ -328,6 +311,5 @@ async def _establish_handshake(
328311

329312
async def _delete_session(self, session: Session) -> None:
330313
async with self._session_lock:
331-
existing_session = self._sessions.get(session._to_id)
332-
if existing_session is session:
314+
if session._to_id in self._sessions:
333315
del self._sessions[session._to_id]

src/replit_river/session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ async def close_websocket(
287287
self._task_manager.create_task(self._retry_connection_callback())
288288

289289
def _should_abort_streams_after_transport_failure(self) -> bool:
290-
return self._retry_connection_callback is None
290+
return not self._transport_options.transparent_reconnect
291291

292292
def _abort_all_streams(self) -> None:
293293
"""Close all active stream channels, notifying any waiting consumers."""

0 commit comments

Comments
 (0)