Skip to content

Commit

Permalink
fix: error wrap
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 7, 2024
1 parent b46a100 commit ab04a93
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 81 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
args: --timeout 5m --new-from-rev=HEAD~1 --issues-exit-code=0
args: --timeout 5m --new-from-rev=HEAD~1
- name: Run tests
run: |
GOPATH="$(dirname ${PWD})" golangci-lint run --out-format checkstyle ./... > golangci-lint-report.out || true
go test -coverprofile=coverage.out -json ./... > test-report.out
GOPATH="$(dirname ${PWD})" golangci-lint run --out-format --issues-exit-code=0 checkstyle ./... > golangci-lint-report.out || true
go test -short -coverprofile=coverage.out -json ./... > test-report.out
- name: SonarCloud Scan
uses: sonarsource/sonarcloud-github-action@master
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/bin
/.golangci.yml
/coverage.*
/*.out
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ lint: .golangci.yml bin/golangci-lint-$(GOLANGCI_LINT_VERSION) ## Lint Go files
test: ## Run unit tests
@go test -v -race ./...

.PHONY: test-short
test-short: ## Run unit tests short
@go test -v -race -short ./...

.PHONY: test-without-cache
test-without-cache: ## Run unit tests without cache
@go test -count=1 -v -race ./...
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil {
}
```

Send record to dead letter queue, use __WrapErrDLQ__ function with to wrap the error and it will be send to dead letter queue.

> Check the aditional options for custom decode and precheck.
### Producer
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName s
}

if consumerConfig.DLQ.SkipExtra == nil {
consumerConfig.DLQ.SkipExtra = map[string]map[int32]Offsets{
consumerConfig.DLQ.SkipExtra = map[string]map[int32]OffsetConfig{
consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip,
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
},
want: ConsumerConfig{
GroupID: "finops_test",
DLQ: DLQ{
DLQ: DLQConfig{
Topic: "finops_serviceX_dlq",
SkipExtra: map[string]map[int32]Offsets{
SkipExtra: map[string]map[int32]OffsetConfig{
"finops_serviceX_dlq": nil,
},
RetryInterval: DefaultRetryInterval,
Expand Down
18 changes: 9 additions & 9 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ConsumerConfig struct {
// - 31
// - 90
// before: 20 // skip all offsets before or equal to this offset
Skip map[string]map[int32]Offsets `cfg:"skip"`
Skip map[string]map[int32]OffsetConfig `cfg:"skip"`
// MaxPollRecords is the maximum number of records returned in a single call to poll.
// - Default is max.poll.records in the broker configuration, usually 500.
// - Fetching messages from broker, this is not related with batch processing!
Expand All @@ -46,10 +46,10 @@ type ConsumerConfig struct {
// - Default is 100.
BatchCount int `cfg:"batch_count"`
// DLQ is a dead letter queue configuration.
DLQ DLQ `cfg:"dlq"`
DLQ DLQConfig `cfg:"dlq"`
}

type DLQ struct {
type DLQConfig struct {
// Disable is a flag to disable DLQ.
// - Default is false.
// - If topic is not set, it will be generated from format_dlq_topic.
Expand All @@ -73,16 +73,16 @@ type DLQ struct {
// - 31
// - 90
// before: 20 // skip all offsets before or equal to this offset
Skip map[int32]Offsets `cfg:"skip"`
Skip map[int32]OffsetConfig `cfg:"skip"`
// Topic is a topic name to send messages that failed to process also could be used for DLQ.
Topic string `cfg:"topic"`
// TopicExtra is extra a list of kafka topics to just consume from DLQ.
TopicsExtra []string `cfg:"topics_extra"`
// SkipExtra are optional message offsets to be skipped for topicsExtra.
SkipExtra map[string]map[int32]Offsets `cfg:"skip_extra"`
SkipExtra map[string]map[int32]OffsetConfig `cfg:"skip_extra"`
}

type Offsets struct {
type OffsetConfig struct {
// Offsets is a list of offsets numbers in that partition to skip.
Offsets []int64 `cfg:"offsets"`
// Before skips all offsets before or equal to this offset.
Expand Down Expand Up @@ -261,7 +261,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
}
}

func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Record) (T, error), func(ctx context.Context, err error, records []*kgo.Record) error) {
func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Record) (T, error), func(ctx context.Context, err *DLQError, records []*kgo.Record) error) {

Check failure on line 264 in consumer.go

View workflow job for this annotation

GitHub Actions / sonarcloud

line is 168 characters (lll)
var decode func(raw []byte, r *kgo.Record) (T, error)

var msg T
Expand All @@ -272,9 +272,9 @@ func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Reco
decode = codecJSON[T]{}.Decode
}

var produceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
var produceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
if !o.ConsumerConfig.DLQ.Disable {
produceDLQ = producerDLQ(o.ConsumerConfig.DLQ.Topic, o.Client.ProduceRaw)
produceDLQ = producerDLQ(o.ConsumerConfig.DLQ.Topic, o.Client.clientID, o.Client.ProduceRaw)
}

return decode, produceDLQ
Expand Down
18 changes: 9 additions & 9 deletions consumer2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Test_skip(t *testing.T) {
name: "skip topic",
args: args{
cfg: &ConsumerConfig{
Skip: map[string]map[int32]Offsets{
Skip: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Offsets: []int64{
Expand All @@ -62,7 +62,7 @@ func Test_skip(t *testing.T) {
name: "skip topic before",
args: args{
cfg: &ConsumerConfig{
Skip: map[string]map[int32]Offsets{
Skip: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Before: 5,
Expand All @@ -82,7 +82,7 @@ func Test_skip(t *testing.T) {
name: "topic before",
args: args{
cfg: &ConsumerConfig{
Skip: map[string]map[int32]Offsets{
Skip: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Before: 5,
Expand Down Expand Up @@ -145,8 +145,8 @@ func Test_skipDLQ(t *testing.T) {
name: "skip topic",
args: args{
cfg: &ConsumerConfig{
DLQ: DLQ{
SkipExtra: map[string]map[int32]Offsets{
DLQ: DLQConfig{
SkipExtra: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Offsets: []int64{
Expand All @@ -169,8 +169,8 @@ func Test_skipDLQ(t *testing.T) {
name: "skip topic before",
args: args{
cfg: &ConsumerConfig{
DLQ: DLQ{
SkipExtra: map[string]map[int32]Offsets{
DLQ: DLQConfig{
SkipExtra: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Before: 5,
Expand All @@ -191,8 +191,8 @@ func Test_skipDLQ(t *testing.T) {
name: "topic before",
args: args{
cfg: &ConsumerConfig{
DLQ: DLQ{
SkipExtra: map[string]map[int32]Offsets{
DLQ: DLQConfig{
SkipExtra: map[string]map[int32]OffsetConfig{
"topic": {
0: {
Before: 5,
Expand Down
4 changes: 4 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (c *Counter[T]) Count(ctx context.Context, msg T) error {
}

func Test_GroupConsuming(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}

client, err := tkafka.TestClient()
if err != nil {
t.Fatalf("TestClient() error = %v", err)
Expand Down
18 changes: 12 additions & 6 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type consumerBatch[T any] struct {
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
PartitionHandler *partitionHandler
Expand Down Expand Up @@ -137,12 +137,12 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f
}

if c.ProduceDLQ != nil {
if err := c.ProduceDLQ(ctx, err, batchRecords); err != nil {
if err := c.ProduceDLQ(ctx, errOrg, batchRecords); err != nil {
return fmt.Errorf("produce to DLQ failed: %w; offsets: %s", err, errorOffsetList(batchRecords))
}
} else {
// returning a batch error could be confusing
return fmt.Errorf("process batch failed: %w; offsets: %s", errOrg, errorOffsetList(batchRecords))
return fmt.Errorf("process batch failed: %w; offsets: %s", err, errorOffsetList(batchRecords))
}
}

Expand Down Expand Up @@ -209,9 +209,15 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro
}

if err := c.iterationRecordDLQ(ctx, r); err != nil {
errOrg, _ := isDQLError(err)
errWrapped := wrapErr(r, errOrg, c.IsDLQ)
c.Logger.Error("process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String())
errOrg, ok := isDQLError(err)
var errWrapped error
if ok {
errWrapped = wrapErr(r, errOrg.Err, c.IsDLQ)
} else {
errWrapped = wrapErr(r, err, c.IsDLQ)
}

c.Logger.Error("DLQ process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String())

if err := wait.Sleep(ctx); err != nil {
return err
Expand Down
20 changes: 13 additions & 7 deletions consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type consumerSingle[T any] struct {
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
PartitionHandler *partitionHandler
Expand Down Expand Up @@ -124,9 +124,15 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err
}

if err := c.iterationRecord(ctx, r); err != nil {
errOrg, _ := isDQLError(err)
errWrapped := wrapErr(r, errOrg, c.IsDLQ)
c.Logger.Error("process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String())
errOrg, ok := isDQLError(err)
var errWrapped error
if ok {
errWrapped = wrapErr(r, errOrg.Err, c.IsDLQ)
} else {
errWrapped = wrapErr(r, err, c.IsDLQ)
}

c.Logger.Error("DLQ process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String())

if err := wait.Sleep(ctx); err != nil {
return err
Expand All @@ -144,22 +150,22 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err
// iterationMain is used to listen main topics.
func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) error {
if err := c.iterationRecord(ctx, r); err != nil {
errOrg, ok := isDQLError(err)
errDLQ, ok := isDQLError(err)
if !ok {
// it is not DLQ error, return it
return err
}

// send to DLQ if enabled
if c.ProduceDLQ != nil {
if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil {
if err := c.ProduceDLQ(ctx, errDLQ, []*kgo.Record{r}); err != nil {
return fmt.Errorf("produce to DLQ failed: %w", err)
}

return nil
}

return errOrg
return err
}

return nil
Expand Down
14 changes: 14 additions & 0 deletions docs/notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Notes

- __max.poll.interval.ms__ is not exist in our library so it is ok to stay on process.

## Joined new consumer to the group

When a new joiner arrived, it will trigger the partition handler function.
That means we will skip to commit that messages and new joiner will consume the messages again and take care of commit.
Messing up the offset of the group is more dangerous problem than duplicate messages.

## Send specific messages to DLQ topic

When using __WrapErrDLQ__ function, it will send the error to dead letter queue topic.
Usable with index to add specific message to dead letter queue also can give different error message.
8 changes: 0 additions & 8 deletions docs/scenarios.md

This file was deleted.

40 changes: 24 additions & 16 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,46 @@ var (
// ErrSkip is use to skip message in the PreCheck hook or Decode function.
ErrSkip = fmt.Errorf("skip message")
// ErrDLQ use with callback function to send message to DLQ topic.
ErrDLQ = fmt.Errorf("send to DLQ")
// Prefer to use WrapErrDLQ to wrap error.
ErrDLQ = fmt.Errorf("error DLQ")
)

// DLQIndexedError is use with callback function to send message to DLQ topic with specific index.
type DLQIndexedError struct {
// DLQError is use with callback function to send message to DLQ topic.
type DLQError struct {
// Err is default error to add in header.
// If not setted, header will just show "DLQ indexed error"
Err error
// Indexes if not empty, use to add error in specific index.
// Indexes to use send specific batch index to DLQ.
// If index's error is nil, default error is used.
Indexes map[int]error
}

func (e *DLQIndexedError) Error() string {
return "DLQ indexed error"
func WrapErrDLQ(err error) error {
return &DLQError{Err: err}
}

// isDQLError check if error is DLQ error and return the original error or error.
func isDQLError(err error) (error, bool) {
if errors.Is(err, ErrDLQ) {
return unwrapErr(err), true
func (e *DLQError) Error() string {
if e.Err != nil {
return e.Err.Error()
}

var errDLQIndexed *DLQIndexedError
return "DLQ indexed error"
}

// isDQLError check if error is DLQ error and return it.
func isDQLError(err error) (*DLQError, bool) {
var errDLQIndexed *DLQError

ok := errors.As(err, &errDLQIndexed)
if ok {
return errDLQIndexed.Err, true
return errDLQIndexed, true
}

return err, false
if errors.Is(err, ErrDLQ) {
return &DLQError{Err: err}, true
}

return nil, false
}

func wrapErr(r *kgo.Record, err error, dlq bool) error {
Expand Down Expand Up @@ -116,9 +127,6 @@ func formatRange(start, end int) string {
}
return strconv.Itoa(start) + "-" + strconv.Itoa(end)
}
func unwrapErr(err error) error {
return errors.Unwrap(err)
}

func stringHeader(headers []Header) string {
var str strings.Builder
Expand Down
Loading

0 comments on commit ab04a93

Please sign in to comment.