Skip to content

Commit

Permalink
feat: skip handler
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jun 12, 2024
1 parent 2bd0eb7 commit 8bd7767
Show file tree
Hide file tree
Showing 24 changed files with 1,111 additions and 200 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ Send record to dead letter queue, use __WrapErrDLQ__ function with to wrap the e

> Check the aditional options for custom decode and precheck.

#### Skip Handler

Editing the skip map and use our handler to initialize server mux.

```go
mux := http.NewServeMux()
mux.Handle(handler.New(client))

reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName)

mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
```

### Producer

Use consumer client or create without consumer settings, `New` also try to connect to brokers.
Expand Down
22 changes: 20 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wkafka
import (
"context"
"fmt"
"sync"

"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kadm"
Expand All @@ -19,6 +20,7 @@ type Client struct {

clientID []byte
consumerConfig *ConsumerConfig
consumerMutex sync.RWMutex
logger logz.Adapter

// log purpose
Expand All @@ -34,7 +36,6 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
AutoTopicCreation: true,
AppName: idProgname,
Logger: logz.AdapterKV{Log: log.Logger},
Meter: EmptyMeter(),
}

o.apply(opts...)
Expand All @@ -50,6 +51,10 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
}
}

if o.Meter == nil {
o.Meter = noopMeter()
}

c := &Client{
consumerConfig: o.ConsumerConfig,
logger: o.Logger,
Expand Down Expand Up @@ -204,7 +209,7 @@ func (c *Client) Close() {
func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error {
o := optionConsumer{
Client: c,
ConsumerConfig: *c.consumerConfig,
ConsumerConfig: c.consumerConfig,
Meter: c.Meter,
}

Expand Down Expand Up @@ -269,3 +274,16 @@ func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error {
func (c *Client) Admin() *kadm.Client {
return kadm.NewClient(c.Kafka)
}

func (c *Client) Skip(modify func(SkipMap) SkipMap) {
c.consumerMutex.Lock()
defer c.consumerMutex.Unlock()

if modify == nil {
return
}

c.consumerConfig.Skip = modify(c.consumerConfig.Skip)

c.logger.Debug("wkafka skip modified", "skip", c.consumerConfig.Skip)
}
2 changes: 2 additions & 0 deletions clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func WithKGOOptionsDLQ(opts ...kgo.Opt) Option {
}
}

// WithConsumer configures the client to use the provided consumer config.
// - It is shallow copied and to make safe use skip function to modify skip map.
func WithConsumer(cfg ConsumerConfig) Option {
return func(o *options) {
o.ConsumerConfig = &cfg
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName s
}
}

if consumerConfig.DLQ.SkipExtra == nil {
consumerConfig.DLQ.SkipExtra = map[string]map[int32]OffsetConfig{
if consumerConfig.Skip == nil {
consumerConfig.Skip = map[string]map[int32]OffsetConfig{
consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip,
}
} else {
consumerConfig.DLQ.SkipExtra[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip
consumerConfig.Skip[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip
}

if consumerConfig.DLQ.RetryInterval == 0 {
Expand Down
8 changes: 4 additions & 4 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
},
want: ConsumerConfig{
GroupID: "finops_test",
Skip: map[string]map[int32]OffsetConfig{
"finops_serviceX_dlq": nil,
},
DLQ: DLQConfig{
Topic: "finops_serviceX_dlq",
SkipExtra: map[string]map[int32]OffsetConfig{
"finops_serviceX_dlq": nil,
},
Topic: "finops_serviceX_dlq",
RetryInterval: DefaultRetryInterval,
},
},
Expand Down
85 changes: 17 additions & 68 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

type SkipMap = map[string]map[int32]OffsetConfig

type ConsumerConfig struct {
// Topics is a list of kafka topics to consume.
// Required at least one topic, topic name if not exist will be created or consumer waits for topic creation.
Expand Down Expand Up @@ -79,8 +81,6 @@ type DLQConfig struct {
Topic string `cfg:"topic"`
// TopicExtra is extra a list of kafka topics to just consume from DLQ.
TopicsExtra []string `cfg:"topics_extra"`
// SkipExtra are optional message offsets to be skipped for topicsExtra.
SkipExtra map[string]map[int32]OffsetConfig `cfg:"skip_extra"`
}

type OffsetConfig struct {
Expand Down Expand Up @@ -110,72 +110,11 @@ type consumer interface {
setPreCheck(fn func(ctx context.Context, r *kgo.Record) error)
}

func skip(cfg *ConsumerConfig, r *kgo.Record) bool {
if len(cfg.Skip) == 0 {
return false
}

if _, ok := cfg.Skip[r.Topic]; !ok {
return false
}

if _, ok := cfg.Skip[r.Topic][r.Partition]; !ok {
return false
}

offsets := cfg.Skip[r.Topic][r.Partition]

if offsets.Before > 0 && r.Offset <= offsets.Before {
return true
}

for _, offset := range offsets.Offsets {
if r.Offset == offset {
return true
}
}

return false
}

func skipDLQ(cfg *ConsumerConfig, r *kgo.Record) bool {
dlqCfg := cfg.DLQ
if dlqCfg.Disable {
return false
}

if len(dlqCfg.SkipExtra) == 0 {
return false
}

if _, ok := dlqCfg.SkipExtra[r.Topic]; !ok {
return false
}

if _, ok := dlqCfg.SkipExtra[r.Topic][r.Partition]; !ok {
return false
}

offsets := dlqCfg.SkipExtra[r.Topic][r.Partition]

if offsets.Before > 0 && r.Offset <= offsets.Before {
return true
}

for _, offset := range offsets.Offsets {
if r.Offset == offset {
return true
}
}

return false
}

type optionConsumer struct {
Client *Client
Consumer consumer
ConsumerDLQ consumer
ConsumerConfig ConsumerConfig
ConsumerConfig *ConsumerConfig
Meter Meter
}

Expand Down Expand Up @@ -213,20 +152,25 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Skip: newSkipper(&o.Client.consumerMutex, false),
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandler,
Meter: o.Meter,
}

if o.ConsumerConfig.DLQ.Disable {
return nil
}

o.ConsumerDLQ = &consumerBatch[T]{
Decode: decode,
ProcessDLQ: dlqProcessBatch(fn),
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable),
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
Meter: o.Meter,
}

return nil
Expand All @@ -245,20 +189,25 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Skip: newSkipper(&o.Client.consumerMutex, false),
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandler,
Meter: o.Meter,
}

if o.ConsumerConfig.DLQ.Disable {
return nil
}

o.ConsumerDLQ = &consumerSingle[T]{
ProcessDLQ: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable),
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
Meter: o.Meter,
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type consumerBatch[T any] struct {
Process func(ctx context.Context, msg []T) error
// ProcessDLQ is nil for main consumer.
ProcessDLQ func(ctx context.Context, msg T) error
Cfg ConsumerConfig
Cfg *ConsumerConfig
Decode func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Expand Down Expand Up @@ -95,7 +95,7 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f

// skip precheck and record section
/////////////////////////////////
if c.Skip(&c.Cfg, r) {
if c.Skip(c.Cfg, r) {
continue
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro
}

func (c *consumerBatch[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error {
if c.Skip(&c.Cfg, r) {
if c.Skip(c.Cfg, r) {
return nil
}

Expand Down
6 changes: 2 additions & 4 deletions consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type consumerSingle[T any] struct {
Process func(ctx context.Context, msg T) error
// ProcessDLQ is nil for main consumer.
ProcessDLQ func(ctx context.Context, msg T) error
Cfg ConsumerConfig
Cfg *ConsumerConfig
Decode func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Expand Down Expand Up @@ -64,7 +64,6 @@ func (c *consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error {
func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error {
for iter := fetch.RecordIter(); !iter.Done(); {
r := iter.Next()

// check partition is revoked
if c.PartitionHandler.IsRevokedRecord(r) {
continue
Expand All @@ -88,7 +87,6 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch
}
} else {
// listening main topics

if err := c.iterationMain(ctx, r); err != nil {
c.Meter.Meter(start, 1, r.Topic, err, false)
return wrapErr(r, err, c.IsDLQ)
Expand Down Expand Up @@ -181,7 +179,7 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er
}

func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error {
if c.Skip(&c.Cfg, r) {
if c.Skip(c.Cfg, r) {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion env/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
redpanda:
image: docker.io/redpandadata/console:v2.3.8
image: docker.io/redpandadata/console:v2.6.0
ports:
- "7071:7071"
environment:
Expand Down
10 changes: 5 additions & 5 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
)

var (
errNotImplemented = fmt.Errorf("not implemented")
errClientClosed = fmt.Errorf("client closed")
errPartitionRevoked = fmt.Errorf("partition revoked")
errNotImplemented = errors.New("not implemented")
errClientClosed = errors.New("client closed")
errPartitionRevoked = errors.New("partition revoked")

// ErrSkip is use to skip message in the PreCheck hook or Decode function.
ErrSkip = fmt.Errorf("skip message")
ErrSkip = errors.New("skip message")
// ErrDLQ use with callback function to send message to DLQ topic.
// Prefer to use WrapErrDLQ to wrap error.
ErrDLQ = fmt.Errorf("error DLQ")
ErrDLQ = errors.New("error DLQ")
)

// DLQError is use with callback function to send message to DLQ topic.
Expand Down
Loading

0 comments on commit 8bd7767

Please sign in to comment.