Skip to content

Commit 9e9ccff

Browse files
committed
Regular killing reschedules a cancel of scheduler job
PR aiidateam#6793 introduced the cancelation of earlier kill actions. This had the problem if two kill commands are set in a sequence, the second kill action will cancel the first one which triggered the cancelation of the scheduler job within an EBM. The second kill command however did not retrigger the cancelation of the scheduler job. This bug appeared because we have two places where the killing logic is placed. More information about this can be found in PR aiidateam#6868 that fixes this properly refactoring the kill action. This PR only serves as a fast temporary fix with workarounds. Before this PR, when the killing command failed through the EBM, the scheduler job could not be cancelled through a kill anymore. Since we have now force-kill option to bypass the EBM, we can reschedule the cancelation of the scheduler job to gracefully kill a process.
1 parent 2fd4b89 commit 9e9ccff

File tree

4 files changed

+61
-2
lines changed

4 files changed

+61
-2
lines changed

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
582582
except TransportTaskException as exception:
583583
raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}')
584584
except plumpy.process_states.KillInterruption as exception:
585-
await self._kill_job(node, transport_queue)
586585
node.set_process_status(str(exception))
587586
return self.retrieve(monitor_result=self._monitor_result)
588587
except (plumpy.futures.CancelledError, asyncio.CancelledError):

src/aiida/engine/processes/process.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@
5151
from aiida.common.lang import classproperty, override
5252
from aiida.common.links import LinkType
5353
from aiida.common.log import LOG_LEVEL_REPORT
54+
from aiida.engine.utils import InterruptableFuture
5455
from aiida.orm.implementation.utils import clean_value
56+
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
5557
from aiida.orm.utils import serialize
5658

5759
from .builder import ProcessBuilder
@@ -72,6 +74,7 @@ class Process(PlumpyProcess):
7274
have full provenance saved in the database.
7375
"""
7476

77+
_cancelling_scheduler_job: asyncio.Task | None = None
7578
_node_class = orm.ProcessNode
7679
_spec_class = ProcessSpec
7780

@@ -336,10 +339,43 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b
336339
"""
337340
self.node.logger.info(f'Request to kill Process<{self.node.pk}>')
338341

339-
had_been_terminated = self.has_terminated()
342+
# PR_COMMENT Because we need to overwrite the logic of the cancelation of the self._killing task of the
343+
# scheduler job, we need to copy this logic of the parent class in plumpy, we need to adapt the
344+
# cancelation of the last sent killing action to also resend the kill/cancelation of the scheduler
345+
# job as we stop this canelation by canceling the last killing action
346+
if self.killed():
347+
# Already killed
348+
return True
349+
350+
if self.has_terminated():
351+
# Can't kill
352+
return False
353+
354+
# Cancel scheduler job
355+
if not force_kill and isinstance(self.node, CalcJobNode):
356+
if self._killing:
357+
self._killing.cancel()
358+
359+
# PR_COMMENT: We cannot reuse _killing because of type issues, it is a CancellableAction.
360+
# We can wrap a task around a CancellableAction but the CancellableAction catches silently any
361+
# error whilel here we need to know if the cancelation of the scheduler job failed.
362+
if self._cancelling_scheduler_job:
363+
self._cancelling_scheduler_job.cancel()
364+
365+
from .calcjobs.tasks import task_kill_job
366+
367+
coro = self._launch_task(task_kill_job, self.node, self.runner.transport)
368+
self._cancelling_scheduler_job = asyncio.create_task(coro)
369+
try:
370+
self.loop.run_until_complete(self._cancelling_scheduler_job)
371+
except Exception as exc:
372+
self.node.logger.error(f'While cancelling job error was raised: {exc!s}')
373+
return False
340374

341375
result = super().kill(msg_text, force_kill)
342376

377+
had_been_terminated = self.has_terminated()
378+
343379
# Only kill children if we could be killed ourselves
344380
if result is not False and not had_been_terminated:
345381
killing = []
@@ -374,6 +410,24 @@ def done(done_future: plumpy.futures.Future):
374410

375411
return result
376412

413+
# PR_COMMENT This is a copy of the function in engine/processes/calcjobs/tasks.py
414+
# and will merged to one place in PR #6868
415+
async def _launch_task(self, coro, *args, **kwargs):
416+
"""Launch a coroutine as a task, making sure to make it interruptable."""
417+
import functools
418+
419+
from aiida.engine.utils import interruptable_task
420+
421+
self._task: Union[InterruptableFuture, None]
422+
423+
task_fn = functools.partial(coro, *args, **kwargs)
424+
try:
425+
self._task = interruptable_task(task_fn)
426+
result = await self._task
427+
return result
428+
finally:
429+
self._task = None
430+
377431
@override
378432
def out(self, output_port: str, value: Any = None) -> None:
379433
"""Attach output to output port.

src/aiida/engine/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ async def exponential_backoff_retry(
193193
:param ignore_exceptions: exceptions to ignore, i.e. when caught do nothing and simply re-raise
194194
:return: result if the ``coro`` call completes within ``max_attempts`` retries without raising
195195
"""
196+
196197
if logger is None:
197198
logger = LOGGER
198199

tests/cmdline/commands/test_process.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,12 @@ def make_a_builder(sleep_seconds=0):
241241
timeout=kill_timeout,
242242
)
243243

244+
# should restart EBM and be again not successful
244245
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
246+
await_condition(lambda: not node.is_killed, timeout=kill_timeout)
247+
248+
# should skip EBM and successfully kill the process
249+
run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait'])
245250
await_condition(lambda: node.is_killed, timeout=kill_timeout)
246251

247252

0 commit comments

Comments
 (0)