Skip to content

Commit

Permalink
feat(cli): add --follow flag to logs command (#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlemesh committed Oct 14, 2024
1 parent 79d0483 commit 7993bbf
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 81 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)
Expand Down
102 changes: 102 additions & 0 deletions reana_client/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import shlex
import sys
import time
from typing import Callable, NoReturn, Optional, List, Tuple, Union

import click
Expand All @@ -24,6 +25,8 @@
RUN_STATUSES,
JOB_STATUS_TO_MSG_COLOR,
JSON,
CLI_LOGS_FOLLOW_MIN_INTERVAL,
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
)
from reana_client.printer import display_message
from reana_client.utils import workflow_uuid_or_name
Expand Down Expand Up @@ -409,3 +412,102 @@ def output_user_friendly_logs(workflow_logs, steps):
f"Step {job_name_or_id} emitted no logs.",
msg_type="info",
)


def retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page=None,
size=None,
): # noqa: D301
"""Retrieve workflow logs."""
from reana_client.api.client import get_workflow_logs

response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))


def follow_workflow_logs(
workflow,
access_token,
interval,
steps,
): # noqa: D301
"""Continuously poll for workflow or job logs."""
from reana_client.api.client import get_workflow_logs, get_workflow_status

if len(steps) > 1:
display_message(
"Only one step can be followed at a time, ignoring additional steps.",
"warning",
)
if interval < CLI_LOGS_FOLLOW_MIN_INTERVAL:
interval = CLI_LOGS_FOLLOW_DEFAULT_INTERVAL
display_message(
"Interval should be an integer greater than 0, resetting to default (10 s).",
"warning",
)
step = steps[0] if steps else None

previous_logs = ""

while True:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not step else [step],
).get("logs")
json_response = json.loads(response)

if step:
jobs = json_response["job_logs"]

if not jobs:
raise Exception(f"Step data not found: {step}")

job = next(iter(jobs.values())) # get values of the first job
logs = job["logs"]
status = job["status"]
else:
logs = json_response["workflow_logs"]
status = get_workflow_status(workflow, access_token).get("status")

previous_lines = previous_logs.splitlines()
new_lines = logs.splitlines()

diff = "\n".join([x for x in new_lines if x not in previous_lines])
if diff != "" and diff != "\n":
display_message(diff)

if status in ["finished", "failed", "stopped", "deleted"]:
return
previous_logs = logs
time.sleep(interval)
171 changes: 90 additions & 81 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
key_value_to_dict,
parse_filter_parameters,
requires_environments,
retrieve_workflow_logs,
follow_workflow_logs,
)
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK, CLI_LOGS_FOLLOW_DEFAULT_INTERVAL
from reana_client.printer import display_message
from reana_client.utils import (
get_reana_yaml_file_path,
Expand Down Expand Up @@ -886,6 +888,20 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
multiple=True,
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
)
@click.option(
"--follow",
"follow",
is_flag=True,
default=False,
help="Follow the logs of a running workflow or job (similar to tail -f).",
)
@click.option(
"-i",
"--interval",
"interval",
default=CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
help=f"Sleep time in seconds between log polling if log following is enabled. [default={CLI_LOGS_FOLLOW_DEFAULT_INTERVAL}]",
)
@add_pagination_options
@check_connection
@click.pass_context
Expand All @@ -894,22 +910,32 @@ def workflow_logs(
workflow,
access_token,
json_format,
steps=None,
follow,
interval,
filters=None,
page=None,
size=None,
): # noqa: D301
"""Get workflow logs.
The ``logs`` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ``logs`` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:\n
\t $ reana-client logs -w myanalysis.42
\t $ reana-client logs -w myanalysis.42 -s 1st_step
\t $ reana-client logs -w myanalysis.42\n
\t $ reana-client logs -w myanalysis.42 --json\n
\t $ reana-client logs -w myanalysis.42 --filter status=running\n
\t $ reana-client logs -w myanalysis.42 --filter step=myfit --follow\n
"""
from reana_client.api.client import get_workflow_logs
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))

if json_format and follow:
display_message(
"Ignoring --json as it cannot be used together with --follow.",
msg_type="warning",
)

available_filters = {
"step": "job_name",
Expand All @@ -920,90 +946,73 @@ def workflow_logs(
steps = []
chosen_filters = dict()

logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
if workflow:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
msg_type="error",
)
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
"Input status value {} is not valid. ".format(value),
msg_type="error",
)
),
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Input status value {} is not valid. ".format(value),
msg_type="error",
),
sys.exit(1)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)
try:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(
workflow_logs, None if not steps else list(set(steps))
)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve the logs of a workflow {}: \n"
"{}".format(workflow, str(e)),
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)

try:
if follow:
follow_workflow_logs(workflow, access_token, interval, steps)
else:
retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page,
size,
)
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
msg_type="error",
)
sys.exit(1)


@workflow_execution_group.command("validate")
@click.option(
Expand Down
6 changes: 6 additions & 0 deletions reana_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@

STD_OUTPUT_CHAR = "-"
"""Character used to refer to the standard output."""

CLI_LOGS_FOLLOW_MIN_INTERVAL = 1
"""Minimum interval between log requests in seconds."""

CLI_LOGS_FOLLOW_DEFAULT_INTERVAL = 10
"""Default interval between log requests in seconds."""
Loading

0 comments on commit 7993bbf

Please sign in to comment.