Skip to content

Commit

Permalink
feat: DLQ skip modify
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jun 25, 2024
1 parent 4b005dd commit baf159a
Show file tree
Hide file tree
Showing 21 changed files with 404 additions and 92 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
```

<details><summary>Handler Example</summary>

```sh
make env

# run the example
EXAMPLE=consumer_single_handler make example
```

Add messages in here to skip the message http://localhost:7071

Go to this side for grpcUI http://localhost:8082/#/grpc/wkafka

</details>

### Producer

Use consumer client or create without consumer settings, `New` also try to connect to brokers.
Expand Down
50 changes: 45 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package wkafka

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
"golang.org/x/sync/errgroup"
)

var ErrConnection = errors.New("connect to kafka brokers failed")

type Client struct {
Kafka *kgo.Client
KafkaDLQ *kgo.Client
Expand All @@ -21,10 +26,11 @@ type Client struct {
clientID []byte
consumerConfig *ConsumerConfig
consumerMutex sync.RWMutex
logger logz.Adapter
logger Logger

// log purpose

Brokers []string
dlqTopics []string
topics []string
Meter Meter
Expand All @@ -36,6 +42,8 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
AutoTopicCreation: true,
AppName: idProgname,
Logger: logz.AdapterKV{Log: log.Logger},
Ping: true,
PingRetry: false,
}

o.apply(opts...)
Expand Down Expand Up @@ -78,11 +86,35 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
c.Kafka = kgoClient
c.KafkaDLQ = kgoClientDLQ

// main and dlq use same config, ask for validation once
if err := c.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("connection to kafka brokers: %w", err)
if o.Ping {

Check failure on line 89 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

`if o.Ping` has complex nested blocks (complexity: 8) (nestif)
if o.PingRetry {
if o.PingBackoff == nil {
o.PingBackoff = defaultBackoff()
}

b := backoff.WithContext(o.PingBackoff, ctx)

if err := backoff.RetryNotify(func() error {
if err := c.Kafka.Ping(ctx); err != nil {
return fmt.Errorf("%w: %w", ErrConnection, err)
}

return nil
}, b, func(err error, d time.Duration) {
c.logger.Warn("wkafka ping failed", "error", err.Error(), "retry_in", d.String())
}); err != nil {
return nil, err

Check failure on line 106 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/cenkalti/backoff/v4.RetryNotify(operation github.com/cenkalti/backoff/v4.Operation, b github.com/cenkalti/backoff/v4.BackOff, notify github.com/cenkalti/backoff/v4.Notify) error (wrapcheck)
}
} else {
// main and dlq use same config, ask for validation once
if err := c.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("%w: %w", ErrConnection, err)
}
}
}

c.Brokers = cfg.Brokers

return c, nil
}

Expand Down Expand Up @@ -203,6 +235,10 @@ func (c *Client) Close() {
}
}

func (c *Client) DLQTopics() []string {
return c.dlqTopics
}

// Consume starts consuming messages from kafka and blocks until context is done or an error occurs.
// - Only works if client is created with consumer config.
// - Just run one time.
Expand Down Expand Up @@ -267,7 +303,11 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error {
result := c.Kafka.ProduceSync(ctx, records...)

return result.FirstErr()
if err := result.FirstErr(); err != nil {
return errors.Join(ctx.Err(), err)
}

return nil
}

// Admin returns an admin client to manage kafka.
Expand Down
44 changes: 40 additions & 4 deletions clientoptions.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package wkafka

import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
)

// DefaultBatchCount is default batch count for batch consumer, if not set.
var DefaultBatchCount = 100

func defaultBackoff() backoff.BackOff {

Check failure on line 13 in clientoptions.go

View workflow job for this annotation

GitHub Actions / sonarcloud

defaultBackoff returns interface (github.com/cenkalti/backoff/v4.BackOff) (ireturn)
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(2*time.Second),

Check failure on line 15 in clientoptions.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Magic number: 2, in <argument> detected (gomnd)
backoff.WithMaxInterval(7*time.Second),

Check failure on line 16 in clientoptions.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Magic number: 7, in <argument> detected (gomnd)
backoff.WithMaxElapsedTime(30*time.Second),

Check failure on line 17 in clientoptions.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Magic number: 30, in <argument> detected (gomnd)
)
}

type options struct {
AppName string
ConsumerEnabled bool
Expand All @@ -18,8 +28,12 @@ type options struct {
KGOOptions []kgo.Opt
KGOOptionsDLQ []kgo.Opt
AutoTopicCreation bool
Logger logz.Adapter
Logger Logger
Meter Meter

Ping bool
PingRetry bool
PingBackoff backoff.BackOff
}

func (o *options) apply(opts ...Option) {
Expand Down Expand Up @@ -62,6 +76,7 @@ func WithClientInfo(appName, version string) Option {
// Use WithClientInfo instead if you want to set version and appname.
func WithAppName(appName string) Option {
return func(o *options) {
o.ClientID = appName + "@" + idHostname
o.AppName = appName
}
}
Expand Down Expand Up @@ -104,7 +119,7 @@ func WithConsumer(cfg ConsumerConfig) Option {
// WithLogger configures the client to use the provided logger.
// - For zerolog logz.AdapterKV{Log: logger} can usable.
// - Default is using zerolog's global logger.
func WithLogger(logger logz.Adapter) Option {
func WithLogger(logger Logger) Option {
return func(o *options) {
o.Logger = logger
}
Expand All @@ -114,7 +129,28 @@ func WithLogger(logger logz.Adapter) Option {
func WithNoLogger(v bool) Option {
return func(o *options) {
if v {
o.Logger = logz.AdapterNoop{}
o.Logger = LogNoop{}
}
}
}

// WithPing to ping kafka brokers on client creation.
// - Default is enabled.
func WithPing(v bool) Option {
return func(o *options) {
o.Ping = v
}
}

// WithPingRetry to retry ping kafka brokers on client creation.
func WithPingRetry(v bool) Option {
return func(o *options) {
o.PingRetry = v
}
}

func WithPingBackoff(b backoff.BackOff) Option {
return func(o *options) {
o.PingBackoff = b
}
}
4 changes: 1 addition & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"regexp"
"time"

"github.com/worldline-go/logz"
)

var DefaultRetryInterval = 10 * time.Second
Expand Down Expand Up @@ -45,7 +43,7 @@ type ConsumerPreConfig struct {
}

// configApply configuration to ConsumerConfig and check validation.
func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger logz.Adapter) error {
func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger Logger) error {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}
Expand Down
4 changes: 1 addition & 3 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package wkafka
import (
"reflect"
"testing"

"github.com/worldline-go/logz"
)

func TestConsumerPreConfig_Apply(t *testing.T) {
Expand Down Expand Up @@ -57,7 +55,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
Validation: tt.fields.Validation,
FormatDLQTopic: "finops_{{.AppName}}_dlq",
}
err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{})
err := configApply(c, &tt.args.consumerConfig, "serviceX", LogNoop{})
if (err != nil) != tt.wantErr {
t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
7 changes: 5 additions & 2 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
)

type consumerBatch[T any] struct {
Expand All @@ -22,7 +21,7 @@ type consumerBatch[T any] struct {
Option optionConsumer
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
Logger Logger
PartitionHandler *partitionHandler
IsDLQ bool
Meter Meter
Expand Down Expand Up @@ -96,6 +95,8 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f
// skip precheck and record section
/////////////////////////////////
if c.Skip(c.Cfg, r) {
c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)

continue
}

Expand Down Expand Up @@ -245,6 +246,8 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro

func (c *consumerBatch[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error {
if c.Skip(c.Cfg, r) {
c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
)

type consumerSingle[T any] struct {
Expand All @@ -22,7 +21,7 @@ type consumerSingle[T any] struct {
Option optionConsumer
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
Logger Logger
PartitionHandler *partitionHandler
IsDLQ bool
Meter Meter
Expand Down Expand Up @@ -180,6 +179,8 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er

func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error {
if c.Skip(c.Cfg, r) {
c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions env/config/turna.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
server:
entrypoints:
web:
address: ":8082"
http:
middlewares:
view:
view:
prefix_path: /
info:
grpc:
- name: "wkafka"
addr: "dns:///localhost:8080"
routers:
view:
path: /
middlewares:
- view
7 changes: 6 additions & 1 deletion env/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ services:
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
redpanda:
image: docker.io/redpandadata/console:v2.6.0
image: docker.io/redpandadata/console:v2.3.10
ports:
- "7071:7071"
environment:
- KAFKA_BROKERS=kafka:9094
- SERVER_LISTENPORT=7071
turna:
image: ghcr.io/rakunlabs/turna:v0.7.0-alpine3.20.0
network_mode: host
volumes:
- ./config/turna.yaml:/turna.yaml
2 changes: 1 addition & 1 deletion example/admin/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
)

func RunExampleList(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.New(ctx, kafkaConfigList)
client, err := wkafka.New(ctx, kafkaConfigList, wkafka.WithPingRetry(true))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error {
defer client.Close()

mux := http.NewServeMux()
mux.Handle(handler.New(client))
mux.Handle(handler.New(client, handler.WithLogger(slog.Default())))

reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName)

Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ go 1.22
require (
connectrpc.com/connect v1.16.2
connectrpc.com/grpcreflect v1.2.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go/pkg/kadm v1.10.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/tlscfg v1.2.1
github.com/worldline-go/initializer v0.3.2
github.com/worldline-go/logz v0.5.0
Expand All @@ -20,14 +21,14 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rakunlabs/logi v0.2.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
Loading

0 comments on commit baf159a

Please sign in to comment.