Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ dependencies:
- docstring_parser
- get-annotations~=0.1
- python-graphviz~=0.19
- plumpy~=0.25.0
- ipython>=7
- jedi<0.19
- jinja2~=3.0
- kiwipy[rmq]~=0.8.4
- importlib-metadata~=6.0
- numpy~=1.21
- paramiko~=3.0
- plumpy~=0.24.0
- pgsu~=0.3.0
- psutil~=5.6
- psycopg[binary]~=3.0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ dependencies = [
'docstring-parser',
'get-annotations~=0.1;python_version<"3.10"',
'graphviz~=0.19',
'plumpy~=0.25.0',
'ipython>=7',
'jedi<0.19',
'jinja2~=3.0',
'kiwipy[rmq]~=0.8.4',
'importlib-metadata~=6.0',
'numpy~=1.21',
'paramiko~=3.0',
'plumpy~=0.24.0',
'pgsu~=0.3.0',
'psutil~=5.6',
'psycopg[binary]~=3.0',
Expand Down
28 changes: 25 additions & 3 deletions src/aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from aiida.cmdline.commands.cmd_verdi import verdi
from aiida.cmdline.params import arguments, options, types
from aiida.cmdline.params.options.overridable import OverridableOption
from aiida.cmdline.utils import decorators, echo
from aiida.common.log import LOG_LEVELS, capture_logging

Expand Down Expand Up @@ -318,10 +319,25 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
@verdi_process.command('kill')
@arguments.PROCESSES()
@options.ALL(help='Kill all processes if no specific processes are specified.')
@options.TIMEOUT()
@OverridableOption(
'-t',
'--timeout',
type=click.FLOAT,
default=5.0,
show_default=True,
help='Time in seconds to wait for a response of the kill task before timing out.',
)()
@options.WAIT()
@OverridableOption(
'-F',
'--force-kill',
is_flag=True,
default=False,
help='Kills the process without waiting for a confirmation if the job has been killed.\n'
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
)()
@decorators.with_dbenv()
def process_kill(processes, all_entries, timeout, wait):
def process_kill(processes, all_entries, timeout, wait, force_kill):
"""Kill running processes.

Kill one or multiple running processes."""
Expand All @@ -338,11 +354,17 @@ def process_kill(processes, all_entries, timeout, wait):
if all_entries:
click.confirm('Are you sure you want to kill all processes?', abort=True)

if force_kill:
echo.echo_warning('Force kill is enabled. This may lead to orphaned jobs on your HPC.')
msg_text = 'Force killed through `verdi process kill`'
else:
msg_text = 'Killed through `verdi process kill`'
with capture_logging() as stream:
try:
control.kill_processes(
processes,
msg_text='Killed through `verdi process kill`',
msg_text=msg_text,
force_kill=force_kill,
all_entries=all_entries,
timeout=timeout,
wait=wait,
Expand Down
7 changes: 6 additions & 1 deletion src/aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ def get_worker_info(self, timeout: int | None = None) -> dict[str, t.Any]:
command = {'command': 'stats', 'properties': {'name': self.daemon_name}}
return self.call_client(command, timeout=timeout)

def get_number_of_workers(self, timeout: int | None = None) -> int:
"""Get number of workers."""
return len(self.get_worker_info(timeout).get('info', []))

def get_daemon_info(self, timeout: int | None = None) -> dict[str, t.Any]:
"""Get statistics about this daemon itself.

Expand Down Expand Up @@ -531,7 +535,8 @@ def start_daemon(
try:
subprocess.check_output(command, env=env, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exception:
raise DaemonException('The daemon failed to start.') from exception
# CalledProcessError is not passing the subprocess stderr in its message so we add it in DaemonException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 👍

raise DaemonException(f'The daemon failed to start with error:\n{exception.stdout.decode()}') from exception

if not wait:
return
Expand Down
7 changes: 5 additions & 2 deletions src/aiida/engine/daemon/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import signal
import sys
from typing import Union

from aiida.common.log import configure_logging
from aiida.engine.daemon.client import get_daemon_client
Expand All @@ -32,18 +33,20 @@ async def shutdown_worker(runner: Runner) -> None:
task.cancel()

await asyncio.gather(*tasks, return_exceptions=True)

runner.close()

LOGGER.info('Daemon worker stopped')


def start_daemon_worker(foreground: bool = False) -> None:
def start_daemon_worker(foreground: bool = False, profile_name: Union[str, None] = None) -> None:
"""Start a daemon worker for the currently configured profile.

:param foreground: If true, the logging will be configured to write to stdout, otherwise it will be configured to
write to the daemon log file.
"""
daemon_client = get_daemon_client()

daemon_client = get_daemon_client(profile_name)
configure_logging(with_orm=True, daemon=not foreground, daemon_log_file=daemon_client.daemon_log_file)

LOGGER.debug(f'sys.executable: {sys.executable}')
Expand Down
17 changes: 9 additions & 8 deletions src/aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine import utils
from aiida.engine.daemon import execmanager
from aiida.engine.processes.exit_code import ExitCode
from aiida.engine.transports import TransportQueue
from aiida.engine.utils import InterruptableFuture, exponential_backoff_retry, interruptable_task
from aiida.engine.utils import InterruptableFuture, interruptable_task
from aiida.manage.configuration import get_config_option
from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode
from aiida.schedulers.datastructures import JobState
Expand Down Expand Up @@ -102,7 +103,7 @@
try:
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
skip_submit = await exponential_backoff_retry(
skip_submit = await utils.exponential_backoff_retry(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes are important to be able to monkeypatch, otherwise one imports a local copy of the function that cannot be monkeypatched

do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
Expand Down Expand Up @@ -150,7 +151,7 @@
try:
logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
result = await utils.exponential_backoff_retry(
do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
Expand Down Expand Up @@ -208,7 +209,7 @@
try:
logger.info(f'scheduled request to update CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
job_done = await exponential_backoff_retry(
job_done = await utils.exponential_backoff_retry(
do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
Expand Down Expand Up @@ -258,7 +259,7 @@
try:
logger.info(f'scheduled request to monitor CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
monitor_result = await exponential_backoff_retry(
monitor_result = await utils.exponential_backoff_retry(
do_monitor, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
Expand Down Expand Up @@ -326,7 +327,7 @@
try:
logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
result = await exponential_backoff_retry(
result = await utils.exponential_backoff_retry(
do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
Expand Down Expand Up @@ -371,7 +372,7 @@
return await execmanager.stash_calculation(node, transport)

try:
await exponential_backoff_retry(
await utils.exponential_backoff_retry(

Check warning on line 375 in src/aiida/engine/processes/calcjobs/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/calcjobs/tasks.py#L375

Added line #L375 was not covered by tests
do_stash,
initial_interval,
max_attempts,
Expand Down Expand Up @@ -419,7 +420,7 @@

try:
logger.info(f'scheduled request to kill CalcJob<{node.pk}>')
result = await exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
result = await utils.exponential_backoff_retry(do_kill, initial_interval, max_attempts, logger=node.logger)
except plumpy.process_states.Interruption:
raise
except Exception as exception:
Expand Down
19 changes: 11 additions & 8 deletions src/aiida/engine/processes/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
processes: list[ProcessNode] | None = None,
*,
msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`',
force_kill: bool = False,
all_entries: bool = False,
timeout: float = 5.0,
wait: bool = False,
Expand Down Expand Up @@ -201,7 +202,7 @@
return

controller = get_manager().get_process_controller()
action = functools.partial(controller.kill_process, msg_text=msg_text)
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force_kill)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this is one of the changes need to be tested. The CLI tests are calling this and should not test against using daemon, which cross too many boundaries.

In essence, if I understand correctly to make verdi process kill --force work, it need:

  • --force can propagate to the kill_processes to make the force_kill set to True
  • kill_processes function should send the message through RMQ with message resolved with force as an attriute of the function.
  • Then it is plumpy part (override by aiida-core/engine/processes/process.py::Process::kill, this I said the evil of inheritance) need to make sure it calls kill properly, so it is out the scope of the tests covered in aiida-core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be now tested.

_perform_actions(processes, action, 'kill', 'killing', timeout, wait)


Expand Down Expand Up @@ -276,15 +277,17 @@
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')

try:
for future in concurrent.futures.as_completed(futures.keys(), timeout=timeout):
process = futures[future]

for future, process in futures.items():
# unwrap is need here since LoopCommunicator will also wrap a future
unwrapped = unwrap_kiwi_future(future)
try:
# unwrap is need here since LoopCommunicator will also wrap a future
unwrapped = unwrap_kiwi_future(future)
result = unwrapped.result()
result = unwrapped.result(timeout=timeout)
except communications.TimeoutError:
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out')
cancelled = unwrapped.cancel()
if cancelled:
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
else:
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')

Check warning on line 290 in src/aiida/engine/processes/control.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/engine/processes/control.py#L290

Added line #L290 was not covered by tests
except Exception as exception:
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
else:
Expand Down
4 changes: 2 additions & 2 deletions src/aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def load_instance_state(

self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state')

def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future]:
def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[bool, plumpy.futures.Future]:
"""Kill the process and all the children calculations it called

:param msg: message
Expand All @@ -338,7 +338,7 @@ def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future

had_been_terminated = self.has_terminated()

result = super().kill(msg_text)
result = super().kill(msg_text, force_kill)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other thing I think require a test, but from the change you made, it should be the plumpy's responsibility to make sure it kill properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you already did the change aiidateam/plumpy#320, I think there is the place require more test to make sure the behavior of kill works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have now several tests. Do you think something is still missing?


# Only kill children if we could be killed ourselves
if result is not False and not had_been_terminated:
Expand Down
1 change: 1 addition & 0 deletions src/aiida/manage/tests/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@
f'Daemon <{started_daemon_client.profile.name}|{daemon_status}> log file content: \n'
f'{daemon_log_file}'
)
time.sleep(0.1)

Check warning on line 763 in src/aiida/manage/tests/pytest_fixtures.py

View check run for this annotation

Codecov / codecov/patch

src/aiida/manage/tests/pytest_fixtures.py#L763

Added line #L763 was not covered by tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to not keep cpu spinning


return node

Expand Down
1 change: 1 addition & 0 deletions src/aiida/tools/pytest_fixtures/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def factory(
f'Daemon <{started_daemon_client.profile.name}|{daemon_status}> log file content: \n'
f'{daemon_log_file}'
)
time.sleep(0.1)

return node

Expand Down
Loading
Loading