diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java index 4e7c59ca..9da16c6c 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumerApiMgr.java @@ -1,6 +1,7 @@ package com.flipkart.varadhi.consumer; import com.flipkart.varadhi.core.cluster.ConsumerApi; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.entities.VaradhiSubscription; import com.flipkart.varadhi.entities.cluster.ShardState; @@ -33,7 +34,22 @@ public CompletableFuture start(ShardOperation.StartData operation) { } @Override - public CompletableFuture getStatus(String subscriptionId, int shardId) { + public CompletableFuture stop(ShardOperation.StopData operation) { + VaradhiSubscription subscription = operation.getSubscription(); + return consumersManager.stopSubscription( + subscription.getName(), + "" + ); + } + + @Override + public CompletableFuture getShardStatus(String subscriptionId, int shardId) { return CompletableFuture.completedFuture(new ShardStatus(ShardState.UNKNOWN, "Not a owner of shard")); } + + @Override + public CompletableFuture getConsumerInfo() { + //TODO::Return assignments as well. + return CompletableFuture.completedFuture(consumersManager.getInfo()); + } } diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java index 8e666582..9818a6a3 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/ConsumersManager.java @@ -4,6 +4,7 @@ import com.flipkart.varadhi.entities.Endpoint; import com.flipkart.varadhi.entities.Project; import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.spi.services.TopicPartitions; import java.util.concurrent.CompletableFuture; @@ -40,4 +41,6 @@ CompletableFuture startSubscription( ConsumerState getConsumerState(String subscription, String shardName); // TODO likely need status on the starting / stopping as well; as the above status is for a running consumer.. + + ConsumerInfo getInfo(); } diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java index e5fcda15..2411349b 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/impl/ConsumersManagerImpl.java @@ -3,18 +3,17 @@ import com.flipkart.varadhi.consumer.ConsumerState; import com.flipkart.varadhi.consumer.ConsumersManager; import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy; -import com.flipkart.varadhi.entities.ConsumptionPolicy; -import com.flipkart.varadhi.entities.Endpoint; -import com.flipkart.varadhi.entities.Project; -import com.flipkart.varadhi.entities.StorageTopic; +import com.flipkart.varadhi.entities.*; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.spi.services.TopicPartitions; import java.util.concurrent.CompletableFuture; public class ConsumersManagerImpl implements ConsumersManager { + private final ConsumerInfo consumerInfo; - - public ConsumersManagerImpl() { + public ConsumersManagerImpl(ConsumerInfo consumerInfo) { + this.consumerInfo = consumerInfo; } @Override @@ -28,7 +27,7 @@ public CompletableFuture startSubscription( @Override public CompletableFuture stopSubscription(String subscription, String shardName) { - return null; + return CompletableFuture.completedFuture(null); } @Override @@ -45,4 +44,9 @@ public void resumeSubscription(String subscription, String shardName) { public ConsumerState getConsumerState(String subscription, String shardName) { return null; } + + @Override + public ConsumerInfo getInfo() { + return consumerInfo; + } } diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java index 12ba7d0a..ddd8736f 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/ControllerApiMgr.java @@ -1,43 +1,65 @@ package com.flipkart.varadhi.controller; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import com.flipkart.varadhi.entities.cluster.Assignment; -import com.flipkart.varadhi.entities.cluster.ConsumerNode; +import com.flipkart.varadhi.entities.cluster.*; import com.flipkart.varadhi.core.cluster.*; import com.flipkart.varadhi.entities.SubscriptionShards; import com.flipkart.varadhi.entities.SubscriptionUnitShard; import com.flipkart.varadhi.entities.VaradhiSubscription; -import com.flipkart.varadhi.entities.cluster.ShardOperation; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; +import com.flipkart.varadhi.exceptions.InvalidOperationForResourceException; import com.flipkart.varadhi.spi.db.MetaStore; import com.flipkart.varadhi.spi.db.MetaStoreProvider; +import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; - @Slf4j public class ControllerApiMgr implements ControllerApi { - private final WebServerApi webServerApiProxy; private final ShardAssigner shardAssigner; private final ConsumerClientFactory consumerClientFactory; private final MetaStore metaStore; private final OperationMgr operationMgr; public ControllerApiMgr( - WebServerApi webServerApiProxy, ConsumerClientFactory consumerClientFactory, MetaStoreProvider metaStoreProvider + ConsumerClientFactory consumerClientFactory, MetaStoreProvider metaStoreProvider, + MeterRegistry meterRegistry ) { - this.webServerApiProxy = webServerApiProxy; this.consumerClientFactory = consumerClientFactory; - this.shardAssigner = new ShardAssigner(metaStoreProvider.getAssignmentStore()); + this.shardAssigner = new ShardAssigner(metaStoreProvider.getAssignmentStore(), meterRegistry); this.metaStore = metaStoreProvider.getMetaStore(); this.operationMgr = new OperationMgr(metaStoreProvider.getOpStore()); } - public void addConsumerNodes(List clusterConsumers) { - shardAssigner.addConsumerNodes(clusterConsumers); + public CompletableFuture addConsumerNodes(List clusterConsumers) { + return CompletableFuture.allOf(clusterConsumers.stream() + .map(cc -> getConsumerInfo(cc.getConsumerId()).thenAccept( + cc::updateWithConsumerInfo)).toArray(CompletableFuture[]::new)) + .thenAccept(v -> shardAssigner.addConsumerNodes(clusterConsumers)); + } + + private CompletableFuture getSubscriptionStatus(VaradhiSubscription subscription) { + String subId = subscription.getName(); + List assignments = shardAssigner.getSubscriptionAssignment(subId); + + List> shardFutures = assignments.stream().map(a -> { + ConsumerApi consumer = getAssignedConsumer(a); + return consumer.getShardStatus(subId, a.getShardId()); + }).toList(); + + return CompletableFuture.allOf(shardFutures.toArray(CompletableFuture[]::new)).thenApply(v -> { + List shardStatuses = new ArrayList<>(); + shardFutures.forEach(sf -> shardStatuses.add(sf.join())); + return getSubscriptionStatusFromShardStatus(subscription, assignments, shardStatuses); + }); + } + + private SubscriptionStatus getSubscriptionStatusFromShardStatus( + VaradhiSubscription subscription, List assignments, List shardStatuses + ) { + SubscriptionState state = SubscriptionState.getFromShardStates(assignments, shardStatuses); + return new SubscriptionStatus(subscription.getName(), state); } /** @@ -47,76 +69,152 @@ public void addConsumerNodes(List clusterConsumers) { */ @Override - public CompletableFuture startSubscription(SubscriptionOperation.StartData opData) { - log.info("Starting the Subscription: {}", opData); + public CompletableFuture startSubscription( + String subscriptionId, String requestedBy + ) { + VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId); + return getSubscriptionStatus(subscription).exceptionally(t -> { + // If not temporary, then alternate needs to be provided to allow recovery from this. + throw new IllegalStateException("Unable to determine subscription status, try again after sometime."); + }).thenApply(ss -> { + if (ss.getState() == SubscriptionState.RUNNING || ss.getState() == SubscriptionState.STARTING) { + throw new InvalidOperationForResourceException("Subscription is already running or starting."); + } + log.info("Starting the Subscription: {}", subscriptionId); + // operationMgr is not expected to create a subOp and throw, so failure is not handled here. + // TODO:: fix this w.r.to failure in getOrCreateShardAssignment or its chain + return operationMgr.requestSubStart( + subscriptionId, requestedBy, subOp -> getOrCreateShardAssignment(subscription).thenCompose( + assignments -> startShards(subOp, subscription, assignments))); + }); + } - if (opData.completed()) { - log.warn("Ignoring already Subscription Operation: {}", opData); - return CompletableFuture.completedFuture(null); + private CompletableFuture> getOrCreateShardAssignment(VaradhiSubscription subscription) { + List assignedShards = shardAssigner.getSubscriptionAssignment(subscription.getName()); + if (assignedShards.isEmpty()) { + List unAssigned = getSubscriptionShards(subscription.getShards()); + return shardAssigner.assignShard(unAssigned, subscription); + } else { + log.info( + "{} Shards for Subscription {} are already assigned", assignedShards.size(), + subscription.getName() + ); + return CompletableFuture.completedFuture(assignedShards); } + } - opData.markInProgress(); - return webServerApiProxy.update(opData).thenAccept(v -> { - String subId = opData.getSubscriptionId(); - VaradhiSubscription subscription = metaStore.getSubscription(subId); - List assignedShards = shardAssigner.getSubscriptionAssignment(subscription.getName()); - if (assignedShards.isEmpty()) { - List unAssigned = getSubscriptionShards(subscription.getShards()); - assignedShards.addAll(shardAssigner.assignShard(unAssigned, subscription)); - } else { - log.info( - "{} Shards for Subscription {} are already assigned", assignedShards.size(), - subscription.getName() - ); - } - - SubscriptionShards shards = subscription.getShards(); - CompletableFuture.allOf(assignedShards.stream() - .map(assignment -> startShard(assignment, shards.getShard(assignment.getShardId()), subscription)) - .toArray(CompletableFuture[]::new)).exceptionally(t -> { - markSubOpFailed(opData, t); - return null; - }); - log.info("Scheduled Subscription start({}).", opData); - }).exceptionally(t -> { - markSubOpFailed(opData, t); + private CompletableFuture startShards( + SubscriptionOperation subOp, VaradhiSubscription subscription, List assignments + ) { + SubscriptionShards shards = subscription.getShards(); + String subOpId = subOp.getData().getOperationId(); + CompletableFuture future = CompletableFuture.allOf(assignments.stream() + .map(assignment -> startShard(subOpId, assignment, shards.getShard(assignment.getShardId()), + subscription + )).toArray(CompletableFuture[]::new)).exceptionally(t -> { + markSubOpFailed(subOp, t); return null; }); + log.info("Scheduled Start on {} shards for SubOp({}).", shards.getShardCount(), subOp.getData()); + return future; } + private CompletableFuture startShard( + String subOpId, Assignment assignment, SubscriptionUnitShard shard, VaradhiSubscription subscription + ) { + String subId = subscription.getName(); + int shardId = shard.getShardId(); + ConsumerApi consumer = getAssignedConsumer(assignment); + return consumer.getShardStatus(subId, shardId).thenCompose(shardStatus -> { + // IsAssigned is started|starting|errored. + // Stopping isn't considered as assigned, as start/stop shouldn't be running in parallel. + if (!shardStatus.isAssigned()) { + ShardOperation shardOp = operationMgr.requestShardStart(subOpId, shard, subscription); + return consumer.start((ShardOperation.StartData) shardOp.getOpData()).whenComplete((v, t) -> { + if (t != null) { + markShardOpFailed(shardOp, t); + } else { + log.info("Scheduled shard start({}).", shardOp); + } + }); + } else { + log.info("Subscription:{} Shard:{} is already started. Skipping.", subId, shardId); + return CompletableFuture.completedFuture(null); + } + }); + } - private void markSubOpFailed(SubscriptionOperation.OpData opData, Throwable t) { - log.error("Failed to schedule Subscription start({}): {}.", opData, t); - opData.markFail(t.getMessage()); - webServerApiProxy.update(opData); + @Override + public CompletableFuture stopSubscription( + String subscriptionId, String requestedBy + ) { + VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId); + return getSubscriptionStatus(subscription).exceptionally(t -> { + throw new IllegalStateException("Unable to determine subscription status, try again after sometime."); + }).thenApply(ss -> { + if (ss.getState() == SubscriptionState.STOPPED) { + throw new InvalidOperationForResourceException("Subscription is already stopped."); + } + log.info("Stopping the Subscription: {}", subscriptionId); + // operationMgr is not expected to create a subOp and throw, so failure is not handled here. + // TODO:: fix this w.r.to failure in stopShards + return operationMgr.requestSubStop(subscriptionId, requestedBy, subOp -> stopShards(subOp, subscription)); + }); } + private CompletableFuture stopShards(SubscriptionOperation subOp, VaradhiSubscription subscription) { + String subId = subscription.getName(); + String subOpId = subOp.getData().getOperationId(); + SubscriptionShards shards = subscription.getShards(); + List assignments = shardAssigner.getSubscriptionAssignment(subId); + log.info( + "Found {} assigned Shards for the Subscription:{} with Shards:{}.", assignments.size(), + subId, shards.getShardCount() + ); + CompletableFuture future = CompletableFuture.allOf(assignments.stream() + .map(assignment -> stopShard(subOpId, assignment, shards.getShard(assignment.getShardId()), + subscription + )).toArray(CompletableFuture[]::new)).exceptionally(t -> { + markSubOpFailed(subOp, t); + return null; + }).thenCompose(v -> shardAssigner.unAssignShard(assignments, subscription)); + log.info("Scheduled Stop on {} shards for SubOp({}).", shards.getShardCount(), subOp.getData()); + return future; + } - /** - * Start the shard if it is not already started. - * TODO:: Implement queuing/sequencing of the operations for specific shard (may be in operations mgr) - */ - private CompletableFuture startShard( - Assignment assignment, SubscriptionUnitShard shard, VaradhiSubscription subscription + private CompletableFuture stopShard( + String subOpId, Assignment assignment, SubscriptionUnitShard shard, VaradhiSubscription subscription ) { + String subId = subscription.getName(); + int shardId = shard.getShardId(); ConsumerApi consumer = getAssignedConsumer(assignment); - return consumer.getStatus(subscription.getName(), shard.getShardId()).thenAccept(shardStatus -> { + return consumer.getShardStatus(subId, shardId).thenCompose(shardStatus -> { if (!shardStatus.isAssigned()) { - ShardOperation.StartData opData = operationMgr.requestShardStart(shard, subscription); - consumer.start(opData).whenComplete((v, t) -> { + ShardOperation shardOp = operationMgr.requestShardStop(subOpId, shard, subscription); + return consumer.stop((ShardOperation.StopData) shardOp.getOpData()).whenComplete((v, t) -> { if (t != null) { - markShardOpFailed(opData, t); - }else{ - log.info("Scheduled shard start({}).", opData); + markShardOpFailed(shardOp, t); + } else { + log.info("Scheduled shard stop({}).", shardOp); } }); + } else { + log.info("Subscription:{} Shard:{} is already Stopped. Skipping.", subId, shardId); + return CompletableFuture.completedFuture(null); } }); } - private void markShardOpFailed(ShardOperation.OpData opData, Throwable t) { - log.error("Failed to schedule shard start({}): {}.", opData, t.getMessage()); - opData.markFail(t.getMessage()); - update(opData); + + private void markSubOpFailed(SubscriptionOperation subscriptionOp, Throwable t) { + log.error("Subscription operation ({}) failed: {}.", subscriptionOp, t); + subscriptionOp.markFail(t.getMessage()); + operationMgr.updateSubOp(subscriptionOp); + } + + private void markShardOpFailed(ShardOperation shardOp, Throwable t) { + log.error("shard operation ({}) failed: {}.", shardOp, t.getMessage()); + shardOp.markFail(t.getMessage()); + operationMgr.updateShardOp(shardOp.getOpData()); } private ConsumerApi getAssignedConsumer(Assignment assignment) { @@ -132,24 +230,32 @@ private List getSubscriptionShards(SubscriptionShards sha return unitShards; } - @Override - public CompletableFuture stopSubscription(SubscriptionOperation.StopData operation) { - operation.markInProgress(); - return webServerApiProxy.update(operation); - } - @Override public CompletableFuture update(ShardOperation.OpData opData) { log.debug("Received update on shard operation: {}", opData); - operationMgr.updateShardOp(opData); - return CompletableFuture.completedFuture(null); + try { + // Update is getting executed inline on dispatcher thread. + operationMgr.updateShardOp(opData); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } } - public void consumerNodeLeft(String consumerNodeId) { + public CompletableFuture consumerNodeLeft(String consumerNodeId) { shardAssigner.consumerNodeLeft(consumerNodeId); + return CompletableFuture.completedFuture(null); + } + + public CompletableFuture consumerNodeJoined(ConsumerNode consumerNode) { + return getConsumerInfo(consumerNode.getConsumerId()).thenAccept(ci -> { + consumerNode.updateWithConsumerInfo(ci); + shardAssigner.consumerNodeJoined(consumerNode); + }); } - public void consumerNodeJoined(ConsumerNode consumerNode) { - shardAssigner.consumerNodeJoined(consumerNode); + private CompletableFuture getConsumerInfo(String consumerId) { + ConsumerApi consumer = consumerClientFactory.getInstance(consumerId); + return consumer.getConsumerInfo(); } } diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java b/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java index cc06142f..4f8f3f43 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/ShardAssigner.java @@ -1,29 +1,38 @@ package com.flipkart.varadhi.controller; +import com.flipkart.varadhi.entities.SubscriptionShards; import com.flipkart.varadhi.entities.cluster.Assignment; import com.flipkart.varadhi.entities.cluster.ConsumerNode; import com.flipkart.varadhi.controller.impl.LeastAssignedStrategy; import com.flipkart.varadhi.entities.SubscriptionUnitShard; import com.flipkart.varadhi.entities.VaradhiSubscription; import com.flipkart.varadhi.spi.db.AssignmentStore; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableBoolean; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.stream.Collectors; @Slf4j public class ShardAssigner { + private final String metricPrefix = "controller.assigner"; private final AssignmentStrategy strategy; private final Map consumerNodes; private final AssignmentStore assignmentStore; + private final ExecutorService executor; - public ShardAssigner(AssignmentStore assignmentStore) { + public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistry) { this.strategy = new LeastAssignedStrategy(); this.consumerNodes = new ConcurrentHashMap<>(); this.assignmentStore = assignmentStore; + //TODO::ExecutorService should emit the metrics. + this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("assigner-%d").build()); } public void addConsumerNodes(List clusterConsumers) { @@ -33,16 +42,60 @@ public void addConsumerNodes(List clusterConsumers) { }); } - public List assignShard(List shards, VaradhiSubscription subscription) { - //TODO:: It need to ensure, assignment is not using stale values, specifically if they are running in parallel. - List activeConsumers = - consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList()); - log.info("AssignShards consumer nodes active:{} of total:{}", activeConsumers.size(), consumerNodes.size()); - List assignments = strategy.assign(shards, subscription, activeConsumers); - assignmentStore.createAssignments(assignments); - return assignments; + public CompletableFuture> assignShard( + List shards, VaradhiSubscription subscription + ) { + return CompletableFuture.supplyAsync(() -> { + List activeConsumers = + consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList()); + log.info( + "AssignShards found consumer nodes active:{} of total:{}", activeConsumers.size(), + consumerNodes.size() + ); + + List assignments = new ArrayList<>(); + try { + assignments.addAll(strategy.assign(shards, subscription, activeConsumers)); + assignmentStore.createAssignments(assignments); + return assignments; + } catch (Exception e) { + log.error("Failed while creating assignment, freeing up any allocation done. {}.", e.getMessage()); + assignments.forEach(assignment -> freeAssignedCapacity(assignment, subscription)); + throw e; + } + }, executor); } + + public CompletableFuture unAssignShard(List assignments, VaradhiSubscription subscription) { + return CompletableFuture.supplyAsync(() -> { + try { + assignmentStore.deleteAssignments(assignments); + assignments.forEach(a -> { + String consumerId = a.getConsumerId(); + ConsumerNode consumerNode = consumerNodes.getOrDefault(consumerId, null); + if (null == consumerNode) { + log.error("Consumer node not found, for assignment {}. Ignoring unAssignShard", a); + } else { + SubscriptionShards shards = subscription.getShards(); + consumerNode.free(a, shards.getShard(a.getShardId()).getCapacityRequest()); + } + }); + return null; + } catch (Exception e) { + log.error("Failed while unAssigning Shards. {}.", e.getMessage()); + throw e; + } + }, executor); + } + + private void freeAssignedCapacity(Assignment assignment, VaradhiSubscription subscription) { + SubscriptionUnitShard shard = subscription.getShards().getShard(assignment.getShardId()); + ConsumerNode consumerNode = consumerNodes.get(assignment.getConsumerId()); + consumerNode.free(assignment, shard.getCapacityRequest()); + } + + public List getSubscriptionAssignment(String subscriptionName) { return assignmentStore.getSubscriptionAssignments(subscriptionName); } @@ -54,19 +107,6 @@ public void consumerNodeJoined(ConsumerNode consumerNode) { } } - private boolean addConsumerNode(ConsumerNode consumerNode) { - String consumerNodeId = consumerNode.getMemberInfo().hostname(); - MutableBoolean added = new MutableBoolean(false); - consumerNodes.computeIfAbsent(consumerNodeId, k -> { - added.setTrue(); - return consumerNode; - }); - if (!added.booleanValue()) { - log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId)); - } - return added.booleanValue(); - } - public void consumerNodeLeft(String consumerNodeId) { //TODO:: re-assign the shards (should this be trigger from here or from the controller) ? MutableBoolean marked = new MutableBoolean(false); @@ -81,4 +121,18 @@ public void consumerNodeLeft(String consumerNodeId) { log.warn("ConsumerNode {} not found.", consumerNodeId); } } + + private boolean addConsumerNode(ConsumerNode consumerNode) { + String consumerNodeId = consumerNode.getMemberInfo().hostname(); + MutableBoolean added = new MutableBoolean(false); + consumerNodes.computeIfAbsent(consumerNodeId, k -> { + added.setTrue(); + return consumerNode; + }); + if (!added.booleanValue()) { + log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId)); + } + return added.booleanValue(); + } + } diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java b/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java index a17bb028..26500651 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/impl/LeastAssignedStrategy.java @@ -22,7 +22,6 @@ public List assign( List shards, VaradhiSubscription subscription, List consumerNodes ) { if (consumerNodes.isEmpty()) { - log.error("Shard Assignment Failure: No active consumer nodes."); throw new CapacityException("No active consumer node for Subscription assignment."); } List assignments = new ArrayList<>(); @@ -44,7 +43,7 @@ public List assign( } Assignment assignment = new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname()); - consumerNode.allocate(shard.getCapacityRequest()); + consumerNode.allocate(assignment, shard.getCapacityRequest()); assignments.add(assignment); consumers.add(consumerNode); } diff --git a/core/build.gradle b/core/build.gradle index 63682cfc..26b9162f 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -8,6 +8,7 @@ dependencies { api(project(':spi')) implementation("io.vertx:vertx-core:${vertx_version}") implementation('com.fasterxml.jackson.core:jackson-databind') + implementation("com.google.guava:guava") testFixturesImplementation(project(":common")) testImplementation(project(':pulsar')) diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/ConsumerApi.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/ConsumerApi.java index 79f1836b..82722538 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/cluster/ConsumerApi.java +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/ConsumerApi.java @@ -1,5 +1,6 @@ package com.flipkart.varadhi.core.cluster; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.entities.cluster.ShardStatus; @@ -8,5 +9,9 @@ public interface ConsumerApi { CompletableFuture start(ShardOperation.StartData operation); - CompletableFuture getStatus(String subscriptionId, int shardId); + CompletableFuture stop(ShardOperation.StopData operation); + + CompletableFuture getShardStatus(String subscriptionId, int shardId); + + CompletableFuture getConsumerInfo(); } diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/ControllerApi.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/ControllerApi.java index 78a2a8d6..5afb8b59 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/cluster/ControllerApi.java +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/ControllerApi.java @@ -14,9 +14,9 @@ public interface ControllerApi { String ROUTE_CONTROLLER = "controller"; - CompletableFuture startSubscription(SubscriptionOperation.StartData operation); + CompletableFuture startSubscription(String subscriptionId, String requestedBy); - CompletableFuture stopSubscription(SubscriptionOperation.StopData operation); + CompletableFuture stopSubscription(String subscriptionId, String requestedBy); CompletableFuture update(ShardOperation.OpData operation); } diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java index 8f160dea..51a52641 100644 --- a/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java +++ b/core/src/main/java/com/flipkart/varadhi/core/cluster/OperationMgr.java @@ -5,41 +5,208 @@ import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; import com.flipkart.varadhi.spi.db.OpStore; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; + + +@Slf4j public class OperationMgr { private final OpStore opStore; + private final ExecutorService executor; + private final Map> subOps; public OperationMgr(OpStore opStore) { this.opStore = opStore; + this.subOps = new ConcurrentHashMap<>(); + //TODO::Add config for number of threads. + //TODO::ExecutorService should emit the metrics. + this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setNameFormat("OpMgr-%d").build()); + } + + /** + * Task Execution + * - Task embeds a Subscription operation to be executed. + * - Task are keyed by subscription. A subscription can have multiple tasks waiting be executed. However only one + * will be executed. + * - Tasks for different subscriptions can be executed in parallel. Parallelism is controlled via number of threads + * in the executor service. + * - Task at the head of the subscription's scheduleTasks queue will be in progress and rest will be waiting. + * queueSubOp -- adds the operation to the queue. If it is only task, it will be immediately scheduled for execution. + * handleSubOpUpdate -- Removes the tasks from the queue, if finished (Completed or Errored). If current task + * finished, schedule next task for execution when available. + * TODO:: Implementation can be enhanced for below + * - Remove redundant operations from the queue. + * - Implement retry logic for failed operation to support auto recovery from temporary failure. + */ + + private void queueSubOp( + SubscriptionOperation operation, Function> opHandler + ) { + SubscriptionOperation.OpData pendingOp = operation.getData(); + subOps.compute(pendingOp.getSubscriptionId(), (subId, scheduledTasks) -> { + OpTask pendingTask = OpTask.of(operation, opHandler); + if (null == scheduledTasks) { + Deque taskQueue = new ArrayDeque<>(); + taskQueue.addLast(pendingTask); + executor.submit(pendingTask); + log.info("Scheduling the SubOp({}) for execution.", pendingOp); + return taskQueue; + } else { + // it means already some operations are scheduled, add this to queue. + int pending = scheduledTasks.size(); + log.info("Subscription has {} pending operations, queued SubOp({}).", pending, pendingOp); + // duplicate shouldn't happen unless it is called multiple times e.g. as part of retry. + boolean alreadyScheduled = false; + for (OpTask task : scheduledTasks) { + if (task.operation.getData().getOperationId().equals(pendingOp.getOperationId())) { + log.warn("SubOp({}) is already scheduled. Ignoring duplicate.", pendingOp); + alreadyScheduled = true; + } + } + if (!alreadyScheduled) { + scheduledTasks.addLast(pendingTask); + } + return scheduledTasks; + } + }); + } + + // This will execute the update on a subscription in a sequential order. Sequential execution is needed, to ensure + // parallel updates to Subscription operation from different shards, do not override each other. + private void handleSubOpUpdate( + SubscriptionOperation operation, Function> updateHandler + ) { + SubscriptionOperation.OpData updated = operation.getData(); + subOps.compute(operation.getData().getSubscriptionId(), (subId, scheduledTasks) -> { + if (null != scheduledTasks && !scheduledTasks.isEmpty()) { + + // process the update using provided handler. + // Update processing can take time, this will affect a subscription. + if (null != updateHandler) { + //TODO::apply failure. + updateHandler.apply(operation); + } + + SubscriptionOperation.OpData inProgress = scheduledTasks.peekFirst().operation.getData(); + if (!updated.getOperationId().equals(inProgress.getOperationId())) { + // This shouldn't happen as only task at the head is scheduled for execution. + log.error("Obtained update for waiting SubOp, Updated({}), InProgress({}).", updated, inProgress); + return scheduledTasks; + } + + // Remove completed operation from the pending list and schedule next operation if available. + if (operation.completed()) { + scheduledTasks.removeFirst(); + log.info("Completed SubOp({}) removed from the queue.", updated); + if (scheduledTasks.isEmpty()) { + return null; + } else { + OpTask waiting = scheduledTasks.peekFirst(); + log.info("Pending SubOp({}) scheduled for execution.", waiting.operation.getData()); + executor.submit(waiting); + } + } + return scheduledTasks; + } else { + log.error("SubOp {} not found for update.", updated); + return null; + } + }); } - public SubscriptionOperation requestSubStart(String subscriptionId, String requestedBy) { + public SubscriptionOperation requestSubStart( + String subscriptionId, String requestedBy, + Function> provider + ) { SubscriptionOperation operation = SubscriptionOperation.startOp(subscriptionId, requestedBy); opStore.createSubOp(operation); + queueSubOp(operation, provider); return operation; } - public SubscriptionOperation requestSubStop(String subscriptionId, String requestedBy) { + public SubscriptionOperation requestSubStop( + String subscriptionId, String requestedBy, + Function> provider + ) { SubscriptionOperation operation = SubscriptionOperation.stopOp(subscriptionId, requestedBy); opStore.createSubOp(operation); + queueSubOp(operation, provider); return operation; } - public void updateSubOp(SubscriptionOperation.OpData opData) { - SubscriptionOperation subOp = opStore.getSubOp(opData.getOperationId()); - subOp.update(opData); - opStore.updateSubOp(subOp); + public void updateSubOp(SubscriptionOperation subscriptionOp) { + handleSubOpUpdate(subscriptionOp, subOp -> { + // updating DB status in handler, to avoid version conflict. + SubscriptionOperation subOpLatest = opStore.getSubOp(subscriptionOp.getData().getOperationId()); + subOpLatest.update(subscriptionOp); + opStore.updateSubOp(subOpLatest); + return CompletableFuture.completedFuture(null); + }); + } + + public ShardOperation requestShardStart( + String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription + ) { + ShardOperation startOp = ShardOperation.startOp(subOpId, shard, subscription); + opStore.createShardOp(startOp); + return startOp; + } + + public ShardOperation requestShardStop( + String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription + ) { + ShardOperation stopOp = ShardOperation.stopOp(subOpId, shard, subscription); + opStore.createShardOp(stopOp); + return stopOp; } public void updateShardOp(ShardOperation.OpData opData) { - ShardOperation shardOp = opStore.getShardOp(opData.getOperationId()); - shardOp.update(opData); - opStore.updateShardOp(shardOp); + // updating DB status in handler for both Shard and Subscription op, to avoid version conflict. + SubscriptionOperation subscriptionOp = opStore.getSubOp(opData.getParentOpId()); + handleSubOpUpdate(subscriptionOp, subOp -> doShardOpAndSubscriptionOpUpdate(subOp, opData)); } - public ShardOperation.StartData requestShardStart(SubscriptionUnitShard shard, VaradhiSubscription subscription) { - ShardOperation startOp = ShardOperation.startOp(shard, subscription); - opStore.createShardOp(startOp); - return (ShardOperation.StartData) startOp.getOpData(); + private CompletableFuture doShardOpAndSubscriptionOpUpdate( + SubscriptionOperation subOp, ShardOperation.OpData opData + ) { + + ShardOperation shardOpLatest = opStore.getShardOp(opData.getOperationId()); + shardOpLatest.update(opData); + opStore.updateShardOp(shardOpLatest); + + List shardOps = new ArrayList<>(); + // db fetch can be avoided if it is a single sharded subscription i.e. SubscriptionUnitShard strategy. + if (1 == shardOpLatest.getOpData().getSubscription().getShards().getShardCount()) { + shardOps.add(shardOpLatest); + } else { + shardOps.addAll(opStore.getShardOps(shardOpLatest.getOpData().getParentOpId())); + } + subOp.update(shardOps); + opStore.updateSubOp(subOp); + return CompletableFuture.completedFuture(null); + } + + @AllArgsConstructor + static class OpTask implements Callable { + Function> opHandler; + private SubscriptionOperation operation; + + public static OpTask of( + SubscriptionOperation operation, Function> handler + ) { + return new OpTask(handler, operation); + } + + @Override + public Void call() { + //TODO::Fix what happens when opExecutor.apply fails. + opHandler.apply(operation); + return null; + } } } diff --git a/core/src/main/java/com/flipkart/varadhi/core/cluster/WebServerApi.java b/core/src/main/java/com/flipkart/varadhi/core/cluster/WebServerApi.java deleted file mode 100644 index cc6650ce..00000000 --- a/core/src/main/java/com/flipkart/varadhi/core/cluster/WebServerApi.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.flipkart.varadhi.core.cluster; - -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; - -import java.util.concurrent.CompletableFuture; - -/** - * Cluster Internal APIs. - * Web server APIs for handling the events during entity (Subscription as of now) lifecycle. - * This will be generally invoked when a respective events from the controller is received. - */ -public interface WebServerApi { - String ROUTE_WEBSERVER = "webserver"; - - /** - * Updates the status of an already scheduled operation and associated entity. - */ - CompletableFuture update(SubscriptionOperation.OpData operation); -} diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index 34ab886f..e3599e44 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -2,6 +2,7 @@ requires static lombok; requires org.slf4j; requires com.fasterxml.jackson.annotation; + requires com.google.common; requires com.flipkart.varadhi.common; requires com.flipkart.varadhi.spi; diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/Assignment.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/Assignment.java index 5d6d27c5..c78cf8e3 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/Assignment.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/Assignment.java @@ -31,4 +31,9 @@ public Assignment(String subscriptionId, int shardId, String consumerId) { private static String getShardName(String subscriptionId, int shardId) { return String.format("%s%s%d", subscriptionId, NAME_SEPARATOR, shardId); } + + @Override + public String toString() { + return String.format("{ConsumerId:%s Subscription:%s Shard:%d}", consumerId, subscriptionId, shardId); + } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java new file mode 100644 index 00000000..aacf17be --- /dev/null +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerInfo.java @@ -0,0 +1,18 @@ +package com.flipkart.varadhi.entities.cluster; + + +import com.flipkart.varadhi.entities.CapacityPolicy; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public class ConsumerInfo { + private String consumerId; + private CapacityPolicy available; + + public static ConsumerInfo from(MemberInfo memberInfo) { + return new ConsumerInfo( + memberInfo.hostname(), new CapacityPolicy(1000, memberInfo.capacity().getNetworkMBps())); + } +} diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java index b516e212..98937742 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ConsumerNode.java @@ -4,28 +4,46 @@ import lombok.Getter; import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; import static java.util.Comparator.comparing; - @Getter public class ConsumerNode { public static Comparator NodeComparator = comparing(o -> o.available); private final MemberInfo memberInfo; private final CapacityPolicy available; private boolean markedForDeletion; + private final Map assignments; public ConsumerNode(MemberInfo memberInfo) { this.memberInfo = memberInfo; this.markedForDeletion = false; this.available = new CapacityPolicy(1000, memberInfo.capacity().getNetworkMBps() * 1000); + this.assignments = new HashMap<>(); } public void markForDeletion() { this.markedForDeletion = true; } - public void allocate(CapacityPolicy requests) { - available.setMaxThroughputKBps(available.getMaxThroughputKBps() - requests.getMaxThroughputKBps()); + public void updateWithConsumerInfo(ConsumerInfo consumerInfo) { + available.setMaxThroughputKBps(consumerInfo.getAvailable().getMaxThroughputKBps()); + } + + public String getConsumerId() { + return memberInfo.hostname(); + } + + public synchronized void allocate(Assignment a, CapacityPolicy requests) { + if (null == assignments.putIfAbsent(a.getName(), a)) { + available.setMaxThroughputKBps(available.getMaxThroughputKBps() - requests.getMaxThroughputKBps()); + } + } + public synchronized void free(Assignment a, CapacityPolicy requests) { + if (null != assignments.remove(a.getName())){ + available.setMaxThroughputKBps(available.getMaxThroughputKBps() + requests.getMaxThroughputKBps()); + } } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardOperation.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardOperation.java index 0d620ac3..91713488 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardOperation.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardOperation.java @@ -10,12 +10,12 @@ import java.util.UUID; -@Value +@Getter @EqualsAndHashCode(callSuper = true) public class ShardOperation extends MetaStoreEntity { - long startTime; - long endTime; - OpData opData; + private final long startTime; + private long endTime; + private final OpData opData; @JsonCreator ShardOperation(String operationId, long startTime, long endTime, ShardOperation.OpData opData, int version) { @@ -32,13 +32,17 @@ public class ShardOperation extends MetaStoreEntity { this.opData = opData; } - public static ShardOperation startOp(SubscriptionUnitShard shard, VaradhiSubscription subscription) { - ShardOperation.OpData data = new ShardOperation.StartData(shard, subscription); + public static ShardOperation startOp(String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription) { + ShardOperation.OpData data = new ShardOperation.StartData(subOpId, shard, subscription); + return new ShardOperation(data); + } + + public static ShardOperation stopOp(String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription) { + ShardOperation.OpData data = new ShardOperation.StopData(subOpId, shard, subscription); return new ShardOperation(data); } public void update(ShardOperation.OpData updated) { - //TODO::check & fix setters if (!opData.operationId.equals(updated.operationId)) { throw new IllegalArgumentException("Update failed. Operation Id mismatch."); } @@ -46,9 +50,22 @@ public void update(ShardOperation.OpData updated) { opData.state = updated.state; } + public void markFail(String reason) { + opData.markFail(reason); + endTime = System.currentTimeMillis(); + } + + public boolean hasFailed() { + return opData.state == State.ERRORED; + } + + public boolean hasCompleted() { + return opData.state == State.ERRORED || opData.state == State.COMPLETED; + } + @Override public String toString() { - return String.format("ShardOperation{data=%s, startTime=%d, endTime=%d}", opData, startTime, endTime); + return String.format("{data=%s, startTime=%d, endTime=%d}", opData, startTime, endTime); } public enum State { @@ -60,8 +77,12 @@ public enum State { @AllArgsConstructor @NoArgsConstructor @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@opDataType") - @JsonSubTypes({@JsonSubTypes.Type(value = ShardOperation.StartData.class, name = "startShardData"),}) + @JsonSubTypes({ + @JsonSubTypes.Type(value = ShardOperation.StartData.class, name = "startShardData"), + @JsonSubTypes.Type(value = ShardOperation.StopData.class, name = "stopShardData"), + }) public static class OpData { + private String parentOpId; private String operationId; private int shardId; private SubscriptionUnitShard shard; @@ -74,15 +95,11 @@ public void markFail(String reason) { errorMsg = reason; } - public void markInProgress() { - state = ShardOperation.State.IN_PROGRESS; - } - @Override public String toString() { return String.format( - "OpData{Id='%s', shardId=%d, subscriptionId='%s', state=%s, errorMsg='%s'}", - operationId, shard.getShardId(), subscription.getName(), state, errorMsg + "OpData{ParentOpId=%s Id='%s', subscriptionId='%s', shardId=%d, state=%s, errorMsg='%s'}", + parentOpId, operationId, subscription.getName(), shard.getShardId(), state, errorMsg ); } } @@ -91,13 +108,26 @@ public String toString() { @AllArgsConstructor @EqualsAndHashCode(callSuper = true) public static class StartData extends ShardOperation.OpData { - StartData(SubscriptionUnitShard shard, VaradhiSubscription subscription) { - super(UUID.randomUUID().toString(), shard.getShardId(), shard, subscription, State.SCHEDULED, null); + StartData(String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription) { + super(subOpId, UUID.randomUUID().toString(), shard.getShardId(), shard, subscription, State.SCHEDULED, null); + } + + @Override + public String toString() { + return String.format("Start.%s", super.toString()); + } + } + + @AllArgsConstructor + @EqualsAndHashCode(callSuper = true) + public static class StopData extends ShardOperation.OpData { + StopData(String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription) { + super(subOpId, UUID.randomUUID().toString(), shard.getShardId(), shard, subscription, State.SCHEDULED, null); } @Override public String toString() { - return String.format("Start:%s", super.toString()); + return String.format("Stop.%s", super.toString()); } } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardStatus.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardStatus.java index 444229a2..95e55b21 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardStatus.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/ShardStatus.java @@ -13,6 +13,6 @@ public class ShardStatus { @JsonIgnore public boolean isAssigned() { - return state != ShardState.STOPPED && state != ShardState.UNKNOWN; + return state == ShardState.STARTED || state == ShardState.STARTING || state == ShardState.ERRORED; } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOpRequest.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOpRequest.java new file mode 100644 index 00000000..68bd1eb8 --- /dev/null +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOpRequest.java @@ -0,0 +1,11 @@ +package com.flipkart.varadhi.entities.cluster; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class SubscriptionOpRequest { + String SubscriptionId; + String requestedBy; +} diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOperation.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOperation.java index 8891f892..fb7905fc 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOperation.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionOperation.java @@ -6,6 +6,7 @@ import com.flipkart.varadhi.entities.MetaStoreEntity; import lombok.*; +import java.util.List; import java.util.UUID; @Getter @@ -13,7 +14,7 @@ public class SubscriptionOperation extends MetaStoreEntity { private final String requestedBy; private final long startTime; - private final long endTime; + private long endTime; private final OpData data; @JsonCreator @@ -45,19 +46,56 @@ public static SubscriptionOperation stopOp(String subscriptionId, String request return new SubscriptionOperation(data, requestedBy); } - public void update(OpData updated) { - if (!data.operationId.equals(updated.operationId)) { + public void markFail(String reason) { + data.state = State.ERRORED; + data.errorMsg = reason; + endTime = System.currentTimeMillis(); + } + + public void markCompleted() { + data.state = State.COMPLETED; + endTime = System.currentTimeMillis(); + } + + public void update(SubscriptionOperation updated) { + if (!data.operationId.equals(updated.data.operationId)) { throw new IllegalArgumentException("Update failed. Operation Id mismatch."); } - data.errorMsg = updated.errorMsg; - data.state = updated.state; + data.errorMsg = updated.data.errorMsg; + data.state = updated.data.state; + endTime = updated.endTime; + } + + public void update(List shardOps) { + StringBuilder sb = new StringBuilder(); + int completedCount = 0; + for(ShardOperation shardOp : shardOps) { + ShardOperation.OpData opData = shardOp.getOpData(); + if (shardOp.hasFailed()) { + sb.append(String.format("Shard:%d failed:%s", opData.getShardId(), opData.getErrorMsg())); + } + if (shardOp.hasCompleted()) { + completedCount++; + } + } + if (completedCount == shardOps.size()) { + if (sb.isEmpty()) { + markCompleted(); + } else { + markFail(sb.toString()); + } + } + } + + public boolean completed() { + return data.state == State.COMPLETED || data.state == State.ERRORED; } @Override public String toString() { return String.format( - "SubscriptionOperation{data=%s requestedBy='%s', startTime=%d, endTime=%d}", data, requestedBy, + "{data=%s requestedBy='%s', startTime=%d, endTime=%d}", data, requestedBy, startTime, endTime ); } @@ -79,24 +117,6 @@ public static class OpData { private String subscriptionId; private State state; private String errorMsg; - - public void markFail(String reason) { - state = State.ERRORED; - errorMsg = reason; - } - - public void markInProgress() { - state = State.IN_PROGRESS; - } - - public void markSuccess() { - state = State.COMPLETED; - } - - public boolean completed() { - return state == State.COMPLETED || state == State.ERRORED; - } - @Override public String toString() { return String.format( @@ -111,12 +131,12 @@ public String toString() { @EqualsAndHashCode(callSuper = true) public static class StartData extends OpData { StartData(String subscriptionId) { - super(UUID.randomUUID().toString(), subscriptionId, State.SCHEDULED, null); + super(UUID.randomUUID().toString(), subscriptionId, State.IN_PROGRESS, null); } @Override public String toString() { - return String.format("Start:%s", super.toString()); + return String.format("Start.%s", super.toString()); } } @@ -130,7 +150,7 @@ public static class StopData extends OpData { @Override public String toString() { - return String.format("Stop:%s", super.toString()); + return String.format("Stop.%s", super.toString()); } } } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionState.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionState.java new file mode 100644 index 00000000..6863e34d --- /dev/null +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionState.java @@ -0,0 +1,58 @@ +package com.flipkart.varadhi.entities.cluster; + +import com.flipkart.varadhi.entities.VaradhiSubscription; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public enum SubscriptionState { + STARTING, + RUNNING, + STOPPING, + STOPPED, + ERRORED; + + public static SubscriptionState getFromShardStates(List assignments, List shardStatuses) { + // len(ShardAssignment) == 0, State can be either of Stopped/Errored (Create/Delete failed). + + // len(ShardAssignment) > 0, State cn be Running, Stopping, Starting, Errored. + // ShardCount == Running Shard --> Running + // Starting shard > 0 --> Starting + // Stopping shard > 0 --> Stopping + + // Below would need additional details like last operation executed. + // For now all the below are being classified as Error'ed. + + // Starting > 0 & Stopping > 0 --> Starting/Stopping (when sub operation are allowed in parallel) + // ShardCount == Unknown shard --> Starting/Stopping + // ShardCount == Unknown Shard + Errored Shard --> Errored (Could be from Start or Stop) + // Errored shard > 0 --> Errored (Could be from Start or Stop) + + if (assignments.isEmpty()) { + //TODO:: error conditions are being ignored for now. + return STOPPED; + } + + Map stateCounts = new HashMap<>(); + shardStatuses.forEach(ss -> stateCounts.compute(ss.getState(), (k,v) -> v == null ? 1 : v+1)); + SubscriptionState subState; + int totalShards = shardStatuses.size(); + int runningShards = stateCounts.getOrDefault(ShardState.STARTED, 0); + int startingShards = stateCounts.getOrDefault(ShardState.STARTED, 0); + int stoppingShards = stateCounts.getOrDefault(ShardState.STARTED, 0); + if (totalShards == runningShards) { + subState = RUNNING; + }else if (startingShards > 0 && stoppingShards > 0) { + subState = ERRORED; + }else if (startingShards > 0) { + subState = STARTING; + }else if (stoppingShards > 0) { + subState = STOPPING; + }else{ + //TODO:: Other conditions are ignored for now and being folded into ERRORED. + subState = ERRORED; + } + return subState; + } +} diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionStatus.java b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionStatus.java new file mode 100644 index 00000000..76c78080 --- /dev/null +++ b/entities/src/main/java/com/flipkart/varadhi/entities/cluster/SubscriptionStatus.java @@ -0,0 +1,12 @@ +package com.flipkart.varadhi.entities.cluster; + + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class SubscriptionStatus { + private String subscriptionId; + private SubscriptionState state; +} diff --git a/server/src/main/java/com/flipkart/varadhi/WebServerApiManager.java b/server/src/main/java/com/flipkart/varadhi/WebServerApiManager.java deleted file mode 100644 index 70a2c1a7..00000000 --- a/server/src/main/java/com/flipkart/varadhi/WebServerApiManager.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.flipkart.varadhi; - - -import com.flipkart.varadhi.core.cluster.OperationMgr; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; -import com.flipkart.varadhi.core.cluster.WebServerApi; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.CompletableFuture; - -@Slf4j -public class WebServerApiManager implements WebServerApi { - private final OperationMgr operationMgr; - - public WebServerApiManager(OperationMgr operationMgr) { - this.operationMgr = operationMgr; - } - - @Override - public CompletableFuture update(SubscriptionOperation.OpData operation) { - log.debug("Received update on subscription: {}", operation); - operationMgr.updateSubOp(operation); - //TODO::Fix this. - return CompletableFuture.completedFuture(null); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MembershipListener.java b/server/src/main/java/com/flipkart/varadhi/cluster/MembershipListener.java index dbec6887..31530d61 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/MembershipListener.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/MembershipListener.java @@ -2,8 +2,10 @@ import com.flipkart.varadhi.entities.cluster.MemberInfo; +import java.util.concurrent.CompletableFuture; + public interface MembershipListener { - void joined(MemberInfo memberInfo); + CompletableFuture joined(MemberInfo memberInfo); - void left(String id); + CompletableFuture left(String id); } diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java b/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java index dfacc069..e555a435 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java @@ -52,7 +52,12 @@ public CompletableFuture request(String routeName, String apiNa vertxEventBus.request(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions, ar -> { if (ar.succeeded()) { log.debug("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body()); - future.complete(JsonMapper.jsonDeserialize((String) ar.result().body(), ResponseMessage.class)); + ResponseMessage response = JsonMapper.jsonDeserialize((String) ar.result().body(), ResponseMessage.class); + if (response.getException() != null) { + future.completeExceptionally(response.getException()); + }else { + future.complete(response); + } } else { log.error("request({}, {}) failure: {}.", apiPath, msg.getId(), ar.cause().getMessage()); future.completeExceptionally(ar.cause()); diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java b/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java index f55ffbab..19b6bf1c 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java @@ -15,8 +15,7 @@ * routeName -- name for a message route (e.g. controller, webserver, consumer-node-1). * apiName -- name of the method e.g. start/stop/update etc. * method -- either send, publish or request - * Address at which api handler is registered would be .. e.g. controller.start.send - * + * Address at which api handler is registered would be .. e.g. controller.start.send * publish methods are fire & forget. * send methods don't have any response, but their delivery can be tracked using the future. * request methods are for traditional request-response pattern. @@ -39,8 +38,17 @@ public void sendHandler(String routeName, String apiName, MsgHandler handler) { vertxEventBus.consumer(apiPath, message -> { ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class); log.debug("Received msg via - send({}, {})", apiPath, msg.getId()); - handler.handle(msg); // this is async invocation. - message.reply("received ok", deliveryOptions); + try { + // this is async invocation. + handler.handle(msg); + } catch (Exception e) { + log.error("send handler.handle({}) Unhandled exception: {}", message.body(), e.getMessage()); + } finally { + // Send ensures only delivery and not execution. + // Client will not be aware of any kind of failure either in invocation or in execution of the + // message's send handler + message.reply("received ok", deliveryOptions); + } }); } @@ -51,9 +59,21 @@ public void requestHandler( vertxEventBus.consumer(apiPath, message -> { ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class); log.debug("Received msg via - request({}, {})", apiPath, msg.getId()); - handler.handle(msg).thenAccept(response -> message.reply(JsonMapper.jsonSerialize(response), - deliveryOptions - )); // this is async invocation. + try { + handler.handle(msg).thenAccept(response -> message.reply( + JsonMapper.jsonSerialize(response), + deliveryOptions + )).exceptionally(t -> { + log.error("request handler completed exceptionally: {}", t.getMessage()); + ResponseMessage response = msg.getResponseMessage(t); + message.reply(JsonMapper.jsonSerialize(response), deliveryOptions); + return null; + }); + } catch (Exception e) { + log.error("request handler Unhandled exception: {}", e.getMessage()); + ResponseMessage response = msg.getResponseMessage(e); + message.reply(JsonMapper.jsonSerialize(response), deliveryOptions); + } }); } diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/custom/VaradhiZkClusterManager.java b/server/src/main/java/com/flipkart/varadhi/cluster/custom/VaradhiZkClusterManager.java index 8c666f75..30f43f1c 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/custom/VaradhiZkClusterManager.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/custom/VaradhiZkClusterManager.java @@ -56,7 +56,8 @@ public Future> getAllMembers() { getNodes().forEach( nodeId -> allFutures.add(Failsafe.with(NodeInfoRetryPolicy) .getStageAsync(() -> fetchNodeInfo(nodeId).toCompletionStage()) - .thenApply(nodeInfo -> JsonMapper.jsonDeserialize(nodeInfo.metadata().toString(), + .thenApply(nodeInfo -> JsonMapper.jsonDeserialize( + nodeInfo.metadata().toString(), MemberInfo.class )) .whenComplete((nodeInfo, throwable) -> { @@ -65,7 +66,6 @@ public Future> getAllMembers() { } }) )); - return Future.fromCompletionStage(CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0])) .thenApply(v -> allFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()))); } @@ -75,15 +75,25 @@ public void addMembershipListener(MembershipListener listener) { nodeListener(new NodeListener() { @Override public void nodeAdded(String nodeId) { + log.debug("Node {} joined.", nodeId); Failsafe.with(NodeInfoRetryPolicy).getStageAsync(() -> fetchNodeInfo(nodeId).toCompletionStage()) .whenComplete((nodeInfo, throwable) -> { if (throwable != null) { // ignore the failure for now. Listener will not be notified of the change. log.error("Failed to get nodeInfo for member: {}.", nodeId, throwable); } else { - log.debug("Member {} joined from {}:{}.", nodeId, nodeInfo.host(), nodeInfo.port()); - MemberInfo memberInfo = nodeInfo.metadata().mapTo(MemberInfo.class); - listener.joined(memberInfo); + try { + log.debug("Member {} joined from {}:{}.", nodeId, nodeInfo.host(), nodeInfo.port()); + MemberInfo memberInfo = nodeInfo.metadata().mapTo(MemberInfo.class); + listener.joined(memberInfo) + .exceptionally(t -> { + log.error("MembershipListener.joined({}) failed, {}.", nodeId, t.getMessage()); + return null; + }); + } catch (Exception e) { + log.error("MembershipListener.joined({}) failed, {}.", nodeId, e.getMessage()); + throw e; + } } }); } @@ -91,7 +101,15 @@ public void nodeAdded(String nodeId) { @Override public void nodeLeft(String nodeId) { log.debug("Node {} left.", nodeId); - listener.left(nodeId); + try { + listener.left(nodeId).exceptionally(t -> { + log.error("MembershipListener.left({}) failed, {}.", nodeId, t.getMessage()); + return null; + }); + }catch (Exception e) { + log.error("MembershipListener.left({}) failed, {}.", nodeId, e.getMessage()); + throw e; + } } }); } diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/messages/ClusterMessage.java b/server/src/main/java/com/flipkart/varadhi/cluster/messages/ClusterMessage.java index ed0c4ce2..eb0ca6bf 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/messages/ClusterMessage.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/messages/ClusterMessage.java @@ -12,7 +12,7 @@ public class ClusterMessage { private final long timeStamp; private final String payload; - public ClusterMessage(String payload) { + ClusterMessage(String payload) { this.id = java.util.UUID.randomUUID().toString(); this.timeStamp = System.currentTimeMillis(); this.payload = payload; @@ -39,6 +39,14 @@ public static ClusterMessage of(ShardRequest request) { return new ClusterMessage(JsonMapper.jsonSerialize(request)); } + public static ClusterMessage of (T payload) { + return new ClusterMessage(JsonMapper.jsonSerialize(payload)); + } + + public static ClusterMessage of() { + return new ClusterMessage(null); + } + public T getData(Class clazz) { return JsonMapper.jsonDeserialize(payload, clazz); } @@ -49,6 +57,10 @@ public T getRequest(Class clazz) { } public ResponseMessage getResponseMessage(Object payload) { - return ResponseMessage.of(payload, id); + return ResponseMessage.fromPayload(payload, id); + } + + public ResponseMessage getResponseMessage(Exception exception) { + return ResponseMessage.fromException(exception, id); } } diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/messages/MsgHandler.java b/server/src/main/java/com/flipkart/varadhi/cluster/messages/MsgHandler.java index 1b6a40e1..860e7455 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/messages/MsgHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/messages/MsgHandler.java @@ -1,6 +1,5 @@ package com.flipkart.varadhi.cluster.messages; -@FunctionalInterface public interface MsgHandler { void handle(ClusterMessage message); } diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/messages/ResponseMessage.java b/server/src/main/java/com/flipkart/varadhi/cluster/messages/ResponseMessage.java index be8d0005..9ebb2d03 100644 --- a/server/src/main/java/com/flipkart/varadhi/cluster/messages/ResponseMessage.java +++ b/server/src/main/java/com/flipkart/varadhi/cluster/messages/ResponseMessage.java @@ -2,29 +2,39 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.flipkart.varadhi.utils.JsonMapper; +import lombok.Getter; +@Getter public class ResponseMessage extends ClusterMessage { private final String requestId; + private final Exception exception; - ResponseMessage(String payload, String requestId) { + ResponseMessage(String payload, Exception exception, String requestId) { super(payload); this.requestId = requestId; + this.exception = exception; } @JsonCreator - ResponseMessage(String id, long timeStamp, String payload, String requestId) { + ResponseMessage(String id, long timeStamp, String payload, String requestId, Exception exception) { super(id, timeStamp, payload); this.requestId = requestId; + this.exception = exception; } - public static ResponseMessage of(Object payload, String requestId) { + public static ResponseMessage fromPayload(Object payload, String requestId) { // This will result in double serialization of the operation object, below and during eventbus call. - return new ResponseMessage(JsonMapper.jsonSerialize(payload), requestId); + return new ResponseMessage(JsonMapper.jsonSerialize(payload), null, requestId); + } + + public static ResponseMessage fromException(Exception exception, String requestId) { + // This will result in double serialization of the operation object, below and during eventbus call. + return new ResponseMessage(null, exception, requestId); } public T getResponse(Class clazz) { - //TODO:: there is no enfrocement on payload, i.e. payload can be deserialized as request/data/response. + //TODO:: there is no enforcement on payload, i.e. payload can be deserialized as request/data/response. return JsonMapper.jsonDeserialize(getPayload(), clazz); } } diff --git a/server/src/main/java/com/flipkart/varadhi/db/AssignmentStoreImpl.java b/server/src/main/java/com/flipkart/varadhi/db/AssignmentStoreImpl.java index f89fd4d4..f7792162 100644 --- a/server/src/main/java/com/flipkart/varadhi/db/AssignmentStoreImpl.java +++ b/server/src/main/java/com/flipkart/varadhi/db/AssignmentStoreImpl.java @@ -26,9 +26,16 @@ private void ensureEntityTypePathExists() { @Override public void createAssignments(List assignments) { - List zNodes = new ArrayList<>(); - assignments.forEach(a -> zNodes.add(ZNode.OfAssignment(getAssignmentMapping(a)))); - zkMetaStore.executeInTransaction(zNodes, new ArrayList<>()); + List nodesToCreate = new ArrayList<>(); + assignments.forEach(a -> nodesToCreate.add(ZNode.OfAssignment(getAssignmentMapping(a)))); + zkMetaStore.executeInTransaction(nodesToCreate, new ArrayList<>()); + } + + @Override + public void deleteAssignments(List assignments) { + List nodesToDelete = new ArrayList<>(); + assignments.forEach(a -> nodesToDelete.add(ZNode.OfAssignment(getAssignmentMapping(a)))); + zkMetaStore.executeInTransaction( new ArrayList<>(), nodesToDelete); } @Override @@ -43,6 +50,7 @@ public List getConsumerNodeAssignments(String consumerNodeId) { private List getAssignments(Pattern pattern) { + List as = zkMetaStore.listChildren(ZNode.OfEntityType(ZNode.ASSIGNMENT)); return zkMetaStore.listChildren(ZNode.OfEntityType(ZNode.ASSIGNMENT)).stream() .filter(m -> pattern.matcher(m).matches()).map(this::getAssignment).collect(Collectors.toList()); } diff --git a/server/src/main/java/com/flipkart/varadhi/db/OpStoreImpl.java b/server/src/main/java/com/flipkart/varadhi/db/OpStoreImpl.java index 49bd812f..aba4d92c 100644 --- a/server/src/main/java/com/flipkart/varadhi/db/OpStoreImpl.java +++ b/server/src/main/java/com/flipkart/varadhi/db/OpStoreImpl.java @@ -5,6 +5,9 @@ import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; import org.apache.curator.framework.CuratorFramework; +import java.util.ArrayList; +import java.util.List; + import static com.flipkart.varadhi.db.ZNode.*; public class OpStoreImpl implements OpStore { @@ -27,6 +30,19 @@ public void createSubOp(SubscriptionOperation operation) { zkMetaStore.createZNodeWithData(znode, operation); } + @Override + public SubscriptionOperation getSubOp(String operationId) { + ZNode znode = ZNode.OfSubOperation(operationId); + return zkMetaStore.getZNodeDataAsPojo(znode, SubscriptionOperation.class); + } + + @Override + public void updateSubOp(SubscriptionOperation operation) { + ZNode znode = ZNode.OfSubOperation(operation.getName()); + zkMetaStore.updateZNodeWithData(znode, operation); + } + + @Override public void createShardOp(ShardOperation operation) { ZNode znode = ZNode.OfShardOperation(operation.getName()); @@ -34,23 +50,25 @@ public void createShardOp(ShardOperation operation) { } @Override - public SubscriptionOperation getSubOp(String operationId) { - ZNode znode = ZNode.OfSubOperation(operationId); - return zkMetaStore.getZNodeDataAsPojo(znode, SubscriptionOperation.class); + public List getShardOps(String operationId) { + //TODO::implement this. + //TODO::This needs to improvise i.e shouldn't need to deserialize all. + List shardOpIds = zkMetaStore.listChildren(ZNode.OfEntityType(SHARD_OP)); + List shardOps = new ArrayList<>(); + shardOpIds.forEach(id -> { + ShardOperation shardOp = zkMetaStore.getZNodeDataAsPojo(ZNode.OfShardOperation(id), ShardOperation.class); + if (operationId.equals(shardOp.getOpData().getParentOpId())) { + shardOps.add(shardOp); + } + }); + return shardOps; } - @Override public ShardOperation getShardOp(String operationId) { ZNode znode = ZNode.OfShardOperation(operationId); return zkMetaStore.getZNodeDataAsPojo(znode, ShardOperation.class); } - @Override - public void updateSubOp(SubscriptionOperation operation) { - ZNode znode = ZNode.OfSubOperation(operation.getName()); - zkMetaStore.updateZNodeWithData(znode, operation); - } - @Override public void updateShardOp(ShardOperation operation) { ZNode znode = ZNode.OfShardOperation(operation.getName()); diff --git a/server/src/main/java/com/flipkart/varadhi/db/ZKMetaStore.java b/server/src/main/java/com/flipkart/varadhi/db/ZKMetaStore.java index 9d3c2ad2..f957aed2 100644 --- a/server/src/main/java/com/flipkart/varadhi/db/ZKMetaStore.java +++ b/server/src/main/java/com/flipkart/varadhi/db/ZKMetaStore.java @@ -179,7 +179,7 @@ public void executeInTransaction(List toAdd, List toDelete) { }); throw new MetaStoreException(String.format("Failed to execute a batch operation %s.", e.getMessage()), e); } catch (Exception e) { - throw new MetaStoreException("Failed to execute batch operation.", e); + throw new MetaStoreException(String.format("Failed to execute a batch operation %s.", e.getMessage()), e); } } diff --git a/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java b/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java index 9d5976c1..22b55e74 100644 --- a/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java +++ b/server/src/main/java/com/flipkart/varadhi/services/SubscriptionService.java @@ -2,27 +2,27 @@ import com.flipkart.varadhi.core.cluster.ControllerApi; -import com.flipkart.varadhi.core.cluster.OperationMgr; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; import com.flipkart.varadhi.entities.VaradhiSubscription; import com.flipkart.varadhi.entities.VaradhiTopic; +import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; import com.flipkart.varadhi.exceptions.InvalidOperationForResourceException; import com.flipkart.varadhi.spi.db.MetaStore; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import static com.flipkart.varadhi.entities.VersionedEntity.INITIAL_VERSION; +@Slf4j public class SubscriptionService { private final MetaStore metaStore; private final ControllerApi controllerApi; - private final OperationMgr operationMgr; - public SubscriptionService(ControllerApi controllerApi, OperationMgr operationMgr, MetaStore metaStore) { + public SubscriptionService(ControllerApi controllerApi, MetaStore metaStore) { this.metaStore = metaStore; this.controllerApi = controllerApi; - this.operationMgr = operationMgr; } public List getSubscriptionList(String projectName) { @@ -40,14 +40,12 @@ public VaradhiSubscription createSubscription(VaradhiSubscription subscription) return subscription; } - public void start(String subscriptionName, String requestedBy) { - SubscriptionOperation op = operationMgr.requestSubStart(subscriptionName, requestedBy); - controllerApi.startSubscription((SubscriptionOperation.StartData) op.getData()); + public CompletableFuture start(String subscriptionName, String requestedBy) { + return controllerApi.startSubscription(subscriptionName, requestedBy); } - public void stop(String subscriptionName, String requestedBy) { - SubscriptionOperation op = operationMgr.requestSubStop(subscriptionName, requestedBy); - controllerApi.stopSubscription((SubscriptionOperation.StopData) op.getData()); + public CompletableFuture stop(String subscriptionName, String requestedBy) { + return controllerApi.stopSubscription(subscriptionName, requestedBy); } private void validateCreation(VaradhiSubscription subscription) { diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerApiHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerApiHandler.java index e2f000c0..9d033db0 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerApiHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerApiHandler.java @@ -6,10 +6,12 @@ import com.flipkart.varadhi.consumer.ConsumerApiMgr; import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.verticles.controller.ControllerClient; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; +@Slf4j public class ConsumerApiHandler { private final ControllerClient controllerClient; private final ConsumerApiMgr consumerApiMgr; @@ -26,10 +28,22 @@ public void start(ClusterMessage message) { controllerClient.update(startOp); } + public void stop(ClusterMessage message) { + ShardOperation.StopData stopData = message.getData(ShardOperation.StopData.class); + consumerApiMgr.stop(stopData); + stopData.markFail("Failed to stop subscription"); + controllerClient.update(stopData); + } + public CompletableFuture status(ClusterMessage message) { ShardRequest request = message.getRequest(ShardRequest.class); - return consumerApiMgr.getStatus(request.getSubscriptionId(), request.getShardId()) + return consumerApiMgr.getShardStatus(request.getSubscriptionId(), request.getShardId()) .thenApply(message::getResponseMessage); } + public CompletableFuture info(ClusterMessage message) { + log.info("Consumer info called"); + return consumerApiMgr.getConsumerInfo().thenApply(message::getResponseMessage); + } + } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerClient.java b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerClient.java index 74b0370e..425a1891 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerClient.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerClient.java @@ -2,6 +2,7 @@ import com.flipkart.varadhi.cluster.MessageExchange; import com.flipkart.varadhi.cluster.messages.ClusterMessage; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.entities.cluster.ShardRequest; import com.flipkart.varadhi.core.cluster.ConsumerApi; import com.flipkart.varadhi.entities.cluster.ShardOperation; @@ -28,8 +29,23 @@ public CompletableFuture start(ShardOperation.StartData operation) { } @Override - public CompletableFuture getStatus(String subscriptionId, int shardId) { + public CompletableFuture stop(ShardOperation.StopData operation) { + ClusterMessage message = ClusterMessage.of(operation); + log.debug("Sending message {}", message); + return exchange.send(consumerId, "stop", message); + } + + @Override + public CompletableFuture getShardStatus(String subscriptionId, int shardId) { return exchange.request(consumerId, "status", ClusterMessage.of(new ShardRequest(subscriptionId, shardId))) - .thenApply(responseMessage -> responseMessage.getResponse(ShardStatus.class)); + .thenApply(rm -> rm.getResponse(ShardStatus.class)); + } + + @Override + public CompletableFuture getConsumerInfo() { + ClusterMessage message = ClusterMessage.of(); + log.debug("Sending info request:{} {}", consumerId, message); + return exchange.request(consumerId, "info", message) + .thenApply(rm -> rm.getResponse(ConsumerInfo.class)); } } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java index f5ee1890..56adbc84 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/consumer/ConsumerVerticle.java @@ -6,6 +6,7 @@ import com.flipkart.varadhi.consumer.ConsumerApiMgr; import com.flipkart.varadhi.consumer.ConsumersManager; import com.flipkart.varadhi.consumer.impl.ConsumersManagerImpl; +import com.flipkart.varadhi.entities.cluster.ConsumerInfo; import com.flipkart.varadhi.entities.cluster.MemberInfo; import com.flipkart.varadhi.verticles.controller.ControllerClient; import io.vertx.core.AbstractVerticle; @@ -14,18 +15,18 @@ public class ConsumerVerticle extends AbstractVerticle { private final VaradhiClusterManager clusterManager; - private final String consumerId; + private final MemberInfo memberInfo; public ConsumerVerticle(MemberInfo memberInfo, VaradhiClusterManager clusterManager) { this.clusterManager = clusterManager; - consumerId = memberInfo.hostname(); + this.memberInfo = memberInfo; } @Override public void start(Promise startPromise) { MessageRouter messageRouter = clusterManager.getRouter(vertx); MessageExchange messageExchange = clusterManager.getExchange(vertx); - ConsumersManager consumersManager = new ConsumersManagerImpl(); + ConsumersManager consumersManager = new ConsumersManagerImpl(ConsumerInfo.from(memberInfo)); ControllerClient controllerClient = new ControllerClient(messageExchange); ConsumerApiMgr consumerApiManager = new ConsumerApiMgr(consumersManager); @@ -40,7 +41,10 @@ public void stop(Promise stopPromise) { } private void setupApiHandlers(MessageRouter messageRouter, ConsumerApiHandler handler) { + String consumerId = memberInfo.hostname(); messageRouter.sendHandler(consumerId, "start", handler::start); + messageRouter.sendHandler(consumerId, "stop", handler::stop); messageRouter.requestHandler(consumerId, "status", handler::status); + messageRouter.requestHandler(consumerId, "info", handler::info); } } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerApiHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerApiHandler.java index 3a262cca..bea5456a 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerApiHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerApiHandler.java @@ -1,45 +1,38 @@ package com.flipkart.varadhi.verticles.controller; import com.flipkart.varadhi.cluster.messages.ClusterMessage; +import com.flipkart.varadhi.cluster.messages.ResponseMessage; import com.flipkart.varadhi.core.cluster.ControllerApi; import com.flipkart.varadhi.entities.cluster.ShardOperation; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; -import com.flipkart.varadhi.core.cluster.WebServerApi; +import com.flipkart.varadhi.entities.cluster.SubscriptionOpRequest; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; +@Slf4j public class ControllerApiHandler { private final ControllerApi controllerMgr; - private final WebServerApi webServerApiProxy; - public ControllerApiHandler(ControllerApi controllerMgr, WebServerApi webServerApiProxy) { + public ControllerApiHandler(ControllerApi controllerMgr) { this.controllerMgr = controllerMgr; - this.webServerApiProxy = webServerApiProxy; } - public CompletableFuture start(ClusterMessage message) { - SubscriptionOperation.StartData operation = message.getData(SubscriptionOperation.StartData.class); - return controllerMgr.startSubscription(operation).exceptionally(throwable -> { - //TODO:: is this exceptionally block correct, or should it be try/catch ? - operation.markFail(throwable.getMessage()); - webServerApiProxy.update(operation); - return null; - }); + public CompletableFuture start(ClusterMessage message) { + SubscriptionOpRequest request = message.getRequest(SubscriptionOpRequest.class); + return controllerMgr.startSubscription(request.getSubscriptionId(), request.getRequestedBy()) + .thenApply(message::getResponseMessage); } - public CompletableFuture stop(ClusterMessage message) { - SubscriptionOperation.StopData operation = message.getData(SubscriptionOperation.StopData.class); - return controllerMgr.stopSubscription(operation).exceptionally(throwable -> { - operation.markFail(throwable.getMessage()); - webServerApiProxy.update(operation); - return null; - }); + public CompletableFuture stop(ClusterMessage message) { + SubscriptionOpRequest request = message.getRequest(SubscriptionOpRequest.class); + return controllerMgr.stopSubscription(request.getSubscriptionId(), request.getRequestedBy()) + .thenApply(message::getResponseMessage); } - public CompletableFuture update(ClusterMessage message) { + public void update(ClusterMessage message) { ShardOperation.OpData operation = message.getData(ShardOperation.OpData.class); - return controllerMgr.update(operation).exceptionally(throwable -> { - //TODO::handle failure to update. + controllerMgr.update(operation).exceptionally(throwable -> { + log.error("Shard update ({}) failed.", operation); return null; }); } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerClient.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerClient.java index 63e52be2..8ed7d473 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerClient.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerClient.java @@ -4,6 +4,7 @@ import com.flipkart.varadhi.cluster.messages.ClusterMessage; import com.flipkart.varadhi.core.cluster.ControllerApi; import com.flipkart.varadhi.entities.cluster.ShardOperation; +import com.flipkart.varadhi.entities.cluster.SubscriptionOpRequest; import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; import java.util.concurrent.CompletableFuture; @@ -16,15 +17,17 @@ public ControllerClient(MessageExchange exchange) { } @Override - public CompletableFuture startSubscription(SubscriptionOperation.StartData operation) { - ClusterMessage message = ClusterMessage.of(operation); - return exchange.send(ROUTE_CONTROLLER, "start", message); + public CompletableFuture startSubscription(String subscriptionId, String requestedBy) { + SubscriptionOpRequest opRequest = new SubscriptionOpRequest(subscriptionId, requestedBy); + ClusterMessage message = ClusterMessage.of(opRequest); + return exchange.request(ROUTE_CONTROLLER, "start", message).thenApply(rm -> rm.getResponse(SubscriptionOperation.class)); } @Override - public CompletableFuture stopSubscription(SubscriptionOperation.StopData operation) { - ClusterMessage message = ClusterMessage.of(operation); - return exchange.send(ROUTE_CONTROLLER, "stop", message); + public CompletableFuture stopSubscription(String subscriptionId, String requestedBy) { + SubscriptionOpRequest opRequest = new SubscriptionOpRequest(subscriptionId, requestedBy); + ClusterMessage message = ClusterMessage.of(opRequest); + return exchange.request(ROUTE_CONTROLLER, "stop", message).thenApply(rm -> rm.getResponse(SubscriptionOperation.class)); } @Override diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java index 8c18df1e..4e06f32f 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java @@ -5,18 +5,19 @@ import com.flipkart.varadhi.entities.cluster.ConsumerNode; import com.flipkart.varadhi.entities.cluster.ComponentKind; import com.flipkart.varadhi.core.cluster.ConsumerClientFactory; +import com.flipkart.varadhi.exceptions.NotImplementedException; import com.flipkart.varadhi.spi.db.MetaStoreProvider; import com.flipkart.varadhi.verticles.consumer.ConsumerClientFactoryImpl; -import com.flipkart.varadhi.verticles.webserver.WebServerClient; import com.flipkart.varadhi.controller.ControllerApiMgr; import com.flipkart.varadhi.entities.cluster.MemberInfo; -import com.flipkart.varadhi.core.cluster.WebServerApi; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.Promise; import lombok.extern.slf4j.Slf4j; import java.util.List; +import java.util.concurrent.CompletableFuture; import static com.flipkart.varadhi.core.cluster.ControllerApi.ROUTE_CONTROLLER; @@ -24,22 +25,23 @@ public class ControllerVerticle extends AbstractVerticle { private final VaradhiClusterManager clusterManager; private final MetaStoreProvider metaStoreProvider; + private final MeterRegistry meterRegistry; public ControllerVerticle(CoreServices coreServices, VaradhiClusterManager clusterManager) { this.clusterManager = clusterManager; this.metaStoreProvider = coreServices.getMetaStoreProvider(); + this.meterRegistry = coreServices.getMeterRegistry(); } @Override public void start(Promise startPromise) { MessageRouter messageRouter = clusterManager.getRouter(vertx); MessageExchange messageExchange = clusterManager.getExchange(vertx); - WebServerApi serverApiProxy = new WebServerClient(messageExchange); ConsumerClientFactory consumerClientFactory = new ConsumerClientFactoryImpl(messageExchange); ControllerApiMgr controllerApiMgr = - new ControllerApiMgr(serverApiProxy, consumerClientFactory, metaStoreProvider); - ControllerApiHandler handler = new ControllerApiHandler(controllerApiMgr, serverApiProxy); + new ControllerApiMgr(consumerClientFactory, metaStoreProvider, meterRegistry); + ControllerApiHandler handler = new ControllerApiHandler(controllerApiMgr); //TODO::Assuming one controller node for time being. Leader election needs to be added. onLeaderElected(controllerApiMgr, handler, messageRouter).onComplete(ar -> { @@ -57,18 +59,25 @@ private Future onLeaderElected( // any failures should give up the leadership. setupMembershipListener(controllerApiMgr); - return clusterManager.getAllMembers().onComplete(ar -> { - if (ar.succeeded()) { - List consumerNodes = - ar.result().stream().filter(memberInfo -> memberInfo.hasRole(ComponentKind.Consumer)) - .map(ConsumerNode::new).toList(); - - controllerApiMgr.addConsumerNodes(consumerNodes); - setupApiHandlers(messageRouter, handler); - } else { + return clusterManager.getAllMembers().compose(allMembers -> { + List consumerNodes = + allMembers.stream().filter(memberInfo -> memberInfo.hasRole(ComponentKind.Consumer)) + .map(ConsumerNode::new).toList(); + log.info("Available Consumer Nodes {}", consumerNodes.size()); + return Future.fromCompletionStage(controllerApiMgr.addConsumerNodes(consumerNodes) + .thenAccept(v -> setupApiHandlers(messageRouter, handler))); + }).onComplete(ar -> { + if (ar.failed()) { log.error("Failed to get all members. Giving up leadership.", ar.cause()); + abortLeaderShip(); + } else { + log.info("Leadership obtained successfully"); } - }).map(a -> null); + }).mapEmpty(); + } + + private void abortLeaderShip() { + throw new NotImplementedException("abortLeaderShip to be implemented."); } @Override @@ -77,29 +86,30 @@ public void stop(Promise stopPromise) { } private void setupApiHandlers(MessageRouter messageRouter, ControllerApiHandler handler) { - messageRouter.sendHandler(ROUTE_CONTROLLER, "start", handler::start); - messageRouter.sendHandler(ROUTE_CONTROLLER, "stop", handler::stop); + messageRouter.requestHandler(ROUTE_CONTROLLER, "start", handler::start); + messageRouter.requestHandler(ROUTE_CONTROLLER, "stop", handler::stop); messageRouter.sendHandler(ROUTE_CONTROLLER, "update", handler::update); } private void setupMembershipListener(ControllerApiMgr controllerApiMgr) { clusterManager.addMembershipListener(new MembershipListener() { @Override - public void joined(MemberInfo memberInfo) { + public CompletableFuture joined(MemberInfo memberInfo) { log.info("Member joined: {}", memberInfo); if (memberInfo.hasRole(ComponentKind.Consumer)) { ConsumerNode consumerNode = new ConsumerNode(memberInfo); - controllerApiMgr.consumerNodeJoined(consumerNode); + return controllerApiMgr.consumerNodeJoined(consumerNode); } + return CompletableFuture.completedFuture(null); } @Override - public void left(String memberId) { + public CompletableFuture left(String memberId) { // Vertx cluster manager provides only the path (i.e.) id and not data of the node. // It will be good, to have a data, so filtering can be done for non-consumer nodes. // Currently, this filtering is taken care implicitly. log.info("Member left: {}", memberId); - controllerApiMgr.consumerNodeLeft(memberId); + return controllerApiMgr.consumerNodeLeft(memberId); } }); } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerApiHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerApiHandler.java deleted file mode 100644 index fbda0c41..00000000 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerApiHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.flipkart.varadhi.verticles.webserver; - -import com.flipkart.varadhi.WebServerApiManager; -import com.flipkart.varadhi.cluster.messages.ClusterMessage; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.CompletableFuture; - -@Slf4j -public class WebServerApiHandler { - private final WebServerApiManager serverOpMgr; - - public WebServerApiHandler(WebServerApiManager serverOpMgr) { - this.serverOpMgr = serverOpMgr; - } - - public CompletableFuture update(ClusterMessage message) { - SubscriptionOperation.OpData operation = message.getData(SubscriptionOperation.OpData.class); - return serverOpMgr.update(operation); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerClient.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerClient.java deleted file mode 100644 index 14120a90..00000000 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerClient.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.flipkart.varadhi.verticles.webserver; - -import com.flipkart.varadhi.cluster.MessageExchange; -import com.flipkart.varadhi.cluster.messages.ClusterMessage; -import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; -import com.flipkart.varadhi.core.cluster.WebServerApi; - -import java.util.concurrent.CompletableFuture; - -public class WebServerClient implements WebServerApi { - private final MessageExchange exchange; - - public WebServerClient(MessageExchange exchange) { - this.exchange = exchange; - } - - @Override - public CompletableFuture update(SubscriptionOperation.OpData operation) { - ClusterMessage message = ClusterMessage.of(operation); - return exchange.send(ROUTE_WEBSERVER, "update", message); - } -} diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java index 52fdf5c4..7a716bc9 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java @@ -1,11 +1,8 @@ package com.flipkart.varadhi.verticles.webserver; import com.flipkart.varadhi.CoreServices; -import com.flipkart.varadhi.WebServerApiManager; import com.flipkart.varadhi.auth.DefaultAuthorizationProvider; -import com.flipkart.varadhi.cluster.MessageRouter; import com.flipkart.varadhi.cluster.VaradhiClusterManager; -import com.flipkart.varadhi.core.cluster.OperationMgr; import com.flipkart.varadhi.verticles.controller.ControllerClient; import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.core.VaradhiTopicFactory; @@ -38,7 +35,6 @@ import java.util.*; -import static com.flipkart.varadhi.core.cluster.WebServerApi.ROUTE_WEBSERVER; @Slf4j @ExtensionMethod({Extensions.RoutingContextExtension.class}) @@ -50,7 +46,6 @@ public class WebServerVerticle extends AbstractVerticle { private final VaradhiClusterManager clusterManager; private final MessagingStackProvider messagingStackProvider; private final MetaStore metaStore; - private final OperationMgr operationMgr; private final MeterRegistry meterRegistry; private final Tracer tracer; private OrgService orgService; @@ -68,7 +63,6 @@ public WebServerVerticle( this.clusterManager = clusterManager; this.messagingStackProvider = services.getMessagingStackProvider(); this.metaStore = services.getMetaStoreProvider().getMetaStore(); - this.operationMgr = new OperationMgr(services.getMetaStoreProvider().getOpStore()); this.meterRegistry = services.getMeterRegistry(); this.tracer = services.getTracer("varadhi"); } @@ -97,7 +91,6 @@ public static Handler wrapBlockingExecution(Vertx vertx, Handler @Override public void start(Promise startPromise) { setupEntityServices(); - setupClusterApiRoutes(); performValidations(); startHttpServer(startPromise); } @@ -118,7 +111,7 @@ private void setupEntityServices() { projectService = new ProjectService(metaStore, projectCacheSpec, meterRegistry); varadhiTopicService = new VaradhiTopicService(messagingStackProvider.getStorageTopicService(), metaStore); ControllerApi controllerApiProxy = new ControllerClient(clusterManager.getExchange(vertx)); - subscriptionService = new SubscriptionService(controllerApiProxy, operationMgr, metaStore); + subscriptionService = new SubscriptionService(controllerApiProxy, metaStore); } private void performValidations() { @@ -129,12 +122,6 @@ private void performValidations() { } } - private void setupClusterApiRoutes() { - MessageRouter messageRouter = clusterManager.getRouter(vertx); - WebServerApiHandler handler = new WebServerApiHandler(new WebServerApiManager(operationMgr)); - messageRouter.sendHandler(ROUTE_WEBSERVER, "update", handler::update); - } - private void startHttpServer(Promise startPromise) { Router router = createApiRouter(); httpServer = vertx.createHttpServer(configuration.getHttpServerOptions()).requestHandler(router).listen(h -> { diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java index 2fb16fe4..46643032 100644 --- a/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java +++ b/server/src/main/java/com/flipkart/varadhi/web/v1/admin/SubscriptionHandlers.java @@ -136,11 +136,29 @@ public void delete(RoutingContext ctx) { } public void start(RoutingContext ctx) { - subscriptionService.start(SubscriptionHelper.buildSubscriptionName(ctx), ctx.getIdentityOrDefault()); + subscriptionService.start(SubscriptionHelper.buildSubscriptionName(ctx), ctx.getIdentityOrDefault()) + .whenComplete( + (sd, error) -> { + if (error != null) { + ctx.endRequestWithException(error); + } else { + ctx.endApiWithResponse(sd); + } + } + ); } public void stop(RoutingContext ctx) { - subscriptionService.stop(SubscriptionHelper.buildSubscriptionName(ctx), ctx.getIdentityOrDefault()); + subscriptionService.stop(SubscriptionHelper.buildSubscriptionName(ctx), ctx.getIdentityOrDefault()) + .whenComplete( + (sd, error) -> { + if (error != null) { + ctx.endRequestWithException(error); + } else { + ctx.endApiWithResponse(sd); + } + } + ); } private SubscriptionResource getValidSubscriptionResource(RoutingContext ctx) { diff --git a/server/src/main/resources/log4j2.xml b/server/src/main/resources/log4j2.xml index 26021efa..92b996d8 100644 --- a/server/src/main/resources/log4j2.xml +++ b/server/src/main/resources/log4j2.xml @@ -15,7 +15,7 @@ TODO:: This is not being auto picked. Needs to be fixed. - + diff --git a/server/src/test/java/com/flipkart/varadhi/cluster/MessageRouterImplTest.java b/server/src/test/java/com/flipkart/varadhi/cluster/MessageRouterImplTest.java index e68faed0..ce67fe80 100644 --- a/server/src/test/java/com/flipkart/varadhi/cluster/MessageRouterImplTest.java +++ b/server/src/test/java/com/flipkart/varadhi/cluster/MessageRouterImplTest.java @@ -63,6 +63,6 @@ public void testSendMessageConsumerCollocated(VertxTestContext testContext) thro } ClusterMessage getClusterMessage(String data) { - return new ClusterMessage(data); + return ClusterMessage.of(data); } } diff --git a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java index 6d71a4e7..61737cf8 100644 --- a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java +++ b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java @@ -48,7 +48,6 @@ void setUp() throws Exception { zkCuratorTestingServer.getConnectString(), new ExponentialBackoffRetry(1000, 1))); zkCurator.start(); varadhiMetaStore = spy(new VaradhiMetaStore(zkCurator)); - OperationMgr operationMgr = new OperationMgr(new OpStoreImpl(zkCurator)); orgService = new OrgService(varadhiMetaStore); teamService = new TeamService(varadhiMetaStore); @@ -70,7 +69,7 @@ void setUp() throws Exception { projectService.createProject(o1t1p1); projectService.createProject(o1t1p2); - subscriptionService = new SubscriptionService(null, operationMgr, varadhiMetaStore); + subscriptionService = new SubscriptionService(null, varadhiMetaStore); } @Test diff --git a/spi/src/main/java/com/flipkart/varadhi/spi/db/AssignmentStore.java b/spi/src/main/java/com/flipkart/varadhi/spi/db/AssignmentStore.java index 920d90d5..2b74ac7c 100644 --- a/spi/src/main/java/com/flipkart/varadhi/spi/db/AssignmentStore.java +++ b/spi/src/main/java/com/flipkart/varadhi/spi/db/AssignmentStore.java @@ -7,6 +7,8 @@ public interface AssignmentStore { void createAssignments(List assignments); + void deleteAssignments(List assignments); + List getSubscriptionAssignments(String subscriptionName); List getConsumerNodeAssignments(String consumerNodeId); diff --git a/spi/src/main/java/com/flipkart/varadhi/spi/db/OpStore.java b/spi/src/main/java/com/flipkart/varadhi/spi/db/OpStore.java index cb74c4e4..cc9789bb 100644 --- a/spi/src/main/java/com/flipkart/varadhi/spi/db/OpStore.java +++ b/spi/src/main/java/com/flipkart/varadhi/spi/db/OpStore.java @@ -4,16 +4,22 @@ import com.flipkart.varadhi.entities.cluster.ShardOperation; import com.flipkart.varadhi.entities.cluster.SubscriptionOperation; +import java.util.List; + public interface OpStore { void createSubOp(SubscriptionOperation operation); - void createShardOp(ShardOperation operation); - SubscriptionOperation getSubOp(String operationId); - ShardOperation getShardOp(String operationId); void updateSubOp(SubscriptionOperation operation); + + void createShardOp(ShardOperation operation); + + List getShardOps(String operationId); + + ShardOperation getShardOp(String operationId); + void updateShardOp(ShardOperation operation); }