Skip to content

Tips for RabbitMQ

Vladyslav Moisieienkov edited this page May 11, 2022 · 2 revisions

Contents

  1. Inspect queues
  2. Purge queues
  3. Acknowledge messages

Inspect queues

You can use rabbitmqctl to inspect queues and their statuses:

$ kubectl exec -i -t reana-message-broker-0 -- rabbitmqctl list_queues name messages messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages        messages_unacknowledged
workflow-submission     12       0
jobs-status     0       0

Purge queues

You can purge queue content by doing:

$ kubectl exec -i -t reana-message-broker-0 -- rabbitmqctl purge_queue workflow-submission

This should be only done on development machines, because discarding workflow status messages will mean that the workflow status will not be properly handled in database. Be careful and inspect Kubernetes pods and jobs and their statuses as well as the database statuses for running/queued/pending workflows and fix the database statuses manually by UPDATE SQL commands should you purge the queue for a reason or another.

Acknowledgement of messages

If the queue is in a situation that one workflow is not starting and blocks the queue:

$ kubectl logs reana-server scheduler
2022-02-07 10:03:47,745 | root | MainThread | INFO | Received workflow: {'user': 'uuuu', 'workflow_id_or_name': 'wwww', 'parameters': {'input_parameters': {}, 'operational_options': {}}, 'priority': 98, 'min_job_memory': 12884901888.0}
2022-02-07 10:03:47,943 | root | MainThread | INFO | REANA not ready to run workflow WWWW.
2022-02-07 10:03:52,950 | root | MainThread | INFO | Received workflow: {'user': 'uuuu', 'workflow_id_or_name': 'wwww', 'parameters': {'input_parameters': {}, 'operational_options': {}}, 'priority': 98, 'min_job_memory': 12884901888.0}
2022-02-07 10:03:53,289 | root | MainThread | INFO | REANA not ready to run workflow WWWW.
2022-02-07 10:03:58,291 | root | MainThread | INFO | Received workflow: {'user': 'uuuu', 'workflow_id_or_name': 'wwww', 'parameters': {'input_parameters': {}, 'operational_options': {}}, 'priority': 98, 'min_job_memory': 12884901888.0}
2022-02-07 10:03:58,563 | root | MainThread | INFO | REANA not ready to run workflow WWWW.

$ kubectl exec -i -t reana-message-broker-0 -- rabbitmqctl list_queues name messages messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages        messages_unacknowledged
workflow-submission     12       1
jobs-status     0       0

The workflow WWWW is constantly requeued and not acknowledged and blocks the queue. The other workflows may not be starting due to lower priorities. In this case, it may be necessary to start another consumer and acknowledge the message about the WWWW workflow manually. You have two options.

Using queue-consume admin command (recommended)

Refer to reana-admin tool and remove a message that keeping the queue busy using queue-consume command:

$ kubectl exec -it deployment/reana-server -- bash
> flask reana-admin queue-consume --help  # for more details
> flask reana-admin queue-consume -q workflow-submission -i  # -i, interactive mode; you can choose which message to delete

Create ad-hoc consumer

Start a Python REPL in the scheduler container:

$ kubectl exec -i -t deployment/reana-server -c scheduler -- /bin/bash
pod> ipython
ipython>

In the Python REPL, create a new consumer similar to class WorkflowExecutionScheduler defined in reana_server.scheduler.py slightly modified to acknowledge problematic workflow:

$ vim reana_server/scheduler.py
@@ -127,7 +127,12 @@

     def on_message(self, workflow_submission, message):
         """On new workflow_submission event handler."""
+        print(repr(message))
         workflow_submission = json.loads(workflow_submission)
+        if workflow_submission["workflow_id_or_name"].startswith("WBoson.4"):
+            print("ACK-ing %s" % workflow_submission["workflow_id_or_name"])
+            message.ack()
+            return
         logging.info("Received workflow: {}".format(workflow_submission))
         workflow_min_job_memory = workflow_submission.pop("min_job_memory", 0)
         if reana_ready(workflow_min_job_memory):

Then copy/paste such a modified WorkflowExecutionScheduler class definition into Python REPL and start the consumer:

ipython> # copy paste the myscheduler.py
ipython> scheduler = WorkflowExecutionScheduler()
ipython> scheduler.run()

See the message being acknowledged. You can then interrupt the process and exit, as the other messages will be handled by the other regular consumer.

The final step of the intervention would be to act on the PostgreSQL database and update the status of workflow WWWW to failed.