Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding consumer spi, varadhi consumer, consumer manager interfaces #100

Merged

Conversation

gauravAshok
Copy link
Collaborator

Closes #72
Closes #73

#73 talks about node registeration, but that is already taken care of in another open PR.

  • What is the identifier of a subscription? It is kind of needed to refer to it when we want to stop it or ask info about it. Check ConsumersManager.
  • I have kept ConsumersManager in server module only for now. If it remains largely independent of server workings, then we can put it in consumer module in future refactoring.
  • The SubscriptionShard info is looking like a separate entity from core Subscription entity. During startSubscription operation, the params are core sub entity and one of its shard, but right now since shard is inside the Subscription entity, the startSubscription operation is only taking Subscription entity and putting the constraint that it must only contain one shard info, which is wierd.

*/
void close();

// TODO: evaluate pause and resume methods.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most likely not going to be there.

@codecov-commenter
Copy link

codecov-commenter commented Feb 8, 2024

Codecov Report

Attention: Patch coverage is 14.28571% with 6 lines in your changes are missing coverage. Please review.

Project coverage is 68.95%. Comparing base (4785457) to head (4334aaa).

Files Patch % Lines
...ipkart/varadhi/entities/UnitSubscriptionShard.java 0.00% 5 Missing ⚠️
...flipkart/varadhi/entities/VaradhiSubscription.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master     #100      +/-   ##
============================================
- Coverage     69.09%   68.95%   -0.14%     
  Complexity      527      527              
============================================
  Files           111      112       +1     
  Lines          2466     2471       +5     
  Branches        154      154              
============================================
  Hits           1704     1704              
- Misses          707      712       +5     
  Partials         55       55              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

*/
THROTTLED,

PAUSED
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will PAUSED be used ? Primarily differentiating it w.r.to CONSUMING i.e. what would be state when subscription is running, but not consuming as there is nothing to consume ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think "pause" meaning stop message delivery immediately as a nice feature. It has nothing to do with allocation.
Basically an api, that you can use to "pause" message delivery and which works immediately.

we can also think of downgrading extended "pause" duration to "stopped" state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription is running, but not consuming as there is nothing to consume - it will be CONSUMING state.
CONSUMING is a "state" in which the consumer is in. When in "Consuming", the varadhi consumer will be attempting message delivery for un-delivered messages.
But if there are no undelivered messages, then obviously the destination wont get any. The consume rate can be fetched orthogonally from the stats exposed by the varadhi consumer.

PAUSED - means that consumer is intentionally not delivering any messages.

* Close the consumer. It will stop message delivery and release all resources. After calling this calling other
* methods on this consumer will result in an exception.
*/
void close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start/stop or close ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chose to stick to terminology from java's closeable. Stop has a connotation that it can be started again.
closed means closed. it cannot be started again.

*/
public interface VaradhiConsumer {

VaradhiSubscription getSubscription();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shard or Subscription ? Any specific reason why it need to provide getter for this or a utility method ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya. this will be reworked to point to a shard. but getter will be there. it is a consumer for a subscription, so a pure getter to again fetch the subscirption details for whcih this consumer was created seems useful. Also, the same can probably be used to check whether the consumer is created for a subscription with an old version.

If it turns out that the this method is not useful, we can remove it.

/**
* Pause the consumer. It just means that message delivery will stop happening immediately.
*/
void pause();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific use case for pause()/resume() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentioned in previous comment.

"stopping" a consumer means releasing all the resources along with it. closing consumer objects. rleeasing all the buffers. and so on. So stopping and then starting "takes" time.
"pause" is a middle ground. it doesn't release anything. it just pauses message delivery. so the effect is immediate.


ConsumerStatus getConsumerStatus(String subscription, int shardId);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ConsumerStatus is specific to an entity in a given state, shouldn't be a status of an entity and state included in it ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't follow exactly.

*
* @return
*/
Future<Void> startSubscription(VaradhiSubscription subscription);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not define these operation on shard entity instead of subscription ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is :) as the description mentioned, subscription is not the correct object to work with as that contain shard internally.
Here, the subscription object is only supposed to contain a single shard info. so this object is not the same the object being persisted.
Anyway, this will be fixed, with the VaradhiSubscription refactor where we move shard info out.

*/
Future<Void> startSubscription(VaradhiSubscription subscription);

Future<Void> stopSubscription(String subscription, int shardId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardId or shardNum ? (Should shardId be given a notion of globally unique if needed)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it does not need to be indentified globally.
This needs some id unique under the context of subscription. If the subscription object is global then sure the shard id together with sub id becomes globally unique automatically.

But as mentioned in the description we probably need subscription identifier to be able to use in inter-node communication. But this identifier does not have to be the identifier that is being used in the api layer.

* Represent a batch of messages polled from a particular partition.
*/
@AllArgsConstructor
public class PartitionMessages<O extends Offset> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. will below placed in spi or entities ?
  2. should paritition be abstracted as well or how it will be converted into tech stack specific identifier e.g. in pulsar partitioned topic has its own identifier ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioned topic has its own identifier - that is similar to kafka. I think you mean the partitions withing the partitioned topic has its own identifier.
And non-partitioned topic does not even have partition id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will below placed in spi or entities ? - will evaluate.

Copy link
Collaborator Author

@gauravAshok gauravAshok Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this in spi itself? I am thinking that this class exists due to spi interfaces. If the interfaces are not there, then there is no need for this entity itself. This entity pertains to spi interfaces.

And about the partition abstraction, I think this particular works for kafka & pulsar at least. In case of non-partitioned scenario, the partition id can be -1. and the topic can point to the relevant reference.

I am also thinking of platforms that dont even have partitioned concept. To support them in future, I think instead of coming up with 1 general interface that somehow fits all usecases, we can broadly define 2 feature capabalities. Partitioned vs non-partitioned.
partitioned stack like (kafka, pulsar google pub-sub lite) can fit in the model present in this PR.
whereas a non-partitioned stack (whcih also will require a different sharding model) can use a different set of interface.

This feature capability value will obviously come from the messagingStackProvider.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we dont rely on partitions at all. that will work in all cases.

/**
* @return the payload of this message.
*/
byte[] payload();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about headers ? should it be message object instead with those semantics attached ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont remember, but did we have some discussion around this before?

Copy link
Collaborator Author

@gauravAshok gauravAshok Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I just refer entities.Message here instead of this payload array?
Actually, I have mentioned in the TODO, my intention of keeping this plain for now.

…s not take explicit partition mapping. Its single shard, so it only has one meaning.
…ger::start to not rely on Subscription entity. It instead now takes explicit params to be able to start consumer.
…pics. These classes are subject for iterations as consumer's & controller's implementation makes progress
…d is more metadata centric concept, shard name is to identify the running shard. added project to startSub method as well.
@gauravAshok gauravAshok merged commit 897e190 into flipkart-incubator:master Mar 4, 2024
1 check passed
@gauravAshok gauravAshok deleted the consumer-interface branch March 4, 2024 08:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants