Skip to content

Developer_3.x Active MQ

Markus Weigelt edited this page Jul 4, 2023 · 17 revisions

Active MQ Web Services for Kitodo

JMS

Active Message Queue is an open source Java Messaging (JMS) implementation provided by the Apache Software Foundation. It is intended to be used to connect software components in a flexible way. The core is the Active MQ server which can be pictured like a post office. The mail boxes are named “queue” or “topic”. Queues work as expected: A producer sends a message where a consumer can pick it up. Topics can be pictured as black boards: The main difference is: A message read from a queue is removed from the queue. A message read from a topic is still available to others. Consumer clients can actively check the server or may register listeners with the server to be notified of new messages.

API Implementation

This behaviour has already been implemented to Kitodo: The org.goobi.mq. ActiveMQDirector is a ServletContextListener which is registered in web.xml. On application startup, it registers all consumers from its “services” variable to the server configured in “activeMQ.hostURL”.

The elements of this variable are classes extending the abstract class ActiveMQProcessor. This class implements the MessageListener and provides facilities to handle exceptions and to store the consumer which is required on shutdown to disconnect.

To implement another web service processor, you have to implement a class which extends ActiveMQProcessor and implements its abstract void process(MapMessage). Here is the right place to do whatever your processor is intended to do. There is a class MapMessageObjectReader which shall be used to type safe retrieve complex objects from MapMessages. You must add your new class to the “services” variable of ActiveMQDirector then.

The Kitodo server administrator shall be in control which processors are being started, and which queue names they listen on. Implementation of this configurability is designed this way: The implementing class must pass its queue name to the constructor of the parent class. This is done by implementing the constructor like in the following skeleton. If the queue name is not configured, it will return null which will prevent the ActiveMQDirector from registering it to the server. Inside the class, the queue name is available in the global variable “queueName” which is set by the parent class. The implementation may use arbitrary “activeMQ.myService.*” entries in goobi_config.properties for configuration.

Service processor skeleton sample

package org.goobi.mq.processores;

import org.goobi.mq.*;
import de.sub.goobi.config.ConfigCore;
import de.sub.goobi.helper.enums.ReportLevel;

public class MyServiceProcessor extends ActiveMQProcessor {

    public MyServiceProcessor() {
        super(ConfigMain.getParameter("activeMQ.myService.queue", null));
    }

    @Override
    protected void process(MapMessageObjectReader args) throws Exception {
        // TODO Auto-generated method stub
    }
}

Processor Response

Responses from processors are designed to be handled as WebServiceResult objects. Those objects are MapMessages which send themselves to a topic configured in “activeMQ.results.topic”. They consist of the Strings “queue” (the name of the queue the job ticket was sent to), “id” (a String “id” in the MapMessage which is mandatory), “level” and an optional “message”. When designing the MapMessage layout to parameterise your web service processor, please keep in mind that a String element “id” is mandatory.

If process() terminates without error, it is meant to have done its job successfully and a WebServiceResult with level “success” will be sent. If process() returns an exception, a WebServiceResult with level “fatal” will be sent. The exception will be returned as the “message” String. You may also use the WebServiceResult class to send messages with the levels “error”, “warn”, “info”, “debug”, “verbose” and “ludicrous” which are meant to be informative only: new WebServiceResult(queueName, args.getMandatoryString("id"), ReportLevel.INFO, "Remote host is down, trying again later.") .send();

Process Creation Service

Kitodo.Production is equipped with a web service interface to automatically create new processes based on a given template. This allows the digitization process to be initiated from outside the application, for example by assigning a new digital ID to a record in a library catalogue (or—at choice of the library—by duplicating a record and assigning a new digital ID to the duplicate) and then running a script.

The web service infrastructure is providet by an Active MQ server (see http://activemq.apache.org/ for details) which needs to be downloaded and started. Without further configuration, it provides everything necessary on port 61616 of the machine in question.

The “activeMQ.hostURL” must be set in goobi_config.properties to point to this server. The “activeMQ.createNewProcess.queue” must be set to point to a queue of your choice where Kitodo.Production shall pick up orders to create new processes.

Orders must be javax.jms.MapMessage objects with the following key-value-pairs provided:

String template
    name of the process template to use
String opac
    Cataloge to use for lookup
String field
    Field to look into, usually 12 (PPN)
String value
    Value to look for, id of physical medium
String id
    Ticket ID (used in log responses)
List<String> collections
    Collections to be selected
Map<String, String> userFields (optional)
    May be used to populates AdditionalField entries

Here is a sample java client to do the job. It expects to be passed from the command line the Active MQ host (e.g. tcp://localhost:61616), the queue name and the parameters as listed above.

To run this application, the following JARs from the ActiveMQ server’s /lib folder are required on the classpath:

  • activemq-core
  • geronimo-j2ee-management_1.1_spec
  • genonimo-jms_1.1_spec
  • log4j2
  • slf4j-api
  • slf4j-log4j12

Main.java

import java.util.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Main {
    public static int main(String[] args) { try {

        // Check arguments
        if (args.length < 8 || (args.length % 2) != 0) {
            System.out.println("Parameters: Active MQ host, queue name, "
                    + "template name, opac name,");
            System.out.println("            no. of search field, search "
                    + "string, digital id, collection name,");
            System.out.println("            [additional details field, "
                    + "value, [add. details field, value, [...");
            return 1;
        }

        // Connect to server
        Connection connection = new ActiveMQConnectionFactory(args[0])
                .createConnection();
        connection.start();
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(args[1]);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        // Create job ticket
        MapMessage message = session.createMapMessage();
        message.setString("template", args[2]);
        message.setString("opac", args[3]);
        message.setString("field", args[4]);
        message.setString("value", args[5]);
        message.setString("id", args[6]);
        List<String> collections = new ArrayList<String>();
        collections.add(args[7]);
        message.setObject("collections", collections);
        Map<String, String> userFields = new HashMap<String, String>();
        for (int i = 8; i < args.length; i += 2)
            userFields.put(args[i], args[i + 1]);
        if (userFields.size() != 0)
            message.setObject("userFields", userFields);

        // Send job ticket
        producer.send(message);

        // Shutdown
        session.close();
        connection.close();
    } catch (Exception e) { e.printStackTrace(); return 2; }
    return 0;
}   }

Processors

Kitodo.Production is equipped with a web service interface to for example automatically finalize steps. This allows external software contributing to a workflow to report their success from outside the application. Additionally, properties can be populated and a message can be added to the processes’ log (in former versions of Kitodo known as “wiki field”).

The activeMQ.hostURL must be set in kitodo_config.properties to point to this server. In addition, the corresponding queue must be commented out or added.

The web service infrastructure is provided by an Active MQ server (see http://activemq.apache.org/ for details) which needs to be downloaded and started. Without further configuration, it provides everything necessary on port 61616 of the machine in question.

For simplified use of ActiveMQ, a Docker image and a Java client are available here. (see https://github.com/slub/kitodo-production-activemq)

FinalizeStep Queue

The activeMQ.finaliseStep.queue parameter must be commented out in kitodo_config.properties or added with value KitodoProduction.FinalizeStep.Queue.

Message must be javax.jms.MapMessage objects with the following key-value-pairs provided:

String id
    ID of the step to close (do not mix up with the process ID)
Map<String, String> properties (optional)
    May be used to populates properties
String message
    Message to be added to the processes’ log.

TaskAction Queue

The activeMQ.taskAction.queue parameter must be commented out in kitodo_config.properties or added with value KitodoProduction.TaskAction.Queue.

Message must be javax.jms.MapMessage objects with the following key-value-pairs provided:

Integer id
   Identifier of the task
String message
    Message to be added as comment
String action
    The task action to process
Integer correctionTaskId
    Identifier of the correction task

Correction Task is a task to jump back when correction is requested. The processing state of tasks between current task and correction task are set to LOCKED and state of correction task is set to OPEN. When correction is made, the correction task is set to DONE and the current task to OPEN.

Please consider the next point Task Action cause some of the keys are required for some actions and for some not.

Task Action

There are followings action with different behavior of the parameters.

COMMENT

This action adds a comment to a task.

Parameter Required Description
id yes the task identifier as integer
message yes the message for comment as string
action yes the task action as string "COMMENT"
PROCESS

This action set the processing status of a task to INWORK. The task must be in the processing state OPEN to run this action.

Parameter Required Description
id yes the task identifier as integer
message no the message for comment as string
action yes the task action as string "PROCESS"
ERROR_OPEN

This action is used to report an error. The task must be in the processing state INWORK to run this action. The behaviour of this action depends on the correctionTaskId.

If the correctionTaskId is available, the processing state of the task is set to LOCKED, an error comment will be created and the processing state of correction task is set to OPEN.

If no correctionTaskId is set only an error comment is created and the task remains in processing status INWORK.

Parameter Required Description
id yes the task identifier as integer
message yes the message for comment as string
action yes the task action as string "ERROR_OPEN"
correctionTaskId no the identifier of correction task as integer
ERROR_CLOSE

This action is used to solve an error. The execution and behaviour of this action depends on the correctionTaskId.

If correctionTaskId is available the task must be in the processing state LOCKED and the processing state of the correction task will be set to DONE.

If no correctionTaskId is set the task must be in the processing state INWORK

In both behaviours, the processing status of the task is set to OPEN.

Parameter Required Description
id yes the task identifier as integer
message no the message for comment as string
action yes the task action as string "ERROR_CLOSE"
correctionTaskId no the identifier of correction task as integer
CLOSE

This action set the processing status of a task to DONE.

Parameter Required Description
id yes the task identifier as integer
message no the message for comment as string
action yes the task action as string "CLOSE"
Clone this wiki locally