Skip to content

IPC EventBus provides a simple EventBus API for intra-JVM and extra-JVM communication with process launching capabilities. This library's goal is to be used for integration tests to isolate components in different processes and provide a way to communicate between these process.

License

Notifications You must be signed in to change notification settings

Terracotta-OSS/ipc-eventbus

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

51 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

IPC EventBus

IPC EventBus provides a simple EventBus API for intra-JVM and extra-JVM communication. You can use it within a JVM as a simple EventBus or between JVM for communication between process. Besides, this project comes with several builders to easily start process and bind them an event bus.

The goal of this project is to be:

  • self-contained (no external dependency)

  • simple

  • efficient

  • targets small usages, i.e. integration tests, system tests, which request several process synchronization and classpath isolation

  • Java 8 Compatible, so that you can use it in any TC project

Licensed under the Apache License, Version 2.0 © Terracotta, Inc., a Software AG company. Copyright Super iPaaS Integration LLC, an IBM Company 2024

Build and Maven dependency

Fork, then:

git clone [email protected]:Terracotta-OSS/ipc-eventbus.git
mvn clean install

Status of the build: Terracotta-OSS@Cloudbees

<dependency>
    <groupId>org.terracotta</groupId>
    <artifactId>ipc-eventbus</artifactId>
    <version>X.Y</version>
</dependency>

What’s Available

This project contains several modules:

  • I/O Stuff

An EventBus API with a local and remote implementation:

  • Local EventBus

  • Remote EventBus

Process launching mechanisms:

  • Process Launching

  • Java Process Launching

And a way to start a Java process which can interact with its launcher through an event bus:

  • [Inter Java Process Communication](#ipc)

I/O Stuff

Pipe

Starts a thread that will copy input data into an output stream.

Pipe pipe = new Pipe("collect stdout", inputStream, outputStream);
pipe.waitFor();     // if you need, you can wait for the pipe to finish
pipe.close();       // close the pipe (does not close the stream!)

MultiplexOutputStream

Create an OutputStream which will write into several output streams sequentially.

MultiplexOutputStream plex = new MultiplexOutputStream()
        .addOutputStream(outputStream)
        .addOutputStream(outputStream2)
        .addOutputStream(System.out);

Usage:

plex.getOutputStreams();    // lists the streams
plex.isEmpty();             // true if no streams
plex.streamCount();         // number of streams
plex.write(data);           // MultiplexOutputStream is an OutputStream

Local EventBus

Create a local EventBus by using a builder:

EventBus eventBus = new EventBus.Builder()
        .onError(new PrintingErrorListener(System.err)) // OPTIONAL: print listener exceptions
        .onError(new RethrowingErrorListener())         // OPTIONAL: rethrow listener exceptions immediately (by default)
        .id("bus-id")                                   // OPTIONAL: bus id (otherwise a UUID is generated)
        .build();
eventBus.getId(); // returns the eventbus id

Implement EventListener interface to listen to events

EventListener listener = new EventListener() {
    @Override
    public void onEvent(Event e) {
    }
};

Bind your listener to events

eventBus.on(listener);              // register a listener for all events
eventBus.on("my.event", listener);  // register a listener for a specific event

You can dump (debug) what is going on

eventBus.on(new EventListenerSniffer()); // listener which dumps to the console all events (for debug)

Unbind events and listeners

eventBus.unbind(listener);              // removes a listener from all events
eventBus.unbind("my.event");            // removes all listeners for this event
eventBus.unbind("my.event", listener);  // removes a specific listener from an event

And, of course, trigger events!

eventBus.trigger("my.event");               // can trigger an event
eventBus.trigger("my.event", "some data");  // event with data (must be serializable)

Here is what you can do with the Event object received by EventListener implementations:

event.getData();                // the data
event.getData(String.class);    // can cast the data in the wanted type
event.getName();                // the name of the event triggered
event.getSource();              // the ID of the eventbus
event.getTimestamp();           // time in ms of the event
event.isUserEvent();            // check if it is a user event. You might listen to system events such as eventbus.server.close, eventbus.client.connect, eventbus.client.disconnect

Remote EventBus

RemoteEventBus have the same builder options that a local EventBus but serves as inter-process communication through a socket. One EventBus acts as a server and several clients can connect to it.

Clients cannot talks to each-other. This is only a client-server communication, so any events triggers from a client will arrive on the server and any events triggered from the server will then be propagated to all clients.

Server creation:

EventBusServer server = new EventBusServer.Builder()
        .id("peer1")     // OPTIONAL: bus id
        .bind("0.0.0.0") // OPTIONAL: bind address
        .listen(56789)   // OPTIONAL: port to listen to. Default to 56789
        .listenRandom()  // OPTIONAL: choose a random port for listening
        .build();

Client creation

EventBusClient client = new EventBusClient.Builder()
        .id("peer2")
        .connect(56789)              // OPTIONAL: port to connect to
        .connect("localhost", 56789) // OPTIONAL: port and host to connect to. Default is localhost:56789
        .build();

If nothing is given in the builders, EventBus will try to use the system property ipc.bus.host for the host to connect to and ipc.bus.port for the port to connect to (or listen).

If no system property is found, localhost is used for the host and 56789 is used for the port.

Process Launching

Creates a Java process, similar to ProcessBuilder but has several improvements to access stdout, stderr and stdin of the process, cache them, forward them, access the process PID, etc.

AnyProcess process = AnyProcess.newBuilder()
        .command("bash", "-c", "sleep 3; echo $VAR")
        .recordStdout()                    // OPTIONAL: save stdout from process for getStdout() (disabled by default). Disables getInputStream().
        .recordStderr()                    // OPTIONAL: save stderr from process for getStderr() (disabled by default). Disabled getErrorStream().
        .env("key", "value")                // OPTIONAL: add a env. variable
        .env(new HashMap<String, String>()) // OPTIONAL: se ta new env
        .pipeStderr()                       // OPTIONAL: send stderr to the console
        .pipeStderr(outputStream)           // OPTIONAL: send stderr to a stream. You can both collect and pipe.
        .pipeStdout()                       // OPTIONAL: send stdout to the console
        .pipeStdout(outputStream)           // OPTIONAL: send stdout to a stream. You can both collect and pipe.
        .pipeStdin()                        // OPTIONAL: will bind process stdin to this process stding
        .pipeStdin(inputStream)             // OPTIONAL: will bind process stdin to this input stream
        .redirectStderr()                   // OPTIONAL: treat stderr like stdout (both merged into stdout)
        .workingDir(new File("."))          // OPTIONAL: change the working directly. Same as current process by default
        .build();

Accessible methods:

process.destroy();                          // destroy (kill with SIGTERM) the process
process.exitValue();                        // the process exit value, when available
process.getCommand();                       // the process command
process.getErrorStream();
process.getInputStream();
process.getOutputStream();
process.getPid();                           // get the process PID
process.getStderr();                        // if collected, get the stderr of the process
process.getStderrText();                    // if collected, get the stderr of the process as a String
process.getStdout();                        // if collected, get the stdout of the process
process.getStdoutText();                    // if collected, get the stdout of the process as a String
process.getWorkingDirectory();
process.isDestroyed();                      // check if process is destroyed
process.isRunning();                        // check if process is still running
process.waitFor();                          // wait and block while process finished
process.waitForTime(1, TimeUnit.MINUTES);   // wait for the process to finish or timeout

You can also use a Java Future:

Future future = process.getFuture();        // get a future representing the process execution. You can cancel (=destroy) the process or wait for its completion

Java Process Launching

Another builder allows you to quickly start a Java main class with specific env and system properties. You can access the same builder methods as above.

JavaProcess javaProcess = JavaProcess.newBuilder()
        .mainClass("my.corp.Echo")
        .addClasspath(Echo.class)           // add a classpath entry from a class (find the enclosing jar or folder)
        .arguments("one", "two")            // add some program arguments
        .env("VAR", "Hello")                // add some env. variable
        .addJvmProp("my.prop", "world")     // add some jvm props
        .addJvmArg("-Xmx512m")              // add some jvm flags
        .pipeStdout()                       // you can access all process builder methods seen above
        .pipeStderr()
        .recordStdout()
        .recordStderr()
        .build();

Java home and Java executable can be automatically discovered, but you can override them in the builder.

javaProcess.getJavaExecutable();            // automatically resolved from java home, but you can override it in the builder
javaProcess.getJavaHome();                  // automatically resolved from java home, but you can override it in the builder

Inter Java Process Communication

This builder allows you to start any main class linked to a remote EventBus to be able to communicate with some other processes.

Special events

Each child process will listen to the event process.exit so that you can force a child process to exit like this:

myProcess.trigger("process.exit");
// equivalent to
myProcess.trigger("process.exit", 0);
// or with a code:
myProcess.trigger("process.exit", 1);

The event process.exiting is fired by the child process when exiting.

When the process has fully exited, an event process.exited is fired.

But if the parent process calls process.destroy() to kill the child process, then the event process.destroyed will be fired after the process is destroyed by the SIGTERM signal.

Full Example

Create your main class. From there, you can access the EventBus statically. The event bus is connected to the parent process. So each event you send will be propagated, and you can listen to events sent by the parent process also.

public class EchoEvent2 {
  public static void main(String[] args) throws Exception {

    Bus.get().on("ping", new EventListener() {
      @Override
      public void onEvent(Event e) {
        Bus.get().trigger("pong", e.getData());
      }
    });

    Thread.sleep(2000);
  }
}

Then, just launch this main class by using the EventJavaProcess builder. It extends all the JavaProcess and AnyProcess classes so you may want to also configure additional things.

EventJavaProcess process = EventJavaProcess.newBuilder()
        .mainClass(EchoEvent2.class) // set main class to start and add it to classpath
        .pipeStdout() // echo stdout
        .pipeStderr() // echo stderr
        .debug() // activate debug mode for ipc eventbus
        .build();

assertTrue(process.isEventBusConnected());

And communicate with the child process like this:

process.on("process.exiting", new EventListener() {
    @Override
    public void onEvent(Event e) throws Throwable {
        System.out.println("Exiting...");
    }
});

process.on("process.exited", new EventListener() {
    @Override
    public void onEvent(Event e) throws Throwable {
        System.out.println("Exited.");
    }
});

process.on("pong", new EventListener() {
    @Override
    public void onEvent(Event e) throws Throwable {
        System.out.println(e.getData());
    }
});

process.trigger("ping", "hello");
process.trigger("process.exit");

process.waitFor();

You should see some output like this:

1440379569484 [11842] [main] ping@11842 at 1440379569484 - hello
1440379569485 [11844] [client-acceptor] eventbus.client.connect@11844 at 1440379569484 - localhost:62978
EchoEvent: Event{name='eventbus.client.connect', source=11844, data=localhost:62978}
1440379569489 [11842] [main] exit@11842 at 1440379569489
1440379569496 [11844] [reader@localhost:62978] pong@11844 at 1440379569496 - hello
EchoEvent: Event{name='pong', source=11844, data=hello}
1440379569499 [11844] [reader@localhost:62978] ping@11842 at 1440379569484 - hello
1440379569500 [11842] [reader@11842] pong@11844 at 1440379569496 - hello
EchoEvent: Event{name='ping', source=11842, data=hello}
1440379569842 [11842] [reader@11842] eventbus.client.disconnect@11842 at 1440379569842

About

IPC EventBus provides a simple EventBus API for intra-JVM and extra-JVM communication with process launching capabilities. This library's goal is to be used for integration tests to isolate components in different processes and provide a way to communicate between these process.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages