diff --git a/controller/build.gradle b/controller/build.gradle new file mode 100644 index 00000000..ebc0a99a --- /dev/null +++ b/controller/build.gradle @@ -0,0 +1,11 @@ +plugins { + id 'com.flipkart.varadhi.java-library-conventions' +} + +dependencies { + api(project(":entities")) + api(project(":spi")) + api(project(":core")) + + implementation("io.vertx:vertx-micrometer-metrics") +} diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/ControllerMgr.java b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerMgr.java new file mode 100644 index 00000000..3d109a6d --- /dev/null +++ b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerMgr.java @@ -0,0 +1,20 @@ +package com.flipkart.varadhi.controller; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.ControllerApi; +import com.flipkart.varadhi.core.ophandlers.WebServerApi; + +import java.util.concurrent.CompletableFuture; + +public class ControllerMgr implements ControllerApi { + private final WebServerApi webServerApiProxy; + + public ControllerMgr(WebServerApi webServerApiProxy) { + this.webServerApiProxy = webServerApiProxy; + } + @Override + public CompletableFuture StartSubscription(SubscriptionOperation.StartData operation) { + operation.markInProgress(); + return webServerApiProxy.update(operation); + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/MessageChannel.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/MessageChannel.java new file mode 100644 index 00000000..adc55de5 --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/MessageChannel.java @@ -0,0 +1,45 @@ +package com.flipkart.varadhi.core.cluster; + + +import com.flipkart.varadhi.core.cluster.messages.*; + +import java.util.concurrent.CompletableFuture; + +/** + * This is subset of methods from vertx.EventBus which are node specific. Instead of using address concept, we will + * rely on path concept from http. address is not needed as this class is meant to represent a message channel to a specific + * node. + * publish methods are fire & forget. + * send methods don't have any response, but their delivery can be tracked using the future. + * request methods are for traditional request-response pattern. + * + * TODO: whether to use the ClusterMessage class. It is unlikely we will use reply-to-reply feature. + */ +public interface MessageChannel { + + void publish(String apiPath, ClusterMessage msg); + + CompletableFuture send(String apiPath, ClusterMessage msg); + + CompletableFuture request(String apiPath, ClusterMessage msg); + + void register(String basePath, Class messageClazz, SendHandler handler); + + void removeMessageHandler(String apiPath); + + enum Method { + SEND("send"), + PUBLISH("publish"), + REQUEST("request"); + + private final String name; + + Method(String name) { + this.name = name; + } + @Override + public String toString() { + return name; + } + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ClusterMessage.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ClusterMessage.java new file mode 100644 index 00000000..64afa689 --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ClusterMessage.java @@ -0,0 +1,24 @@ +package com.flipkart.varadhi.core.cluster.messages; + + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Getter; + +@Getter +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@messageType") +@JsonSubTypes({@JsonSubTypes.Type(value = SubscriptionMessage.class, name = "subscription_message"),}) +public class ClusterMessage { + String id; + long timeStamp; + + public ClusterMessage() { + this.id = java.util.UUID.randomUUID().toString(); + this.timeStamp = System.currentTimeMillis(); + } + + public ClusterMessage(String id, long timeStamp) { + this.id = id; + this.timeStamp = timeStamp; + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ResponseMessage.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ResponseMessage.java new file mode 100644 index 00000000..304c85dc --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/ResponseMessage.java @@ -0,0 +1,6 @@ +package com.flipkart.varadhi.core.cluster.messages; + +public class ResponseMessage extends ClusterMessage { + // indicates a response to a request message. + // should contain the reference to original request (e.g. request id). +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SendHandler.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SendHandler.java new file mode 100644 index 00000000..0f30bb79 --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SendHandler.java @@ -0,0 +1,6 @@ +package com.flipkart.varadhi.core.cluster.messages; + +@FunctionalInterface +public interface SendHandler { + void handle(E message); +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionMessage.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionMessage.java new file mode 100644 index 00000000..4858ee8a --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionMessage.java @@ -0,0 +1,20 @@ +package com.flipkart.varadhi.core.cluster.messages; + +import lombok.EqualsAndHashCode; +import lombok.Getter; + +@Getter +@EqualsAndHashCode(callSuper = true) +public class SubscriptionMessage extends ClusterMessage { + private final SubscriptionOperation.OpData operation; + + public SubscriptionMessage(SubscriptionOperation.OpData operation) { + super(); + this.operation = operation; + } + + public SubscriptionMessage(String id, long timestamp, SubscriptionOperation.OpData operation) { + super(id, timestamp); + this.operation = operation; + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionOperation.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionOperation.java new file mode 100644 index 00000000..8972410f --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/messages/SubscriptionOperation.java @@ -0,0 +1,63 @@ +package com.flipkart.varadhi.core.cluster.messages; + + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.UUID; + +@Data +public class SubscriptionOperation { + public enum Kind { + START, STOP, UPDATE + } + public enum State { + SCHEDULED, ERRORED, COMPLETED, IN_PROGRESS + } + + private Kind kind; + private State state; + private String requestedBy; + private long startTime; + private long endTime; + private OpData data; + + public static SubscriptionOperation startOp(String subscriptionId, String requestedBy) { + SubscriptionOperation op = new SubscriptionOperation(); + op.setKind(Kind.START); + op.setRequestedBy(requestedBy); + op.setStartTime(System.currentTimeMillis()); + op.data = new StartData(subscriptionId, UUID.randomUUID().toString()); + return op; + } + + @Data + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@opDataType") + @JsonSubTypes({@JsonSubTypes.Type(value = StartData.class, name = "startData"),}) + public static class OpData { + private String subscriptionId; + private String operationId; + private State state; + private String errorMsg; + + public void markFail(String reason) { + state = State.ERRORED; + errorMsg = reason; + } + public void markInProgress() { + state = State.IN_PROGRESS; + } + } + + @Data + @EqualsAndHashCode(callSuper = true) + public static class StartData extends OpData { + public StartData(String subscriptionId, String operationId) { + this.setOperationId(operationId); + this.setSubscriptionId(subscriptionId); + this.setState(State.SCHEDULED); + } + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/ophandlers/ControllerApi.java b/core/src/main/java/com/flipkart/varadhi/core/ophandlers/ControllerApi.java new file mode 100644 index 00000000..948db056 --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/ophandlers/ControllerApi.java @@ -0,0 +1,9 @@ +package com.flipkart.varadhi.core.ophandlers; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; + +import java.util.concurrent.CompletableFuture; + +public interface ControllerApi { + CompletableFuture StartSubscription(SubscriptionOperation.StartData operation); +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/ophandlers/WebServerApi.java b/core/src/main/java/com/flipkart/varadhi/core/ophandlers/WebServerApi.java new file mode 100644 index 00000000..4f2bf7a7 --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/ophandlers/WebServerApi.java @@ -0,0 +1,9 @@ +package com.flipkart.varadhi.core.ophandlers; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; + +import java.util.concurrent.CompletableFuture; + +public interface WebServerApi { + CompletableFuture update(SubscriptionOperation.OpData operation); +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/proxies/ControllerApiProxy.java b/core/src/main/java/com/flipkart/varadhi/core/proxies/ControllerApiProxy.java new file mode 100644 index 00000000..537d7b4d --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/proxies/ControllerApiProxy.java @@ -0,0 +1,27 @@ +package com.flipkart.varadhi.core.proxies; + +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.ControllerApi; + +import java.util.concurrent.CompletableFuture; + +public class ControllerApiProxy implements ControllerApi { + private final String CONTROLLER_PATH = "controller"; + private final MessageChannel channel; + + public ControllerApiProxy(MessageChannel channel) { + this.channel = channel; + } + + @Override + public CompletableFuture StartSubscription(SubscriptionOperation.StartData operation) { + SubscriptionMessage message = new SubscriptionMessage(operation); + return channel.send(getApiPath(MessageChannel.Method.SEND, message), message); + } + + private String getApiPath(MessageChannel.Method method, SubscriptionMessage message) { + return CONTROLLER_PATH + "." + message.getClass().getSimpleName() + "." + method; + } +} diff --git a/core/src/main/java/com/flipkart/varadhi/core/proxies/WebServerApiProxy.java b/core/src/main/java/com/flipkart/varadhi/core/proxies/WebServerApiProxy.java new file mode 100644 index 00000000..11c82c0f --- /dev/null +++ b/core/src/main/java/com/flipkart/varadhi/core/proxies/WebServerApiProxy.java @@ -0,0 +1,25 @@ +package com.flipkart.varadhi.core.proxies; + +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.WebServerApi; + +import java.util.concurrent.CompletableFuture; + +public class WebServerApiProxy implements WebServerApi { + private final String SERVER_PATH = "webserver"; + private final MessageChannel channel; + + public WebServerApiProxy(MessageChannel channel) { + this.channel = channel; + } + @Override + public CompletableFuture update(SubscriptionOperation.OpData operation) { + SubscriptionMessage message = new SubscriptionMessage(operation); + return channel.send(getApiPath(MessageChannel.Method.SEND, message), message); + } + private String getApiPath(MessageChannel.Method method, SubscriptionMessage message) { + return SERVER_PATH + "." + message.getClass().getSimpleName() + "." + method; + } +} diff --git a/server/build.gradle b/server/build.gradle index 69e2c299..b7f5f7f0 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation(project(":messaging")) implementation(project(":entities")) implementation(project(":authz")) + implementation(project(":controller")) implementation(project(":consumer")) implementation("org.apache.logging.log4j:log4j-slf4j2-impl") @@ -52,6 +53,8 @@ dependencies { implementation("org.apache.curator:curator-framework") runtimeOnly(project(":pulsar")) + testImplementation("io.vertx:vertx-zookeeper:4.5.1") + testImplementation(project(":pulsar")) testImplementation(testFixtures(project(":spi"))) testImplementation(testFixtures(project(":entities"))) diff --git a/server/src/main/java/com/flipkart/varadhi/CoreServices.java b/server/src/main/java/com/flipkart/varadhi/CoreServices.java index fce3888e..242f9109 100644 --- a/server/src/main/java/com/flipkart/varadhi/CoreServices.java +++ b/server/src/main/java/com/flipkart/varadhi/CoreServices.java @@ -1,7 +1,7 @@ package com.flipkart.varadhi; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.spi.db.MetaStoreOptions; import com.flipkart.varadhi.spi.db.MetaStoreProvider; import com.flipkart.varadhi.spi.services.MessagingStackOptions; @@ -41,7 +41,7 @@ public class CoreServices { private final MessagingStackProvider messagingStackProvider; private final MetaStoreProvider metaStoreProvider; - public CoreServices(ServerConfig configuration) { + public CoreServices(AppConfiguration configuration) { this.observabilityStack = setupObservabilityStack(configuration); this.messagingStackProvider = setupMessagingStackProvider(configuration.getMessagingStackOptions()); this.metaStoreProvider = setupMetaStoreProvider(configuration.getMetaStoreOptions()); @@ -52,7 +52,7 @@ public Tracer getTracer(String instrumentationScope) { return this.observabilityStack.getOpenTelemetry().getTracer(instrumentationScope); } - public MeterRegistry getMetricsRegistry() { + public MeterRegistry getMeterRegistry() { return this.observabilityStack.getMeterRegistry(); } @@ -78,7 +78,8 @@ private MessagingStackProvider setupMessagingStackProvider(MessagingStackOptions return provider; } - private ObservabilityStack setupObservabilityStack(ServerConfig configuration) { + private ObservabilityStack setupObservabilityStack(AppConfiguration configuration) { + Resource resource = Resource.getDefault() .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "com.flipkart.varadhi"))); diff --git a/server/src/main/java/com/flipkart/varadhi/RestVerticle.java b/server/src/main/java/com/flipkart/varadhi/RestVerticle.java index 2dcb420f..0a1cf338 100644 --- a/server/src/main/java/com/flipkart/varadhi/RestVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/RestVerticle.java @@ -102,8 +102,8 @@ public void start(Promise startPromise) { log.info("HttpServer Started."); startPromise.complete(); } else { - log.warn("HttpServer Start Failed."); - startPromise.fail("HttpServer Start Failed."); + log.warn("HttpServer Start Failed." + h.cause()); + startPromise.fail("HttpServer Start Failed." + h.cause()); } }); } diff --git a/server/src/main/java/com/flipkart/varadhi/Server.java b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java similarity index 50% rename from server/src/main/java/com/flipkart/varadhi/Server.java rename to server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java index b0a12d19..3027a723 100644 --- a/server/src/main/java/com/flipkart/varadhi/Server.java +++ b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java @@ -1,15 +1,19 @@ package com.flipkart.varadhi; +import com.flipkart.varadhi.cluster.ClusterManager; import com.flipkart.varadhi.cluster.custom.ZookeeperClusterManager; -import com.flipkart.varadhi.config.ServerConfig; -import com.flipkart.varadhi.deployment.FullDeploymentVerticleDeployer; -import com.flipkart.varadhi.deployment.LeanDeploymentVerticleDeployer; +import com.flipkart.varadhi.cluster.impl.ClusterManagerImpl; +import com.flipkart.varadhi.components.Component; +import com.flipkart.varadhi.components.ComponentKind; +import com.flipkart.varadhi.components.controller.Controller; +import com.flipkart.varadhi.components.webserver.WebServer; +import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.exceptions.InvalidConfigException; import com.flipkart.varadhi.utils.HostUtils; -import io.opentelemetry.api.trace.Tracer; import io.vertx.config.ConfigRetriever; import io.vertx.config.ConfigRetrieverOptions; import io.vertx.config.ConfigStoreOptions; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.json.JsonObject; @@ -19,29 +23,50 @@ import io.vertx.tracing.opentelemetry.OpenTelemetryOptions; import lombok.extern.slf4j.Slf4j; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j -public class Server { +public class VaradhiApplication { public static void main(String[] args) { try { String hostName = HostUtils.getHostName(); - log.info("Server Starting on {}.", hostName); - ServerConfig configuration = readConfiguration(args); + log.info("VaradhiApplication Starting on {}.", hostName); + AppConfiguration configuration = readConfiguration(args); CoreServices services = new CoreServices(configuration); Vertx vertx = createVertx(configuration, services); - deployVerticle(hostName, configuration, services, vertx); - log.info("Server Started on {}.", hostName); + ClusterManager clusterManager = createClusterManager(vertx); + Map components = getComponents(configuration, services); + + Future.all(components.entrySet().stream().map(e -> e.getValue().start(vertx, clusterManager).onComplete(ar -> { + if (ar.succeeded()) { + log.info("Component({}) started.", e.getKey()); + } else { + log.error("Component({}) failed to start.", e.getKey(), ar.cause()); + } + })).collect(Collectors.toList())).onComplete(ar -> { + if (ar.succeeded()) { + log.info("VaradhiApplication Started on {}.", hostName); + } else { + log.error("VaradhiApplication failed to start.", ar.cause()); + } + }); } catch (Exception e) { - log.error("Failed to initialise the server.", e); + log.error("Failed to initialise the VaradhiApplication.", e); System.exit(-1); } + // TODO: check need for shutdown hook +// Runtime.getRuntime().addShutdownHook(); } - private static Vertx createVertx(ServerConfig configuration, CoreServices services) + + private static Vertx createVertx(AppConfiguration configuration, CoreServices services) throws ExecutionException, InterruptedException { log.debug("Creating Vertex"); @@ -50,7 +75,7 @@ private static Vertx createVertx(ServerConfig configuration, CoreServices servic VertxOptions vertxOptions = configuration.getVertxOptions() .setTracingOptions(new OpenTelemetryOptions(services.getOpenTelemetry())) .setMetricsOptions(new MicrometerMetricsOptions() - .setMicrometerRegistry(services.getMetricsRegistry()) + .setMicrometerRegistry(services.getMeterRegistry()) .setMetricsNaming(MetricsNaming.v4Names()) .setRegistryName("default") .addDisabledMetricsCategory(MetricsDomain.HTTP_SERVER) @@ -72,52 +97,21 @@ private static Vertx createVertx(ServerConfig configuration, CoreServices servic return vertx; } - private static void deployVerticle( - String hostName, ServerConfig configuration, CoreServices services, Vertx vertx - ) { - log.debug("Verticle deployment started."); - - // TODO: what is the correct scope name for tracer? - Tracer tracer = services.getTracer("varadhi"); - - VerticleDeployer verticleDeployer; - if (configuration.getFeatureFlags().isLeanDeployment()) { - verticleDeployer = new LeanDeploymentVerticleDeployer( - hostName, - vertx, - configuration, - services.getMessagingStackProvider(), - services.getMetaStoreProvider(), - services.getMetricsRegistry(), - tracer - - ); - } else { - verticleDeployer = new FullDeploymentVerticleDeployer( - hostName, - vertx, - configuration, - services.getMessagingStackProvider(), - services.getMetaStoreProvider(), - services.getMetricsRegistry(), - tracer - ); - } - - verticleDeployer.deployVerticle(vertx, configuration); - log.debug("Verticle deployment completed."); + private static ClusterManager createClusterManager(Vertx vertx) { + // TODO:: Placeholder for now. This node joining the cluster needs to be closed + // along with ClusterManager related changes. + return new ClusterManagerImpl(vertx); } - - public static ServerConfig readConfiguration(String[] args) { + public static AppConfiguration readConfiguration(String[] args) { if (args.length < 1) { - log.error("Usage: java com.flipkart.varadhi.Server configuration.yml"); + log.error("Usage: java com.flipkart.varadhi.VaradhiApplication configuration.yml"); System.exit(-1); } return readConfigFromFile(args[0]); } - public static ServerConfig readConfigFromFile(String filePath) throws InvalidConfigException { + public static AppConfiguration readConfigFromFile(String filePath) throws InvalidConfigException { log.info("Loading Configuration."); Vertx vertx = Vertx.vertx(); @@ -132,7 +126,7 @@ public static ServerConfig readConfigFromFile(String filePath) throws InvalidCon try { JsonObject content = retriever.getConfig().toCompletionStage().toCompletableFuture().join(); - return content.mapTo(ServerConfig.class); + return content.mapTo(AppConfiguration.class); } catch (Exception e) { throw new InvalidConfigException("Failed to load Application Configuration", e); } finally { @@ -140,4 +134,20 @@ public static ServerConfig readConfigFromFile(String filePath) throws InvalidCon vertx.close(); } } + + private static Map getComponents( + AppConfiguration configuration, CoreServices coreServices + ) { + //TODO:: check if there is need for ordered sequence of component. + return Arrays.stream(ComponentKind.values()) + .filter(kind -> !kind.equals(ComponentKind.All) && ( + configuration.getComponents().contains(ComponentKind.All) || + configuration.getComponents().contains(kind) + )) + .collect(Collectors.toMap(Function.identity(), kind -> switch (kind) { + case Server -> new WebServer(configuration, coreServices); + case Controller -> new Controller(configuration, coreServices); + default -> throw new IllegalArgumentException("Unknown Component Kind: " + kind); + })); + } } diff --git a/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java b/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java index 95da70d9..2a08f7da 100644 --- a/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java +++ b/server/src/main/java/com/flipkart/varadhi/VerticleDeployer.java @@ -2,9 +2,12 @@ import com.flipkart.varadhi.auth.DefaultAuthorizationProvider; import com.flipkart.varadhi.config.RestOptions; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.core.VaradhiTopicFactory; import com.flipkart.varadhi.core.VaradhiTopicService; +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.ophandlers.ControllerApi; +import com.flipkart.varadhi.core.proxies.ControllerApiProxy; import com.flipkart.varadhi.exceptions.VaradhiException; import com.flipkart.varadhi.produce.otel.ProducerMetricHandler; import com.flipkart.varadhi.produce.services.ProducerService; @@ -25,6 +28,7 @@ import com.flipkart.varadhi.web.v1.produce.ProduceHandlers; import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Tracer; +import io.vertx.core.Future; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; @@ -50,11 +54,11 @@ public abstract class VerticleDeployer { private final Map behaviorConfigurators = new HashMap<>(); public VerticleDeployer( - String hostName, Vertx vertx, - ServerConfig configuration, + AppConfiguration configuration, MessagingStackProvider messagingStackProvider, MetaStoreProvider metaStoreProvider, + MessageChannel messageChannel, MeterRegistry meterRegistry, Tracer tracer ) { @@ -86,7 +90,9 @@ public VerticleDeployer( producerMetricsHandler ); this.authZHandlersSupplier = getIamPolicyHandlersSupplier(projectService, metaStore); - this.subscriptionHandlers = new SubscriptionHandlers(new SubscriptionService(metaStore), projectService); + ControllerApi controllerApi = new ControllerApiProxy(messageChannel); + SubscriptionService subscriptionService = new SubscriptionService(metaStore, controllerApi); + this.subscriptionHandlers = new SubscriptionHandlers(subscriptionService, projectService); this.healthCheckHandler = new HealthCheckHandler(); AuthnHandler authnHandler = new AuthnHandler(vertx, configuration); @@ -130,28 +136,26 @@ public List getRouteDefinitions() { .collect(Collectors.toList()); } - public void deployVerticle(Vertx vertx, ServerConfig configuration) { + public Future deployVerticle(Vertx vertx, AppConfiguration configuration) { List handlerDefinitions = getRouteDefinitions(); if (shouldEnableAuthZHandlers(configuration)) { handlerDefinitions.addAll(authZHandlersSupplier.get().get()); } - vertx.deployVerticle( - () -> new RestVerticle( - handlerDefinitions, - behaviorConfigurators, - new FailureHandler(), - configuration.getHttpServerOptions() - ), - configuration.getVerticleDeploymentOptions() - ) - .onFailure(t -> { - log.error("Could not start HttpServer Verticle.", t); - throw new VaradhiException("Failed to Deploy Rest API.", t); - }) - .onSuccess(name -> log.debug("Successfully deployed the Verticle id({}).", name)); + return vertx.deployVerticle( + () -> new RestVerticle( + handlerDefinitions, + behaviorConfigurators, + new FailureHandler(), + configuration.getHttpServerOptions() + ), + configuration.getVerticleDeploymentOptions() + ).onFailure(t -> { + log.error("Could not start HttpServer Verticle.", t); + throw new VaradhiException("Failed to Deploy Rest API.", t); + }).onSuccess(name -> log.debug("Successfully deployed the Verticle id({}).", name)); } - private boolean shouldEnableAuthZHandlers(ServerConfig configuration) { + private boolean shouldEnableAuthZHandlers(AppConfiguration configuration) { String defaultProviderClass = DefaultAuthorizationProvider.class.getName(); return configuration.isAuthorizationEnabled() && defaultProviderClass.equals(configuration.getAuthorization().getProviderClassName()); diff --git a/server/src/main/java/com/flipkart/varadhi/WebServerOpManager.java b/server/src/main/java/com/flipkart/varadhi/WebServerOpManager.java new file mode 100644 index 00000000..f0a1294f --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/WebServerOpManager.java @@ -0,0 +1,14 @@ +package com.flipkart.varadhi; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.WebServerApi; + +import java.util.concurrent.CompletableFuture; + +public class WebServerOpManager implements WebServerApi { + @Override + public CompletableFuture update(SubscriptionOperation.OpData operation) { + // TODO:: persist the operation change. + return CompletableFuture.completedFuture(null); + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/ClusterManager.java b/server/src/main/java/com/flipkart/varadhi/cluster/ClusterManager.java index ba003368..85fab206 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/ClusterManager.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/ClusterManager.java @@ -1,5 +1,7 @@ package com.flipkart.varadhi.cluster; +import com.flipkart.varadhi.core.cluster.MessageChannel; + import java.util.List; /** @@ -13,7 +15,7 @@ public interface ClusterManager { void addMembershipListener(MembershipListener listener); - NodeConnection connect(String nodeId); + MessageChannel connect(String nodeId); // TODO: Any publish to all methods? diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MessageChannelImpl.java b/server/src/main/java/com/flipkart/varadhi/cluster/MessageChannelImpl.java new file mode 100644 index 00000000..cd74745f --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/cluster/MessageChannelImpl.java @@ -0,0 +1,75 @@ +package com.flipkart.varadhi.cluster; + +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.*; +import com.flipkart.varadhi.exceptions.NotImplementedException; +import com.flipkart.varadhi.utils.JsonMapper; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageConsumer; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@Slf4j +public class MessageChannelImpl implements MessageChannel { + private final EventBus vertxEventBus; + private final DeliveryOptions deliveryOptions; + private final Map> registeredConsumers; + + //TODO:: Add config details to DeliveryOptions e.g. timeouts, tracing etc as part of cluster manager changes. + public MessageChannelImpl(EventBus vertxEventBus) { + this.vertxEventBus = vertxEventBus; + this.deliveryOptions = new DeliveryOptions(); + this.registeredConsumers = new HashMap<>(); + } + + @Override + public void publish(String apiPath, ClusterMessage msg) { + throw new NotImplementedException("publish not implemented"); + } + + @Override + public CompletableFuture send(String apiPath, ClusterMessage msg) { + CompletableFuture future = new CompletableFuture<>(); + vertxEventBus.request(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions, ar -> { + if (ar.succeeded()) { + log.info("received reply: " + ar.result().body()); + future.complete(null); + } else { + log.info("send failure: " + ar.cause().getMessage()); + future.completeExceptionally(ar.cause()); + } + }); + return future; + } + + @Override + public CompletableFuture request(String apiPath, ClusterMessage msg) { + throw new NotImplementedException("request not implemented"); + } + + @Override + public void removeMessageHandler(String apiPath) { + MessageConsumer consumer = registeredConsumers.get(apiPath); + if (null != consumer) { + consumer.unregister(); + registeredConsumers.remove(apiPath); + } + } + + @Override + public void register(String basePath, Class messageClazz, SendHandler handler) { + String apiPath = basePath + "." + messageClazz.getSimpleName() + "." + Method.SEND ; + MessageConsumer consumer = vertxEventBus.consumer(apiPath, message -> { + E cm = JsonMapper.jsonDeserialize(message.body(), messageClazz); + message.reply("received"); + handler.handle(cm); + log.info("send({}) message processed.", cm.getId()); + }); + // not expected to be registered multiple times. + registeredConsumers.put(apiPath, consumer); + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/NodeConnection.java b/server/src/main/java/com/flipkart/varadhi/cluster/NodeConnection.java deleted file mode 100644 index 6a86a005..00000000 --- a/server/src/main/java/com/flipkart/varadhi/cluster/NodeConnection.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.flipkart.varadhi.cluster; - -import io.vertx.core.Future; -import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.Message; - -/** - * This is subset of methods from vertx.EventBus which are node specific. Instead of using address concept, we will - * rely on path concept from http. address is not needed as this class is meant to represent a connection to a specific - * node. - * publish methods are fire & forget. - * send methods don't have any response, but their delivery can be tracked using the future. - * request methods are for traditional request-response pattern. - * - * TODO: whether to use the Message class. It is unlikely we will use reply-to-reply feature. - */ -public interface NodeConnection { - - NodeInfo getNodeInfo(); - - void publish(String path, Object msg); - - void publish(String path, Object msg, DeliveryOptions options); - - Future send(String path, Object msg); - - Future send(String path, Object msg, DeliveryOptions options); - - Future> request(String path, Object msg); - - Future> request(String path, Object msg, DeliveryOptions options); -} diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/impl/ClusterManagerImpl.java b/server/src/main/java/com/flipkart/varadhi/cluster/impl/ClusterManagerImpl.java index 5f1594a6..2e3ddc59 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/impl/ClusterManagerImpl.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/impl/ClusterManagerImpl.java @@ -1,18 +1,21 @@ package com.flipkart.varadhi.cluster.impl; + import com.flipkart.varadhi.cluster.ClusterManager; import com.flipkart.varadhi.cluster.MembershipListener; -import com.flipkart.varadhi.cluster.NodeConnection; +import com.flipkart.varadhi.cluster.MessageChannelImpl; import com.flipkart.varadhi.cluster.NodeInfo; +import com.flipkart.varadhi.core.cluster.MessageChannel; +import io.vertx.core.Vertx; import lombok.RequiredArgsConstructor; import java.util.List; + @RequiredArgsConstructor public class ClusterManagerImpl implements ClusterManager { - // TODO: add instance of zkClusterManager & clusteredVertx and use it to implement the methods of this class - + private final Vertx vertx; @Override public List getAllMembers() { return null; @@ -24,7 +27,7 @@ public void addMembershipListener(MembershipListener listener) { } @Override - public NodeConnection connect(String nodeId) { - return null; + public MessageChannel connect(String nodeId) { + return new MessageChannelImpl(vertx.eventBus()); } } diff --git a/server/src/main/java/com/flipkart/varadhi/components/Component.java b/server/src/main/java/com/flipkart/varadhi/components/Component.java new file mode 100644 index 00000000..72c39f3c --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/Component.java @@ -0,0 +1,10 @@ +package com.flipkart.varadhi.components; + +import com.flipkart.varadhi.cluster.ClusterManager; +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +public interface Component { + Future start(Vertx vertx, ClusterManager clusterManager); + Future shutdown(Vertx vertx, ClusterManager clusterManager); +} diff --git a/server/src/main/java/com/flipkart/varadhi/components/ComponentKind.java b/server/src/main/java/com/flipkart/varadhi/components/ComponentKind.java new file mode 100644 index 00000000..bd3b5a02 --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/ComponentKind.java @@ -0,0 +1,17 @@ +package com.flipkart.varadhi.components; + +public enum ComponentKind { + Server("Server"), + Controller("Controller"), + All("All"); + + private final String name; + ComponentKind(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/components/controller/Controller.java b/server/src/main/java/com/flipkart/varadhi/components/controller/Controller.java new file mode 100644 index 00000000..333a4084 --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/controller/Controller.java @@ -0,0 +1,38 @@ +package com.flipkart.varadhi.components.controller; + +import com.flipkart.varadhi.CoreServices; +import com.flipkart.varadhi.cluster.ClusterManager; +import com.flipkart.varadhi.components.Component; +import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.controller.ControllerMgr; +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; +import com.flipkart.varadhi.core.ophandlers.WebServerApi; +import com.flipkart.varadhi.core.proxies.WebServerApiProxy; +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +public class Controller implements Component { + + public Controller(AppConfiguration configuration, CoreServices coreServices) { + } + + @Override + public Future start(Vertx vertx, ClusterManager clusterManager) { + MessageChannel messageChannel = clusterManager.connect(null); + setupApiHandlers(messageChannel); + return Future.succeededFuture(); + } + + @Override + public Future shutdown(Vertx vertx, ClusterManager clusterManager) { + return Future.succeededFuture(); + } + + private void setupApiHandlers(MessageChannel channel) { + WebServerApi serverApi = new WebServerApiProxy(channel); + ControllerMgr controllerMgr = new ControllerMgr(serverApi); + ControllerApiHandler handler = new ControllerApiHandler(controllerMgr,serverApi); + channel.register("controller", SubscriptionMessage.class, handler::start); + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/components/controller/ControllerApiHandler.java b/server/src/main/java/com/flipkart/varadhi/components/controller/ControllerApiHandler.java new file mode 100644 index 00000000..4d29ac87 --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/controller/ControllerApiHandler.java @@ -0,0 +1,27 @@ +package com.flipkart.varadhi.components.controller; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.ControllerApi; +import com.flipkart.varadhi.core.ophandlers.WebServerApi; + +import java.util.concurrent.CompletableFuture; + +public class ControllerApiHandler { + private final ControllerApi opHandler; + private final WebServerApi webServerApiProxy; + + public ControllerApiHandler(ControllerApi opHandler, WebServerApi webServerApiProxy) { + this.opHandler = opHandler; + this.webServerApiProxy = webServerApiProxy; + } + + public CompletableFuture start(SubscriptionMessage message) { + SubscriptionOperation.StartData operation = (SubscriptionOperation.StartData) message.getOperation(); + return opHandler.StartSubscription(operation).exceptionally(throwable -> { + operation.markFail(throwable.getMessage()); + webServerApiProxy.update(operation); + return null; + }); + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServer.java b/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServer.java new file mode 100644 index 00000000..c97e2385 --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServer.java @@ -0,0 +1,103 @@ +package com.flipkart.varadhi.components.webserver; + +import com.flipkart.varadhi.CoreServices; +import com.flipkart.varadhi.WebServerOpManager; +import com.flipkart.varadhi.VerticleDeployer; +import com.flipkart.varadhi.cluster.ClusterManager; +import com.flipkart.varadhi.components.Component; +import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; +import com.flipkart.varadhi.deployment.FullDeploymentVerticleDeployer; +import com.flipkart.varadhi.deployment.LeanDeploymentVerticleDeployer; +import io.opentelemetry.api.trace.Tracer; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class WebServer implements Component { + private final AppConfiguration configuration; + private final CoreServices coreServices; + private String verticleId; + private final WebServerApiHandler handler; + + public WebServer(AppConfiguration configuration, CoreServices coreServices) { + this.configuration = configuration; + this.coreServices = coreServices; + this.handler = new WebServerApiHandler(new WebServerOpManager()); + } + + @Override + public Future start(Vertx vertx, ClusterManager clusterManager) { + MessageChannel messageChannel = clusterManager.connect(null); + setupApiHandlers(messageChannel); + return deployVerticle(vertx, messageChannel); + } + + @Override + public Future shutdown(Vertx vertx, ClusterManager clusterManager) { + //TODO:: + // - fail health check. + // - reject any new request for retry (may be via a custom handler ?). + + // Not taking care of concurrent execution, in general not expected for startup/shutdown. + if (null != verticleId) { + log.info("Undeploy verticle {}.", verticleId); + return vertx.undeploy(verticleId).onComplete(ar -> { + if (ar.succeeded()) { + verticleId = null; + log.info("Undeploy completed"); + } else { + log.error("Undeploy failed.", ar.cause()); + } + }); + } else { + return Future.succeededFuture(); + } + } + + private Future deployVerticle(Vertx vertx, MessageChannel messageChannel) { + log.info("Verticle deployment started."); + VerticleDeployer verticleDeployer = createVerticleDeployer(vertx, messageChannel); + return verticleDeployer.deployVerticle(vertx, configuration).compose(r -> { + log.info("Verticle() deployment completed {}.", r); + verticleId = r; + return Future.succeededFuture(); + }, t -> { + log.error("Verticle() deployment failed.", t); + return Future.failedFuture(t); + }); + } + + private VerticleDeployer createVerticleDeployer(Vertx vertx, MessageChannel messageChannel) { + VerticleDeployer verticleDeployer; + Tracer tracer = coreServices.getTracer("varadhi"); + if (configuration.getFeatureFlags().isLeanDeployment()) { + verticleDeployer = new LeanDeploymentVerticleDeployer( + vertx, + configuration, + coreServices.getMessagingStackProvider(), + coreServices.getMetaStoreProvider(), + messageChannel, + coreServices.getMeterRegistry(), + tracer + ); + } else { + verticleDeployer = new FullDeploymentVerticleDeployer( + vertx, + configuration, + coreServices.getMessagingStackProvider(), + coreServices.getMetaStoreProvider(), + messageChannel, + coreServices.getMeterRegistry(), + tracer + ); + } + return verticleDeployer; + } + public void setupApiHandlers(MessageChannel messageChannel) { + messageChannel.register("webserver", SubscriptionMessage.class, handler::update); + } + +} diff --git a/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServerApiHandler.java b/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServerApiHandler.java new file mode 100644 index 00000000..6a9b7840 --- /dev/null +++ b/server/src/main/java/com/flipkart/varadhi/components/webserver/WebServerApiHandler.java @@ -0,0 +1,18 @@ +package com.flipkart.varadhi.components.webserver; + +import com.flipkart.varadhi.WebServerOpManager; +import com.flipkart.varadhi.core.cluster.messages.SubscriptionMessage; + +import java.util.concurrent.CompletableFuture; + +public class WebServerApiHandler { + private final WebServerOpManager serverOpMgr; + + public WebServerApiHandler(WebServerOpManager serverOpMgr) { + this.serverOpMgr = serverOpMgr; + } + + public CompletableFuture update(SubscriptionMessage message) { + return serverOpMgr.update(message.getOperation()); + } +} diff --git a/server/src/main/java/com/flipkart/varadhi/config/ServerConfig.java b/server/src/main/java/com/flipkart/varadhi/config/AppConfiguration.java similarity index 89% rename from server/src/main/java/com/flipkart/varadhi/config/ServerConfig.java rename to server/src/main/java/com/flipkart/varadhi/config/AppConfiguration.java index b7bc035c..19dddf6b 100644 --- a/server/src/main/java/com/flipkart/varadhi/config/ServerConfig.java +++ b/server/src/main/java/com/flipkart/varadhi/config/AppConfiguration.java @@ -3,17 +3,24 @@ import com.flipkart.varadhi.auth.AuthenticationOptions; import com.flipkart.varadhi.authz.AuthorizationOptions; import com.flipkart.varadhi.cluster.NodeResources; +import com.flipkart.varadhi.components.ComponentKind; import com.flipkart.varadhi.produce.config.ProducerOptions; import com.flipkart.varadhi.spi.db.MetaStoreOptions; import com.flipkart.varadhi.spi.services.MessagingStackOptions; import io.vertx.core.DeploymentOptions; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpServerOptions; +import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import lombok.Getter; +import java.util.List; + @Getter -public class ServerConfig { +public class AppConfiguration { + @NotEmpty + List components; + @NotNull private VertxOptions vertxOptions; diff --git a/server/src/main/java/com/flipkart/varadhi/deployment/FullDeploymentVerticleDeployer.java b/server/src/main/java/com/flipkart/varadhi/deployment/FullDeploymentVerticleDeployer.java index 8f93e5d6..d5ed4d17 100644 --- a/server/src/main/java/com/flipkart/varadhi/deployment/FullDeploymentVerticleDeployer.java +++ b/server/src/main/java/com/flipkart/varadhi/deployment/FullDeploymentVerticleDeployer.java @@ -1,7 +1,8 @@ package com.flipkart.varadhi.deployment; import com.flipkart.varadhi.VerticleDeployer; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.core.cluster.MessageChannel; import com.flipkart.varadhi.spi.db.MetaStoreProvider; import com.flipkart.varadhi.spi.services.MessagingStackProvider; import com.flipkart.varadhi.web.routes.RouteDefinition; @@ -24,12 +25,11 @@ public class FullDeploymentVerticleDeployer extends VerticleDeployer { private final ProjectHandlers projectHandlers; public FullDeploymentVerticleDeployer( - String hostName, Vertx vertx, ServerConfig configuration, + Vertx vertx, AppConfiguration configuration, MessagingStackProvider messagingStackProvider, MetaStoreProvider metaStoreProvider, - MeterRegistry meterRegistry, - Tracer tracer + MessageChannel messageChannel, MeterRegistry meterRegistry, Tracer tracer ) { - super(hostName, vertx, configuration, messagingStackProvider, metaStoreProvider, meterRegistry, tracer); + super(vertx, configuration, messagingStackProvider, metaStoreProvider, messageChannel, meterRegistry, tracer); this.orgHandlers = new OrgHandlers(this.orgService); this.teamHandlers = new TeamHandlers(this.teamService); this.projectHandlers = new ProjectHandlers(this.projectService); diff --git a/server/src/main/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployer.java b/server/src/main/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployer.java index 57766881..4fe320c7 100644 --- a/server/src/main/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployer.java +++ b/server/src/main/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployer.java @@ -3,7 +3,8 @@ import com.flipkart.varadhi.VerticleDeployer; import com.flipkart.varadhi.config.RestOptions; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.core.cluster.MessageChannel; import com.flipkart.varadhi.entities.Org; import com.flipkart.varadhi.entities.Project; import com.flipkart.varadhi.entities.Team; @@ -12,6 +13,8 @@ import com.flipkart.varadhi.spi.services.MessagingStackProvider; import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Tracer; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; @@ -20,21 +23,24 @@ @Slf4j public class LeanDeploymentVerticleDeployer extends VerticleDeployer { public LeanDeploymentVerticleDeployer( - String hostName, Vertx vertx, ServerConfig configuration, + Vertx vertx, AppConfiguration configuration, MessagingStackProvider messagingStackProvider, MetaStoreProvider metaStoreProvider, - MeterRegistry meterRegistry, - Tracer tracer + MessageChannel messageChannel, MeterRegistry meterRegistry, Tracer tracer ) { - super(hostName, vertx, configuration, messagingStackProvider, metaStoreProvider, meterRegistry, tracer); + super(vertx, configuration, messagingStackProvider, metaStoreProvider, messageChannel, meterRegistry, tracer); } @Override - public void deployVerticle( + public Future deployVerticle( Vertx vertx, - ServerConfig configuration + AppConfiguration configuration ) { - ensureLeanDeploymentConstraints(configuration.getRestOptions()); - super.deployVerticle(vertx, configuration); + Promise promise = Promise.promise(); + vertx.executeBlocking(future -> { + ensureLeanDeploymentConstraints(configuration.getRestOptions()); + future.complete(); + }, promise); + return promise.future().compose(v -> super.deployVerticle(vertx, configuration)); } private void ensureLeanDeploymentConstraints(RestOptions restOptions) { diff --git a/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java b/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java index 5ff7db21..e23bd4e9 100644 --- a/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java +++ b/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java @@ -1,5 +1,8 @@ package com.flipkart.varadhi.services; + +import com.flipkart.varadhi.core.cluster.messages.SubscriptionOperation; +import com.flipkart.varadhi.core.ophandlers.ControllerApi; import com.flipkart.varadhi.entities.VaradhiSubscription; import com.flipkart.varadhi.entities.VaradhiTopic; import com.flipkart.varadhi.exceptions.InvalidOperationForResourceException; @@ -11,11 +14,12 @@ import static com.flipkart.varadhi.entities.VersionedEntity.INITIAL_VERSION; public class SubscriptionService { - private final MetaStore metaStore; + private final ControllerApi controllerApi; - public SubscriptionService(MetaStore metaStore) { + public SubscriptionService(MetaStore metaStore, ControllerApi controllerApi) { this.metaStore = metaStore; + this.controllerApi = controllerApi; } public List getSubscriptionList(String projectName) { @@ -33,6 +37,11 @@ public VaradhiSubscription createSubscription(VaradhiSubscription subscription) return subscription; } + public void start(String subscriptionName, String requestedBy){ + SubscriptionOperation op = SubscriptionOperation.startOp(subscriptionName, requestedBy); + controllerApi.StartSubscription((SubscriptionOperation.StartData) op.getData()); + } + private void validateCreation(VaradhiSubscription subscription) { metaStore.getProject(subscription.getProject()); VaradhiTopic topic = metaStore.getTopic(subscription.getTopic()); diff --git a/server/src/main/java/com/flipkart/varadhi/utils/SubscriptionHelper.java b/server/src/main/java/com/flipkart/varadhi/utils/SubscriptionHelper.java index 6785f70a..32377f09 100644 --- a/server/src/main/java/com/flipkart/varadhi/utils/SubscriptionHelper.java +++ b/server/src/main/java/com/flipkart/varadhi/utils/SubscriptionHelper.java @@ -4,7 +4,10 @@ import com.flipkart.varadhi.entities.SubscriptionResource; import com.flipkart.varadhi.entities.SubscriptionShards; import com.flipkart.varadhi.entities.VaradhiSubscription; +import io.vertx.ext.web.RoutingContext; +import static com.flipkart.varadhi.Constants.PathParams.PATH_PARAM_PROJECT; +import static com.flipkart.varadhi.Constants.PathParams.PATH_PARAM_SUBSCRIPTION; import static com.flipkart.varadhi.entities.VersionedEntity.NAME_SEPARATOR; import static com.flipkart.varadhi.entities.VersionedEntity.NAME_SEPARATOR_REGEX; @@ -56,6 +59,12 @@ public static String buildSubscriptionName(String projectName, String subscripti return String.join(NAME_SEPARATOR, projectName, subscriptionName); } + public static String buildSubscriptionName(RoutingContext ctx) { + String projectName = ctx.pathParam(PATH_PARAM_PROJECT); + String subscriptionName = ctx.pathParam(PATH_PARAM_SUBSCRIPTION); + return SubscriptionHelper.buildSubscriptionName(projectName, subscriptionName); + } + public static String buildTopicName(String projectName, String topicName) { return String.join(NAME_SEPARATOR, projectName, topicName); } diff --git a/server/src/main/java/com/flipkart/varadhi/web/AuthnHandler.java b/server/src/main/java/com/flipkart/varadhi/web/AuthnHandler.java index a9de59fe..d0cd26cd 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/AuthnHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/web/AuthnHandler.java @@ -1,7 +1,7 @@ package com.flipkart.varadhi.web; import com.flipkart.varadhi.auth.AuthenticationOptions; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.exceptions.InvalidConfigException; import com.flipkart.varadhi.exceptions.VaradhiException; import com.flipkart.varadhi.web.routes.RouteConfigurator; @@ -27,7 +27,7 @@ public class AuthnHandler implements RouteConfigurator { private final Handler authenticationHandler; - public AuthnHandler(Vertx vertx, ServerConfig configuration) throws InvalidConfigException { + public AuthnHandler(Vertx vertx, AppConfiguration configuration) throws InvalidConfigException { if (configuration.isAuthenticationEnabled()) { authenticationHandler = switch (configuration.getAuthentication().getMechanism()) { diff --git a/server/src/main/java/com/flipkart/varadhi/web/AuthzHandler.java b/server/src/main/java/com/flipkart/varadhi/web/AuthzHandler.java index 1fc1e753..60dde49f 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/AuthzHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/web/AuthzHandler.java @@ -2,7 +2,7 @@ import com.flipkart.varadhi.authz.AuthorizationProvider; import com.flipkart.varadhi.authz.AuthorizationOptions; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.exceptions.InvalidConfigException; import com.flipkart.varadhi.web.routes.RouteConfigurator; import com.flipkart.varadhi.web.routes.RouteDefinition; @@ -12,7 +12,7 @@ public class AuthzHandler implements RouteConfigurator { private final AuthorizationHandlerBuilder authorizationHandlerBuilder; - public AuthzHandler(ServerConfig configuration) throws InvalidConfigException { + public AuthzHandler(AppConfiguration configuration) throws InvalidConfigException { if (configuration.isAuthenticationEnabled() && configuration.isAuthorizationEnabled()) { authorizationHandlerBuilder = createAuthorizationHandler(configuration); } else { @@ -26,7 +26,7 @@ public void configure(Route route, RouteDefinition routeDef) { } } - AuthorizationHandlerBuilder createAuthorizationHandler(ServerConfig configuration) { + AuthorizationHandlerBuilder createAuthorizationHandler(AppConfiguration configuration) { if (configuration.isAuthorizationEnabled()) { AuthorizationProvider authorizationProvider = getAuthorizationProvider(configuration); return new AuthorizationHandlerBuilder(configuration.getAuthorization() @@ -37,7 +37,7 @@ AuthorizationHandlerBuilder createAuthorizationHandler(ServerConfig configuratio } @SuppressWarnings("unchecked") - private AuthorizationProvider getAuthorizationProvider(ServerConfig configuration) { + private AuthorizationProvider getAuthorizationProvider(AppConfiguration configuration) { String providerClassName = configuration.getAuthorization().getProviderClassName(); if (StringUtils.isNotBlank(providerClassName)) { try { diff --git a/server/src/main/java/com/flipkart/varadhi/web/Extensions.java b/server/src/main/java/com/flipkart/varadhi/web/Extensions.java index 65d6dbce..cf28c0f6 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/Extensions.java +++ b/server/src/main/java/com/flipkart/varadhi/web/Extensions.java @@ -6,10 +6,13 @@ import io.netty.handler.codec.http.HttpHeaderValues; import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.auth.User; import io.vertx.ext.web.RequestBody; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; +import static com.flipkart.varadhi.MessageConstants.ANONYMOUS_IDENTITY; + public class Extensions { @@ -99,5 +102,12 @@ public static T getApiResponse(RoutingContext ctx) { public static void todo(RoutingContext context) { throw new NotImplementedException("Not Implemented."); } + + public static String getIdentityOrDefault(RoutingContext ctx) { + User user = ctx.user(); + return null != user ? user.subject() : ANONYMOUS_IDENTITY; + } + + } } diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java index 6e2e21ba..df107f96 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java @@ -99,9 +99,7 @@ public void list(RoutingContext ctx) { } public void get(RoutingContext ctx) { - String projectName = ctx.pathParam(PATH_PARAM_PROJECT); - String subscriptionName = ctx.pathParam(PATH_PARAM_SUBSCRIPTION); - String internalSubscriptionName = SubscriptionHelper.buildSubscriptionName(projectName, subscriptionName); + String internalSubscriptionName = SubscriptionHelper.buildSubscriptionName(ctx); SubscriptionResource subscription = SubscriptionHelper.toResource(subscriptionService.getSubscription(internalSubscriptionName)); ctx.endApiWithResponse(subscription); @@ -123,15 +121,12 @@ public void update(RoutingContext ctx) { } public void delete(RoutingContext ctx) { - String projectName = ctx.pathParam(PATH_PARAM_PROJECT); - String subscriptionName = ctx.pathParam(PATH_PARAM_SUBSCRIPTION); - String internalSubscriptionName = SubscriptionHelper.buildSubscriptionName(projectName, subscriptionName); - subscriptionService.deleteSubscription(internalSubscriptionName); + subscriptionService.deleteSubscription(SubscriptionHelper.buildSubscriptionName(ctx)); ctx.endApi(); } public void start(RoutingContext ctx) { - ctx.todo(); + subscriptionService.start(SubscriptionHelper.buildSubscriptionName(ctx), ctx.getIdentityOrDefault()); } public void stop(RoutingContext ctx) { @@ -140,7 +135,7 @@ public void stop(RoutingContext ctx) { private SubscriptionResource getValidSubscriptionResource(RoutingContext ctx) { String projectName = ctx.pathParam(PATH_PARAM_PROJECT); - SubscriptionResource subscription = ctx.get(CONTEXT_KEY_BODY);; + SubscriptionResource subscription = ctx.get(CONTEXT_KEY_BODY); // ensure project name consistent if (!projectName.equals(subscription.getProject())) { diff --git a/server/src/main/resources/configuration.yml b/server/src/main/resources/configuration.yml index 872fc7a5..b988de1b 100755 --- a/server/src/main/resources/configuration.yml +++ b/server/src/main/resources/configuration.yml @@ -1,3 +1,6 @@ + +components : ["Server", "Controller", "All"] + restOptions: deployedRegion: "default" defaultOrg: "default" diff --git a/server/src/test/java/com/flipkart/varadhi/cluster/MessageChannelImplTest.java b/server/src/test/java/com/flipkart/varadhi/cluster/MessageChannelImplTest.java new file mode 100644 index 00000000..3f4944e2 --- /dev/null +++ b/server/src/test/java/com/flipkart/varadhi/cluster/MessageChannelImplTest.java @@ -0,0 +1,93 @@ +package com.flipkart.varadhi.cluster; + +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.flipkart.varadhi.core.cluster.MessageChannel; +import com.flipkart.varadhi.core.cluster.messages.ClusterMessage; +import com.flipkart.varadhi.core.cluster.messages.SendHandler; +import com.flipkart.varadhi.utils.JsonMapper; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Mockito.spy; + + +@ExtendWith(VertxExtension.class) +public class MessageChannelImplTest { + + CuratorFramework zkCuratorFramework; + + // TODO:: Tests needs to be added, so this will go under refactor + @BeforeEach + public void setup() { + JsonMapper.getMapper().registerSubtypes(new NamedType(TestClusterMessage.class, "TestClusterMessage")); + JsonMapper.getMapper() + .registerSubtypes(new NamedType(ExtendedTestClusterMessage.class, "ExtendedTestClusterMessage")); + } + + private Vertx createClusteredVertx() throws Exception { + TestingServer zkCuratorTestingServer = new TestingServer(); + zkCuratorFramework = spy( + CuratorFrameworkFactory.newClient( + zkCuratorTestingServer.getConnectString(), new ExponentialBackoffRetry(1000, 1))); + zkCuratorFramework.start(); + ClusterManager cm = new ZookeeperClusterManager(zkCuratorFramework, "foo"); + return Vertx.builder().withClusterManager(cm).buildClustered().toCompletionStage().toCompletableFuture().get(); + } + + @Test + public void sendMessageNoConsumer(VertxTestContext testContext) throws Exception { + Checkpoint checkpoint = testContext.checkpoint(1); + Vertx vertx = createClusteredVertx(); + MessageChannelImpl c = new MessageChannelImpl(vertx.eventBus()); + ClusterMessage cm = getClusterMessage("foo"); + Future.fromCompletionStage(c.send("foo", cm)).onComplete(testContext.failing(v -> checkpoint.flag())); + } + + @Test + public void testSendMessageConsumerCollocated(VertxTestContext testContext) throws Exception { + Checkpoint checkpoint = testContext.checkpoint(2); + Vertx vertx = createClusteredVertx(); + MessageChannelImpl c = new MessageChannelImpl(vertx.eventBus()); + c.register("testAddress", ExtendedTestClusterMessage.class, new SendHandler<>() { + @Override + public void handle(ExtendedTestClusterMessage message) { + checkpoint.flag(); + } + }); + + ClusterMessage cm = getClusterMessage("foo"); + String address = "testAddress" + "." + ExtendedTestClusterMessage.class.getSimpleName() + "." + MessageChannel.Method.SEND; + Future.fromCompletionStage(c.send(address, cm)).onComplete(testContext.succeeding(v -> checkpoint.flag())); + } + + ClusterMessage getClusterMessage(String data) { + ExtendedTestClusterMessage dm = new ExtendedTestClusterMessage(); + dm.data1 = data; + dm.data2 = data; + return dm; + } + + public static class TestClusterMessage extends ClusterMessage { + String data1; + } + + public static class ExtendedTestClusterMessage extends TestClusterMessage { + String data2; + } + + +} diff --git a/server/src/test/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployerTest.java b/server/src/test/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployerTest.java index 4ff67175..5e106bd0 100644 --- a/server/src/test/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployerTest.java +++ b/server/src/test/java/com/flipkart/varadhi/deployment/LeanDeploymentVerticleDeployerTest.java @@ -1,11 +1,11 @@ package com.flipkart.varadhi.deployment; -import com.flipkart.varadhi.config.ServerConfig; +import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.core.cluster.MessageChannel; import com.flipkart.varadhi.db.VaradhiMetaStore; import com.flipkart.varadhi.entities.Org; import com.flipkart.varadhi.entities.Project; import com.flipkart.varadhi.entities.Team; -import com.flipkart.varadhi.exceptions.InvalidConfigException; import com.flipkart.varadhi.services.OrgService; import com.flipkart.varadhi.services.ProjectService; import com.flipkart.varadhi.services.TeamService; @@ -17,18 +17,25 @@ import com.flipkart.varadhi.web.routes.RouteDefinition; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + import java.util.List; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +@ExtendWith(VertxExtension.class) public class LeanDeploymentVerticleDeployerTest { TestingServer zkCuratorTestingServer; @@ -41,16 +48,16 @@ public class LeanDeploymentVerticleDeployerTest { MessagingStackProvider messagingStackProvider; MetaStoreProvider metaStoreProvider; - - ServerConfig serverConfiguration; + AppConfiguration appConfiguration; MeterRegistry meterRegistry; - Vertx vertx = Vertx.vertx(); + Vertx vertx; private OrgService orgService; private TeamService teamService; private ProjectService projectService; + private MessageChannel messageChannel; private static final String TEST_ORG = "testOrg"; @@ -65,6 +72,7 @@ public class LeanDeploymentVerticleDeployerTest { @BeforeEach public void setup() throws Exception { + vertx = Vertx.vertx(); zkCuratorTestingServer = new TestingServer(); zkCurator = spy(CuratorFrameworkFactory.newClient( zkCuratorTestingServer.getConnectString(), new ExponentialBackoffRetry(1000, 1))); @@ -74,62 +82,79 @@ public void setup() throws Exception { messagingStackProvider = mock(MessagingStackProvider.class); metaStoreProvider = mock(MetaStoreProvider.class); meterRegistry = mock(MeterRegistry.class); + messageChannel = mock(MessageChannel.class); + when(metaStoreProvider.getMetaStore()).thenReturn(varadhiMetaStore); when(messagingStackProvider.getProducerFactory()).thenReturn(mock(ProducerFactory.class)); - serverConfiguration = YamlLoader.loadConfig( + appConfiguration = YamlLoader.loadConfig( "src/test/resources/testConfiguration.yml", - ServerConfig.class - ); + AppConfiguration.class); + orgService = new OrgService(varadhiMetaStore); teamService = new TeamService(varadhiMetaStore); projectService = new ProjectService( varadhiMetaStore, - serverConfiguration.getRestOptions().getProjectCacheBuilderSpec(), - meterRegistry - ); + appConfiguration.getRestOptions().getProjectCacheBuilderSpec(), + meterRegistry); + leanDeploymentVerticleDeployer = new LeanDeploymentVerticleDeployer( - "testHostName", vertx, - serverConfiguration, + appConfiguration, messagingStackProvider, metaStoreProvider, + messageChannel, meterRegistry, null ); } + @AfterEach + public void tearDown(VertxTestContext testContext) throws Exception { + Checkpoint cp = testContext.checkpoint(1); + vertx.close().onComplete(testContext.succeeding(v -> { + cp.flag(); + })); + } + @Test - public void testNoEntitiesPresent_Success() { - leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration); - - Org org = orgService.getOrg( - serverConfiguration.getRestOptions().getDefaultOrg()); - assertNotNull(org); - assertEquals(serverConfiguration.getRestOptions().getDefaultOrg(), org.getName()); - - Team team = teamService.getTeam( - org.getName(), serverConfiguration.getRestOptions().getDefaultTeam()); - assertNotNull(team); - assertEquals(serverConfiguration.getRestOptions().getDefaultTeam(), team.getName()); - - Project project = projectService.getProject( - serverConfiguration.getRestOptions().getDefaultProject()); - assertNotNull(project); - assertEquals(serverConfiguration.getRestOptions().getDefaultProject(), project.getName()); + public void testNoEntitiesPresent_Success(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + leanDeploymentVerticleDeployer + .deployVerticle(vertx, appConfiguration) + .onComplete(testContext.succeeding( id -> { + Org org = orgService.getOrg( + appConfiguration.getRestOptions().getDefaultOrg()); + assertNotNull(org); + assertEquals(appConfiguration.getRestOptions().getDefaultOrg(), org.getName()); + + Team team = teamService.getTeam( + org.getName(), appConfiguration.getRestOptions().getDefaultTeam()); + assertNotNull(team); + assertEquals(appConfiguration.getRestOptions().getDefaultTeam(), team.getName()); + + Project project = projectService.getProject( + appConfiguration.getRestOptions().getDefaultProject()); + assertNotNull(project); + assertEquals(appConfiguration.getRestOptions().getDefaultProject(), project.getName()); + cp.flag(); + } + )); } @Test - public void testEntitiesPresentWithDefaultName_Success() { - Org org = new Org(serverConfiguration.getRestOptions().getDefaultOrg(), 0); + public void testEntitiesPresentWithDefaultName_Success(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + Org org = new Org(appConfiguration.getRestOptions().getDefaultOrg(), 0); orgService.createOrg(org); - Team team = new Team(serverConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); + Team team = new Team(appConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); teamService.createTeam(team); Project project = new Project( - serverConfiguration.getRestOptions().getDefaultProject(), + appConfiguration.getRestOptions().getDefaultProject(), + 0, "", team.getName(), @@ -137,135 +162,121 @@ public void testEntitiesPresentWithDefaultName_Success() { ); projectService.createProject(project); - leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration); - - org = orgService.getOrg(serverConfiguration.getRestOptions().getDefaultOrg()); - assertNotNull(org); - assertEquals(serverConfiguration.getRestOptions().getDefaultOrg(), org.getName()); - - team = teamService.getTeam( - org.getName(), serverConfiguration.getRestOptions().getDefaultTeam()); - assertNotNull(team); - assertEquals(serverConfiguration.getRestOptions().getDefaultTeam(), team.getName()); - - project = projectService.getProject( - serverConfiguration.getRestOptions().getDefaultProject()); - assertNotNull(project); - assertEquals(serverConfiguration.getRestOptions().getDefaultProject(), project.getName()); + leanDeploymentVerticleDeployer + .deployVerticle(vertx, appConfiguration) + .onComplete(testContext.succeeding( id -> { + Org orgObtained = orgService.getOrg(appConfiguration.getRestOptions().getDefaultOrg()); + assertNotNull(orgObtained); + assertEquals(appConfiguration.getRestOptions().getDefaultOrg(), orgObtained.getName()); + + Team teamObtained = teamService.getTeam( + org.getName(), appConfiguration.getRestOptions().getDefaultTeam()); + assertNotNull(teamObtained); + assertEquals(appConfiguration.getRestOptions().getDefaultTeam(), teamObtained.getName()); + + Project pObtained = projectService.getProject( + appConfiguration.getRestOptions().getDefaultProject()); + assertNotNull(pObtained); + assertEquals(appConfiguration.getRestOptions().getDefaultProject(), pObtained.getName()); + cp.flag(); + })); } @Test - public void testDifferentOrgPresent_Failure() { + public void testDifferentOrgPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); Org org = new Org(TEST_ORG, 0); orgService.createOrg(org); - - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals(String.format( - "Lean deployment can not be enabled as org with %s name is present.", - TEST_ORG - ), exception.getMessage()); + leanDeploymentVerticleDeployer + .deployVerticle(vertx, appConfiguration) + .onComplete(testContext.failing(t -> { + assertEquals(String.format( + "Lean deployment can not be enabled as org with %s name is present.", + TEST_ORG), t.getMessage()); + cp.flag(); + })); } @Test - public void testMultipleOrgsPresent_Failure() { + public void testMultipleOrgsPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); Org org1 = new Org(TEST_ORG, 0); Org org2 = new Org(TEST_ORG + "2", 0); orgService.createOrg(org1); orgService.createOrg(org2); + leanDeploymentVerticleDeployer.deployVerticle(vertx, appConfiguration).onComplete(testContext.failing( t -> + { + assertEquals("Lean deployment can not be enabled as there are more than one orgs.", + t.getMessage()); + cp.flag(); + })); - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals( - "Lean deployment can not be enabled as there are more than one orgs.", - exception.getMessage() - ); } @Test - public void testDifferentTeamPresent_Failure() { - Org org = new Org(serverConfiguration.getRestOptions().getDefaultOrg(), 0); + public void testDifferentTeamPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + Org org = new Org(appConfiguration.getRestOptions().getDefaultOrg(), 0); orgService.createOrg(org); Team team = new Team(TEST_TEAM, 0, org.getName()); teamService.createTeam(team); - - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals(String.format( - "Lean deployment can not be enabled as team with %s name is present.", - TEST_TEAM - ), exception.getMessage()); + leanDeploymentVerticleDeployer.deployVerticle(vertx, appConfiguration).onComplete(testContext.failing(t -> { + assertEquals(String.format( + "Lean deployment can not be enabled as team with %s name is present.", + TEST_TEAM), t.getMessage()); + cp.flag(); + })); } @Test - public void testMultipleTeamsPresent_Failure() { - Org org = new Org(serverConfiguration.getRestOptions().getDefaultOrg(), 0); + public void testMultipleTeamsPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + Org org = new Org(appConfiguration.getRestOptions().getDefaultOrg(), 0); orgService.createOrg(org); Team team1 = new Team(TEST_TEAM, 0, org.getName()); Team team2 = new Team(TEST_TEAM + "2", 0, org.getName()); teamService.createTeam(team1); teamService.createTeam(team2); - - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals( - "Lean deployment can not be enabled as there are more than one teams.", - exception.getMessage() - ); + leanDeploymentVerticleDeployer.deployVerticle(vertx, appConfiguration).onComplete(testContext.failing( t -> { + assertEquals("Lean deployment can not be enabled as there are more than one teams.", + t.getMessage()); + cp.flag(); + } )); } @Test - public void testDifferentProjectPresent_Failure() { - Org org = new Org(serverConfiguration.getRestOptions().getDefaultOrg(), 0); + public void testDifferentProjectPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + Org org = new Org(appConfiguration.getRestOptions().getDefaultOrg(), 0); orgService.createOrg(org); - Team team = new Team(serverConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); + Team team = new Team(appConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); teamService.createTeam(team); Project project = new Project(TEST_PROJECT, 0, "", team.getName(), org.getName()); projectService.createProject(project); - - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals(String.format( - "Lean deployment can not be enabled as project with %s name is present.", - TEST_PROJECT - ), exception.getMessage()); + leanDeploymentVerticleDeployer.deployVerticle(vertx, appConfiguration).onComplete(testContext.failing( t -> { + assertEquals(String.format( + "Lean deployment can not be enabled as project with %s name is present.", + TEST_PROJECT), t.getMessage()); + cp.flag(); + })); } @Test - public void testMultipleProjectsPresent_Failure() { - Org org = new Org(serverConfiguration.getRestOptions().getDefaultOrg(), 0); + public void testMultipleProjectsPresent_Failure(VertxTestContext testContext) { + Checkpoint cp = testContext.checkpoint(1); + Org org = new Org(appConfiguration.getRestOptions().getDefaultOrg(), 0); orgService.createOrg(org); - Team team = new Team(serverConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); + Team team = new Team(appConfiguration.getRestOptions().getDefaultTeam(), 0, org.getName()); teamService.createTeam(team); Project project1 = new Project(TEST_PROJECT, 0, "", team.getName(), org.getName()); Project project2 = new Project(TEST_PROJECT + "2", 0, "", team.getName(), org.getName()); projectService.createProject(project1); projectService.createProject(project2); - - InvalidConfigException exception = assertThrows( - InvalidConfigException.class, - () -> leanDeploymentVerticleDeployer.deployVerticle(vertx, serverConfiguration) - ); - - assertEquals( - "Lean deployment can not be enabled as there are more than one projects.", - exception.getMessage() - ); + leanDeploymentVerticleDeployer.deployVerticle(vertx, appConfiguration).onComplete(testContext.failing( t -> { + assertEquals("Lean deployment can not be enabled as there are more than one projects.", + t.getMessage()); + cp.flag(); + })); } @Test diff --git a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java index 6d32417e..c2bf1db7 100644 --- a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java +++ b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java @@ -46,6 +46,7 @@ void setUp() throws Exception { zkCuratorTestingServer.getConnectString(), new ExponentialBackoffRetry(1000, 1))); zkCurator.start(); varadhiMetaStore = spy(new VaradhiMetaStore(zkCurator)); + orgService = new OrgService(varadhiMetaStore); teamService = new TeamService(varadhiMetaStore); meterRegistry = new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM); @@ -66,7 +67,7 @@ void setUp() throws Exception { projectService.createProject(o1t1p1); projectService.createProject(o1t1p2); - subscriptionService = new SubscriptionService(varadhiMetaStore); + subscriptionService = new SubscriptionService(varadhiMetaStore, null); } @Test diff --git a/server/src/test/resources/testConfiguration.yml b/server/src/test/resources/testConfiguration.yml index e4ec5054..bd1e7643 100644 --- a/server/src/test/resources/testConfiguration.yml +++ b/server/src/test/resources/testConfiguration.yml @@ -1,3 +1,5 @@ +components : ["Server"] + restOptions: deployedRegion: "test" defaultOrg: "test" @@ -44,7 +46,7 @@ metaStoreOptions: configFile: "src/main/resources/zkConfig.yml" httpServerOptions: - port: 8080 + port: 8091 alpnVersions: [ "HTTP_1_1", "HTTP_2" ] decompressionSupported: false useAlpn: true diff --git a/settings.gradle b/settings.gradle index f4bb1b0b..0be2edd1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,4 +8,5 @@ */ rootProject.name = 'varadhi' -include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'consumer', 'authz', 'server') +include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'consumer', 'controller', 'authz', 'server') + diff --git a/setup/docker/Dockerfile b/setup/docker/Dockerfile index 7c0899ab..d9e18ec3 100644 --- a/setup/docker/Dockerfile +++ b/setup/docker/Dockerfile @@ -33,4 +33,4 @@ USER varadhi WORKDIR /usr/share/varadhi # Start Varadhi server -CMD java -cp ./*:dependencies/* -Dcom.sun.management.jmxremote.port=9990 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false com.flipkart.varadhi.Server /etc/varadhi/configuration.yml +CMD java -cp ./*:dependencies/* -Dcom.sun.management.jmxremote.port=9990 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false com.flipkart.varadhi.VaradhiApplication /etc/varadhi/configuration.yml