From 4abcedecc94409a197145d079e6ba92ac4365831 Mon Sep 17 00:00:00 2001 From: jerryfan01234 <44346807+jerryfan01234@users.noreply.github.com> Date: Tue, 6 Aug 2024 11:02:06 -0400 Subject: [PATCH] Change kafka producer client config (#2030) (cherry picked from commit 44054d4cdf70bcc68f833ec0d1151cff4dd6d40f) --- protocol/indexer/flags.go | 5 +++-- protocol/indexer/msgsender/msgsender_kafka.go | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/protocol/indexer/flags.go b/protocol/indexer/flags.go index e99bf5c07a..ffb7cb6cb8 100644 --- a/protocol/indexer/flags.go +++ b/protocol/indexer/flags.go @@ -16,12 +16,13 @@ type IndexerFlags struct { // List of default values const ( - DefaultMaxRetries = 3 + DefaultMaxRetries = 20 ) // List of CLI flags const ( - FlagKafkaConnStr = "indexer-kafka-conn-str" + FlagKafkaConnStr = "indexer-kafka-conn-str" + // max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer FlagKafkaMaxRetry = "indexer-kafka-max-retry" FlagSendOffchainData = "indexer-send-offchain-data" MsgSenderInstanceForTest = "msgsender-instance-for-test" diff --git a/protocol/indexer/msgsender/msgsender_kafka.go b/protocol/indexer/msgsender/msgsender_kafka.go index 2a6acca1de..750801e168 100644 --- a/protocol/indexer/msgsender/msgsender_kafka.go +++ b/protocol/indexer/msgsender/msgsender_kafka.go @@ -43,7 +43,10 @@ func NewIndexerMessageSenderKafka( config.Producer.Return.Errors = true config.Producer.Return.Successes = true config.Producer.Retry.Max = indexerFlags.MaxRetries + // max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer + config.Producer.Retry.Backoff = 1000 * time.Millisecond config.Producer.MaxMessageBytes = 4194304 // 4MB + config.Producer.RequiredAcks = sarama.WaitForAll // Use the JVM compatible parititoner to match `kafkajs` which is used in the indexer services. config.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner producer, err := sarama.NewAsyncProducer(indexerFlags.KafkaAddrs, config)