diff --git a/AUTHORS.md b/AUTHORS.md index 29c3f245..1b43d72e 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -15,6 +15,7 @@ The list of contributors in alphabetical order: - [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553) - [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X) - [Jan Okraska](https://orcid.org/0000-0002-1416-3244) +- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270) - [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630) - [Marco Donadoni](https://orcid.org/0000-0003-2922-5505) - [Marco Vidal](https://orcid.org/0000-0002-9363-4971) diff --git a/reana_client/cli/utils.py b/reana_client/cli/utils.py index 0acdbcaa..7a9c13f8 100644 --- a/reana_client/cli/utils.py +++ b/reana_client/cli/utils.py @@ -12,6 +12,7 @@ import os import shlex import sys +import time from typing import Callable, NoReturn, Optional, List, Tuple, Union import click @@ -24,6 +25,8 @@ RUN_STATUSES, JOB_STATUS_TO_MSG_COLOR, JSON, + CLI_LOGS_FOLLOW_MIN_INTERVAL, + CLI_LOGS_FOLLOW_DEFAULT_INTERVAL, ) from reana_client.printer import display_message from reana_client.utils import workflow_uuid_or_name @@ -409,3 +412,114 @@ def output_user_friendly_logs(workflow_logs, steps): f"Step {job_name_or_id} emitted no logs.", msg_type="info", ) + + +def retrieve_workflow_logs( + workflow, + access_token, + json_format, + filters, + steps, + chosen_filters, + available_filters, + page=None, + size=None, +): # noqa: D301 + """Retrieve workflow logs.""" + from reana_client.api.client import get_workflow_logs + + response = get_workflow_logs( + workflow, + access_token, + steps=None if not steps else list(set(steps)), + page=page, + size=size, + ) + workflow_logs = json.loads(response["logs"]) + if filters: + for key, value in chosen_filters.items(): + unwanted_steps = [ + k + for k, v in workflow_logs["job_logs"].items() + if v[available_filters[key]] != value + ] + for job_id in unwanted_steps: + del workflow_logs["job_logs"][job_id] + + if json_format: + display_message(json.dumps(workflow_logs, indent=2)) + sys.exit(0) + else: + from reana_client.cli.utils import output_user_friendly_logs + + output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps))) + + +def follow_workflow_logs( + workflow, + access_token, + interval, + steps, +): # noqa: D301 + """Continuously poll for workflow or job logs.""" + from reana_client.api.client import get_workflow_logs, get_workflow_status + + if len(steps) > 1: + display_message( + "Only one step can be followed at a time, ignoring additional steps.", + "warning", + ) + if interval < CLI_LOGS_FOLLOW_MIN_INTERVAL: + interval = CLI_LOGS_FOLLOW_DEFAULT_INTERVAL + display_message( + f"Interval should be an integer greater than or equal to {CLI_LOGS_FOLLOW_MIN_INTERVAL}, resetting to default ({CLI_LOGS_FOLLOW_DEFAULT_INTERVAL} s).", + "warning", + ) + step = steps[0] if steps else None + + previous_logs = "" + + while True: + response = get_workflow_logs( + workflow, + access_token, + steps=None if not step else [step], + ) + if response.get("live_logs_enabled", False) is False: + display_message( + "Live logs are not enabled, please rerun the command without the --follow flag.", + "error", + ) + return + + json_response = json.loads(response.get("logs")) + + if step: + jobs = json_response["job_logs"] + + if not jobs: + raise Exception(f"Step data not found: {step}") + + job = next(iter(jobs.values())) # get values of the first job + logs = job["logs"] + status = job["status"] + else: + logs = json_response["workflow_logs"] + status = get_workflow_status(workflow, access_token).get("status") + + previous_lines = previous_logs.splitlines() + new_lines = logs.splitlines() + + diff = "\n".join([x for x in new_lines if x not in previous_lines]) + if diff != "" and diff != "\n": + display_message(diff) + + if status in ["finished", "failed", "stopped", "deleted"]: + subject = "Workflow" if not step else "Job" + display_message( + f"{subject} has completed, you might want to rerun the command without the --follow flag.", + "info", + ) + return + previous_logs = logs + time.sleep(interval) diff --git a/reana_client/cli/workflow.py b/reana_client/cli/workflow.py index 1706a4a8..97f73318 100644 --- a/reana_client/cli/workflow.py +++ b/reana_client/cli/workflow.py @@ -31,8 +31,15 @@ key_value_to_dict, parse_filter_parameters, requires_environments, + retrieve_workflow_logs, + follow_workflow_logs, +) +from reana_client.config import ( + ERROR_MESSAGES, + RUN_STATUSES, + TIMECHECK, + CLI_LOGS_FOLLOW_DEFAULT_INTERVAL, ) -from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK from reana_client.printer import display_message from reana_client.utils import ( get_reana_yaml_file_path, @@ -886,6 +893,20 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data): multiple=True, help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.", ) +@click.option( + "--follow", + "follow", + is_flag=True, + default=False, + help="Follow the logs of a running workflow or job (similar to tail -f).", +) +@click.option( + "-i", + "--interval", + "interval", + default=CLI_LOGS_FOLLOW_DEFAULT_INTERVAL, + help=f"Sleep time in seconds between log polling if log following is enabled. [default={CLI_LOGS_FOLLOW_DEFAULT_INTERVAL}]", +) @add_pagination_options @check_connection @click.pass_context @@ -894,22 +915,32 @@ def workflow_logs( workflow, access_token, json_format, - steps=None, + follow, + interval, filters=None, page=None, size=None, ): # noqa: D301 """Get workflow logs. - The ``logs`` command allows to retrieve logs of running workflow. Note that - only finished steps of the workflow are returned, the logs of the currently - processed step is not returned until it is finished. + The ``logs`` command allows to retrieve logs of a running workflow. + Either retrive logs and print the result or follow the logs of a running workflow/job. Examples:\n - \t $ reana-client logs -w myanalysis.42 - \t $ reana-client logs -w myanalysis.42 -s 1st_step + \t $ reana-client logs -w myanalysis.42\n + \t $ reana-client logs -w myanalysis.42 --json\n + \t $ reana-client logs -w myanalysis.42 --filter status=running\n + \t $ reana-client logs -w myanalysis.42 --filter step=myfit --follow\n """ - from reana_client.api.client import get_workflow_logs + logging.debug("command: {}".format(ctx.command_path.replace(" ", "."))) + for p in ctx.params: + logging.debug("{param}: {value}".format(param=p, value=ctx.params[p])) + + if json_format and follow: + display_message( + "Ignoring --json as it cannot be used together with --follow.", + msg_type="warning", + ) available_filters = { "step": "job_name", @@ -920,90 +951,73 @@ def workflow_logs( steps = [] chosen_filters = dict() - logging.debug("command: {}".format(ctx.command_path.replace(" ", "."))) - for p in ctx.params: - logging.debug("{param}: {value}".format(param=p, value=ctx.params[p])) - if workflow: - if filters: - try: - for f in filters: - key, value = f.split("=") - if key not in available_filters: + if filters: + try: + for f in filters: + key, value = f.split("=") + if key not in available_filters: + display_message( + "Filter '{}' is not valid.\n" + "Available filters are '{}'.".format( + key, + "' '".join(sorted(available_filters.keys())), + ), + msg_type="error", + ) + sys.exit(1) + elif key == "step": + steps.append(value) + else: + # Case insensitive for compute backends + if ( + key == "compute_backend" + and value.lower() in REANA_COMPUTE_BACKENDS + ): + value = REANA_COMPUTE_BACKENDS[value.lower()] + elif key == "status" and value not in RUN_STATUSES: display_message( - "Filter '{}' is not valid.\n" - "Available filters are '{}'.".format( - key, - "' '".join(sorted(available_filters.keys())), - ), + "Input status value {} is not valid. ".format(value), msg_type="error", - ) + ), sys.exit(1) - elif key == "step": - steps.append(value) - else: - # Case insensitive for compute backends - if ( - key == "compute_backend" - and value.lower() in REANA_COMPUTE_BACKENDS - ): - value = REANA_COMPUTE_BACKENDS[value.lower()] - elif key == "status" and value not in RUN_STATUSES: - display_message( - "Input status value {} is not valid. ".format(value), - msg_type="error", - ), - sys.exit(1) - chosen_filters[key] = value - except Exception as e: - logging.debug(traceback.format_exc()) - logging.debug(str(e)) - display_message( - "Please provide complete --filter name=value pairs, " - "for example --filter status=running.\n" - "Available filters are '{}'.".format( - "' '".join(sorted(available_filters.keys())) - ), - msg_type="error", - ) - sys.exit(1) - try: - response = get_workflow_logs( - workflow, - access_token, - steps=None if not steps else list(set(steps)), - page=page, - size=size, - ) - workflow_logs = json.loads(response["logs"]) - if filters: - for key, value in chosen_filters.items(): - unwanted_steps = [ - k - for k, v in workflow_logs["job_logs"].items() - if v[available_filters[key]] != value - ] - for job_id in unwanted_steps: - del workflow_logs["job_logs"][job_id] - - if json_format: - display_message(json.dumps(workflow_logs, indent=2)) - sys.exit(0) - else: - from reana_client.cli.utils import output_user_friendly_logs - - output_user_friendly_logs( - workflow_logs, None if not steps else list(set(steps)) - ) + chosen_filters[key] = value except Exception as e: logging.debug(traceback.format_exc()) logging.debug(str(e)) display_message( - "Cannot retrieve the logs of a workflow {}: \n" - "{}".format(workflow, str(e)), + "Please provide complete --filter name=value pairs, " + "for example --filter status=running.\n" + "Available filters are '{}'.".format( + "' '".join(sorted(available_filters.keys())) + ), msg_type="error", ) sys.exit(1) + try: + if follow: + follow_workflow_logs(workflow, access_token, interval, steps) + else: + retrieve_workflow_logs( + workflow, + access_token, + json_format, + filters, + steps, + chosen_filters, + available_filters, + page, + size, + ) + except Exception as e: + logging.debug(traceback.format_exc()) + logging.debug(str(e)) + display_message( + "Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)), + msg_type="error", + ) + sys.exit(1) + @workflow_execution_group.command("validate") @click.option( diff --git a/reana_client/config.py b/reana_client/config.py index 3be4a487..c81a952c 100644 --- a/reana_client/config.py +++ b/reana_client/config.py @@ -77,3 +77,9 @@ STD_OUTPUT_CHAR = "-" """Character used to refer to the standard output.""" + +CLI_LOGS_FOLLOW_MIN_INTERVAL = 1 +"""Minimum interval between log requests in seconds.""" + +CLI_LOGS_FOLLOW_DEFAULT_INTERVAL = 10 +"""Default interval between log requests in seconds.""" diff --git a/tests/test_cli_workflows.py b/tests/test_cli_workflows.py index 2b3d652d..e5971fc7 100644 --- a/tests/test_cli_workflows.py +++ b/tests/test_cli_workflows.py @@ -8,6 +8,7 @@ """REANA client workflow tests.""" +import copy import json import sys from typing import List @@ -21,6 +22,7 @@ from reana_client.cli import cli from reana_client.config import RUN_STATUSES from reana_client.utils import get_workflow_status_change_msg +from reana_commons.api_client import BaseAPIClient from reana_commons.config import INTERACTIVE_SESSION_TYPES @@ -940,6 +942,179 @@ def test_get_workflow_status_ok(): assert json_response[0]["name"] in response["name"] +def test_get_workflow_logs(): + """Test workflow logs.""" + status_code = 200 + response = { + "logs": '{"workflow_logs": "workflow logs test"}', + "user": "00000000-0000-0000-0000-000000000000", + "workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9", + "workflow_name": "helloworld-serial-kubernetes0.3", + } + env = {"REANA_SERVER_URL": "localhost"} + mock_http_response, mock_response = Mock(), Mock() + mock_http_response.status_code = status_code + mock_response = response + reana_token = "000000" + runner = CliRunner(env=env) + with runner.isolation(): + with patch( + "reana_client.api.client.current_rs_api_client", + make_mock_api_client("reana-server")(mock_response, mock_http_response), + ): + result = runner.invoke( + cli, + ["logs", "-t", reana_token, "--json", "-w", response["workflow_name"]], + ) + json_response = json.loads(result.output) + assert result.exit_code == 0 + assert isinstance(json_response, dict) + assert json_response["workflow_logs"] in "workflow logs test" + + +def test_follow_job_logs(): + """Test follow job logs.""" + logs = { + "workflow_logs": "workflow logs test", + "job_logs": { + "job_id": { + "workflow_uuid": "26a55924-83c9-493b-841b-8fd7629e25c9", + "job_name": "hello1", + "compute_backend": "Kubernetes", + "backend_job_id": "reana-run-job-42532a36-4a41-4acf-a3b0-d61655030f43", + "docker_img": "docker.io/library/python:3.8-slim", + "cmd": "python", + "status": "running", + "logs": "job test logs\n", + "started_at": "2024-09-26T09:02:36", + "finished_at": None, + } + }, + } + logs_next = copy.deepcopy(logs) + logs_next["job_logs"]["job_id"]["status"] = "stopped" + logs_next["job_logs"]["job_id"]["logs"] = "job test logs\nmore job logs\n" + + response = { + "logs": json.dumps(logs), + "user": "00000000-0000-0000-0000-000000000000", + "workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9", + "workflow_name": "helloworld-serial-kubernetes0.3", + "live_logs_enabled": True, + } + response_next = copy.deepcopy(response) + response_next["logs"] = json.dumps(logs_next) + + env = {"REANA_SERVER_URL": "localhost"} + mock_http_response = Mock() + mock_http_response.status_code = 200 + reana_token = "000000" + runner = CliRunner(env=env) + + mock_http_client, mock_result = Mock(), Mock() + mock_result.result.side_effect = [ + (response, mock_http_response), + (response_next, mock_http_response), + ] + mock_http_client.request.return_value = mock_result + + with runner.isolation(): + with patch( + "reana_client.api.client.current_rs_api_client", + BaseAPIClient("reana-server", http_client=mock_http_client)._client, + ): + result = runner.invoke( + cli, + [ + "logs", + "-t", + reana_token, + "--follow", + "-i", + 1, + "-w", + "helloworld-serial-kubernetes0.3", + "--filter", + "step=hello1", + ], + ) + assert result.exit_code == 0 + assert ( + result.output + == """job test logs +more job logs +==> Job has completed, you might want to rerun the command without the --follow flag. +""" + ) + + +def test_follow_live_logs_disabled(): + """Test follow job logs when live logs are disabled.""" + logs = { + "workflow_logs": "", + "job_logs": { + "job_id": { + "workflow_uuid": "26a55924-83c9-493b-841b-8fd7629e25c9", + "job_name": "hello1", + "compute_backend": "Kubernetes", + "backend_job_id": "reana-run-job-42532a36-4a41-4acf-a3b0-d61655030f43", + "docker_img": "docker.io/library/python:3.8-slim", + "cmd": "python", + "status": "running", + "logs": "", + "started_at": "2024-09-26T09:02:36", + "finished_at": None, + } + }, + } + + response = { + "logs": json.dumps(logs), + "user": "00000000-0000-0000-0000-000000000000", + "workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9", + "workflow_name": "helloworld-serial-kubernetes0.3", + } + + env = {"REANA_SERVER_URL": "localhost"} + mock_http_response = Mock() + mock_http_response.status_code = 200 + reana_token = "000000" + runner = CliRunner(env=env) + + mock_http_client, mock_result = Mock(), Mock() + mock_result.result.side_effect = [ + (response, mock_http_response), + ] + mock_http_client.request.return_value = mock_result + + with runner.isolation(): + with patch( + "reana_client.api.client.current_rs_api_client", + BaseAPIClient("reana-server", http_client=mock_http_client)._client, + ): + result = runner.invoke( + cli, + [ + "logs", + "-t", + reana_token, + "--follow", + "-i", + 1, + "-w", + "helloworld-serial-kubernetes0.3", + "--filter", + "step=hello1", + ], + ) + assert result.exit_code == 0 + assert ( + result.output + == """==> ERROR: Live logs are not enabled, please rerun the command without the --follow flag. +""" + ) + + @patch("reana_client.cli.workflow.workflow_create") @patch("reana_client.cli.workflow.upload_files") @patch("reana_client.cli.workflow.workflow_start")