Skip to content

Commit e768b70

Browse files
agoscinskikhsrali
andauthored
Sending multiple process_kill actions reschedules the cancelation of scheduler job (#6870)
PR #6793 introduced logic to cancel earlier kill actions when a new one is issued. However, this led to a bug: when two kill actions are sent in succession, the second cancels the first including its triggered cancelation of the scheduler job that is stuck in the EBM. The second kill command does not re-initiate the scheduler job cancelation, leaving it in an inconsistent state. This issue arises because the kill logic is split across two places. PR #6868 addresses this properly with a refactor of the kill action. In contrast, this PR provides a temporary workaround to mitigate the issue. Furthermore, before this PR if the kill command failed through the EBM, the cancelation could not be rescheduled by a subsequent kill action as the process state already transition to the `Expected` state. With the new force-kill option the user can bypasses the EBM as desired, thus the transition to the `Expected` state is not needed anymore and has been removed to allow the user to further gracefully kill a job. --------- Co-authored-by: Ali Khosravi <[email protected]>
1 parent 43176cb commit e768b70

File tree

3 files changed

+126
-15
lines changed

3 files changed

+126
-15
lines changed

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
582582
except TransportTaskException as exception:
583583
raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}')
584584
except plumpy.process_states.KillInterruption as exception:
585-
await self._kill_job(node, transport_queue)
586585
node.set_process_status(str(exception))
587586
return self.retrieve(monitor_result=self._monitor_result)
588587
except (plumpy.futures.CancelledError, asyncio.CancelledError):

src/aiida/engine/processes/process.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@
5151
from aiida.common.lang import classproperty, override
5252
from aiida.common.links import LinkType
5353
from aiida.common.log import LOG_LEVEL_REPORT
54+
from aiida.engine.utils import InterruptableFuture
5455
from aiida.orm.implementation.utils import clean_value
56+
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
5557
from aiida.orm.utils import serialize
5658

5759
from .builder import ProcessBuilder
@@ -72,6 +74,7 @@ class Process(PlumpyProcess):
7274
have full provenance saved in the database.
7375
"""
7476

77+
_cancelling_scheduler_job: asyncio.Task | None = None
7578
_node_class = orm.ProcessNode
7679
_spec_class = ProcessSpec
7780

@@ -336,10 +339,37 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b
336339
"""
337340
self.node.logger.info(f'Request to kill Process<{self.node.pk}>')
338341

339-
had_been_terminated = self.has_terminated()
342+
if self.killed():
343+
# Already killed
344+
return True
345+
346+
if self.has_terminated():
347+
# Can't kill
348+
return False
349+
350+
# Cancel scheduler job
351+
if not force_kill and isinstance(self.node, CalcJobNode):
352+
if self._killing:
353+
self._killing.cancel()
354+
355+
if self._cancelling_scheduler_job:
356+
self._cancelling_scheduler_job.cancel()
357+
self.node.logger.report('Found active scheduler job cancelation that will be rescheduled.')
358+
359+
from .calcjobs.tasks import task_kill_job
360+
361+
coro = self._launch_task(task_kill_job, self.node, self.runner.transport)
362+
self._cancelling_scheduler_job = asyncio.create_task(coro)
363+
try:
364+
self.loop.run_until_complete(self._cancelling_scheduler_job)
365+
except Exception as exc:
366+
self.node.logger.error(f'While cancelling the scheduler job an error was raised: {exc}')
367+
return False
340368

341369
result = super().kill(msg_text, force_kill)
342370

371+
had_been_terminated = self.has_terminated()
372+
343373
# Only kill children if we could be killed ourselves
344374
if result is not False and not had_been_terminated:
345375
killing = []
@@ -374,6 +404,22 @@ def done(done_future: plumpy.futures.Future):
374404

375405
return result
376406

407+
async def _launch_task(self, coro, *args, **kwargs):
408+
"""Launch a coroutine as a task, making sure to make it interruptable."""
409+
import functools
410+
411+
from aiida.engine.utils import interruptable_task
412+
413+
self._task: Union[InterruptableFuture, None]
414+
415+
task_fn = functools.partial(coro, *args, **kwargs)
416+
try:
417+
self._task = interruptable_task(task_fn)
418+
result = await self._task
419+
return result
420+
finally:
421+
self._task = None
422+
377423
@override
378424
def out(self, output_port: str, value: Any = None) -> None:
379425
"""Attach output to output port.

tests/cmdline/commands/test_process.py

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from aiida.common.log import LOG_LEVEL_REPORT
2626
from aiida.engine import Process, ProcessState
2727
from aiida.engine.processes import control as process_control
28-
from aiida.orm import CalcJobNode, Group, WorkChainNode, WorkflowNode, WorkFunctionNode
28+
from aiida.engine.utils import exponential_backoff_retry
29+
from aiida.orm import CalcJobNode, Group, Int, WorkChainNode, WorkflowNode, WorkFunctionNode
2930
from tests.utils.processes import WaitProcess
3031

3132
FuncArgs = tuple[t.Any, ...]
@@ -53,6 +54,7 @@ def start_daemon_worker_in_foreground_and_redirect_streams(
5354

5455
try:
5556
pid = os.getpid()
57+
# For easier debugging you can change these to stdout
5658
sys.stdout = open(log_dir / f'worker-{pid}.out', 'w')
5759
sys.stderr = open(log_dir / f'worker-{pid}.err', 'w')
5860
start_daemon_worker(False, aiida_profile_name)
@@ -72,10 +74,22 @@ def mock_open(_):
7274
raise Exception('Mock open exception')
7375

7476
@staticmethod
75-
async def mock_exponential_backoff_retry(*_, **__):
77+
async def exponential_backoff_retry_fail_upload(fct: t.Callable[..., t.Any], *args, **kwargs):
7678
from aiida.common.exceptions import TransportTaskException
7779

78-
raise TransportTaskException
80+
if 'do_upload' in fct.__name__:
81+
raise TransportTaskException
82+
else:
83+
return await exponential_backoff_retry(fct, *args, **kwargs)
84+
85+
@staticmethod
86+
async def exponential_backoff_retry_fail_kill(fct: t.Callable[..., t.Any], *args, **kwargs):
87+
from aiida.common.exceptions import TransportTaskException
88+
89+
if 'do_kill' in fct.__name__:
90+
raise TransportTaskException
91+
else:
92+
return await exponential_backoff_retry(fct, *args, **kwargs)
7993

8094

8195
@pytest.fixture(scope='function')
@@ -138,7 +152,6 @@ def test_process_kill_failing_transport(
138152
A failure in opening a transport connection results in the EBM to be fired blocking a regular kill command.
139153
The force kill command will ignore the EBM and kill the process in any case."""
140154
from aiida.cmdline.utils.common import get_process_function_report
141-
from aiida.orm import Int
142155

143156
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
144157

@@ -179,7 +192,6 @@ def test_process_kill_failing_transport_failed_kill(
179192
"""
180193

181194
from aiida.cmdline.utils.common import get_process_function_report
182-
from aiida.orm import Int
183195

184196
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
185197

@@ -213,14 +225,14 @@ def make_a_builder(sleep_seconds=0):
213225

214226
@pytest.mark.requires_rmq
215227
@pytest.mark.usefixtures('started_daemon_client')
216-
def test_process_kill_failng_ebm(
228+
def test_process_kill_failing_ebm_transport(
217229
fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch
218230
):
219-
"""9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally.
220-
# (e.g. in scenarios that transport is working again)
221-
"""
222-
from aiida.orm import Int
231+
"""Kill a process that is waiting after failed EBM during a transport task.
223232
233+
It should be possible to kill it normally. A process that failed upload (e.g. in scenarios that transport is working
234+
again) and is then killed
235+
"""
224236
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
225237

226238
def make_a_builder(sleep_seconds=0):
@@ -232,7 +244,10 @@ def make_a_builder(sleep_seconds=0):
232244

233245
kill_timeout = 10
234246

235-
monkeypatch_args = ('aiida.engine.utils.exponential_backoff_retry', MockFunctions.mock_exponential_backoff_retry)
247+
monkeypatch_args = (
248+
'aiida.engine.utils.exponential_backoff_retry',
249+
MockFunctions.exponential_backoff_retry_fail_upload,
250+
)
236251
with fork_worker_context(monkeypatch.setattr, monkeypatch_args):
237252
node = submit_and_await(make_a_builder(), ProcessState.WAITING)
238253
await_condition(
@@ -241,7 +256,60 @@ def make_a_builder(sleep_seconds=0):
241256
timeout=kill_timeout,
242257
)
243258

259+
# kill should start EBM and should successfully kill
260+
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
261+
await_condition(lambda: node.is_killed, timeout=kill_timeout)
262+
263+
264+
@pytest.mark.requires_rmq
265+
@pytest.mark.usefixtures('started_daemon_client')
266+
def test_process_kill_failing_ebm_kill(
267+
fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch
268+
):
269+
"""Kill a process that had previously failed with an EBM.
270+
271+
Killing a process tries to gracefully cancel the job on the remote node. If there are connection problems it retries
272+
it in using the EBM. If this fails another kill command can be send to restart the cancelation of the job scheduler.
273+
"""
274+
from aiida.cmdline.utils.common import get_process_function_report
275+
276+
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
277+
278+
def make_a_builder(sleep_seconds=0):
279+
builder = code.get_builder()
280+
builder.x = Int(1)
281+
builder.y = Int(1)
282+
builder.metadata.options.sleep = sleep_seconds
283+
return builder
284+
285+
kill_timeout = 10
286+
287+
monkeypatch_args = (
288+
'aiida.engine.utils.exponential_backoff_retry',
289+
MockFunctions.exponential_backoff_retry_fail_kill,
290+
)
291+
with fork_worker_context(monkeypatch.setattr, monkeypatch_args):
292+
node = submit_and_await(make_a_builder(kill_timeout + 10), ProcessState.WAITING, timeout=kill_timeout)
293+
await_condition(
294+
lambda: node.process_status == 'Monitoring scheduler: job state RUNNING',
295+
timeout=kill_timeout,
296+
)
297+
298+
# kill should start EBM and be not successful in EBM
244299
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
300+
await_condition(lambda: not node.is_killed, timeout=kill_timeout)
301+
302+
# kill should restart EBM and be not successful in EBM
303+
# this tests if the old task is cancelled and restarted successfully
304+
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
305+
await_condition(
306+
lambda: 'Found active scheduler job cancelation that will be rescheduled.'
307+
in get_process_function_report(node),
308+
timeout=kill_timeout,
309+
)
310+
311+
# force kill should skip EBM and successfully kill the process
312+
run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait'])
245313
await_condition(lambda: node.is_killed, timeout=kill_timeout)
246314

247315

@@ -758,8 +826,6 @@ def test_process_kill(submit_and_await, run_cli_command, aiida_code_installed):
758826
assert result.exit_code == ExitCode.USAGE_ERROR
759827
assert len(result.output_lines) > 0
760828

761-
from aiida.orm import Int
762-
763829
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
764830
builder = code.get_builder()
765831
builder.x = Int(2)

0 commit comments

Comments
 (0)