Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding consumer spi, varadhi consumer, consumer manager interfaces #100

Merged
6 changes: 5 additions & 1 deletion consumer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ plugins {
}

dependencies {

api(project(':entities'))
api(project(':spi'))

// async interface are using vertx.Future
api("io.vertx:vertx-core")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.flipkart.varadhi.consumer;

/**
* Status for a consumer that has been started.
*/
public enum ConsumerState {

/**
* varadhi consumer will try to make delivery based on the subscription details.
*/
CONSUMING,

/**
* varadhi consumer has trapped an uncaught exception and is haulted.
*/
ERRORED,

/**
* varadhi consumer is delivering at throttled rate, lost likely due to destination errors and subscription's
* consumption policy.
* Throttled is a wide bucket. TODO: evaluate splitting based on the requirements later on.
*/
THROTTLED,

/**
* varadhi consumer is paused and not making message delivery. But it <i>may</i> be processing failed groups.
*/
PAUSED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.spi.services.TopicPartitions;
import io.vertx.core.Future;

/**
* Management layer that will manage consumers for multiple subscriptions.
*/
public interface ConsumersManager {

/**
* Start consumption for a shard of a subscription. If subscription failed, the future will contain the exception
* details.
* `shardName` identifies the different shards within the subscription.
*
* @return
*/
Future<Void> startSubscription(
Project project,
String subscription,
String shardName,
TopicPartitions<?> topic,
boolean grouped,
Endpoint endpoint,
ConsumptionPolicy consumptionPolicy,
ConsumptionFailurePolicy failurePolicy
);

Future<Void> stopSubscription(String subscription, String shardName);

Future<Void> pauseSubscription(String subscription, String shardName);

Future<Void> resumeSubscription(String subscription, String shardName);

ConsumerState getConsumerState(String subscription, String shardName);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.RetryPolicy;
import com.flipkart.varadhi.entities.StorageTopic;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public class ConsumptionFailurePolicy {

private final RetryPolicy retryPolicy;

private final StorageRetryTopic retryTopic;

private final StorageTopic deadLetterTopic;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.flipkart.varadhi.consumer;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flipkart.varadhi.entities.StorageTopic;

public class StorageRetryTopic {

private final StorageTopic[] retryTopics;

public StorageRetryTopic(StorageTopic[] retryTopics) {
this.retryTopics = retryTopics;
}

/**
* @param retryCount 1-based retry count
*
* @return the storage topic for the given retry count
*/
@JsonIgnore
public StorageTopic getTopicForRetry(int retryCount) {
return retryTopics[retryCount - 1];
}

@JsonIgnore
public int getMaxRetryCount() {
return retryTopics.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.VaradhiSubscription;

/**
* Base varadhi consumer to consume messages from a varadhi topic. It corresponds to a single shard of some
* subscription.
*/
public interface VaradhiConsumer {

String getSubscriptionName();

int getShardId();

ConsumerState getState();

/**
* Start the consumer. It will start message delivery from the last committed offset. If no offset is committed, it
* can start from the earliest or latest offset based on the implementation.
*
* @throws Exception bad configuration can result in exception in which case the consumer will not start.
*/
void start() throws Exception;

/**
* Pause the consumer. It just means that message delivery will stop happening immediately.
*/
void pause();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific use case for pause()/resume() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentioned in previous comment.

"stopping" a consumer means releasing all the resources along with it. closing consumer objects. rleeasing all the buffers. and so on. So stopping and then starting "takes" time.
"pause" is a middle ground. it doesn't release anything. it just pauses message delivery. so the effect is immediate.


/**
* Resume from paused state. It means that message delivery will start happening immediately.
*/
void resume();

/**
* Close the consumer. It will stop message delivery and release all resources. After calling this calling other
* methods on this consumer will result in an exception.
*/
void close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start/stop or close ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chose to stick to terminology from java's closeable. Stop has a connotation that it can be started again.
closed means closed. it cannot be started again.

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.entities;


import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@shardsStrategy")
@JsonSubTypes({
@JsonSubTypes.Type(value = UnitSubscriptionShard.class, name = "unit"),
})
public abstract class SubscriptionShards {
private final int shardCount;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.flipkart.varadhi.entities;

import lombok.Getter;

@Getter
public class UnitSubscriptionShard extends SubscriptionShards {

private final int shardId;

private final RetryTopic retryTopic;

private final InternalCompositeTopic deadLetterTopic;

public UnitSubscriptionShard(int shardId, RetryTopic retryTopic, InternalCompositeTopic deadLetterTopic) {
super(1);
this.shardId = shardId;
this.retryTopic = retryTopic;
this.deadLetterTopic = deadLetterTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class VaradhiSubscription extends MetaStoreEntity {
Endpoint endpoint;
RetryPolicy retryPolicy;
ConsumptionPolicy consumptionPolicy;
SubscriptionShard[] shards;
SubscriptionShards shards;

public VaradhiSubscription(
String name,
Expand All @@ -25,7 +25,7 @@ public VaradhiSubscription(
Endpoint endpoint,
RetryPolicy retryPolicy,
ConsumptionPolicy consumptionPolicy,
SubscriptionShard[] shards
SubscriptionShards shards
) {
super(name, version);
this.project = project;
Expand All @@ -35,7 +35,7 @@ public VaradhiSubscription(
this.endpoint = endpoint;
this.retryPolicy = retryPolicy;
this.consumptionPolicy = consumptionPolicy;
if (shards == null || shards.length == 0) {
if (shards == null || shards.getShardCount() <= 0) {
throw new IllegalArgumentException("shards cannot be null or empty");
}
this.shards = shards;
Expand Down
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation(project(":messaging"))
implementation(project(":entities"))
implementation(project(":authz"))
implementation(project(":consumer"))

implementation("org.apache.logging.log4j:log4j-slf4j2-impl")
implementation("org.apache.logging.log4j:log4j-core")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.flipkart.varadhi.utils;

import com.flipkart.varadhi.entities.UnitSubscriptionShard;
import com.flipkart.varadhi.entities.SubscriptionResource;
import com.flipkart.varadhi.entities.SubscriptionShard;
import com.flipkart.varadhi.entities.SubscriptionShards;
import com.flipkart.varadhi.entities.VaradhiSubscription;

import static com.flipkart.varadhi.entities.VersionedEntity.NAME_SEPARATOR;
Expand All @@ -11,8 +12,7 @@ public final class SubscriptionHelper {
private SubscriptionHelper() {
}

private static final SubscriptionShard[] dummyShards =
new SubscriptionShard[]{new SubscriptionShard(0, null, null, null)};
private static final SubscriptionShards dummyShards = new UnitSubscriptionShard(0, null, null);

public static VaradhiSubscription fromResource(SubscriptionResource subscriptionResource, int version) {
return new VaradhiSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ public class SubscriptionHandlersTest extends WebTestBase {
1, 1, 1, 1
);
private static final ConsumptionPolicy consumptionPolicy = new ConsumptionPolicy(1, 1, false, 1, null);
private static final SubscriptionShard[] shards =
new SubscriptionShard[]{new SubscriptionShard(0, null, null, null)};
private static final SubscriptionShards shards = new UnitSubscriptionShard(0, null, null);

static {
try {
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
*/

rootProject.name = 'varadhi'
include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'server', 'authz', 'consumer')
include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'consumer', 'authz', 'server')
34 changes: 34 additions & 0 deletions spi/src/main/java/com/flipkart/varadhi/spi/services/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.flipkart.varadhi.spi.services;

import com.flipkart.varadhi.entities.Offset;

import java.util.concurrent.CompletableFuture;

/**
* Consumer interface to receive messages from a topic.
*/
public interface Consumer<O extends Offset> {

/**
* Receive a batch of messages from the subscribed topics.
*
* @return
*/
CompletableFuture<PolledMessages<O>> receiveAsync();

/**
* Commit upto the offset, signifying that all messages upto (& including) the offset have been processed.
*
* @param topic
* @param partition
* @param offset
*
* @return
*/
CompletableFuture<Void> commitCumulative(String topic, int partition, O offset);

/**
* Unsubscribes the topics and closes the consumer object and any associated resources.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.flipkart.varadhi.spi.services;

import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.entities.StorageTopic;

import java.util.Collection;

public interface ConsumerFactory<T extends StorageTopic, O extends Offset> {

// TODO: need to see if we need property bag param or other params

/**
* @param topics to be consumed
* @param name name of the consumer. synonymous to consumer group / subscription name
*
* @return Consumer instance
*
* @throws MessagingException
*/
Consumer<O> getConsumer(Collection<TopicPartitions<T>> topics, String name) throws MessagingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.spi.services;

import com.flipkart.varadhi.entities.Offset;
import lombok.AllArgsConstructor;

/**
* Represent a batch of messages polled from a particular partition.
*/
@AllArgsConstructor
public class PartitionMessages<O extends Offset> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. will below placed in spi or entities ?
  2. should paritition be abstracted as well or how it will be converted into tech stack specific identifier e.g. in pulsar partitioned topic has its own identifier ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioned topic has its own identifier - that is similar to kafka. I think you mean the partitions withing the partitioned topic has its own identifier.
And non-partitioned topic does not even have partition id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will below placed in spi or entities ? - will evaluate.

Copy link
Collaborator Author

@gauravAshok gauravAshok Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this in spi itself? I am thinking that this class exists due to spi interfaces. If the interfaces are not there, then there is no need for this entity itself. This entity pertains to spi interfaces.

And about the partition abstraction, I think this particular works for kafka & pulsar at least. In case of non-partitioned scenario, the partition id can be -1. and the topic can point to the relevant reference.

I am also thinking of platforms that dont even have partitioned concept. To support them in future, I think instead of coming up with 1 general interface that somehow fits all usecases, we can broadly define 2 feature capabalities. Partitioned vs non-partitioned.
partitioned stack like (kafka, pulsar google pub-sub lite) can fit in the model present in this PR.
whereas a non-partitioned stack (whcih also will require a different sharding model) can use a different set of interface.

This feature capability value will obviously come from the messagingStackProvider.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we dont rely on partitions at all. that will work in all cases.


private final String topic;

private final int partition;

private final PolledMessage<O> messages;
}
Loading
Loading