Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better shutdown cleanup behaviour #657

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from urllib.parse import quote as quote_urllib
from urllib.parse import urlsplit

import aiohttp
import fsspec
from fsspec import asyn
from fsspec.callbacks import NoOpCallback
Expand Down Expand Up @@ -349,27 +350,43 @@ def project(self):
# in-thread asynchronous cleanup first, then fallback to synchronous
# cleanup (which can handle cross-thread calls).
@staticmethod
def close_session(loop, session):
if loop is not None and session is not None:
def close_session(loop, session: aiohttp.ClientSession, asynchronous=False):
if session.closed:
return
force_close = False
try:
current_loop = asyncio.get_running_loop()
except RuntimeError:
current_loop = None
if loop:
# an explicit loop was set
if loop.is_running():
try:
current_loop = asyncio.get_running_loop()
current_loop.create_task(session.close())
return
except RuntimeError:
pass

try:
asyn.sync(loop, session.close, timeout=0.1)
except fsspec.FSTimeoutError:
pass
loop.create_task(session.close())
else:
pass
force_close = True
elif current_loop is not None and current_loop.is_running() and asynchronous:
# running in a concurrnet context
current_loop.create_task(session.close())
elif asyn.loop[0] is not None and asyn.loop[0].is_running():
try:
asyn.sync(asyn.loop[0], session.close, timeout=0.1)
except fsspec.FSTimeoutError:
force_close = True
else:
force_close = True
if force_close:
# during shutdown, this is the fallback
connector = getattr(session, "_connector", None)
if connector is not None:
# close after loop is dead
connector._close()

async def _set_session(self):
if self._session is None:
self._session = await get_client(**self.session_kwargs)
weakref.finalize(self, self.close_session, self.loop, self._session)
weakref.finalize(
self, self.close_session, self.loop, self._session, self.asynchronous
)
return self._session

@property
Expand Down
Loading