diff --git a/src/nrtk_explorer/library/multiprocess_predictor.py b/src/nrtk_explorer/library/multiprocess_predictor.py index b94d54b..0389ff2 100644 --- a/src/nrtk_explorer/library/multiprocess_predictor.py +++ b/src/nrtk_explorer/library/multiprocess_predictor.py @@ -75,12 +75,16 @@ def __init__(self, model_name="facebook/detr-resnet-50", force_cpu=False): self._request_queue = None self._result_queue = None self._pending_futures = {} + self._result_thread = None self.loop = asyncio.get_event_loop() self._start_process() - asyncio.ensure_future(self._poll_responses()) + # Start a dedicated thread for responses instead of scheduling an async task. + self._result_thread = threading.Thread(target=self._result_listener, daemon=True) + self._result_thread.start() + self.loop.add_signal_handler(signal.SIGINT, self.shutdown) def _start_process(self): @@ -102,16 +106,19 @@ def _start_process(self): ) self._proc.start() - async def _poll_responses(self): + def _result_listener(self): while True: try: - r_id, payload = await self.loop.run_in_executor(None, self._result_queue.get) + result = self._result_queue.get() except (EOFError, KeyboardInterrupt): break + if result is None: + break + r_id, payload = result with self._lock: future = self._pending_futures.pop(r_id, None) if future and not future.done(): - future.set_result(payload) + self.loop.call_soon_threadsafe(future.set_result, payload) async def _submit_request(self, command, payload): future = self.loop.create_future() @@ -187,9 +194,12 @@ async def _async_shutdown(): with self._lock: try: self._request_queue.put(None) + self._result_queue.put(None) # Signal the listener thread to exit. except Exception: logging.warning("Could not send exit message to worker.") if self._proc: self._proc.join() + if self._result_thread: + self._result_thread.join() return self._run_coro(_async_shutdown())