From d5d779a4a5d03a69d8eddc4907f1d4b180ca1b60 Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Sat, 6 Jan 2024 14:49:41 +0100 Subject: [PATCH] fix: partiton revoked on single Signed-off-by: Eray Ates --- Makefile | 3 +- client.go | 33 ++++++++++++++++------ clientoptions.go | 9 ++++++ codec.go | 2 +- consumer.go | 52 ++++++++++++++++++---------------- consumerbatch.go | 15 +++++----- consumersingle.go | 39 +++++++++++++++++++------ env/docker-compose.yml | 23 +++++++-------- error.go | 8 +++--- example/main.go | 5 ++++ partition.go | 64 +++++++++++++++++++++++++++++++++++++++--- producer.go | 4 ++- producerdlq.go | 8 +++--- 13 files changed, 190 insertions(+), 75 deletions(-) diff --git a/Makefile b/Makefile index 618b247..61d0b6a 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,9 @@ env-logs: ## Show env logs env-down: ## Stop env docker compose -p wkafka down +run-example: LOG_LEVEL ?= debug run-example: ## Run example - @go run ./example/main.go + LOG_LEVEL=$(LOG_LEVEL) go run ./example/main.go .golangci.yml: @$(MAKE) golangci diff --git a/client.go b/client.go index 20cff50..0ef6c54 100644 --- a/client.go +++ b/client.go @@ -12,8 +12,10 @@ import ( ) type Client struct { - Kafka *kgo.Client - KafkaDLQ *kgo.Client + Kafka *kgo.Client + KafkaDLQ *kgo.Client + partitionHandler *partitionHandler + partitionHandlerDLQ *partitionHandler clientID []byte consumerConfig *ConsumerConfig @@ -122,13 +124,27 @@ func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, erro startOffset = startOffset.At(startOffsetCfg) } + // create partition handler + var partitionH *partitionHandler + if isDLQ { + c.partitionHandlerDLQ = &partitionHandler{ + logger: c.logger, + } + partitionH = c.partitionHandlerDLQ + } else { + c.partitionHandler = &partitionHandler{ + logger: c.logger, + } + partitionH = c.partitionHandler + } + kgoOpt = append(kgoOpt, kgo.DisableAutoCommit(), kgo.RequireStableFetchOffsets(), kgo.ConsumerGroup(o.ConsumerConfig.GroupID), kgo.ConsumeResetOffset(startOffset), - kgo.OnPartitionsLost(partitionLost(c)), - kgo.OnPartitionsRevoked(partitionRevoked(c)), + kgo.OnPartitionsLost(partitionLost(partitionH)), + kgo.OnPartitionsRevoked(partitionRevoked(partitionH)), ) if isDLQ { @@ -143,14 +159,14 @@ func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, erro } } - // Add custom options + // add custom options if isDLQ { kgoOpt = append(kgoOpt, o.KGOOptionsDLQ...) } else { kgoOpt = append(kgoOpt, o.KGOOptions...) } - // Create kafka client + // create kafka client kgoClient, err := kgo.NewClient(kgoOpt...) if err != nil { return nil, fmt.Errorf("create kafka client: %w", err) @@ -168,8 +184,9 @@ func (c *Client) Close() { } } -// Consume starts consuming messages from kafka. +// Consume starts consuming messages from kafka and blocks until context is done or an error occurs. // - Only works if client is created with consumer config. +// - Just run one time. func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error { o := optionConsumer{ Client: c, @@ -183,7 +200,7 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt } if o.Consumer == nil { - return fmt.Errorf("consumer is nil: %w", ErrNotImplemented) + return fmt.Errorf("consumer is nil: %w", errNotImplemented) } if o.ConsumerDLQ == nil { diff --git a/clientoptions.go b/clientoptions.go index 9c558ff..4de8edc 100644 --- a/clientoptions.go +++ b/clientoptions.go @@ -99,3 +99,12 @@ func WithLogger(logger logz.Adapter) Option { o.Logger = logger } } + +// WithNoLogger to disable logger. +func WithNoLogger(v bool) Option { + return func(o *options) { + if v { + o.Logger = logz.AdapterNoop{} + } + } +} diff --git a/codec.go b/codec.go index 6e9d25a..59f74d1 100644 --- a/codec.go +++ b/codec.go @@ -37,7 +37,7 @@ func compressionVerify(c []string) error { switch v { case "gzip", "snappy", "lz4", "zstd": default: - return fmt.Errorf("%w: %q", ErrInvalidCompression, v) + return fmt.Errorf("invalid compression: %q", v) } } diff --git a/consumer.go b/consumer.go index 2a48064..c6e03b4 100644 --- a/consumer.go +++ b/consumer.go @@ -200,21 +200,23 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB decode, produceDLQ := getDecodeProduceDLQ[T](o) o.Consumer = &consumerBatch[T]{ - Process: fn, - Decode: decode, - ProduceDLQ: produceDLQ, - Cfg: o.ConsumerConfig, - Skip: skip, - Logger: o.Client.logger, + Process: fn, + Decode: decode, + ProduceDLQ: produceDLQ, + Cfg: o.ConsumerConfig, + Skip: skip, + Logger: o.Client.logger, + PartitionHandler: o.Client.partitionHandler, } o.ConsumerDLQ = &consumerBatch[T]{ - Process: fn, - Decode: decode, - Cfg: o.ConsumerConfig, - Skip: skipDLQ, - IsDLQ: true, - Logger: o.Client.logger, + Process: fn, + Decode: decode, + Cfg: o.ConsumerConfig, + Skip: skipDLQ, + IsDLQ: true, + Logger: o.Client.logger, + PartitionHandler: o.Client.partitionHandlerDLQ, } return nil @@ -229,21 +231,23 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc decode, produceDLQ := getDecodeProduceDLQ[T](o) o.Consumer = &consumerSingle[T]{ - Process: fn, - Decode: decode, - ProduceDLQ: produceDLQ, - Cfg: o.ConsumerConfig, - Skip: skip, - Logger: o.Client.logger, + Process: fn, + Decode: decode, + ProduceDLQ: produceDLQ, + Cfg: o.ConsumerConfig, + Skip: skip, + Logger: o.Client.logger, + PartitionHandler: o.Client.partitionHandler, } o.ConsumerDLQ = &consumerSingle[T]{ - Process: fn, - Decode: decode, - Cfg: o.ConsumerConfig, - Skip: skipDLQ, - IsDLQ: true, - Logger: o.Client.logger, + Process: fn, + Decode: decode, + Cfg: o.ConsumerConfig, + Skip: skipDLQ, + IsDLQ: true, + Logger: o.Client.logger, + PartitionHandler: o.Client.partitionHandlerDLQ, } return nil diff --git a/consumerbatch.go b/consumerbatch.go index e9f2b06..295e9da 100644 --- a/consumerbatch.go +++ b/consumerbatch.go @@ -14,12 +14,13 @@ type consumerBatch[T any] struct { Cfg ConsumerConfig Decode func(raw []byte, r *kgo.Record) (T, error) // PreCheck is a function that is called before the callback and decode. - PreCheck func(ctx context.Context, r *kgo.Record) error - Option optionConsumer - ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error - Skip func(cfg *ConsumerConfig, r *kgo.Record) bool - Logger logz.Adapter - IsDLQ bool + PreCheck func(ctx context.Context, r *kgo.Record) error + Option optionConsumer + ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error + Skip func(cfg *ConsumerConfig, r *kgo.Record) bool + Logger logz.Adapter + PartitionHandler *partitionHandler + IsDLQ bool } func (c *consumerBatch[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Record) error) { @@ -30,7 +31,7 @@ func (c *consumerBatch[T]) Consume(ctx context.Context, cl *kgo.Client) error { for { fetch := cl.PollRecords(ctx, c.Cfg.MaxPollRecords) if fetch.IsClientClosed() { - return ErrClientClosed + return errClientClosed } if err := fetch.Err(); err != nil { diff --git a/consumersingle.go b/consumersingle.go index 472c5db..3c59efd 100644 --- a/consumersingle.go +++ b/consumersingle.go @@ -14,12 +14,13 @@ type consumerSingle[T any] struct { Cfg ConsumerConfig Decode func(raw []byte, r *kgo.Record) (T, error) // PreCheck is a function that is called before the callback and decode. - PreCheck func(ctx context.Context, r *kgo.Record) error - Option optionConsumer - ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error - Skip func(cfg *ConsumerConfig, r *kgo.Record) bool - Logger logz.Adapter - IsDLQ bool + PreCheck func(ctx context.Context, r *kgo.Record) error + Option optionConsumer + ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error + Skip func(cfg *ConsumerConfig, r *kgo.Record) bool + Logger logz.Adapter + PartitionHandler *partitionHandler + IsDLQ bool } func (c *consumerSingle[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Record) error) { @@ -28,9 +29,12 @@ func (c *consumerSingle[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Reco func (c *consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error { for { + // flush the partition handler, it will be ready next poll + c.PartitionHandler.Flush() + fetch := cl.PollRecords(ctx, c.Cfg.MaxPollRecords) if fetch.IsClientClosed() { - return ErrClientClosed + return errClientClosed } if err := fetch.Err(); err != nil { @@ -56,18 +60,32 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch for iter := fetch.RecordIter(); !iter.Done(); { r := iter.Next() - // listening DLQ topics + // check partition is revoked + if c.PartitionHandler.IsRevokedRecord(r) { + continue + } + if c.IsDLQ { + // listening DLQ topics + // check partition is revoked and not commit it! + // when error return than it will not be committed if err := c.iterationDLQ(ctx, r); err != nil { + if errors.Is(err, errPartitionRevoked) { + // don't commit revoked record + continue + } + return wrapErr(r, err, c.IsDLQ) } } else { // listening main topics + // checking revoked partition already on above no need to check again if err := c.iterationMain(ctx, r); err != nil { return wrapErr(r, err, c.IsDLQ) } } + // commit if not see any error if err := cl.CommitRecords(ctx, r); err != nil { return wrapErr(r, fmt.Errorf("commit records failed: %w", err), c.IsDLQ) } @@ -77,6 +95,7 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch } // iterationDLQ is used to listen DLQ topics, error usually comes from context cancellation. +// any kind of error will be retry with interval. func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) error { wait := waitRetry{ Interval: c.Cfg.DLQ.RetryInterval, @@ -89,6 +108,10 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err default: } + if c.PartitionHandler.IsRevokedRecord(r) { + return errPartitionRevoked + } + if err := c.iterationRecord(ctx, r); err != nil { errOrg, _ := isDQLError(err) errWrapped := wrapErr(r, errOrg, c.IsDLQ) diff --git a/env/docker-compose.yml b/env/docker-compose.yml index 0be1a30..813b208 100644 --- a/env/docker-compose.yml +++ b/env/docker-compose.yml @@ -3,26 +3,23 @@ version: '3.8' services: kafka: image: docker.io/bitnami/kafka:3.5.1 - # ports: - # - "9092:9092" - network_mode: host + ports: + - "9092:9092" environment: - ALLOW_PLAINTEXT_LISTENER=yes # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093 + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9093 # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,INTERNAL://kafka:9094 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT redpanda: - image: docker.io/redpandadata/console:v2.3.5 - # ports: - # - "7071:7071" - network_mode: host + image: docker.io/redpandadata/console:v2.3.8 + ports: + - "7071:7071" environment: - - KAFKA_BROKERS=localhost:9092 + - KAFKA_BROKERS=kafka:9094 - SERVER_LISTENPORT=7071 diff --git a/error.go b/error.go index 4722880..0539010 100644 --- a/error.go +++ b/error.go @@ -9,12 +9,12 @@ import ( ) var ( - ErrNotImplemented = fmt.Errorf("not implemented") - ErrClientClosed = fmt.Errorf("client closed") + errNotImplemented = fmt.Errorf("not implemented") + errClientClosed = fmt.Errorf("client closed") + errPartitionRevoked = fmt.Errorf("partition revoked") + // ErrSkip is use to skip message in the PreCheck hook or Decode function. ErrSkip = fmt.Errorf("skip message") - // ErrInvalidCompression for producer setting check. - ErrInvalidCompression = fmt.Errorf("invalid compression") // ErrDLQ use with callback function to send message to DLQ topic. ErrDLQ = fmt.Errorf("send to DLQ") ) diff --git a/example/main.go b/example/main.go index 32b6be9..8b65c8e 100644 --- a/example/main.go +++ b/example/main.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "os" + "sort" "sync" "github.com/worldline-go/initializer" @@ -26,6 +27,8 @@ func getExampleList() []string { exampleNames = append(exampleNames, k) } + sort.Strings(exampleNames) + return exampleNames } @@ -41,6 +44,8 @@ func main() { run := examples[exampleName] if run == nil { slog.Error("unknown example", slog.String("example", exampleName)) + + return } initializer.Init(run, initializer.WithOptionsLogz(logz.WithCaller(false))) diff --git a/partition.go b/partition.go index a399e07..68fcd91 100644 --- a/partition.go +++ b/partition.go @@ -4,24 +4,80 @@ import ( "context" "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/logz" ) -func partitionLost(c *Client) func(context.Context, *kgo.Client, map[string][]int32) { +type partitionHandler struct { + logger logz.Adapter + + mapPartitionsRevoked map[string][]int32 + mapPartitionsLost map[string][]int32 +} + +// Flush is used to flush the partition handler and it will be ready next poll. +func (h *partitionHandler) Flush() { + h.mapPartitionsRevoked = nil + h.mapPartitionsLost = nil +} + +func (h *partitionHandler) AddPartitionsRevoked(mapPartitions map[string][]int32) { + if h.mapPartitionsRevoked == nil { + h.mapPartitionsRevoked = make(map[string][]int32, len(mapPartitions)) + } + + for topic, partition := range mapPartitions { + h.mapPartitionsRevoked[topic] = partition + } +} + +func (h *partitionHandler) AddPartitionsLost(mapPartitions map[string][]int32) { + if h.mapPartitionsLost == nil { + h.mapPartitionsLost = make(map[string][]int32, len(mapPartitions)) + } + + for topic, partition := range mapPartitions { + h.mapPartitionsLost[topic] = partition + } +} + +func (h *partitionHandler) IsRevokedRecord(r *Record) bool { + if len(h.mapPartitionsRevoked) == 0 { + return false + } + + if _, ok := h.mapPartitionsRevoked[r.Topic]; !ok { + return false + } + + for _, partition := range h.mapPartitionsRevoked[r.Topic] { + if partition == r.Partition { + return true + } + } + + return false +} + +func partitionLost(h *partitionHandler) func(context.Context, *kgo.Client, map[string][]int32) { return func(ctx context.Context, cl *kgo.Client, partitions map[string][]int32) { if len(partitions) == 0 { return } - c.logger.Info("partition lost", "partitions", partitions) + h.logger.Debug("partition lost", "partitions", partitions) + + h.AddPartitionsLost(partitions) } } -func partitionRevoked(c *Client) func(context.Context, *kgo.Client, map[string][]int32) { +func partitionRevoked(h *partitionHandler) func(context.Context, *kgo.Client, map[string][]int32) { return func(ctx context.Context, cl *kgo.Client, partitions map[string][]int32) { if len(partitions) == 0 { return } - c.logger.Info("partition revoked", "partitions", partitions) + h.logger.Debug("partition revoked", "partitions", partitions) + + h.AddPartitionsRevoked(partitions) } } diff --git a/producer.go b/producer.go index 08e07f7..77b5903 100644 --- a/producer.go +++ b/producer.go @@ -8,6 +8,8 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) +var defaultServiceNameKey = "service" + type ( Header = kgo.RecordHeader Record = kgo.Record @@ -92,7 +94,7 @@ func NewProducer[T any](client *Client, topic string, opts ...OptionProducer) (* Topic: topic, Headers: []Header{ { - Key: "server", + Key: defaultServiceNameKey, Value: client.clientID, }, }, diff --git a/producerdlq.go b/producerdlq.go index f77103f..e5ae0b9 100644 --- a/producerdlq.go +++ b/producerdlq.go @@ -13,7 +13,7 @@ import ( // - err could be ErrDLQIndexed or any other error func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Record) error) func(ctx context.Context, err error, records []*kgo.Record) error { return func(ctx context.Context, err error, records []*kgo.Record) error { - recordsSend := make([]*kgo.Record, len(records)) + recordsSend := make([]*kgo.Record, 0, len(records)) errDLQIndexed := &DLQIndexedError{} if !errors.As(err, &errDLQIndexed) { @@ -27,11 +27,11 @@ func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Recor continue } } else { - // ErrDLQ used, unwrap and show original error. + // ErrDLQ used, unwrap and show original error err = unwrapErr(err) } - recordsSend[i] = &kgo.Record{ + recordsSend = append(recordsSend, &kgo.Record{ Topic: topic, Key: r.Key, Value: r.Value, @@ -43,7 +43,7 @@ func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Recor kgo.RecordHeader{Key: "topic", Value: []byte(r.Topic)}, kgo.RecordHeader{Key: "timestamp", Value: []byte(r.Timestamp.Format(time.RFC3339))}, ), - } + }) } return fn(ctx, recordsSend)