From ab04a932ddf2fc6c84939ab9e1d9b5cffcb90a0d Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Sun, 7 Jan 2024 21:28:42 +0100 Subject: [PATCH] fix: error wrap Signed-off-by: Eray Ates --- .github/workflows/test.yml | 6 +-- .gitignore | 1 + Makefile | 4 ++ README.md | 2 + config.go | 2 +- config_test.go | 4 +- consumer.go | 18 ++++---- consumer2_test.go | 18 ++++---- consumer_test.go | 4 ++ consumerbatch.go | 18 +++++--- consumersingle.go | 20 +++++--- docs/notes.md | 14 ++++++ docs/scenarios.md | 8 ---- error.go | 40 +++++++++------- error_test.go | 53 +++++++++++++++++++++ example/consumer/batch.go | 2 +- example/consumer/single.go | 18 ++++++-- producerdlq.go | 27 +++++------ producerdlq_test.go | 95 ++++++++++++++++++++++++++++++++++++++ 19 files changed, 273 insertions(+), 81 deletions(-) create mode 100644 docs/notes.md delete mode 100644 docs/scenarios.md create mode 100644 producerdlq_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 531f887..176a1ed 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,11 +20,11 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - args: --timeout 5m --new-from-rev=HEAD~1 --issues-exit-code=0 + args: --timeout 5m --new-from-rev=HEAD~1 - name: Run tests run: | - GOPATH="$(dirname ${PWD})" golangci-lint run --out-format checkstyle ./... > golangci-lint-report.out || true - go test -coverprofile=coverage.out -json ./... > test-report.out + GOPATH="$(dirname ${PWD})" golangci-lint run --out-format --issues-exit-code=0 checkstyle ./... > golangci-lint-report.out || true + go test -short -coverprofile=coverage.out -json ./... > test-report.out - name: SonarCloud Scan uses: sonarsource/sonarcloud-github-action@master with: diff --git a/.gitignore b/.gitignore index cf435ad..71a26ba 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /bin /.golangci.yml /coverage.* +/*.out \ No newline at end of file diff --git a/Makefile b/Makefile index b88efa8..585a7b9 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ lint: .golangci.yml bin/golangci-lint-$(GOLANGCI_LINT_VERSION) ## Lint Go files test: ## Run unit tests @go test -v -race ./... +.PHONY: test-short +test-short: ## Run unit tests short + @go test -v -race -short ./... + .PHONY: test-without-cache test-without-cache: ## Run unit tests without cache @go test -count=1 -v -race ./... diff --git a/README.md b/README.md index 84e6244..7efb169 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,8 @@ if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil { } ``` +Send record to dead letter queue, use __WrapErrDLQ__ function with to wrap the error and it will be send to dead letter queue. + > Check the aditional options for custom decode and precheck. ### Producer diff --git a/config.go b/config.go index 7215e0f..06ccad2 100644 --- a/config.go +++ b/config.go @@ -70,7 +70,7 @@ func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName s } if consumerConfig.DLQ.SkipExtra == nil { - consumerConfig.DLQ.SkipExtra = map[string]map[int32]Offsets{ + consumerConfig.DLQ.SkipExtra = map[string]map[int32]OffsetConfig{ consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip, } } else { diff --git a/config_test.go b/config_test.go index 84f7b34..6718453 100644 --- a/config_test.go +++ b/config_test.go @@ -40,9 +40,9 @@ func TestConsumerPreConfig_Apply(t *testing.T) { }, want: ConsumerConfig{ GroupID: "finops_test", - DLQ: DLQ{ + DLQ: DLQConfig{ Topic: "finops_serviceX_dlq", - SkipExtra: map[string]map[int32]Offsets{ + SkipExtra: map[string]map[int32]OffsetConfig{ "finops_serviceX_dlq": nil, }, RetryInterval: DefaultRetryInterval, diff --git a/consumer.go b/consumer.go index c39696e..040ad3e 100644 --- a/consumer.go +++ b/consumer.go @@ -35,7 +35,7 @@ type ConsumerConfig struct { // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset - Skip map[string]map[int32]Offsets `cfg:"skip"` + Skip map[string]map[int32]OffsetConfig `cfg:"skip"` // MaxPollRecords is the maximum number of records returned in a single call to poll. // - Default is max.poll.records in the broker configuration, usually 500. // - Fetching messages from broker, this is not related with batch processing! @@ -46,10 +46,10 @@ type ConsumerConfig struct { // - Default is 100. BatchCount int `cfg:"batch_count"` // DLQ is a dead letter queue configuration. - DLQ DLQ `cfg:"dlq"` + DLQ DLQConfig `cfg:"dlq"` } -type DLQ struct { +type DLQConfig struct { // Disable is a flag to disable DLQ. // - Default is false. // - If topic is not set, it will be generated from format_dlq_topic. @@ -73,16 +73,16 @@ type DLQ struct { // - 31 // - 90 // before: 20 // skip all offsets before or equal to this offset - Skip map[int32]Offsets `cfg:"skip"` + Skip map[int32]OffsetConfig `cfg:"skip"` // Topic is a topic name to send messages that failed to process also could be used for DLQ. Topic string `cfg:"topic"` // TopicExtra is extra a list of kafka topics to just consume from DLQ. TopicsExtra []string `cfg:"topics_extra"` // SkipExtra are optional message offsets to be skipped for topicsExtra. - SkipExtra map[string]map[int32]Offsets `cfg:"skip_extra"` + SkipExtra map[string]map[int32]OffsetConfig `cfg:"skip_extra"` } -type Offsets struct { +type OffsetConfig struct { // Offsets is a list of offsets numbers in that partition to skip. Offsets []int64 `cfg:"offsets"` // Before skips all offsets before or equal to this offset. @@ -261,7 +261,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc } } -func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Record) (T, error), func(ctx context.Context, err error, records []*kgo.Record) error) { +func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Record) (T, error), func(ctx context.Context, err *DLQError, records []*kgo.Record) error) { var decode func(raw []byte, r *kgo.Record) (T, error) var msg T @@ -272,9 +272,9 @@ func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Reco decode = codecJSON[T]{}.Decode } - var produceDLQ func(ctx context.Context, err error, records []*kgo.Record) error + var produceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error if !o.ConsumerConfig.DLQ.Disable { - produceDLQ = producerDLQ(o.ConsumerConfig.DLQ.Topic, o.Client.ProduceRaw) + produceDLQ = producerDLQ(o.ConsumerConfig.DLQ.Topic, o.Client.clientID, o.Client.ProduceRaw) } return decode, produceDLQ diff --git a/consumer2_test.go b/consumer2_test.go index 4c57633..1573d4e 100644 --- a/consumer2_test.go +++ b/consumer2_test.go @@ -40,7 +40,7 @@ func Test_skip(t *testing.T) { name: "skip topic", args: args{ cfg: &ConsumerConfig{ - Skip: map[string]map[int32]Offsets{ + Skip: map[string]map[int32]OffsetConfig{ "topic": { 0: { Offsets: []int64{ @@ -62,7 +62,7 @@ func Test_skip(t *testing.T) { name: "skip topic before", args: args{ cfg: &ConsumerConfig{ - Skip: map[string]map[int32]Offsets{ + Skip: map[string]map[int32]OffsetConfig{ "topic": { 0: { Before: 5, @@ -82,7 +82,7 @@ func Test_skip(t *testing.T) { name: "topic before", args: args{ cfg: &ConsumerConfig{ - Skip: map[string]map[int32]Offsets{ + Skip: map[string]map[int32]OffsetConfig{ "topic": { 0: { Before: 5, @@ -145,8 +145,8 @@ func Test_skipDLQ(t *testing.T) { name: "skip topic", args: args{ cfg: &ConsumerConfig{ - DLQ: DLQ{ - SkipExtra: map[string]map[int32]Offsets{ + DLQ: DLQConfig{ + SkipExtra: map[string]map[int32]OffsetConfig{ "topic": { 0: { Offsets: []int64{ @@ -169,8 +169,8 @@ func Test_skipDLQ(t *testing.T) { name: "skip topic before", args: args{ cfg: &ConsumerConfig{ - DLQ: DLQ{ - SkipExtra: map[string]map[int32]Offsets{ + DLQ: DLQConfig{ + SkipExtra: map[string]map[int32]OffsetConfig{ "topic": { 0: { Before: 5, @@ -191,8 +191,8 @@ func Test_skipDLQ(t *testing.T) { name: "topic before", args: args{ cfg: &ConsumerConfig{ - DLQ: DLQ{ - SkipExtra: map[string]map[int32]Offsets{ + DLQ: DLQConfig{ + SkipExtra: map[string]map[int32]OffsetConfig{ "topic": { 0: { Before: 5, diff --git a/consumer_test.go b/consumer_test.go index d2ef3d3..10eded0 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -92,6 +92,10 @@ func (c *Counter[T]) Count(ctx context.Context, msg T) error { } func Test_GroupConsuming(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + client, err := tkafka.TestClient() if err != nil { t.Fatalf("TestClient() error = %v", err) diff --git a/consumerbatch.go b/consumerbatch.go index b3fc5f2..27b1491 100644 --- a/consumerbatch.go +++ b/consumerbatch.go @@ -19,7 +19,7 @@ type consumerBatch[T any] struct { // PreCheck is a function that is called before the callback and decode. PreCheck func(ctx context.Context, r *kgo.Record) error Option optionConsumer - ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error + ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool Logger logz.Adapter PartitionHandler *partitionHandler @@ -137,12 +137,12 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f } if c.ProduceDLQ != nil { - if err := c.ProduceDLQ(ctx, err, batchRecords); err != nil { + if err := c.ProduceDLQ(ctx, errOrg, batchRecords); err != nil { return fmt.Errorf("produce to DLQ failed: %w; offsets: %s", err, errorOffsetList(batchRecords)) } } else { // returning a batch error could be confusing - return fmt.Errorf("process batch failed: %w; offsets: %s", errOrg, errorOffsetList(batchRecords)) + return fmt.Errorf("process batch failed: %w; offsets: %s", err, errorOffsetList(batchRecords)) } } @@ -209,9 +209,15 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro } if err := c.iterationRecordDLQ(ctx, r); err != nil { - errOrg, _ := isDQLError(err) - errWrapped := wrapErr(r, errOrg, c.IsDLQ) - c.Logger.Error("process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String()) + errOrg, ok := isDQLError(err) + var errWrapped error + if ok { + errWrapped = wrapErr(r, errOrg.Err, c.IsDLQ) + } else { + errWrapped = wrapErr(r, err, c.IsDLQ) + } + + c.Logger.Error("DLQ process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String()) if err := wait.Sleep(ctx); err != nil { return err diff --git a/consumersingle.go b/consumersingle.go index 2a24450..c67a2bd 100644 --- a/consumersingle.go +++ b/consumersingle.go @@ -19,7 +19,7 @@ type consumerSingle[T any] struct { // PreCheck is a function that is called before the callback and decode. PreCheck func(ctx context.Context, r *kgo.Record) error Option optionConsumer - ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error + ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool Logger logz.Adapter PartitionHandler *partitionHandler @@ -124,9 +124,15 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err } if err := c.iterationRecord(ctx, r); err != nil { - errOrg, _ := isDQLError(err) - errWrapped := wrapErr(r, errOrg, c.IsDLQ) - c.Logger.Error("process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String()) + errOrg, ok := isDQLError(err) + var errWrapped error + if ok { + errWrapped = wrapErr(r, errOrg.Err, c.IsDLQ) + } else { + errWrapped = wrapErr(r, err, c.IsDLQ) + } + + c.Logger.Error("DLQ process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String()) if err := wait.Sleep(ctx); err != nil { return err @@ -144,7 +150,7 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err // iterationMain is used to listen main topics. func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) error { if err := c.iterationRecord(ctx, r); err != nil { - errOrg, ok := isDQLError(err) + errDLQ, ok := isDQLError(err) if !ok { // it is not DLQ error, return it return err @@ -152,14 +158,14 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er // send to DLQ if enabled if c.ProduceDLQ != nil { - if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil { + if err := c.ProduceDLQ(ctx, errDLQ, []*kgo.Record{r}); err != nil { return fmt.Errorf("produce to DLQ failed: %w", err) } return nil } - return errOrg + return err } return nil diff --git a/docs/notes.md b/docs/notes.md new file mode 100644 index 0000000..370dd5c --- /dev/null +++ b/docs/notes.md @@ -0,0 +1,14 @@ +# Notes + +- __max.poll.interval.ms__ is not exist in our library so it is ok to stay on process. + +## Joined new consumer to the group + +When a new joiner arrived, it will trigger the partition handler function. +That means we will skip to commit that messages and new joiner will consume the messages again and take care of commit. +Messing up the offset of the group is more dangerous problem than duplicate messages. + +## Send specific messages to DLQ topic + +When using __WrapErrDLQ__ function, it will send the error to dead letter queue topic. +Usable with index to add specific message to dead letter queue also can give different error message. diff --git a/docs/scenarios.md b/docs/scenarios.md deleted file mode 100644 index b9f1e2b..0000000 --- a/docs/scenarios.md +++ /dev/null @@ -1,8 +0,0 @@ -# Scenarios - -Some scenarios to explain what will happen if we got in that situation. - -## Group consuming without duplicate messages - -## Commit messages with different order - diff --git a/error.go b/error.go index 4763650..d147f90 100644 --- a/error.go +++ b/error.go @@ -19,35 +19,46 @@ var ( // ErrSkip is use to skip message in the PreCheck hook or Decode function. ErrSkip = fmt.Errorf("skip message") // ErrDLQ use with callback function to send message to DLQ topic. - ErrDLQ = fmt.Errorf("send to DLQ") + // Prefer to use WrapErrDLQ to wrap error. + ErrDLQ = fmt.Errorf("error DLQ") ) -// DLQIndexedError is use with callback function to send message to DLQ topic with specific index. -type DLQIndexedError struct { +// DLQError is use with callback function to send message to DLQ topic. +type DLQError struct { // Err is default error to add in header. + // If not setted, header will just show "DLQ indexed error" Err error - // Indexes if not empty, use to add error in specific index. + // Indexes to use send specific batch index to DLQ. + // If index's error is nil, default error is used. Indexes map[int]error } -func (e *DLQIndexedError) Error() string { - return "DLQ indexed error" +func WrapErrDLQ(err error) error { + return &DLQError{Err: err} } -// isDQLError check if error is DLQ error and return the original error or error. -func isDQLError(err error) (error, bool) { - if errors.Is(err, ErrDLQ) { - return unwrapErr(err), true +func (e *DLQError) Error() string { + if e.Err != nil { + return e.Err.Error() } - var errDLQIndexed *DLQIndexedError + return "DLQ indexed error" +} + +// isDQLError check if error is DLQ error and return it. +func isDQLError(err error) (*DLQError, bool) { + var errDLQIndexed *DLQError ok := errors.As(err, &errDLQIndexed) if ok { - return errDLQIndexed.Err, true + return errDLQIndexed, true } - return err, false + if errors.Is(err, ErrDLQ) { + return &DLQError{Err: err}, true + } + + return nil, false } func wrapErr(r *kgo.Record, err error, dlq bool) error { @@ -116,9 +127,6 @@ func formatRange(start, end int) string { } return strconv.Itoa(start) + "-" + strconv.Itoa(end) } -func unwrapErr(err error) error { - return errors.Unwrap(err) -} func stringHeader(headers []Header) string { var str strings.Builder diff --git a/error_test.go b/error_test.go index 203affa..2b1b366 100644 --- a/error_test.go +++ b/error_test.go @@ -1,6 +1,7 @@ package wkafka import ( + "fmt" "testing" "github.com/twmb/franz-go/pkg/kgo" @@ -82,3 +83,55 @@ func Test_errorOffsetList(t *testing.T) { }) } } + +func Test_isDQLError(t *testing.T) { + errTest := fmt.Errorf("some error") + type args struct { + err error + } + tests := []struct { + name string + args args + wantErr string + wantOk bool + }{ + { + name: "dlq error", + args: args{ + err: WrapErrDLQ(errTest), + }, + wantErr: "some error", + wantOk: true, + }, + { + name: "not dlq error", + args: args{ + err: errTest, + }, + wantOk: false, + }, + { + name: "dlq error", + args: args{ + err: fmt.Errorf("some error: %w", ErrDLQ), + }, + wantErr: "some error: error DLQ", + wantOk: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err, ok := isDQLError(tt.args.err) + if ok != tt.wantOk { + t.Errorf("isDQLError() ok = %v, wantOk %v", ok, tt.wantOk) + return + } + if !tt.wantOk { + return + } + if err.Error() != tt.wantErr { + t.Errorf("isDQLError() = %v, want %v", err, tt.wantErr) + } + }) + } +} diff --git a/example/consumer/batch.go b/example/consumer/batch.go index 7967b1f..cf0e292 100644 --- a/example/consumer/batch.go +++ b/example/consumer/batch.go @@ -50,7 +50,7 @@ func ProcessBatch(_ context.Context, msg []DataBatch) error { } if anyError { - return fmt.Errorf("test error: %w", wkafka.ErrDLQ) + return wkafka.WrapErrDLQ(fmt.Errorf("test error")) } return nil diff --git a/example/consumer/single.go b/example/consumer/single.go index 6dd7659..54adcae 100644 --- a/example/consumer/single.go +++ b/example/consumer/single.go @@ -5,7 +5,9 @@ import ( "fmt" "log/slog" "sync" + "time" + "github.com/rs/zerolog/log" "github.com/worldline-go/wkafka" ) @@ -23,20 +25,28 @@ var ( ) type DataSingle struct { - Test int `json:"test"` - IsErr bool `json:"is_err"` - IsErrFatal bool `json:"is_err_fatal"` + Test int `json:"test"` + Timeout string `json:"timeout"` + IsErr bool `json:"is_err"` + IsErrFatal bool `json:"is_err_fatal"` } func ProcessSingle(_ context.Context, msg DataSingle) error { slog.Info("callback", slog.Any("test", msg.Test), slog.Bool("is_err", msg.IsErr)) + if duration, err := time.ParseDuration(msg.Timeout); err != nil { + log.Error().Err(err).Msg("parse duration") + } else { + log.Info().Dur("duration", duration).Msg("sleep") + time.Sleep(duration) + } + if msg.IsErrFatal { return fmt.Errorf("test fatal error %d", msg.Test) } if msg.IsErr { - return fmt.Errorf("test error %d: %w", msg.Test, wkafka.ErrDLQ) + return wkafka.WrapErrDLQ(fmt.Errorf("test error %d", msg.Test)) } return nil diff --git a/producerdlq.go b/producerdlq.go index e5ae0b9..a98d2d8 100644 --- a/producerdlq.go +++ b/producerdlq.go @@ -2,7 +2,6 @@ package wkafka import ( "context" - "errors" "strconv" "time" @@ -11,24 +10,21 @@ import ( // producerDLQ to push failed records to dead letter queue. // - err could be ErrDLQIndexed or any other error -func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Record) error) func(ctx context.Context, err error, records []*kgo.Record) error { - return func(ctx context.Context, err error, records []*kgo.Record) error { +func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, records []*kgo.Record) error) func(ctx context.Context, err *DLQError, records []*kgo.Record) error { + return func(ctx context.Context, err *DLQError, records []*kgo.Record) error { recordsSend := make([]*kgo.Record, 0, len(records)) - errDLQIndexed := &DLQIndexedError{} - if !errors.As(err, &errDLQIndexed) { - errDLQIndexed.Err = err - } - for i, r := range records { - err := errDLQIndexed.Err - if len(errDLQIndexed.Indexes) > 0 { - if err := errDLQIndexed.Indexes[i]; err == nil { + errOrg := err.Err + if len(err.Indexes) > 0 { + errIndex, ok := err.Indexes[i] + if !ok { continue } - } else { - // ErrDLQ used, unwrap and show original error - err = unwrapErr(err) + + if errIndex != nil { + errOrg = errIndex + } } recordsSend = append(recordsSend, &kgo.Record{ @@ -37,7 +33,8 @@ func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Recor Value: r.Value, Headers: append( r.Headers, - kgo.RecordHeader{Key: "error", Value: []byte(err.Error())}, + kgo.RecordHeader{Key: defaultServiceNameKey, Value: clientID}, + kgo.RecordHeader{Key: "error", Value: []byte(errOrg.Error())}, kgo.RecordHeader{Key: "offset", Value: []byte(strconv.FormatInt(r.Offset, 10))}, kgo.RecordHeader{Key: "partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))}, kgo.RecordHeader{Key: "topic", Value: []byte(r.Topic)}, diff --git a/producerdlq_test.go b/producerdlq_test.go new file mode 100644 index 0000000..134358b --- /dev/null +++ b/producerdlq_test.go @@ -0,0 +1,95 @@ +package wkafka + +import ( + "context" + "errors" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kgo" +) + +func Test_producerDLQ(t *testing.T) { + clientID := []byte("test-client") + topic := "test-topic" + + errDLQ := &DLQError{ + Err: errors.New("test error"), + Indexes: map[int]error{ + 0: errors.New("indexed error"), + 2: nil, + }, + } + + orgRecords := []*kgo.Record{ + { + Topic: "original-topic", + Key: []byte("key"), + Value: []byte("value"), + Headers: []kgo.RecordHeader{}, + Offset: 123, + Partition: 1, + Timestamp: time.Now(), + }, + { + // This record should be skipped + Topic: "original-topic", + Key: []byte("key"), + Value: []byte("value-1"), + Headers: []kgo.RecordHeader{}, + Offset: 124, + Partition: 1, + Timestamp: time.Now(), + }, + { + Topic: "original-topic", + Key: []byte("key"), + Value: []byte("value-2"), + Headers: []kgo.RecordHeader{}, + Offset: 125, + Partition: 2, + Timestamp: time.Now(), + }, + } + + // Define a mock function to be wrapped by producerDLQ + mockFn := func(ctx context.Context, records []*kgo.Record) error { + for i := range records { + var orgRecord *kgo.Record + errDLQStr := errDLQ.Err.Error() + switch i { + case 0: + orgRecord = orgRecords[0] + errDLQStr = "indexed error" + case 1: + orgRecord = orgRecords[2] + } + + for _, header := range records[i].Headers { + switch header.Key { + case defaultServiceNameKey: + assert.Equal(t, clientID, header.Value) + case "error": + assert.Equal(t, errDLQStr, string(header.Value)) + case "offset": + assert.Equal(t, strconv.FormatInt(orgRecord.Offset, 10), string(header.Value)) + case "partition": + assert.Equal(t, strconv.FormatInt(int64(orgRecord.Partition), 10), string(header.Value)) + case "topic": + assert.Equal(t, orgRecord.Topic, string(header.Value)) + case "timestamp": + assert.Equal(t, orgRecord.Timestamp.Format(time.RFC3339), string(header.Value)) + } + } + } + + return nil + } + + dlqProducer := producerDLQ(topic, clientID, mockFn) + + err := dlqProducer(context.Background(), errDLQ, orgRecords) + assert.NoError(t, err) +}