File tree Expand file tree Collapse file tree 1 file changed +21
-0
lines changed Expand file tree Collapse file tree 1 file changed +21
-0
lines changed Original file line number Diff line number Diff line change @@ -158,7 +158,28 @@ async def _record_latency(self, websocket):
158158 self .timestamp = datetime .now ()
159159 self .subscription_ping_latency = websocket .latency
160160
161+ async def monitor_heads_received (self , websocket ):
162+ """Monitors the heads received (messages) from the websocket.
163+ If no heads have been received in while the websocket closed
164+ so a new connection can be created"""
165+ while True :
166+ idle_timeout = 60
167+ prev_heads_received_count = self .heads_received
168+ await asyncio .sleep (idle_timeout )
169+ if websocket .closed :
170+ break
171+ if prev_heads_received_count == self .heads_received :
172+ self ._logger .error (
173+ "Websocket has not received new message within timeout, closing connection..." ,
174+ timeout = idle_timeout ,
175+ ** self ._logger_metadata )
176+ await websocket .close (code = 4000 ,
177+ reason = f'No new messages within { idle_timeout } seconds' )
178+ break
179+
161180 async def _process_message (self , websocket ):
181+ asyncio .create_task (
182+ self .monitor_heads_received (websocket ))
162183 async for msg in websocket :
163184 await self ._record_latency (websocket )
164185 if msg is not None :
You can’t perform that action at this time.
0 commit comments