diff --git a/gcsfs/core.py b/gcsfs/core.py index a78ea5f5..6868f9da 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -17,6 +17,7 @@ from urllib.parse import quote as quote_urllib from urllib.parse import urlsplit +import aiohttp import fsspec from fsspec import asyn from fsspec.callbacks import NoOpCallback @@ -351,27 +352,43 @@ def project(self): # in-thread asynchronous cleanup first, then fallback to synchronous # cleanup (which can handle cross-thread calls). @staticmethod - def close_session(loop, session): - if loop is not None and session is not None: + def close_session(loop, session: aiohttp.ClientSession, asynchronous=False): + if session.closed: + return + force_close = False + try: + current_loop = asyncio.get_running_loop() + except RuntimeError: + current_loop = None + if loop: + # an explicit loop was set if loop.is_running(): - try: - current_loop = asyncio.get_running_loop() - current_loop.create_task(session.close()) - return - except RuntimeError: - pass - - try: - asyn.sync(loop, session.close, timeout=0.1) - except fsspec.FSTimeoutError: - pass + loop.create_task(session.close()) else: - pass + force_close = True + elif current_loop is not None and current_loop.is_running() and asynchronous: + # running in a concurrnet context + current_loop.create_task(session.close()) + elif asyn.loop[0] is not None and asyn.loop[0].is_running(): + try: + asyn.sync(asyn.loop[0], session.close, timeout=0.1) + except fsspec.FSTimeoutError: + force_close = True + else: + force_close = True + if force_close: + # during shutdown, this is the fallback + connector = getattr(session, "_connector", None) + if connector is not None: + # close after loop is dead + connector._close() async def _set_session(self): if self._session is None: self._session = await get_client(**self.session_kwargs) - weakref.finalize(self, self.close_session, self.loop, self._session) + weakref.finalize( + self, self.close_session, self.loop, self._session, self.asynchronous + ) return self._session @property