diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 3f37027e..0d55e8ad 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2018, 2019, 2020, 2021 CERN. +# Copyright (C) 2018, 2019, 2020, 2021, 2022 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -87,22 +87,19 @@ def on_message(self, body, message): Session.query(Workflow) .filter( Workflow.id_ == workflow_uuid, - Workflow.status.in_(ALIVE_STATUSES), ) .one_or_none() ) - if workflow: + if workflow and workflow.status in ALIVE_STATUSES: next_status = body_dict.get("status") if next_status: next_status = RunStatus(next_status) logging.info( - " [x] Received workflow_uuid: {0} status: {1}".format( - workflow_uuid, next_status - ) + f" [x] Received workflow_uuid: {workflow_uuid} status: {next_status}" ) - logs = body_dict.get("logs") or "" if workflow.can_transition_to(next_status): + logs = body_dict.get("logs") or "" _update_workflow_status(workflow, next_status, logs) if "message" in body_dict and body_dict.get("message"): msg = body_dict["message"] @@ -119,17 +116,21 @@ def on_message(self, body, message): f" from status {workflow.status} to" f" {next_status}." ) - elif workflow_uuid: + elif workflow and workflow.status not in ALIVE_STATUSES: logging.warning( - "Event for not alive workflow {workflow_uuid} received:\n" - "{body}\n" - "Ignoring ...".format(workflow_uuid=workflow_uuid, body=body) + f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n" + f"{body}\nIgnoring..." + ) + else: + logging.warning( + f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n" + f"{body}\nIgnoring..." ) except REANAWorkflowControllerError as rwce: logging.error(rwce, exc_info=True) except SQLAlchemyError as sae: logging.error( - f"Something went wrong while querying the database for workflow: {workflow.id_}" + f"Something went wrong while querying the database for workflow: {workflow_uuid}" ) logging.error(sae, exc_info=True) except Exception as e: