Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

messagechannel.send() implementation #102

Merged
merged 12 commits into from
Mar 19, 2024
11 changes: 11 additions & 0 deletions controller/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id 'com.flipkart.varadhi.java-library-conventions'
}

dependencies {
api(project(":entities"))
api(project(":spi"))
api(project(":core"))

implementation("io.vertx:vertx-micrometer-metrics")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation;
import com.flipkart.varadhi.core.ophandlers.ControllerOpHandler;
import com.flipkart.varadhi.core.ophandlers.ServerOpHandler;
import com.flipkart.varadhi.core.proxies.ServerOpMgrProxy;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.spi.db.MetaStore;
import io.micrometer.core.instrument.MeterRegistry;

import java.util.concurrent.CompletableFuture;

public class ControllerMgr implements ControllerOpHandler {
private ServerOpHandler serverOpHandler;

public ControllerMgr(ServerOpHandler serverOpHandler) {
this.serverOpHandler = serverOpHandler;
}
@Override
public CompletableFuture<Void> StartSubscription(SubscriptionOperation operation) {
operation.markInProgress();
serverOpHandler.update(operation);
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.flipkart.varadhi.core.cluster;


import com.flipkart.varadhi.core.cluster.messages.*;

import java.util.concurrent.CompletableFuture;

/**
* This is subset of methods from vertx.EventBus which are node specific. Instead of using address concept, we will
* rely on path concept from http. address is not needed as this class is meant to represent a message channel to a specific
* node.
* publish methods are fire & forget.
* send methods don't have any response, but their delivery can be tracked using the future.
* request methods are for traditional request-response pattern.
*
* TODO: whether to use the ClusterMessage class. It is unlikely we will use reply-to-reply feature.
*/
public interface MessageChannel {

void publish(String path, ClusterMessage msg);

CompletableFuture<Void> send(String path, ClusterMessage msg);

CompletableFuture<ResponseMessage> request(String path, ClusterMessage msg);

// void addMessageHandler(String path, MessageHandler messageHandler);
<E extends ClusterMessage> void register(String address, Class<E> messageClazz, SendHandler<E> handler);
kmrdhruv marked this conversation as resolved.
Show resolved Hide resolved

<E extends ClusterMessage> void register(String address, Class<E> messageClazz, RequestHandler<E> handler);

<E extends ClusterMessage> void register(String address, Class<E> messageClazz, PublishHandler<E> handler);

void removeMessageHandler(String path);

public static enum Method {
SEND("send"),
PUBLISH("publish"),
REQUEST("request");

private String name;

Method(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.flipkart.varadhi.core.cluster.messages;


import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.Getter;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@messageType")
@Getter
public class ClusterMessage {
String id;
long timeStamp;

public ClusterMessage() {
this.id = java.util.UUID.randomUUID().toString();
this.timeStamp = System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.flipkart.varadhi.core.cluster.messages;

public interface PublishHandler<E extends ClusterMessage> {
void handle(E message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.flipkart.varadhi.core.cluster.messages;

import java.util.concurrent.CompletableFuture;

public interface RequestHandler <E extends ClusterMessage> {
CompletableFuture<ResponseMessage> handle(E message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flipkart.varadhi.core.cluster.messages;

public class ResponseMessage extends ClusterMessage {
// indicates a response to a request message.
// should contain the reference to original request (e.g. request id).
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.flipkart.varadhi.core.cluster.messages;

import java.util.concurrent.CompletableFuture;

@FunctionalInterface
public interface SendHandler<E extends ClusterMessage> {
CompletableFuture<Void> handle(E message);
kmrdhruv marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.core.cluster.messages;

import lombok.EqualsAndHashCode;
import lombok.Getter;

@Getter
@EqualsAndHashCode(callSuper = true)
public class SubscriptionMessage extends ClusterMessage {
private final String subscriptionId;
private final SubscriptionOperation operation;

public SubscriptionMessage(SubscriptionOperation operation) {
super();
this.subscriptionId = operation.getSubscriptionId();
this.operation = operation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.flipkart.varadhi.core.cluster.messages;


import com.flipkart.varadhi.entities.VaradhiSubscription;
import lombok.Data;

import java.util.UUID;

@Data
public class SubscriptionOperation {
public static enum Kind {
CREATE, START, STOP, DELETE, UPDATE
kmrdhruv marked this conversation as resolved.
Show resolved Hide resolved
}
public static enum State {
SCHEDULED, ERRORED, COMPLETED, IN_PROGRESS
}

private String operationId;
private Kind kind;
private State state;
private String subscriptionId;
private String requestedBy;
private String errorMessage;
private long startTime;
private long endTime;

public void markInProgress() {
state = State.IN_PROGRESS;
}

public SubscriptionMessage toMessage() {
return new SubscriptionMessage(this);
}

public static SubscriptionOperation getSubscriptionOp(Kind opKind, String subscriptionId, String requestedBy) {
SubscriptionOperation op = new SubscriptionOperation();
op.setOperationId(UUID.randomUUID().toString());
op.setKind(opKind);
op.setState(State.SCHEDULED);
op.setSubscriptionId(subscriptionId);
op.setRequestedBy(requestedBy);
op.setStartTime(System.currentTimeMillis());
return op;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.flipkart.varadhi.core.ophandlers;

import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation;

import java.util.concurrent.CompletableFuture;

public interface ControllerOpHandler {
CompletableFuture<Void> StartSubscription(SubscriptionOperation operation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.flipkart.varadhi.core.ophandlers;

import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation;
import com.flipkart.varadhi.entities.VaradhiSubscription;

import java.util.concurrent.CompletableFuture;

public interface ServerOpHandler {
CompletableFuture<Void> update(SubscriptionOperation operation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.flipkart.varadhi.core.proxies;

import com.flipkart.varadhi.core.cluster.MessageChannel;
import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage;
import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation;
import com.flipkart.varadhi.core.ophandlers.ControllerOpHandler;

import java.util.concurrent.CompletableFuture;

public class ControllerMgrProxy implements ControllerOpHandler {
private final String CONTROLLER_ADDR = "Controller";
private final MessageChannel channel;

public ControllerMgrProxy(MessageChannel channel) {
this.channel = channel;
}

@Override
public CompletableFuture<Void> StartSubscription(SubscriptionOperation operation) {
SubscriptionMessage message = operation.toMessage();
return channel.send(getAddress(MessageChannel.Method.SEND, message), message);
}

private String getAddress(MessageChannel.Method method, SubscriptionMessage message) {
return CONTROLLER_ADDR + "." + message.getClass().getSimpleName() + "." + method;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.flipkart.varadhi.core.proxies;

import com.flipkart.varadhi.core.cluster.MessageChannel;
import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage;
import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation;
import com.flipkart.varadhi.core.ophandlers.ServerOpHandler;
import com.flipkart.varadhi.entities.VaradhiSubscription;

import java.util.concurrent.CompletableFuture;

public class ServerOpMgrProxy implements ServerOpHandler {
private final String SERVER_ADDR = "Server";
private final MessageChannel channel;

public ServerOpMgrProxy(MessageChannel channel) {
this.channel = channel;
}
@Override
public CompletableFuture<Void> update(SubscriptionOperation operation) {
SubscriptionMessage message = operation.toMessage();
return channel.send(getAddress(MessageChannel.Method.SEND, message), message);
}

private String getAddress(MessageChannel.Method method, SubscriptionMessage message) {
return SERVER_ADDR + "." + message.getClass().getSimpleName() + "." + method;
}
}
3 changes: 3 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation(project(":messaging"))
implementation(project(":entities"))
implementation(project(":authz"))
implementation(project(":controller"))
implementation(project(":consumer"))

implementation("org.apache.logging.log4j:log4j-slf4j2-impl")
Expand Down Expand Up @@ -50,6 +51,8 @@ dependencies {
implementation("org.apache.curator:curator-framework")
runtimeOnly(project(":pulsar"))

testImplementation("io.vertx:vertx-zookeeper:4.5.1")

testImplementation(project(":pulsar"))
testImplementation(testFixtures(project(":spi")))
testImplementation(testFixtures(project(":entities")))
Expand Down
9 changes: 5 additions & 4 deletions server/src/main/java/com/flipkart/varadhi/CoreServices.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.flipkart.varadhi;


import com.flipkart.varadhi.config.ServerConfig;
import com.flipkart.varadhi.config.AppConfiguration;
import com.flipkart.varadhi.spi.db.MetaStoreOptions;
import com.flipkart.varadhi.spi.db.MetaStoreProvider;
import com.flipkart.varadhi.spi.services.MessagingStackOptions;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class CoreServices {
private final MessagingStackProvider messagingStackProvider;
private final MetaStoreProvider metaStoreProvider;

public CoreServices(ServerConfig configuration) {
public CoreServices(AppConfiguration configuration) {
this.observabilityStack = setupObservabilityStack(configuration);
this.messagingStackProvider = setupMessagingStackProvider(configuration.getMessagingStackOptions());
this.metaStoreProvider = setupMetaStoreProvider(configuration.getMetaStoreOptions());
Expand All @@ -50,7 +50,7 @@ public Tracer getTracer(String instrumentationScope) {
return this.observabilityStack.getOpenTelemetry().getTracer(instrumentationScope);
}

public MeterRegistry getMetricsRegistry() {
public MeterRegistry getMeterRegistry() {
return this.observabilityStack.getMeterRegistry();
}

Expand All @@ -76,7 +76,8 @@ private MessagingStackProvider setupMessagingStackProvider(MessagingStackOptions
return provider;
}

private ObservabilityStack setupObservabilityStack(ServerConfig configuration) {
private ObservabilityStack setupObservabilityStack(AppConfiguration configuration) {

Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "com.flipkart.varadhi")));

Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/com/flipkart/varadhi/RestVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void start(Promise<Void> startPromise) {
log.info("HttpServer Started.");
startPromise.complete();
} else {
log.warn("HttpServer Start Failed.");
startPromise.fail("HttpServer Start Failed.");
log.warn("HttpServer Start Failed." + h.cause());
startPromise.fail("HttpServer Start Failed." + h.cause());
}
});
}
Expand Down
Loading
Loading