Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-128041: Add terminate_workers and kill_workers methods to ProcessPoolExecutor #130849

Merged
merged 6 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,30 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.

.. method:: terminate_workers()

Attempt to terminate all living worker processes immediately by calling
:meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them.
Internally, it will also call :meth:`Executor.shutdown` to ensure that all
other resources associated with the executor are freed.

After calling this method the caller should no longer submit tasks to the
executor.

.. versionadded:: next

.. method:: kill_workers()

Attempt to kill all living worker processes immediately by calling
:meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
Internally, it will also call :meth:`Executor.shutdown` to ensure that all
other resources associated with the executor are freed.

After calling this method the caller should no longer submit tasks to the
executor.

.. versionadded:: next

.. _processpoolexecutor-example:

ProcessPoolExecutor Example
Expand Down
5 changes: 5 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.)

* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
ways to terminate or kill all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`130849`.)


ctypes
------
Expand Down
71 changes: 71 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor):
while a future was in the running state.
"""

_TERMINATE = "terminate"
_KILL = "kill"

_SHUTDOWN_CALLBACK_OPERATION = {
_TERMINATE,
_KILL
}


class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
Expand Down Expand Up @@ -855,3 +863,66 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__

def _force_shutdown(self, operation):
"""Attempts to terminate or kill the executor's workers based off the
given operation. Iterates through all of the current processes and
performs the relevant task if the process is still alive.

After terminating workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
if operation not in _SHUTDOWN_CALLBACK_OPERATION:
raise ValueError(f"Unsupported operation: {operation!r}")

processes = {}
if self._processes:
processes = self._processes.copy()

# shutdown will invalidate ._processes, so we copy it right before
# calling. If we waited here, we would deadlock if a process decides not
# to exit.
self.shutdown(wait=False, cancel_futures=True)

if not processes:
return

for proc in processes.values():
try:
if not proc.is_alive():
continue
except ValueError:
# The process is already exited/closed out.
continue

try:
if operation == _TERMINATE:
proc.terminate()
elif operation == _KILL:
proc.kill()
except ProcessLookupError:
# The process just ended before our signal
continue

def terminate_workers(self):
"""Attempts to terminate the executor's workers.
Iterates through all of the current worker processes and terminates
each one that is still alive.

After terminating workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
return self._force_shutdown(operation=_TERMINATE)

def kill_workers(self):
"""Attempts to kill the executor's workers.
Iterates through all of the current worker processes and kills
each one that is still alive.

After killing workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).
"""
return self._force_shutdown(operation=_KILL)
120 changes: 120 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import os
import queue
import signal
import sys
import threading
import time
import unittest
import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool

from test import support
from test.support import hashlib_helper
from test.test_importlib.metadata.fixtures import parameterize

from .executor import ExecutorTest, mul
from .util import (
Expand All @@ -22,6 +26,21 @@ def __init__(self, mgr):
def __del__(self):
self.event.set()

TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
FORCE_SHUTDOWN_PARAMS = [
dict(function_name=TERMINATE_WORKERS),
dict(function_name=KILL_WORKERS),
]

def _put_wait_put(queue, event):
""" Used as part of test_terminate_workers """
queue.put('started')
event.wait()

# We should never get here since the event will not get set
queue.put('finished')


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down Expand Up @@ -218,6 +237,107 @@ def mock_start_new_thread(func, *args, **kwargs):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_terminate_workers(self):
mock_fn = unittest.mock.Mock()
with self.executor_type(max_workers=1) as executor:
executor._force_shutdown = mock_fn
executor.terminate_workers()

mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)

def test_kill_workers(self):
mock_fn = unittest.mock.Mock()
with self.executor_type(max_workers=1) as executor:
executor._force_shutdown = mock_fn
executor.kill_workers()

mock_fn.assert_called_once_with(operation=futures.process._KILL)

def test_force_shutdown_workers_invalid_op(self):
with self.executor_type(max_workers=1) as executor:
self.assertRaises(ValueError,
executor._force_shutdown,
operation='invalid operation'),

@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers(self, function_name):
manager = self.get_context().Manager()
q = manager.Queue()
e = manager.Event()

with self.executor_type(max_workers=1) as executor:
executor.submit(_put_wait_put, q, e)

# We should get started, but not finished since we'll terminate the
# workers just after and never set the event.
self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'started')

worker_process = list(executor._processes.values())[0]

Mock = unittest.mock.Mock
worker_process.terminate = Mock(wraps=worker_process.terminate)
worker_process.kill = Mock(wraps=worker_process.kill)

getattr(executor, function_name)()
worker_process.join()

if function_name == TERMINATE_WORKERS:
worker_process.terminate.assert_called()
elif function_name == KILL_WORKERS:
worker_process.kill.assert_called()
else:
self.fail(f"Unknown operation: {function_name}")

self.assertRaises(queue.Empty, q.get, timeout=0.01)

@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_dead_workers(self, function_name):
with self.executor_type(max_workers=1) as executor:
future = executor.submit(os._exit, 1)
self.assertRaises(BrokenProcessPool, future.result)

# even though the pool is broken, this shouldn't raise
getattr(executor, function_name)()

@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_not_started_yet(self, function_name):
ctx = self.get_context()
with unittest.mock.patch.object(ctx, 'Process') as mock_process:
with self.executor_type(max_workers=1, mp_context=ctx) as executor:
# The worker has not been started yet, terminate/kill_workers
# should basically no-op
getattr(executor, function_name)()

mock_process.return_value.kill.assert_not_called()
mock_process.return_value.terminate.assert_not_called()

@parameterize(*FORCE_SHUTDOWN_PARAMS)
def test_force_shutdown_workers_stops_pool(self, function_name):
with self.executor_type(max_workers=1) as executor:
task = executor.submit(time.sleep, 0)
self.assertIsNone(task.result())

worker_process = list(executor._processes.values())[0]
getattr(executor, function_name)()

self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)

# A signal sent, is not a signal reacted to.
# So wait a moment here for the process to die.
# If we don't, every once in a while we may get an ENV CHANGE
# error since the process would be alive immediately after the
# test run.. and die a moment later.
worker_process.join(support.SHORT_TIMEOUT)

# Oddly enough, even though join completes, sometimes it takes a
# moment for the process to actually be marked as dead.
# ... that seems a bit buggy.
# We need it dead before ending the test to ensure it doesn't
# get marked as an ENV CHANGE due to living child process.
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if not worker_process.is_alive():
break


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
ways to terminate or kill all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`130849`.)
Loading