-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow concurrent broker restarts within same broker rack #1001
Allow concurrent broker restarts within same broker rack #1001
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for the contribution! Took a quick look at the implementation and it looks good. Will come back for the tests when I get some more time tmr
Hello @ctrlaltluc ! What if we dont do restart based on racks rather we impelement a check which maps broker-topic-partitions on the kafka cluster and checks that the actual broker contains a partition which is not appeared on another healthy broker (also check is the partition in sync) . If there is no such partition then we can restart the broker. It would be more advanced if before the broker restart there would be a new leader election (for that partition which is on the actual broker what we want to restart) so the consuming producing would be totally seamless (no need to wait for the new leader election by controller). We can introduce a new field which would specify how many in sync replicas should be remain when rollingUpgrade happens. (In this way if another broker goes down unintentionally under rolling upgrade then cluster can be also helathy). After the rolling upgrade we should checks that everything is in sync and when yes then we should execute the kafka-preferred-replica-election command to achieve that state where every leader is the prefered one. The https://github.com/linkedin/cruise-control/wiki/REST-APIs#demote-a-list-of-brokers-from-the-kafka-cluster can also be used before we restart a broker. (Actually it also can be used before remove_broker in this case the remove_broker operation could be finished sooner because there were be no leader on the removable broker which is problematic under heavy load). Execute a demotebroker operation in rolling upgrade. When the broker is demoted do the restart and go to the next broker. If we do this it is possible that scennario when the rest of the healthy broker getting too much load. |
pkg/resources/kafka/kafka.go
Outdated
func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { | ||
brokerAzMap := make(map[int32]string) | ||
for _, broker := range cluster.Spec.Brokers { | ||
brokerRack := getBrokerRack(broker.ReadOnlyConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should take care of that case when the broker config is coming from the brokerConfigGroup.
Here we should use this function to extend:
koperator/api/v1beta1/kafkacluster_types.go
Line 1099 in 25c4e97
func (b *Broker) GetBrokerConfig(kafkaClusterSpec KafkaClusterSpec) (*BrokerConfig, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the broker rack be specified via broker config?
I assumed the source of truth for broker rack is the read-only config, as in broker groups you can define it more freely, not necessarily with rack semantics, plus that the broker.rack
needs to be set in read-only configs for it to work.
Please lmk if I am mistaken.
Hi Marton @bartam1! I would like to reach a conclusion here before addressing the other PR comments. Regarding the suggestion for keeping a map of topic-partitions to brokers, I believe this would not be easy to implement as to ensure consistency. Topic-partitions might change between queries and we must keep that map consistent with actual cluster metadata. I would keep this option as a last resort. Regarding the suggestion for using demote, I agree that would be safe, but that would also make the restart much slower and might not be necessary. I first want to understand if it is necessary. Our foremost motivation for this feature was to increase the speed of cluster restarts during rolling upgrade, as currently 60 broker clusters take a long time to restart (3h+). Regarding the problem you raised, I'll try to summarize to make sure I understand that what you mention is not covered and needs addressing. Please correct me if I am wrong. Problematic use case:
Is this what you mean is not covered? If this is accurate, then I would argue that:
Wdyt? |
Hello @ctrlaltluc
This is an edge case. If we want to be sure we can check two things before multiple broker restart:
Regarding the demoting: It happens instantly when the target broker is in sync. I tested this with a continuously producer-consumer. There were no any communication interruption under demoting. Otherwise when there is an active producer to the leader and there is a rolling upgrade (without demotion) there will be interruption (until the new leader is elected). I'm not sure about that is it worth or not the effort to implement what I wrote regarding mapping and demotion. I don't have much experience with production Kafka clusters. If it is worth it can be done. I saw that as a possible improvement and I was curious about your opinion. |
Thanks @bartam1! Your point makes perfect sense. I will add these additional checks before the decision to restart more than 1 broker:
I will have a look into demoting too, as it would make it smoother for clients (no interruptions), makes sense. Will come back on this with either code changes or a comment. |
3398dc9
to
43a982a
Compare
Thank you for doing! |
43a982a
to
69eee7b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bartam1 @panyuenlau I have addressed all comments on the PR, including the check for RackAwareDistributionGoal
. Please have another look, thanks in advance for your time!
@bartam1 I have not implemented demote brokers, as it is not necessary. By default, Kafka brokers have controlled.shutdown.enable default true. This means that any graceful Kafka shutdown (i.e. anything excluding SIGKILL) will ensure that leaders are moved.
LE: I also tested demote myself, and the behavior is the same as normal broker restarts (leaders are moved). However, demote has some downsides: 1/ it takes a long time (~10 min per broker for brokers with ~4500 replicas) as demote moves the demoted broker as last in any set of in-sync replicas and this takes time, even though there is no data movement, 2/ we later need to add the demoted broker back, as it would otherwise remain in recently demoted brokers, which complicates the code without added benefit.
Please note that the tests will still be failing, until #1002 is merged and a new API module release is done.
pkg/resources/kafka/kafka.go
Outdated
func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { | ||
brokerAzMap := make(map[int32]string) | ||
for _, broker := range cluster.Spec.Brokers { | ||
brokerRack := getBrokerRack(broker.ReadOnlyConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the broker rack be specified via broker config?
I assumed the source of truth for broker rack is the read-only config, as in broker groups you can define it more freely, not necessarily with rack semantics, plus that the broker.rack
needs to be set in read-only configs for it to work.
Please lmk if I am mistaken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies - was zoned out for doing my own implementation. Thx for the great work! Only left a couple of quick comments (more like nits)
pkg/resources/kafka/kafka.go
Outdated
return brokerAzMap | ||
} | ||
|
||
func getBrokerRack(readOnlyConfig string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: It looks like the latest implementation doesn't require this func, can we remove it since it's not being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good catch! Forgot about it. Will address in another commit, thanks for paying attention!
} | ||
} | ||
|
||
func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: It'd be appreciated if we can add some quick unit tests for func like this one, but since the caller handleRollingUpgrade
is tested, I am not going to ask for any more changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this properly. I added unit tests for it in cae78f5, please have a look. Thanks!
P.S. I saw you were doing heavy lifting with kraft, so no worries and thanks for the review. :)
69eee7b
to
cae78f5
Compare
FYI - The new API tag https://github.com/banzaicloud/koperator/tree/api/v0.28.7 should be available soon |
cae78f5
to
997f46e
Compare
@panyuenlau @bartam1 the PR is now final and ready for review. CI fails because of the flaky nodeports test (tried once to retrigger it with an empty commit, gave up), otherwise all tests are passing. Thanks for your patience! |
I think this can have (#1023) effect for this PR |
@bartam1 had a look on #1023 and indeed, if KRaft is enabled and the controllers are not spread across AZs (racks), then there are edge cases where multiple restarts within the same rack/AZ might be an issue. I see 2 options to move forward:
@bartam1 @panyuenlau lmk your thoughts. |
I agree. We should merge this PR. We can add modifications later if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thank you for implementing it with consideration of the edge cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for the consideration for the potentional impact on KRaft support @bartam1 @ctrlaltluc,
will keep this new feature in mind when we get there during the KRaft implementation.
Thanks for your thorough reviews as well! Looking forward to getting this merged. |
Description
Cluster restarts (during rolling upgrades) are time consuming for big clusters.
Given that clients might be using Kafka
broker.rack
config to spread replicas across N AZs, we can speed up the cluster restart by allowing concurrent broker restarts within the same AZ. Allowing only same AZ concurrent restarts makes sure that we always have at most 1/Nth of the replicas of any topic-partition offline, not more, while the other (N-1)/N replicas are fine.For example, using a common 3AZ setup, this would ensure that at most 1/3rd of the replicas are offline, while the other 2/3rds are in-sync.
Key changes
RollingUpgradeConfig.ConcurrentBrokerRestartsPerRackCount
which is by default 1RollingUpgradeConfig.ConcurrentBrokerRestartsPerRackCount
andRollingUpgradeConfig.FailureThreshold
RollingUpgradeConfig.FailureThreshold
by allowing (same as before) multiple failures even if not in the same AZbroker.rack
is not properly configured for all brokers, consider each broker as being in a separate AZ, meaning we err on the side of caution and restart at most 1 broker at onceRackAwareDistributionGoal
ready and not recently violated (either as fixable or unfixable)Notes
RollingUpgradeConfig.ConcurrentBrokerRestartsPerRackCount
andRollingUpgradeConfig.FailureThreshold
to a value above 1Type of Change
Checklist