Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller - Operation handling -- WIP #146

Merged
merged 31 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f571f2c
messagechannel.send() implementation
kmrdhruv Feb 13, 2024
fc704e9
Merge branch 'master' into controller
kmrdhruv Feb 27, 2024
b021412
moving cluster related details to server module
kmrdhruv Feb 27, 2024
edcd600
merge
kmrdhruv Feb 28, 2024
59cc57a
Merge branch 'master' into controller
kmrdhruv Mar 1, 2024
f5d484c
- refactoring needed for controller bootstrapping
kmrdhruv Mar 7, 2024
f243139
Merge branch 'master' into controller
kmrdhruv Mar 7, 2024
f498775
add string override to channel method
kmrdhruv Mar 7, 2024
c0d9a5e
fix commented test and remove obselete message handler
kmrdhruv Mar 7, 2024
0c7b269
Merge branch 'master' into controller
kmrdhruv Mar 18, 2024
1fd93f8
review comments
kmrdhruv Mar 19, 2024
1f43d2e
avoid http port conflict in for server deployment in UTs
kmrdhruv Mar 19, 2024
597df03
Merge branch 'master' into controller
kmrdhruv Mar 19, 2024
ccfed68
cluster manager and refactoring
kmrdhruv Mar 21, 2024
249d957
fixing TODOs, Router tests and implementation of getAllMembers() in c…
kmrdhruv Mar 23, 2024
8d849a3
Merge branch 'master' into controller
kmrdhruv Apr 3, 2024
48d28b2
Merge branch 'master' into controller
kmrdhruv Apr 3, 2024
4caa516
replacing Component with Verticle
kmrdhruv Apr 3, 2024
08ad83f
removing hard coded 127.0.0.1
kmrdhruv Apr 4, 2024
a97b531
class name fix in gradle file
kmrdhruv Apr 18, 2024
61b1175
Merge branch 'master' into controller
kmrdhruv Apr 20, 2024
615c422
StartSubscription operation and Consumer verticle bootsrap code flow.
kmrdhruv May 9, 2024
f978900
Merge branch 'master' into controller
kmrdhruv May 9, 2024
9fa7a9f
adding persistence to shard assignment and operations.
kmrdhruv May 17, 2024
50c96b1
Merge branch 'master' into controller
kmrdhruv May 17, 2024
19a1363
pr fixes and improvements
kmrdhruv May 18, 2024
68bc1b3
fixing subscription failing test
kmrdhruv May 18, 2024
caf9082
Merge branch 'master' into controller
kmrdhruv May 27, 2024
094a279
Merge branch 'master' into controller
kmrdhruv May 27, 2024
16211de
serializing the shard assignment for all, subscription operation exe…
kmrdhruv May 30, 2024
c210b37
Merge branch 'master' into controller
kmrdhruv May 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.core.cluster.ConsumerApi;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.entities.cluster.ShardState;
Expand Down Expand Up @@ -33,7 +34,22 @@ public CompletableFuture<Void> start(ShardOperation.StartData operation) {
}

@Override
public CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId) {
public CompletableFuture<Void> stop(ShardOperation.StopData operation) {
VaradhiSubscription subscription = operation.getSubscription();
return consumersManager.stopSubscription(
subscription.getName(),
""
);
}

@Override
public CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId) {
return CompletableFuture.completedFuture(new ShardStatus(ShardState.UNKNOWN, "Not a owner of shard"));
}

@Override
public CompletableFuture<ConsumerInfo> getConsumerInfo() {
//TODO::Return assignments as well.
return CompletableFuture.completedFuture(consumersManager.getInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -40,4 +41,6 @@ CompletableFuture<Void> startSubscription(
ConsumerState getConsumerState(String subscription, String shardName);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..

ConsumerInfo getInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import com.flipkart.varadhi.consumer.ConsumerState;
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.concurrent.CompletableFuture;

public class ConsumersManagerImpl implements ConsumersManager {
private final ConsumerInfo consumerInfo;


public ConsumersManagerImpl() {
public ConsumersManagerImpl(ConsumerInfo consumerInfo) {
this.consumerInfo = consumerInfo;
}

@Override
Expand All @@ -28,7 +27,7 @@ public CompletableFuture<Void> startSubscription(

@Override
public CompletableFuture<Void> stopSubscription(String subscription, String shardName) {
return null;
return CompletableFuture.completedFuture(null);
}

@Override
Expand All @@ -45,4 +44,9 @@ public void resumeSubscription(String subscription, String shardName) {
public ConsumerState getConsumerState(String subscription, String shardName) {
return null;
}

@Override
public ConsumerInfo getInfo() {
return consumerInfo;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.entities.SubscriptionShards;
import com.flipkart.varadhi.entities.cluster.Assignment;
import com.flipkart.varadhi.entities.cluster.ConsumerNode;
import com.flipkart.varadhi.controller.impl.LeastAssignedStrategy;
import com.flipkart.varadhi.entities.SubscriptionUnitShard;
import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.spi.db.AssignmentStore;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableBoolean;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Slf4j
public class ShardAssigner {
private final String metricPrefix = "controller.assigner";
private final AssignmentStrategy strategy;
private final Map<String, ConsumerNode> consumerNodes;
private final AssignmentStore assignmentStore;
private final ExecutorService executor;

public ShardAssigner(AssignmentStore assignmentStore) {
public ShardAssigner(AssignmentStore assignmentStore, MeterRegistry meterRegistry) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meter registry is not being used, is that expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Metric addition is pending task and will be taken subsequently. Here we need to emit metrics around threadpool and associated tasks.

this.strategy = new LeastAssignedStrategy();
this.consumerNodes = new ConcurrentHashMap<>();
this.assignmentStore = assignmentStore;
//TODO::ExecutorService should emit the metrics.
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("assigner-%d").build());
}

public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
Expand All @@ -33,16 +42,60 @@ public void addConsumerNodes(List<ConsumerNode> clusterConsumers) {
});
}

public List<Assignment> assignShard(List<SubscriptionUnitShard> shards, VaradhiSubscription subscription) {
//TODO:: It need to ensure, assignment is not using stale values, specifically if they are running in parallel.
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info("AssignShards consumer nodes active:{} of total:{}", activeConsumers.size(), consumerNodes.size());
List<Assignment> assignments = strategy.assign(shards, subscription, activeConsumers);
assignmentStore.createAssignments(assignments);
return assignments;
public CompletableFuture<List<Assignment>> assignShard(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription
) {
return CompletableFuture.supplyAsync(() -> {
List<ConsumerNode> activeConsumers =
consumerNodes.values().stream().filter(c -> !c.isMarkedForDeletion()).collect(Collectors.toList());
log.info(
"AssignShards found consumer nodes active:{} of total:{}", activeConsumers.size(),
consumerNodes.size()
);

List<Assignment> assignments = new ArrayList<>();
try {
assignments.addAll(strategy.assign(shards, subscription, activeConsumers));
assignmentStore.createAssignments(assignments);
return assignments;
} catch (Exception e) {
log.error("Failed while creating assignment, freeing up any allocation done. {}.", e.getMessage());
assignments.forEach(assignment -> freeAssignedCapacity(assignment, subscription));
throw e;
}
}, executor);
}


public CompletableFuture<Void> unAssignShard(List<Assignment> assignments, VaradhiSubscription subscription) {
return CompletableFuture.supplyAsync(() -> {
try {
assignmentStore.deleteAssignments(assignments);
assignments.forEach(a -> {
String consumerId = a.getConsumerId();
ConsumerNode consumerNode = consumerNodes.getOrDefault(consumerId, null);
if (null == consumerNode) {
log.error("Consumer node not found, for assignment {}. Ignoring unAssignShard", a);
} else {
SubscriptionShards shards = subscription.getShards();
consumerNode.free(a, shards.getShard(a.getShardId()).getCapacityRequest());
}
});
return null;
} catch (Exception e) {
log.error("Failed while unAssigning Shards. {}.", e.getMessage());
throw e;
}
}, executor);
}

private void freeAssignedCapacity(Assignment assignment, VaradhiSubscription subscription) {
SubscriptionUnitShard shard = subscription.getShards().getShard(assignment.getShardId());
ConsumerNode consumerNode = consumerNodes.get(assignment.getConsumerId());
consumerNode.free(assignment, shard.getCapacityRequest());
}


public List<Assignment> getSubscriptionAssignment(String subscriptionName) {
return assignmentStore.getSubscriptionAssignments(subscriptionName);
}
Expand All @@ -54,19 +107,6 @@ public void consumerNodeJoined(ConsumerNode consumerNode) {
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

public void consumerNodeLeft(String consumerNodeId) {
//TODO:: re-assign the shards (should this be trigger from here or from the controller) ?
MutableBoolean marked = new MutableBoolean(false);
Expand All @@ -81,4 +121,18 @@ public void consumerNodeLeft(String consumerNodeId) {
log.warn("ConsumerNode {} not found.", consumerNodeId);
}
}

private boolean addConsumerNode(ConsumerNode consumerNode) {
String consumerNodeId = consumerNode.getMemberInfo().hostname();
MutableBoolean added = new MutableBoolean(false);
consumerNodes.computeIfAbsent(consumerNodeId, k -> {
added.setTrue();
return consumerNode;
});
if (!added.booleanValue()) {
log.warn("ConsumerNode {} already exists. Not adding again.", consumerNodes.get(consumerNodeId));
}
return added.booleanValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public List<Assignment> assign(
List<SubscriptionUnitShard> shards, VaradhiSubscription subscription, List<ConsumerNode> consumerNodes
) {
if (consumerNodes.isEmpty()) {
log.error("Shard Assignment Failure: No active consumer nodes.");
throw new CapacityException("No active consumer node for Subscription assignment.");
}
List<Assignment> assignments = new ArrayList<>();
Expand All @@ -44,7 +43,7 @@ public List<Assignment> assign(
}
Assignment assignment =
new Assignment(subscription.getName(), shard.getShardId(), consumerNode.getMemberInfo().hostname());
consumerNode.allocate(shard.getCapacityRequest());
consumerNode.allocate(assignment, shard.getCapacityRequest());
assignments.add(assignment);
consumers.add(consumerNode);
}
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api(project(':spi'))
implementation("io.vertx:vertx-core:${vertx_version}")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("com.google.guava:guava")

testFixturesImplementation(project(":common"))
testImplementation(project(':pulsar'))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.core.cluster;

import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.cluster.ShardStatus;

Expand All @@ -8,5 +9,9 @@
public interface ConsumerApi {
CompletableFuture<Void> start(ShardOperation.StartData operation);

CompletableFuture<ShardStatus> getStatus(String subscriptionId, int shardId);
CompletableFuture<Void> stop(ShardOperation.StopData operation);

CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int shardId);

CompletableFuture<ConsumerInfo> getConsumerInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
public interface ControllerApi {
String ROUTE_CONTROLLER = "controller";

CompletableFuture<Void> startSubscription(SubscriptionOperation.StartData operation);
CompletableFuture<SubscriptionOperation> startSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> stopSubscription(SubscriptionOperation.StopData operation);
CompletableFuture<SubscriptionOperation> stopSubscription(String subscriptionId, String requestedBy);

CompletableFuture<Void> update(ShardOperation.OpData operation);
}
Loading
Loading