Skip to content

Commit 8cdadad

Browse files
committed
Regular killing reschedules a cancel of scheduler job
PR aiidateam#6793 introduced the cancelation of earlier kill actions. This had the problem if two kill commands are set in a sequence, the second kill action will cancel the first one which triggered the cancelation of the scheduler job within an EBM. The second kill command however did not retrigger the cancelation of the scheduler job. This bug appeared because we have two places where the killing logic is placed. More information about this can be found in PR aiidateam#6868 that fixes this properly refactoring the kill action. This PR only serves as a fast temporary fix with workarounds. Before this PR, when the killing command failed through the EBM, the scheduler job could not be cancelled through a kill anymore. Since we have now force-kill option to bypass the EBM, we can reschedule the cancelation of the scheduler job to gracefully kill a process.
1 parent 8ad7953 commit 8cdadad

File tree

4 files changed

+130
-8
lines changed

4 files changed

+130
-8
lines changed

src/aiida/engine/processes/calcjobs/tasks.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
582582
except TransportTaskException as exception:
583583
raise plumpy.process_states.PauseInterruption(f'Pausing after failed transport task: {exception}')
584584
except plumpy.process_states.KillInterruption as exception:
585-
await self._kill_job(node, transport_queue)
586585
node.set_process_status(str(exception))
587586
return self.retrieve(monitor_result=self._monitor_result)
588587
except (plumpy.futures.CancelledError, asyncio.CancelledError):

src/aiida/engine/processes/process.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@
5151
from aiida.common.lang import classproperty, override
5252
from aiida.common.links import LinkType
5353
from aiida.common.log import LOG_LEVEL_REPORT
54+
from aiida.engine.utils import InterruptableFuture
5455
from aiida.orm.implementation.utils import clean_value
56+
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
5557
from aiida.orm.utils import serialize
5658

5759
from .builder import ProcessBuilder
@@ -72,6 +74,7 @@ class Process(PlumpyProcess):
7274
have full provenance saved in the database.
7375
"""
7476

77+
_cancelling_scheduler_job: asyncio.Task | None = None
7578
_node_class = orm.ProcessNode
7679
_spec_class = ProcessSpec
7780

@@ -336,10 +339,43 @@ def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[b
336339
"""
337340
self.node.logger.info(f'Request to kill Process<{self.node.pk}>')
338341

339-
had_been_terminated = self.has_terminated()
342+
# PR_COMMENT Because we need to overwrite the logic of the cancelation of the self._killing task of the
343+
# scheduler job, we need to copy this logic of the parent class in plumpy, we need to adapt the
344+
# cancelation of the last sent killing action to also resend the kill/cancelation of the scheduler
345+
# job as we stop this canelation by canceling the last killing action
346+
if self.killed():
347+
# Already killed
348+
return True
349+
350+
if self.has_terminated():
351+
# Can't kill
352+
return False
353+
354+
# Cancel scheduler job
355+
if not force_kill and isinstance(self.node, CalcJobNode):
356+
if self._killing:
357+
self._killing.cancel()
358+
359+
# PR_COMMENT: We cannot reuse _killing because of type issues, it is a CancellableAction.
360+
# We can wrap a task around a CancellableAction but the CancellableAction catches silently any
361+
# error whilel here we need to know if the cancelation of the scheduler job failed.
362+
if self._cancelling_scheduler_job:
363+
self._cancelling_scheduler_job.cancel()
364+
365+
from .calcjobs.tasks import task_kill_job
366+
367+
coro = self._launch_task(task_kill_job, self.node, self.runner.transport)
368+
self._cancelling_scheduler_job = asyncio.create_task(coro)
369+
try:
370+
self.loop.run_until_complete(self._cancelling_scheduler_job)
371+
except Exception as exc:
372+
self.node.logger.error(f'While cancelling job error was raised: {exc!s}')
373+
return False
340374

341375
result = super().kill(msg_text, force_kill)
342376

377+
had_been_terminated = self.has_terminated()
378+
343379
# Only kill children if we could be killed ourselves
344380
if result is not False and not had_been_terminated:
345381
killing = []
@@ -374,6 +410,24 @@ def done(done_future: plumpy.futures.Future):
374410

375411
return result
376412

413+
# PR_COMMENT This is a copy of the function in engine/processes/calcjobs/tasks.py
414+
# and will merged to one place in PR #6868
415+
async def _launch_task(self, coro, *args, **kwargs):
416+
"""Launch a coroutine as a task, making sure to make it interruptable."""
417+
import functools
418+
419+
from aiida.engine.utils import interruptable_task
420+
421+
self._task: Union[InterruptableFuture, None]
422+
423+
task_fn = functools.partial(coro, *args, **kwargs)
424+
try:
425+
self._task = interruptable_task(task_fn)
426+
result = await self._task
427+
return result
428+
finally:
429+
self._task = None
430+
377431
@override
378432
def out(self, output_port: str, value: Any = None) -> None:
379433
"""Attach output to output port.

src/aiida/engine/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ async def exponential_backoff_retry(
193193
:param ignore_exceptions: exceptions to ignore, i.e. when caught do nothing and simply re-raise
194194
:return: result if the ``coro`` call completes within ``max_attempts`` retries without raising
195195
"""
196+
196197
if logger is None:
197198
logger = LOGGER
198199

tests/cmdline/commands/test_process.py

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from aiida.common.log import LOG_LEVEL_REPORT
2626
from aiida.engine import Process, ProcessState
2727
from aiida.engine.processes import control as process_control
28+
from aiida.engine.utils import exponential_backoff_retry
2829
from aiida.orm import CalcJobNode, Group, WorkChainNode, WorkflowNode, WorkFunctionNode
2930
from tests.utils.processes import WaitProcess
3031

@@ -53,6 +54,7 @@ def start_daemon_worker_in_foreground_and_redirect_streams(
5354

5455
try:
5556
pid = os.getpid()
57+
# For easier debugging you can change these to stdout
5658
sys.stdout = open(log_dir / f'worker-{pid}.out', 'w')
5759
sys.stderr = open(log_dir / f'worker-{pid}.err', 'w')
5860
start_daemon_worker(False, aiida_profile_name)
@@ -72,10 +74,22 @@ def mock_open(_):
7274
raise Exception('Mock open exception')
7375

7476
@staticmethod
75-
async def mock_exponential_backoff_retry(*_, **__):
77+
async def exponential_backoff_retry_fail_upload(fct: t.Callable[..., t.Any], *args, **kwargs):
7678
from aiida.common.exceptions import TransportTaskException
7779

78-
raise TransportTaskException
80+
if 'do_upload' in fct.__name__:
81+
raise TransportTaskException
82+
else:
83+
return await exponential_backoff_retry(fct, *args, **kwargs)
84+
85+
@staticmethod
86+
async def exponential_backoff_retry_fail_kill(fct: t.Callable[..., t.Any], *args, **kwargs):
87+
from aiida.common.exceptions import TransportTaskException
88+
89+
if 'do_kill' in fct.__name__:
90+
raise TransportTaskException
91+
else:
92+
return await exponential_backoff_retry(fct, *args, **kwargs)
7993

8094

8195
@pytest.fixture(scope='function')
@@ -213,11 +227,12 @@ def make_a_builder(sleep_seconds=0):
213227

214228
@pytest.mark.requires_rmq
215229
@pytest.mark.usefixtures('started_daemon_client')
216-
def test_process_kill_failng_ebm(
230+
def test_process_kill_failing_ebm_upload(
217231
fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch
218232
):
219-
"""9) Kill a process that is paused after EBM (5 times failed). It should be possible to kill it normally.
220-
# (e.g. in scenarios that transport is working again)
233+
"""Kill a process that is waiting after failed EBM during upload. It should be possible to kill it normally.
234+
235+
A process that failed upload (e.g. in scenarios that transport is working again) and is then killed with
221236
"""
222237
from aiida.orm import Int
223238

@@ -232,7 +247,10 @@ def make_a_builder(sleep_seconds=0):
232247

233248
kill_timeout = 10
234249

235-
monkeypatch_args = ('aiida.engine.utils.exponential_backoff_retry', MockFunctions.mock_exponential_backoff_retry)
250+
monkeypatch_args = (
251+
'aiida.engine.utils.exponential_backoff_retry',
252+
MockFunctions.exponential_backoff_retry_fail_upload,
253+
)
236254
with fork_worker_context(monkeypatch.setattr, monkeypatch_args):
237255
node = submit_and_await(make_a_builder(), ProcessState.WAITING)
238256
await_condition(
@@ -241,10 +259,60 @@ def make_a_builder(sleep_seconds=0):
241259
timeout=kill_timeout,
242260
)
243261

262+
# kill should start EBM and should successfully kill
244263
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
245264
await_condition(lambda: node.is_killed, timeout=kill_timeout)
246265

247266

267+
@pytest.mark.requires_rmq
268+
@pytest.mark.usefixtures('started_daemon_client')
269+
def test_process_kill_failing_ebm_kill(
270+
fork_worker_context, submit_and_await, aiida_code_installed, run_cli_command, monkeypatch
271+
):
272+
"""Kill a process that with a failng EBM during the kill.
273+
274+
Killing a process tries to gracefully cancel the job on the remote node. If there are connection problems it retries
275+
it in using the EBM. If this fails another kill command can be send to restart the cancelation of the job scheduler.
276+
"""
277+
from aiida.orm import Int
278+
279+
code = aiida_code_installed(default_calc_job_plugin='core.arithmetic.add', filepath_executable='/bin/bash')
280+
281+
def make_a_builder(sleep_seconds=0):
282+
builder = code.get_builder()
283+
builder.x = Int(1)
284+
builder.y = Int(1)
285+
builder.metadata.options.sleep = sleep_seconds
286+
return builder
287+
288+
kill_timeout = 10
289+
290+
monkeypatch_args = (
291+
'aiida.engine.utils.exponential_backoff_retry',
292+
MockFunctions.exponential_backoff_retry_fail_kill,
293+
)
294+
# from aiida.engine.utils import exponential_backoff_retry
295+
# monkeypatch_args = ('aiida.engine.utils.exponential_backoff_retry', exponential_backoff_retry)
296+
with fork_worker_context(monkeypatch.setattr, monkeypatch_args):
297+
node = submit_and_await(make_a_builder(kill_timeout + 10), ProcessState.WAITING, timeout=kill_timeout)
298+
await_condition(
299+
lambda: node.process_status == 'Monitoring scheduler: job state RUNNING',
300+
timeout=kill_timeout,
301+
)
302+
303+
# kill should start EBM and be not successful in EBM
304+
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
305+
await_condition(lambda: not node.is_killed, timeout=kill_timeout)
306+
307+
# kill should restart EBM and be not successful in EBM
308+
run_cli_command(cmd_process.process_kill, [str(node.pk), '--wait'])
309+
await_condition(lambda: not node.is_killed, timeout=kill_timeout)
310+
311+
# force kill should skip EBM and successfully kill the process
312+
run_cli_command(cmd_process.process_kill, [str(node.pk), '-F', '--wait'])
313+
await_condition(lambda: node.is_killed, timeout=kill_timeout)
314+
315+
248316
class TestVerdiProcess:
249317
"""Tests for `verdi process`."""
250318

0 commit comments

Comments
 (0)