Skip to content

Commit

Permalink
job-status-consumer, log DB status of "not alive" workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav Moisieienkov committed Apr 4, 2022
1 parent 08410ca commit b1a73dc
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,19 @@ def on_message(self, body, message):
Session.query(Workflow)
.filter(
Workflow.id_ == workflow_uuid,
Workflow.status.in_(ALIVE_STATUSES),
)
.one_or_none()
)
if workflow:
if workflow and workflow.status in ALIVE_STATUSES:
next_status = body_dict.get("status")
if next_status:
next_status = RunStatus(next_status)
logging.info(
" [x] Received workflow_uuid: {0} status: {1}".format(
workflow_uuid, next_status
)
f" [x] Received workflow_uuid: {workflow_uuid} status: {next_status}"
)

logs = body_dict.get("logs") or ""
if workflow.can_transition_to(next_status):
logs = body_dict.get("logs") or ""
_update_workflow_status(workflow, next_status, logs)
if "message" in body_dict and body_dict.get("message"):
msg = body_dict["message"]
Expand All @@ -113,17 +110,21 @@ def on_message(self, body, message):
f" from status {workflow.status} to"
f" {next_status}."
)
elif workflow_uuid:
elif workflow and workflow.status not in ALIVE_STATUSES:
logging.warning(
"Event for not alive workflow {workflow_uuid} received:\n"
"{body}\n"
"Ignoring ...".format(workflow_uuid=workflow_uuid, body=body)
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n"
f"{body}\nIgnoring..."
)
else:
logging.warning(
f"Event for workflow {workflow_uuid} that doesn't exist received:\n"
f"{body}\nIgnoring..."
)
except REANAWorkflowControllerError as rwce:
logging.error(rwce, exc_info=True)
except SQLAlchemyError as sae:
logging.error(
f"Something went wrong while querying the database for workflow: {workflow.id_}"
f"Something went wrong while querying the database for workflow: {workflow_uuid}"
)
logging.error(sae, exc_info=True)
except Exception as e:
Expand Down

0 comments on commit b1a73dc

Please sign in to comment.