-
Notifications
You must be signed in to change notification settings - Fork 2
Open
Description
Currently the logic just has one client session when initializing the transport. It should tie the client session to the event loop.
The current logic causes issues for us on CI/CD because we create new event loops per test
You need to add handling that looks like this
def _get_valid_client_session(self) -> ClientSession:
"""
Helper to get a valid ClientSession for the current event loop.
This handles the case where the session was created in a different
event loop that may have been closed (common in CI/CD environments).
"""
from aiohttp.client import ClientSession
# If we don't have a client or it's not a ClientSession, create one
if not isinstance(self.client, ClientSession):
if hasattr(self, "_client_factory") and callable(self._client_factory):
self.client = self._client_factory()
else:
self.client = ClientSession()
return self.client
# Check if the existing session is still valid for the current event loop
try:
session_loop = getattr(self.client, "_loop", None)
current_loop = asyncio.get_running_loop()
# If session is from a different or closed loop, recreate it
if (
session_loop is None
or session_loop != current_loop
or session_loop.is_closed()
):
# Clean up the old session
try:
# Note: not awaiting close() here as it might be from a different loop
# The session will be garbage collected
pass
except Exception as e:
verbose_logger.debug(f"Error closing old session: {e}")
pass
# Create a new session in the current event loop
if hasattr(self, "_client_factory") and callable(self._client_factory):
self.client = self._client_factory()
else:
self.client = ClientSession()
except (RuntimeError, AttributeError):
# If we can't check the loop or session is invalid, recreate it
if hasattr(self, "_client_factory") and callable(self._client_factory):
self.client = self._client_factory()
else:
self.client = ClientSession()
return self.clientsunhailin-Leo
Metadata
Metadata
Assignees
Labels
No labels