Skip to content

Commit

Permalink
consumer: remove pods for deleted workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav Moisieienkov committed Apr 19, 2022
1 parent f2d2be9 commit 7e8674f
Showing 1 changed file with 46 additions and 6 deletions.
52 changes: 46 additions & 6 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
build_unique_component_name,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, RunStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus, JobStatus
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.attributes import flag_modified

Expand Down Expand Up @@ -118,11 +118,21 @@ def on_message(self, body, message):
)
elif workflow and workflow.status not in ALIVE_STATUSES:
logging.warning(
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received:\n"
f"{body}\nIgnoring..."
f"Event for not alive workflow {workflow.id_} with DB status {workflow.status} received."
)
try:
_delete_workflow_batch_pod(workflow)
logging.info(
f"Remove batch-pod for not alive {workflow.id_} workflow."
)
except REANAWorkflowControllerError as exception:
logging.error(
f"Could not clean up not alive workflow {workflow.id_} batch pod for workflow."
f" Error: {exception}"
)
_delete_workflow_jobs(workflow)
else:
logging.warning(
logging.error(
f"Event for workflow {workflow_uuid} that doesn't exist in DB received:\n"
f"{body}\nIgnoring..."
)
Expand Down Expand Up @@ -162,7 +172,7 @@ def _update_workflow_status(workflow, status, logs):

if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
_delete_workflow_batch_pod(workflow)
except REANAWorkflowControllerError as exception:
logging.error(
f"Could not clean up workflow job for workflow {workflow.id_}."
Expand Down Expand Up @@ -281,7 +291,7 @@ def _update_job_cache(msg):
Session.add(cached_job)


def _delete_workflow_job(workflow: Workflow) -> None:
def _delete_workflow_batch_pod(workflow: Workflow) -> None:
job_name = build_unique_component_name("run-batch", workflow.id_)
try:
current_k8s_batchv1_api_client.delete_namespaced_job(
Expand All @@ -295,6 +305,36 @@ def _delete_workflow_job(workflow: Workflow) -> None:
)


def _delete_workflow_jobs(workflow: Workflow) -> None:
"""
Delete jobs that belong to workflow. Ignore Kubernetes API errors.
If job is successfully deleted, update its status to JobStatus.stopped.
"""
jobs = Session.query(Job).filter(
Job.workflow_uuid == workflow.id_,
)
for job in jobs:
job_name = build_unique_component_name("run-job", job.id_)
if job.status in [
JobStatus.running,
JobStatus.queued,
JobStatus.created,
]:
try:
current_k8s_batchv1_api_client.delete_namespaced_job(
name=job_name,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
propagation_policy="Background",
)
job.status = JobStatus.stopped
Session.add(job)
except ApiException as e:
logging.error(
f"run-job pod {job_name} for {workflow.id_} could not be deleted. Error: {e}"
)
Session.commit()


def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
try:
pods = current_k8s_corev1_api_client.list_namespaced_pod(
Expand Down

0 comments on commit 7e8674f

Please sign in to comment.