diff --git a/environment.yml b/environment.yml index 86eee6f90b..79d16cd0ba 100644 --- a/environment.yml +++ b/environment.yml @@ -22,7 +22,6 @@ dependencies: - importlib-metadata~=6.0 - numpy~=1.21 - paramiko~=3.0 -- plumpy~=0.22.3 - pgsu~=0.3.0 - psutil~=5.6 - psycopg[binary]~=3.0 @@ -35,3 +34,5 @@ dependencies: - tqdm~=4.45 - upf_to_json~=0.9.2 - wrapt~=1.11 +- pip: + - plumpy@git+https://github.com/aiidateam/plumpy.git@force-kill#egg=plumpy diff --git a/pyproject.toml b/pyproject.toml index 52b2536deb..c36768da47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ 'importlib-metadata~=6.0', 'numpy~=1.21', 'paramiko~=3.0', - 'plumpy~=0.22.3', + 'plumpy@git+https://github.com/aiidateam/plumpy.git@force-kill#egg=plumpy', 'pgsu~=0.3.0', 'psutil~=5.6', 'psycopg[binary]~=3.0', diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index c9c492ae14..9673b3e95a 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -320,8 +320,12 @@ def process_status(call_link_label, most_recent_node, max_depth, processes): @options.ALL(help='Kill all processes if no specific processes are specified.') @options.TIMEOUT() @options.WAIT() +@options.FORCE_KILL( + help='Force kill the process if it does not respond to the initial kill signal.\n' + ' Note: This may lead to orphaned jobs on your HPC and should be used with caution.' +) @decorators.with_dbenv() -def process_kill(processes, all_entries, timeout, wait): +def process_kill(processes, all_entries, timeout, wait, force_kill): """Kill running processes. Kill one or multiple running processes.""" @@ -340,7 +344,12 @@ def process_kill(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' + if force_kill: + echo.echo_warning('Force kill is enabled. This may lead to orphaned jobs on your HPC.') + # note: It's important to include -F in the message, as this is used to identify force-killed processes. + message = 'Force killed through `verdi process kill -F`' + else: + message = 'Killed through `verdi process kill`' control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index 381199d199..c8bd87f533 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -61,6 +61,7 @@ 'EXPORT_FORMAT', 'FAILED', 'FORCE', + 'FORCE_KILL', 'FORMULA_MODE', 'FREQUENCY', 'GROUP', @@ -328,6 +329,14 @@ def set_log_level(ctx, _param, value): FORCE = OverridableOption('-f', '--force', is_flag=True, default=False, help='Do not ask for confirmation.') +FORCE_KILL = OverridableOption( + '-F', + '--force-kill', + is_flag=True, + default=False, + help='Kills the process without waiting for a confirmation if the job has been killed from remote.', +) + SILENT = OverridableOption('-s', '--silent', is_flag=True, default=False, help='Suppress any output printed to stdout.') VISUALIZATION_FORMAT = OverridableOption( diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 8b8231634f..2708e633d7 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -101,9 +101,9 @@ async def do_upload(): try: logger.info(f'scheduled request to upload CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption) skip_submit = await exponential_backoff_retry( - do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_upload, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except PreSubmitException: raise @@ -149,9 +149,9 @@ async def do_submit(): try: logger.info(f'scheduled request to submit CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( - do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_submit, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -207,9 +207,9 @@ async def do_update(): try: logger.info(f'scheduled request to update CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) job_done = await exponential_backoff_retry( - do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_update, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -258,9 +258,9 @@ async def do_monitor(): try: logger.info(f'scheduled request to monitor CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) monitor_result = await exponential_backoff_retry( - do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_monitor, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -334,9 +334,9 @@ async def do_retrieve(): try: logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>') - ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) + breaking_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption) result = await exponential_backoff_retry( - do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions + do_retrieve, initial_interval, max_attempts, logger=node.logger, breaking_exceptions=breaking_exceptions ) except (plumpy.futures.CancelledError, plumpy.process_states.Interruption): raise @@ -385,7 +385,7 @@ async def do_stash(): initial_interval, max_attempts, logger=node.logger, - ignore_exceptions=plumpy.process_states.Interruption, + breaking_exceptions=plumpy.process_states.Interruption, ) except plumpy.process_states.Interruption: raise @@ -429,10 +429,13 @@ async def do_kill(): try: logger.info(f'scheduled request to kill CalcJob<{node.pk}>') result = await exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger) + # Note: any exception raised here, will result in the process being excepted. not killed! + # There for it can result in orphaned jobs! except plumpy.process_states.Interruption: + logger.warning(f'killing CalcJob<{node.pk}> excepted, the job might be orphaned.') raise except Exception as exception: - logger.warning(f'killing CalcJob<{node.pk}> failed') + logger.warning(f'killing CalcJob<{node.pk}> excepted, the job might be orphaned.') raise TransportTaskException(f'kill_calculation failed {max_attempts} times consecutively') from exception else: logger.info(f'killing CalcJob<{node.pk}> successful') @@ -568,6 +571,22 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}') except plumpy.process_states.KillInterruption as exception: await self._kill_job(node, transport_queue) + node.set_process_status(str(exception)) + return self.retrieve(monitor_result=self._monitor_result) + except plumpy.process_states.ForceKillInterruption as exception: + if node.get_state() in [CalcJobState.UPLOADING, CalcJobState.SUBMITTING]: + logger.warning(f'CalcJob<{node.pk}> force killed, it was in the {node.get_state()} state') + else: + logger.warning( + f'CalcJob<{node.pk}> is being force killed. The associated job on the computer' + ' is not being killed through the scheduler and may continue running.' + ) + + if self._killing is not None: + self._killing.set_result(True) + else: + logger.info(f'Forcekilled CalcJob<{node.pk}> but async future was None') + node.set_process_status(str(exception)) return self.retrieve(monitor_result=self._monitor_result) except (plumpy.futures.CancelledError, asyncio.CancelledError): diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 7cc214c76c..6d12760b57 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -144,7 +144,7 @@ def pause_processes( .. note:: Requires the daemon to be running, or processes will be unresponsive. - :param processes: List of processes to play. + :param processes: List of processes to pause. :param all_entries: Pause all playing processes. :param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds. :param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget. @@ -279,9 +279,12 @@ def handle_result(result): try: # unwrap is need here since LoopCommunicator will also wrap a future unwrapped = unwrap_kiwi_future(future) - result = unwrapped.result() + result = unwrapped.result(timeout=timeout) except communications.TimeoutError: - LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') + if process.is_terminated: + LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent') + else: + LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out') except Exception as exception: LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}') else: diff --git a/src/aiida/engine/utils.py b/src/aiida/engine/utils.py index 4053156a97..6228e08c5e 100644 --- a/src/aiida/engine/utils.py +++ b/src/aiida/engine/utils.py @@ -101,7 +101,7 @@ async def with_interrupt(self, coro: Awaitable[Any]) -> Any: import asyncio loop = asyncio.get_event_loop() - interruptable = InterutableFuture() + interruptable = InterruptableFuture() loop.call_soon(interruptable.interrupt, RuntimeError("STOP")) loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.))) >>> RuntimeError: STOP @@ -124,7 +124,7 @@ def interruptable_task( ) -> InterruptableFuture: """Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it. - :param coro: the coroutine that should be made interruptable with object of InterutableFuture as last paramenter + :param coro: the coroutine that should be made interruptable with object of InterruptableFuture as last parameter :param loop: the event loop in which to run the coroutine, by default uses asyncio.get_event_loop() :return: an InterruptableFuture """ @@ -178,7 +178,7 @@ async def exponential_backoff_retry( initial_interval: Union[int, float] = 10.0, max_attempts: int = 5, logger: Optional[logging.Logger] = None, - ignore_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None, + breaking_exceptions: Union[None, Type[Exception], Tuple[Type[Exception], ...]] = None, ) -> Any: """Coroutine to call a function, recalling it with an exponential backoff in the case of an exception @@ -190,7 +190,8 @@ async def exponential_backoff_retry( :param fct: the function to call, which will be turned into a coroutine first if it is not already :param initial_interval: the time to wait after the first caught exception before calling the coroutine again :param max_attempts: the maximum number of times to call the coroutine before re-raising the exception - :param ignore_exceptions: exceptions to ignore, i.e. when caught do nothing and simply re-raise + :param breaking_exceptions: exceptions that breaks EBM loop. These exceptions are re-raise. + If None, all exceptions are raised only after max_attempts reached. :return: result if the ``coro`` call completes within ``max_attempts`` retries without raising """ if logger is None: @@ -205,8 +206,7 @@ async def exponential_backoff_retry( result = await coro() break # Finished successfully except Exception as exception: - # Re-raise exceptions that should be ignored - if ignore_exceptions is not None and isinstance(exception, ignore_exceptions): + if breaking_exceptions is not None and isinstance(exception, breaking_exceptions): raise count = iteration + 1 diff --git a/src/aiida/tools/pytest_fixtures/__init__.py b/src/aiida/tools/pytest_fixtures/__init__.py index 1b2c38e285..8b91adf35a 100644 --- a/src/aiida/tools/pytest_fixtures/__init__.py +++ b/src/aiida/tools/pytest_fixtures/__init__.py @@ -16,6 +16,7 @@ from .daemon import daemon_client, started_daemon_client, stopped_daemon_client, submit_and_await from .entry_points import entry_points from .globals import aiida_manager +from .hardpatch import inject_patch from .orm import ( aiida_code, aiida_code_installed, @@ -52,6 +53,7 @@ 'started_daemon_client', 'stopped_daemon_client', 'submit_and_await', + 'inject_patch', ) diff --git a/src/aiida/tools/pytest_fixtures/hardpatch.py b/src/aiida/tools/pytest_fixtures/hardpatch.py new file mode 100644 index 0000000000..7517281700 --- /dev/null +++ b/src/aiida/tools/pytest_fixtures/hardpatch.py @@ -0,0 +1,196 @@ +"""Fixtures that allows you to "hardpatch" the source code. + +These are useful, only if an external party is instantiating from source. +Otherwise, pytest-monkeypatching is the way to go. + +When should I use these fixtures? +when patching the source code is your goal. +For instance, you might wish to patch a particular function that +should be reflected in the behavior of the daemon when submitting a job to it. +Because the daemon is instantiated as a distinct process and not routed from your process interpreter, +pytest-monkeypatching in your tests will not function in these situations. + + +This is where these fixtures come in handy. They allow you to inject directly to the source code. +Everything is reverted back to normal after the test is done. + +Note: There might be other approaches that are more efficient: +- One could use importlib to set hooks in sys.meta_path. +- Or perhaps modifying sitecustomize.py in PYTHONPATH + +I have not tested these approaches, but they might be worth exploring. + +In the meantime, this is a simple solution that works. +And should remain backward compatible as the interface is designed to be simple. + +""" + +from __future__ import annotations + +import atexit +import importlib +import inspect +import os +import typing as t + +import pytest + +from aiida.engine.daemon.client import DaemonNotRunningException + +if t.TYPE_CHECKING: + pass + + +@pytest.fixture(scope='function') +def inject_patch(started_daemon_client): + """Fixture to inject a patch to the source code. + + It can be used this way: + + + def test_my_code(inject_patch): + + def mock_open(self): + raise Exception('Mock open exception') + + inject_patch.patch('aiida.transports.plugins.local.LocalTransport.open', mock_open) + + # Your test code here + + # to restore the original source code and remove the patch, (only if you need!) + # inject_patch.restore() + + # More tests here + """ + + inj = InjectTool(started_daemon_client) + yield inj + inj.restore() + + +class InjectTool: + def __init__(self, daemon_client): + """This class is used to inject a patch to the source code.""" + + self._BACKUP_NAME = '__backup__' + self.patched_files = [] + self.daemon_client = daemon_client + atexit.register(self.restore, notfound_ok=True) + + def patch(self, original_func: str, mock_func, restart_daemon=True): + """This function patches a function with a mock function + + :param original_func: (str) the original function to be patched + :param mock_func: (function) the mock function + :param restart_daemon: (bool) if True, the daemon will be restarted after the patch is applied + default is True + """ + + # Import original function + module_name, function_name = original_func.rsplit('.', 1) + try: + module = importlib.import_module(module_name) + func = getattr(module, function_name) + source_file = inspect.getfile(func) + except ModuleNotFoundError as e: + # if the module is not found, it could be a class, try to import it's parent module + module_name, parent_module = module_name.rsplit('.', 1) + module = importlib.import_module(module_name) + # verify if the class is found + if not hasattr(module, parent_module): + raise e + source_file = inspect.getfile(module) + + # Prepare the mock function + mock_def = inspect.getsource(mock_func) + def_style = 'async def' if 'async def' in mock_def else 'def' + mock_def = mock_def.replace(f'{def_style} {mock_func.__name__}', f'{def_style} {function_name}') + mock_indent = len(mock_def.split(f'{def_style}')[0]) + mock_def = mock_def.replace('\n' + ' ' * mock_indent, '\n')[mock_indent:] + + with open(source_file, 'r') as f: + source_code = f.read() + + # Make a backup of the original source code + backup_file = source_file + self._BACKUP_NAME + if not os.path.exists(backup_file): + with open(backup_file, 'w') as f_backup: + f_backup.write(source_code) + + f.seek(0) + + new_source_code = '' + inserted = False + def_completed = False + while True: + line = f.readline() + if not line: + break + + if f'{def_style} {function_name}' in line: + indent_on_def = len(line.split(f'{def_style}')[0]) + insert_string = ' ' * indent_on_def + mock_def.replace('\n', '\n' + ' ' * indent_on_def) + '\n' + new_source_code += insert_string + inserted = True + if line.strip().endswith(':'): + def_completed = True + continue + + elif inserted: + if not def_completed: + if line.strip().endswith(':'): + def_completed = True + continue + + indent_ = len(line) - len(line.lstrip()) + if indent_ > indent_on_def or line == '\n': + continue + else: + new_source_code += line + f.read() + break + + new_source_code += line + + if not inserted: + raise ValueError(f'Function {function_name} not found in {source_file}') + + with open(source_file, 'w') as f: + f.write(new_source_code) + self.patched_files.append(source_file) + + if restart_daemon: + self.daemon_client.restart_daemon() + + def restore(self, notfound_ok=False, restart_daemon=True): + """This function unpatches the source code + + :param notfound_ok: (bool) if True, it will not raise an error if the backup file is not found. + default is False + :param restart_daemon: (bool) if True, the daemon will be restarted after the patch is removed + default is True + """ + + for source_file in self.patched_files: + backup_file = source_file + self._BACKUP_NAME + + try: + with open(backup_file, 'r') as f: + source_code = f.read() + except FileNotFoundError as e: + if notfound_ok: + continue + else: + raise e + + with open(source_file, 'w') as f: + f.write(source_code) + + os.remove(backup_file) + + self.patched_files = [] + + if restart_daemon: + try: + self.daemon_client.restart_daemon() + except DaemonNotRunningException: + pass diff --git a/tests/cmdline/commands/test_process.py b/tests/cmdline/commands/test_process.py index fae9957f80..9ed53411be 100644 --- a/tests/cmdline/commands/test_process.py +++ b/tests/cmdline/commands/test_process.py @@ -22,7 +22,7 @@ from aiida.common.log import LOG_LEVEL_REPORT from aiida.engine import Process, ProcessState from aiida.engine.processes import control as process_control -from aiida.orm import CalcJobNode, Group, WorkChainNode, WorkflowNode, WorkFunctionNode +from aiida.orm import CalcJobNode, Group, Int, WorkChainNode, WorkflowNode, WorkFunctionNode from tests.utils.processes import WaitProcess @@ -35,6 +35,8 @@ def await_condition(condition: t.Callable, timeout: int = 1): if time.time() - start_time > timeout: raise RuntimeError(f'waiting for {condition} to evaluate to `True` timed out after {timeout} seconds.') + return condition() + class TestVerdiProcess: """Tests for `verdi process`.""" @@ -537,8 +539,33 @@ def test_process_play_all(submit_and_await, run_cli_command): @pytest.mark.requires_rmq @pytest.mark.usefixtures('started_daemon_client') -def test_process_kill(submit_and_await, run_cli_command): - """Test the ``verdi process kill`` command.""" +def test_process_kill(submit_and_await, aiida_code_installed, run_cli_command, inject_patch): + """Test the ``verdi process kill`` command. + It tries to cover all the possible scenarios of killing a process. + + The test is divided into multiple parts. For parts that is tied to daemon functionality, + we use the `inject_patch` to "hardpatch" certain functions in the source code. + """ + + from aiida.cmdline.utils.common import get_process_function_report + + def make_a_builder(sleep_seconds=0): + builder = code.get_builder() + builder.x = Int(1) + builder.y = Int(1) + builder.metadata.options.sleep = sleep_seconds + return builder + + code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash') + + kill_timeout = 5 + + # 0) Running without identifiers should except and print something + result = run_cli_command(cmd_process.process_kill, raises=True) + assert result.exit_code == ExitCode.USAGE_ERROR + assert len(result.output_lines) > 0 + + # 1) Kill a process that is not yet running node = submit_and_await(WaitProcess, ProcessState.WAITING) run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) @@ -549,23 +576,157 @@ def test_process_kill(submit_and_await, run_cli_command): await_condition(lambda: node.is_killed) assert node.process_status == 'Killed through `verdi process kill`' - # Running without identifiers should except and print something - options = [] - result = run_cli_command(cmd_process.process_kill, options, raises=True) - assert result.exit_code == ExitCode.USAGE_ERROR - assert len(result.output_lines) > 0 - - -@pytest.mark.requires_rmq -@pytest.mark.usefixtures('started_daemon_client') -def test_process_kill_all(submit_and_await, run_cli_command): - """Test the ``verdi process kill --all`` command.""" + # 2) *Force* kill a process that is not yet running node = submit_and_await(WaitProcess, ProcessState.WAITING) - run_cli_command(cmd_process.process_kill, ['--all', '--wait'], user_input='y') + run_cli_command(cmd_process.process_pause, [str(node.pk), '--wait']) + await_condition(lambda: node.paused) + assert node.process_status == 'Paused through `verdi process pause`' + + run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) await_condition(lambda: node.is_killed) + assert node.process_status == 'Force killed through `verdi process kill -F`' + + # 3) Kill a running process + node = submit_and_await(make_a_builder(sleep_seconds=100), ProcessState.RUNNING) + run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + + # note: timeout should be less than sleep_seconds for this test to be valid + # this is to make sure that the process is still running before we kill it + await_condition(lambda: node.is_killed, timeout=kill_timeout) assert node.process_status == 'Killed through `verdi process kill`' + # 4) *Force* kill a running process + node = submit_and_await(make_a_builder(sleep_seconds=100), ProcessState.RUNNING) + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + + await_condition(lambda: node.is_killed, timeout=kill_timeout) + assert node.process_status == 'Force killed through `verdi process kill -F`' + + # 5) `verdi process kill --all` should kill all processes (running / not running) + node_1 = submit_and_await(WaitProcess, ProcessState.WAITING) + run_cli_command(cmd_process.process_pause, [str(node_1.pk), '--wait']) + await_condition(lambda: node_1.paused) + node_2 = submit_and_await(make_a_builder(sleep_seconds=100), ProcessState.RUNNING) + + run_cli_command(cmd_process.process_kill, ['--all', '--wait'], user_input='y') + await_condition(lambda: node_1.is_killed, timeout=kill_timeout) + await_condition(lambda: node_2.is_killed, timeout=kill_timeout) + assert node_1.process_status == 'Killed through `verdi process kill`' + assert node_2.process_status == 'Killed through `verdi process kill`' + + # 6) `verdi process kill --all -F` should Force kill all processes (running / not running) + node_1 = submit_and_await(WaitProcess, ProcessState.WAITING) + run_cli_command(cmd_process.process_pause, [str(node_1.pk), '--wait']) + await_condition(lambda: node_1.paused) + node_2 = submit_and_await(make_a_builder(sleep_seconds=100), ProcessState.RUNNING) + + run_cli_command(cmd_process.process_kill, ['--all', '--wait', '-F'], user_input='y') + await_condition(lambda: node_1.is_killed, timeout=kill_timeout) + await_condition(lambda: node_2.is_killed, timeout=kill_timeout) + assert node_1.process_status == 'Force killed through `verdi process kill -F`' + assert node_2.process_status == 'Force killed through `verdi process kill -F`' + + # 7) *Force* kill a process that has stuck in EBM, something that *kill* cannot do. + # `verdi process kill -F` --as the first attempt-- + def mock_open(self): + raise Exception('Mock open exception') + + # patch a faulty transport open, to make EBM go crazy + inject_patch.patch('aiida.transports.plugins.local.LocalTransport.open', mock_open) + + node = submit_and_await(make_a_builder(100), ProcessState.WAITING) + + # assert the process has stuck in EBM + result = await_condition(lambda: get_process_function_report(node), timeout=kill_timeout) + assert 'Mock open exception' in result + assert 'exponential_backoff_retry' in result + + # force kill the process + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + await_condition(lambda: node.is_killed, timeout=kill_timeout) + assert node.process_status == 'Force killed through `verdi process kill -F`' + + # 8) A process that has stuck in EBM, cannot get killed directly by `verdi process kill`. + # Such a process with a history of failed attempts, should still be able to get force killed. + # `verdi process kill -F` --as the second attempt-- + + node = submit_and_await(make_a_builder(100), ProcessState.WAITING) + + # assert the process is stuck in EBM + result = await_condition(lambda: get_process_function_report(node), timeout=kill_timeout) + assert 'Mock open exception' in result + assert 'exponential_backoff_retry' in result + + # practice a normal kill, which should fail + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + print(result.stdout) + assert f'Error: call to kill Process<{node.pk}> timed out' in result.stdout + + # force kill the process + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait']) + await_condition(lambda: node.is_killed, timeout=kill_timeout) + assert node.process_status == 'Force killed through `verdi process kill -F`' + + # 9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally. + # (e.g. in scenarios that transport is working again) + async def mock_exponential_backoff_retry(*args, **kwargs): + raise Exception('Exponential backoff retry failed') + + # patch EBM, to make it fail quickly. + inject_patch.restore(restart_daemon=False) + inject_patch.patch('aiida.engine.utils.exponential_backoff_retry', mock_exponential_backoff_retry) + + node = submit_and_await(make_a_builder(), ProcessState.WAITING) + + await_condition( + lambda: node.process_status + == 'Pausing after failed transport task: upload_calculation failed 5 times consecutively', + timeout=kill_timeout, + ) + + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + await_condition(lambda: node.is_killed, timeout=kill_timeout) + + # 10) In the current implementation, a failed normal kill attempt will eventually cause a process to be EXCEPTED. + # which btw, means orphans may be left behind on server. + # The following test is more of a documentation of the current behavior. + async def mock_exponential_backoff_retry(*args, **kwargs): + await asyncio.sleep(10) # noqa: F821 + raise Exception('Exponential backoff retry failed') + + inject_patch.restore(restart_daemon=False) + inject_patch.patch('aiida.engine.utils.exponential_backoff_retry', mock_exponential_backoff_retry) + + node = submit_and_await(make_a_builder(), ProcessState.WAITING) + + # assert the process is stuck in EBM + await_condition(lambda: node.process_status == 'Waiting for transport task: upload', timeout=kill_timeout) + + # practice a normal kill, which should timeout + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + assert f'Error: call to kill Process<{node.pk}> timed out' in result.stdout + + # by now, EBM should have failed, and a failed normal kill attempt should have caused the process to be EXCEPTED + # EXCEPTIONS particularly reflects slowly in the process state, so please be graceful, we got all night. + await_condition(lambda: node.process_state == ProcessState.EXCEPTED, timeout=kill_timeout * 2) + + # 11) The same as number #10, only this time tasks is has passed EBM successfully and is running. + # but the kill command will fail, because it cannot open a transport. + async def mock_kill(*args, **kwargs): + raise TransportTaskException('Mock kill exception') # noqa: F821 + + inject_patch.restore(restart_daemon=False) + inject_patch.patch('aiida.engine.processes.calcjobs.tasks.task_kill_job', mock_kill) + + node = submit_and_await(make_a_builder(100), ProcessState.WAITING) + await_condition(lambda: node.process_status == 'Monitoring scheduler: job state RUNNING', timeout=kill_timeout) + + result = run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait']) + assert f'Report: request to kill Process<{node.pk}> sent' in result.stdout + + await_condition(lambda: node.process_state == ProcessState.EXCEPTED, timeout=kill_timeout * 2) + @pytest.mark.usefixtures('started_daemon_client') def test_process_repair_running_daemon(run_cli_command):