Skip to content

Commit 9cbd19c

Browse files
committed
Merge wait and timeout to one parameter since we only have scheduled actions
The original idea of `wait` and `timeout` was to distinguish between actions that immediately return and actions that scheduled. This mechanism was however never used and resulted in an misinterpretation of in the force-kill PR. We now only deal with scheduled actions to simplify the logic. Related commits in aiida-core 8388018, cd0d15c and plumpy 1b6ecb8
1 parent 133d884 commit 9cbd19c

File tree

4 files changed

+45
-96
lines changed

4 files changed

+45
-96
lines changed

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -319,15 +319,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
319319
@verdi_process.command('kill')
320320
@arguments.PROCESSES()
321321
@options.ALL(help='Kill all processes if no specific processes are specified.')
322-
@OverridableOption(
323-
'-t',
324-
'--timeout',
325-
type=click.FLOAT,
326-
default=5.0,
327-
show_default=True,
328-
help='Time in seconds to wait for a response of the kill task before timing out.',
329-
)()
330-
@options.WAIT()
322+
@options.TIMEOUT(default=float('inf'))
331323
@OverridableOption(
332324
'-F',
333325
'--force-kill',
@@ -337,7 +329,7 @@ def process_status(call_link_label, most_recent_node, max_depth, processes):
337329
'Note: This may lead to orphaned jobs on your HPC and should be used with caution.',
338330
)()
339331
@decorators.with_dbenv()
340-
def process_kill(processes, all_entries, timeout, wait, force_kill):
332+
def process_kill(processes, all_entries, timeout, force_kill):
341333
"""Kill running processes.
342334
343335
Kill one or multiple running processes."""
@@ -367,7 +359,6 @@ def process_kill(processes, all_entries, timeout, wait, force_kill):
367359
force_kill=force_kill,
368360
all_entries=all_entries,
369361
timeout=timeout,
370-
wait=wait,
371362
)
372363
except control.ProcessTimeoutException as exception:
373364
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -379,10 +370,9 @@ def process_kill(processes, all_entries, timeout, wait, force_kill):
379370
@verdi_process.command('pause')
380371
@arguments.PROCESSES()
381372
@options.ALL(help='Pause all active processes if no specific processes are specified.')
382-
@options.TIMEOUT()
383-
@options.WAIT()
373+
@options.TIMEOUT(default=float('inf'))
384374
@decorators.with_dbenv()
385-
def process_pause(processes, all_entries, timeout, wait):
375+
def process_pause(processes, all_entries, timeout):
386376
"""Pause running processes.
387377
388378
Pause one or multiple running processes."""
@@ -403,7 +393,6 @@ def process_pause(processes, all_entries, timeout, wait):
403393
msg_text='Paused through `verdi process pause`',
404394
all_entries=all_entries,
405395
timeout=timeout,
406-
wait=wait,
407396
)
408397
except control.ProcessTimeoutException as exception:
409398
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
@@ -415,10 +404,9 @@ def process_pause(processes, all_entries, timeout, wait):
415404
@verdi_process.command('play')
416405
@arguments.PROCESSES()
417406
@options.ALL(help='Play all paused processes if no specific processes are specified.')
418-
@options.TIMEOUT()
419-
@options.WAIT()
407+
@options.TIMEOUT(default=float('inf'))
420408
@decorators.with_dbenv()
421-
def process_play(processes, all_entries, timeout, wait):
409+
def process_play(processes, all_entries, timeout):
422410
"""Play (unpause) paused processes.
423411
424412
Play (unpause) one or multiple paused processes."""
@@ -434,7 +422,7 @@ def process_play(processes, all_entries, timeout, wait):
434422

435423
with capture_logging() as stream:
436424
try:
437-
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
425+
control.play_processes(processes, all_entries=all_entries, timeout=timeout)
438426
except control.ProcessTimeoutException as exception:
439427
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
440428

src/aiida/cmdline/params/options/main.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
'USER_LAST_NAME',
111111
'VERBOSITY',
112112
'VISUALIZATION_FORMAT',
113-
'WAIT',
114113
'WITH_ELEMENTS',
115114
'WITH_ELEMENTS_EXCLUSIVE',
116115
'active_process_states',
@@ -675,12 +674,6 @@ def set_log_level(ctx, _param, value):
675674
help='Time in seconds to wait for a response before timing out.',
676675
)
677676

678-
WAIT = OverridableOption(
679-
'--wait/--no-wait',
680-
default=False,
681-
help='Wait for the action to be completed otherwise return as soon as it is scheduled.',
682-
)
683-
684677
FORMULA_MODE = OverridableOption(
685678
'-f',
686679
'--formula-mode',

src/aiida/engine/processes/control.py

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def revive_processes(processes: list[ProcessNode], *, wait: bool = False) -> Non
104104

105105

106106
def play_processes(
107-
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0, wait: bool = False
107+
processes: list[ProcessNode] | None = None, *, all_entries: bool = False, timeout: float = 5.0
108108
) -> None:
109109
"""Play (unpause) paused processes.
110110
@@ -113,7 +113,6 @@ def play_processes(
113113
:param processes: List of processes to play.
114114
:param all_entries: Play all paused processes.
115115
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
116-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
117116
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
118117
"""
119118
if not get_daemon_client().is_daemon_running:
@@ -130,7 +129,7 @@ def play_processes(
130129
return
131130

132131
controller = get_manager().get_process_controller()
133-
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout, wait)
132+
_perform_actions(processes, controller.play_process, 'play', 'playing', timeout)
134133

135134

136135
def pause_processes(
@@ -139,7 +138,6 @@ def pause_processes(
139138
msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`',
140139
all_entries: bool = False,
141140
timeout: float = 5.0,
142-
wait: bool = False,
143141
) -> None:
144142
"""Pause running processes.
145143
@@ -148,7 +146,6 @@ def pause_processes(
148146
:param processes: List of processes to play.
149147
:param all_entries: Pause all playing processes.
150148
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
151-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
152149
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
153150
"""
154151
if not get_daemon_client().is_daemon_running:
@@ -166,7 +163,7 @@ def pause_processes(
166163

167164
controller = get_manager().get_process_controller()
168165
action = functools.partial(controller.pause_process, msg_text=msg_text)
169-
_perform_actions(processes, action, 'pause', 'pausing', timeout, wait)
166+
_perform_actions(processes, action, 'pause', 'pausing', timeout)
170167

171168

172169
def kill_processes(
@@ -176,7 +173,6 @@ def kill_processes(
176173
force_kill: bool = False,
177174
all_entries: bool = False,
178175
timeout: float = 5.0,
179-
wait: bool = False,
180176
) -> None:
181177
"""Kill running processes.
182178
@@ -185,7 +181,6 @@ def kill_processes(
185181
:param processes: List of processes to play.
186182
:param all_entries: Kill all active processes.
187183
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
188-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
189184
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
190185
"""
191186
if not get_daemon_client().is_daemon_running:
@@ -203,7 +198,7 @@ def kill_processes(
203198

204199
controller = get_manager().get_process_controller()
205200
action = functools.partial(controller.kill_process, msg_text=msg_text, force_kill=force_kill)
206-
_perform_actions(processes, action, 'kill', 'killing', timeout, wait)
201+
_perform_actions(processes, action, 'kill', 'killing', timeout)
207202

208203

209204
def _perform_actions(
@@ -212,7 +207,6 @@ def _perform_actions(
212207
infinitive: str,
213208
present: str,
214209
timeout: t.Optional[float] = None,
215-
wait: bool = False,
216210
**kwargs: t.Any,
217211
) -> None:
218212
"""Perform an action on a list of processes.
@@ -223,7 +217,6 @@ def _perform_actions(
223217
:param present: The present tense of the verb that represents the action.
224218
:param past: The past tense of the verb that represents the action.
225219
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
226-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
227220
:param kwargs: Keyword arguments that will be passed to the method ``action``.
228221
:raises ``ProcessTimeoutException``: If the processes do not respond within the timeout.
229222
"""
@@ -241,77 +234,52 @@ def _perform_actions(
241234
else:
242235
futures[future] = process
243236

244-
_resolve_futures(futures, infinitive, present, wait, timeout)
237+
_resolve_futures(futures, infinitive, present, timeout)
245238

246239

247240
def _resolve_futures(
248241
futures: dict[concurrent.futures.Future, ProcessNode],
249242
infinitive: str,
250243
present: str,
251-
wait: bool = False,
252244
timeout: t.Optional[float] = None,
253245
) -> None:
254246
"""Process a mapping of futures representing an action on an active process.
255247
256248
This function will echo the correct information strings based on the outcomes of the futures and the given verb
257249
conjugations. You can optionally wait for any pending actions to be completed before the functions returns and use a
258-
timeout to put a maximum wait time on the actions.
250+
timeout to put a maximum wait time on the actions. TODO fix docstring
259251
260252
:param futures: The map of action futures and the corresponding processes.
261253
:param infinitive: The infinitive form of the action verb.
262254
:param present: The present tense form of the action verb.
263-
:param wait: Set to ``True`` to wait for process response, for ``False`` the action is fire-and-forget.
264255
:param timeout: Raise a ``ProcessTimeoutException`` if the process does not respond within this amount of seconds.
265256
"""
266-
scheduled = {}
267-
268-
def handle_result(result):
269-
if result is True:
270-
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
271-
elif result is False:
272-
LOGGER.error(f'problem {present} Process<{process.pk}>')
273-
elif isinstance(result, kiwipy.Future):
274-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
275-
scheduled[result] = process
276-
else:
277-
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
257+
if not timeout:
258+
return
259+
260+
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in futures.values()])}")
278261

279262
try:
280263
for future, process in futures.items():
281-
# unwrap is need here since LoopCommunicator will also wrap a future
264+
# we unwrap to the end
282265
unwrapped = unwrap_kiwi_future(future)
283266
try:
284267
result = unwrapped.result(timeout=timeout)
285268
except communications.TimeoutError:
286-
cancelled = unwrapped.cancel()
269+
cancelled = future.cancel()
287270
if cancelled:
288271
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out and was cancelled.')
289272
else:
290273
LOGGER.error(f'call to {infinitive} Process<{process.pk}> timed out but could not be cancelled.')
291274
except Exception as exception:
292275
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
293276
else:
294-
if isinstance(result, kiwipy.Future):
295-
LOGGER.report(f'scheduled {infinitive} Process<{process.pk}>')
296-
scheduled[result] = process
277+
if result is True:
278+
LOGGER.report(f'request to {infinitive} Process<{process.pk}> sent')
279+
elif result is False:
280+
LOGGER.error(f'problem {present} Process<{process.pk}>')
297281
else:
298-
handle_result(result)
299-
300-
if not wait or not scheduled:
301-
return
302-
303-
LOGGER.report(f"waiting for process(es) {','.join([str(proc.pk) for proc in scheduled.values()])}")
304-
305-
for future in concurrent.futures.as_completed(scheduled.keys(), timeout=timeout):
306-
process = scheduled[future]
307-
308-
try:
309-
result = future.result()
310-
except Exception as exception:
311-
LOGGER.error(f'failed to {infinitive} Process<{process.pk}>: {exception}')
312-
else:
313-
handle_result(result)
314-
282+
LOGGER.error(f'got unexpected response when {present} Process<{process.pk}>: {result}')
315283
except concurrent.futures.TimeoutError:
316284
raise ProcessTimeoutException(
317285
f'timed out trying to {infinitive} processes {futures.values()}\n'

0 commit comments

Comments
 (0)