From 4ca6a36f40df4500da8f3b6a078c5e6f335ea47d Mon Sep 17 00:00:00 2001 From: taylanisikdemir Date: Tue, 5 Mar 2024 09:02:39 -0800 Subject: [PATCH] Proper shutdown of kafka consumer impl and fix test (#5712) --- common/messaging/kafka/consumer_impl.go | 15 ++++++++++----- common/messaging/kafka/consumer_impl_test.go | 2 ++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/common/messaging/kafka/consumer_impl.go b/common/messaging/kafka/consumer_impl.go index 39df8ddeb79..7fb9427debf 100644 --- a/common/messaging/kafka/consumer_impl.go +++ b/common/messaging/kafka/consumer_impl.go @@ -45,6 +45,7 @@ type ( consumerHandler *consumerHandlerImpl consumerGroup sarama.ConsumerGroup msgChan <-chan messaging.Message + wg sync.WaitGroup cancelFunc context.CancelFunc logger log.Logger @@ -95,23 +96,22 @@ func NewKafkaConsumer( consumerHandler := newConsumerHandlerImpl(dlqProducer, topic, msgChan, metricsClient, logger) return &consumerImpl{ - topic: topic, - + topic: topic, consumerHandler: consumerHandler, consumerGroup: consumerGroup, msgChan: msgChan, - - logger: logger, + logger: logger, }, nil } func (c *consumerImpl) Start() error { - ctx, cancel := context.WithCancel(context.Background()) c.cancelFunc = cancel + c.wg.Add(1) // consumer loop go func() { + defer c.wg.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -133,7 +133,12 @@ func (c *consumerImpl) Start() error { func (c *consumerImpl) Stop() { c.logger.Info("Stopping consumer") c.cancelFunc() + c.logger.Info("Waiting consumer goroutines to complete") + c.wg.Wait() + c.logger.Info("Stopping consumer handler and group") c.consumerHandler.stop() + c.consumerGroup.Close() + c.logger.Info("Stopped consumer") } // Messages return the message channel for this consumer diff --git a/common/messaging/kafka/consumer_impl_test.go b/common/messaging/kafka/consumer_impl_test.go index e4e9e61aaaa..7120e7fa90a 100644 --- a/common/messaging/kafka/consumer_impl_test.go +++ b/common/messaging/kafka/consumer_impl_test.go @@ -213,6 +213,8 @@ func initMockBroker(t *testing.T, group string) *sarama.MockBroker { SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). SetLeader(topics[0], 0, mockBroker.BrokerID()). SetController(mockBroker.BrokerID()), + "FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t). + SetCoordinator(sarama.CoordinatorGroup, group, mockBroker), }) return mockBroker }