Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error using fsspec and flask-socketio #1701

Open
rob-ashdown-monolith opened this issue Oct 2, 2024 · 2 comments
Open

error using fsspec and flask-socketio #1701

rob-ashdown-monolith opened this issue Oct 2, 2024 · 2 comments

Comments

@rob-ashdown-monolith
Copy link

rob-ashdown-monolith commented Oct 2, 2024

I am trying to incorporate calls to fsppec-based packages (adlfs, s3fs) into a (sync) flask view.
Flask is running using flask-socketio, which is in turn using gevent.
Whenever I try and call a filesystem method (e.g. ls), it raise the following error
NotImplementedError: Calling sync() from within a running loop

I can bypass the issue by telling flask-socketio to use threading and not gevent (and remove the patching), however this has limitations (i.e. it uses werkzeug server, which is dev only) which means it can't be used in production.

minimal working example:

server.py

from gevent import monkey

monkey.patch_all()

import os

from adlfs import AzureBlobFileSystem
from azure.identity.aio import DefaultAzureCredential
from flask import Flask
from flask_socketio import SocketIO


def create_app():
    app = Flask(__name__)

    @app.route("/")
    def hello_world():
        return "<p>Hello, World!</p>"

    @app.route("/ls")
    def listing():
        credentials = DefaultAzureCredential()
        account_name = os.environ.get("AZURE_ACCOUNT_NAME")
        azure_path = os.environ.get("AZURE_PATH")
        new_fs = AzureBlobFileSystem(account_name=account_name, credential=credentials)
        res = new_fs.ls(azure_path)
        list_str = [f"<p>{l}</p>" for l in res]
        return f"<p>Hello, Listing!</p>{''.join(list_str)}"

    return app
python server.py
curl 127.0.0.1:5000/ls

results in

[2024-10-02 16:57:35,406] ERROR in app: Exception on /ls [GET]
Traceback (most recent call last):
  File "<path>/site-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<path>/site-packages/flask/app.py", line 1519, in full_dispatch_request
    rv = self.handle_user_exception(e)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<path>/site-packages/flask/app.py", line 1517, in full_dispatch_request
    rv = self.dispatch_request()
         ^^^^^^^^^^^^^^^^^^^^^^^
  File "<path>/site-packages/flask/app.py", line 1503, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "server.py", line 26, in listing
    res = new_fs.ls(azure_path)
          ^^^^^^^^^^^^^^^^^^^^^
  File "<path>/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<patch>/site-packages/fsspec/asyn.py", line 80, in sync
    raise NotImplementedError("Calling sync() from within a running loop")
NotImplementedError: Calling sync() from within a running loop

requirements.txt

adlfs==2024.7.0
Flask==2.1.3
Flask-SocketIO==5.3.6
fsspec==2024.6.1
gevent==24.2.1
azure-identity==1.17.1
werkzeug=2.2.2

Is there a known workaround for this? I have tried various alternatives, to no avail.
e.g.

  • passing a loop to the filesystem init, then calling _ls
  • calling asyncio.run(fs.ls)
@martindurant
Copy link
Member

When you do patch_all, a number of the low-level things within asyncio change for the whole session. In addition, it appears that although your code is itself sync, the flask runtime is still running an event loop to handle requests.

I'm not sure how these things interact. The specific error should only happen if fsspec is sharing the same loop as the calling code, so you might want to check what threads are active at the time (threading.enumerate()). In the end, you might have no choice but to make your handlers async and run fsspec calls in async mode.

    @app.route("/ls")
    async def listing():
        ...
        res = await new_fs._ls(azure_path)

@rob-ashdown-monolith
Copy link
Author

Thanks Martin

I'll see what I can find out via threading.enumerate().
Unfortunately I don't think I'll be able to make all the views async, since that will have far-reaching repercussions on the codebase, which I was hoping to avoid at the moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants