-
Notifications
You must be signed in to change notification settings - Fork 0
/
producerdlq.go
48 lines (41 loc) · 1.35 KB
/
producerdlq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package wkafka
import (
"context"
"strconv"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
// producerDLQ to push failed records to dead letter queue.
// - err could be ErrDLQIndexed or any other error
func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, records []*kgo.Record) error) func(ctx context.Context, err *DLQError, records []*kgo.Record) error {
return func(ctx context.Context, err *DLQError, records []*kgo.Record) error {
recordsSend := make([]*kgo.Record, 0, len(records))
for i, r := range records {
errOrg := err.Err
if len(err.Indexes) > 0 {
errIndex, ok := err.Indexes[i]
if !ok {
continue
}
if errIndex != nil {
errOrg = errIndex
}
}
recordsSend = append(recordsSend, &kgo.Record{
Topic: topic,
Key: r.Key,
Value: r.Value,
Headers: append(
r.Headers,
kgo.RecordHeader{Key: "process", Value: clientID},
kgo.RecordHeader{Key: "error", Value: []byte(errOrg.Error())},
kgo.RecordHeader{Key: "offset", Value: []byte(strconv.FormatInt(r.Offset, 10))},
kgo.RecordHeader{Key: "partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))},
kgo.RecordHeader{Key: "topic", Value: []byte(r.Topic)},
kgo.RecordHeader{Key: "timestamp", Value: []byte(r.Timestamp.Format(time.RFC3339))},
),
})
}
return fn(ctx, recordsSend)
}
}