Skip to content

Commit

Permalink
fix(multiprocess_predictor): avoid hanging process on ctrl-c
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulHax committed Feb 4, 2025
1 parent 430a8e4 commit 4aca58a
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/nrtk_explorer/library/multiprocess_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ def __init__(self, model_name="facebook/detr-resnet-50", force_cpu=False):
self._request_queue = None
self._result_queue = None
self._pending_futures = {}
self._result_thread = None

self.loop = asyncio.get_event_loop()

self._start_process()

asyncio.ensure_future(self._poll_responses())
# Start a dedicated thread for responses instead of scheduling an async task.
self._result_thread = threading.Thread(target=self._result_listener, daemon=True)
self._result_thread.start()

self.loop.add_signal_handler(signal.SIGINT, self.shutdown)

def _start_process(self):
Expand All @@ -102,16 +106,19 @@ def _start_process(self):
)
self._proc.start()

async def _poll_responses(self):
def _result_listener(self):
while True:
try:
r_id, payload = await self.loop.run_in_executor(None, self._result_queue.get)
result = self._result_queue.get()
except (EOFError, KeyboardInterrupt):
break
if result is None:
break
r_id, payload = result
with self._lock:
future = self._pending_futures.pop(r_id, None)
if future and not future.done():
future.set_result(payload)
self.loop.call_soon_threadsafe(future.set_result, payload)

async def _submit_request(self, command, payload):
future = self.loop.create_future()
Expand Down Expand Up @@ -187,9 +194,12 @@ async def _async_shutdown():
with self._lock:
try:
self._request_queue.put(None)
self._result_queue.put(None) # Signal the listener thread to exit.
except Exception:
logging.warning("Could not send exit message to worker.")
if self._proc:
self._proc.join()
if self._result_thread:
self._result_thread.join()

return self._run_coro(_async_shutdown())

0 comments on commit 4aca58a

Please sign in to comment.