-
Notifications
You must be signed in to change notification settings - Fork 96
Description
Describe the bug
I have a Flask server that uses Gunicorn as a WSGI server. It does some HTTP request gateway routing using the multiprocessing library under the hood. Inside one of the HTTP request handler, I call Pypeln for some multiprocessing workload.
Things are fine if Pypeln finishes successfully. However, if the pypeln function raises an exception, the exception would be raised by the main_queue, but hang and never get past the pl.process.run call.
Note: it's the combination of gunicorn and pypeln that's causing this bug. When I swapped out Gunicorn with another WSGI server Waitress, Pypeln returns the error just fine. Moreover, when I remove the WSGI server altogether and simple use vanilla Flask, there was also no issue.
Minimal code to reproduce
Pypeln code:
import pypeln as pl
def input_gen():
yield 1
raise Exception("Error")
yield 2 # unreachable code
def f(x):
print(f"f: {x}")
return x
print("Running...")
stage = pl.process.map(
f,
input_gen(),
workers=7,
)
try:
pl.thread.run(stage)
data = list(stage)
print(f"data: {data}")
except Exception as e:
print(f"\n\n!!!!! Exception caught: {e}")For Flask/Gunicorn setup:
import connexion
from gunicorn.app.wsgiapp import WSGIApplication
class StandaloneApplication(WSGIApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
app = connexion.App(__name__, specification_dir="./openapi/")
options = {
"bind": "0.0.0.0:5001",
"workers": (multiprocessing.cpu_count() * 2) + 1,
"threads": (multiprocessing.cpu_count() * 2) + 1,
}
StandaloneApplication(app.app, options).run()Actual behavior
The above code would hang at pl.process.run(stage), and I will see the following in stdout:
$ python test_pypeln.py
Running...
Exception in thread Thread-4:
Traceback (most recent call last):
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 97, in __call__
self.process_fn(
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/api/from_iterable.py", line 24, in __call__
for i, x in enumerate(iterable):
File "/home/jim/opaque/test_pypeln.py", line 14, in input_gen
raise Exception("Error")
Exception: Error
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/jim/.pyenv/versions/3.8.13/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/home/jim/.pyenv/versions/3.8.13/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 127, in __call__
time.sleep(0.01)
pypeln.utils.StopThreadException
At this point, the Flask server is hanging, and I'll need to SIGINT to recover. Setting timeout in the map function also does not help. Seems like the Pypeln process is probably completely killed by Gunicorn.
Expected behavior
If I don't run this script with Gunicorn, the exception is successfully caught and returned.
$ python test_pypeln.py
Running...
Exception in thread Thread-4:
Traceback (most recent call last):
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 97, in __call__
self.process_fn(
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/api/from_iterable.py", line 24, in __call__
for i, x in enumerate(iterable):
File "/home/jim/opaque/test_pypeln.py", line 14, in input_gen
raise Exception("Error")
Exception: Error
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/jim/.pyenv/versions/3.8.13/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/home/jim/.pyenv/versions/3.8.13/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 127, in __call__
time.sleep(0.01)
pypeln.utils.StopThreadException
!!!!! Exception caught:
('\n\n(\'Error\',)\n\nTraceback (most recent call last):\n File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 97, in __call__\n self.process_fn(\n File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/api/from_iterable.py", line 24, in __call__\n for i, x in enumerate(iterable):\n File "/home/jim/opaque/test_pypeln.py", line 14, in input_gen\n raise Exception("Error")\nException: Error\n',)
Traceback (most recent call last):
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/thread/worker.py", line 91, in __call__
self.process_fn(
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/thread/api/from_iterable.py", line 18, in __call__
for x in self.iterable.to_iterable(maxsize=self.maxsize, return_index=True):
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/stage.py", line 84, in to_iterable
for elem in main_queue:
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/queue.py", line 65, in __iter__
x = self.get(timeout=pypeln_utils.TIMEOUT)
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/queue.py", line 41, in get
raise exception
Exception:
('Error',)
Traceback (most recent call last):
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 97, in __call__
self.process_fn(
File "/home/jim/.venv/lib/python3.8/site-packages/pypeln/process/api/from_iterable.py", line 24, in __call__
for i, x in enumerate(iterable):
File "/home/jim/opaque/test_pypeln.py", line 14, in input_gen
raise Exception("Error")
Exception: Error
$
Library Info
Please provide os info and elegy version.
pypeln.__version__ = 0.4.9