Skip to content

Commit

Permalink
fix: consumer update
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Oct 18, 2023
1 parent d3c25d4 commit ab9f604
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 190 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/bin
/.golangci.yml
/coverage.*
5 changes: 0 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,3 @@ func (c *Client) produceRaw(ctx context.Context, records []*kgo.Record) error {

return result.FirstErr()

Check failure on line 116 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (github.com/twmb/franz-go/pkg/kgo.ProduceResults).FirstErr() error (wrapcheck)

Check failure on line 116 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (github.com/twmb/franz-go/pkg/kgo.ProduceResults).FirstErr() error (wrapcheck)
}

// Producer to create a producer to send message to kafka.
func (c *Client) Producer(config ProducerConfig) (Producer, error) {
return newProducer(c, config)
}
8 changes: 7 additions & 1 deletion clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ func WithKGOOptions(opts ...kgo.Opt) Option {
}

// WithConsumer sets the listener to use.
func WithConsumer[T any](cfg ConsumeConfig, callback func(ctx context.Context, msg T) error, decode func([]byte) (T, error)) Option {
func WithConsumer[T any](
cfg ConsumeConfig,
callback func(ctx context.Context, msg T) error,
decode func([]byte) (T, error),
preCheck func(context.Context, *Record) error,
) Option {
return func(o *options) {
decode := decode
if decode == nil {
Expand All @@ -56,6 +61,7 @@ func WithConsumer[T any](cfg ConsumeConfig, callback func(ctx context.Context, m
Callback: callback,
Cfg: cfg,
Decode: decode,
PreCheck: preCheck,
}
}
}
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "encoding/json"

type codecJSON[T any] struct{}

func (codecJSON[T]) Encode(data any) ([]byte, error) {
func (codecJSON[T]) Encode(data T) ([]byte, error) {
return json.Marshal(data)

Check failure on line 8 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Marshal(v any) ([]byte, error) (wrapcheck)

Check failure on line 8 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Marshal(v any) ([]byte, error) (wrapcheck)
}

Expand Down
103 changes: 86 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wkafka

import (
"context"
"errors"
"fmt"

"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -52,6 +53,8 @@ type consume[T any] struct {
Callback func(ctx context.Context, msg T) error
Cfg ConsumeConfig
Decode func(raw []byte) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
}

type consumer interface {
Expand All @@ -63,6 +66,34 @@ func (c consume[T]) config() ConsumeConfig {
return c.Cfg
}

func (c consume[T]) skip(r *kgo.Record) bool {
if c.Cfg.Skip == nil {
return false
}

if _, ok := c.Cfg.Skip[r.Topic]; !ok {
return false
}

if _, ok := c.Cfg.Skip[r.Topic][r.Partition]; !ok {
return false
}

offsets := c.Cfg.Skip[r.Topic][r.Partition]

if offsets.Before > 0 && r.Offset <= offsets.Before {
return true
}

for _, offset := range offsets.Offsets {
if r.Offset == offset {
return true
}
}

return false
}

func (c consume[T]) Consume(ctx context.Context, cl *kgo.Client) error {
for {
fetches := cl.PollFetches(ctx)
Expand All @@ -72,35 +103,73 @@ func (c consume[T]) Consume(ctx context.Context, cl *kgo.Client) error {

var errP error
fetches.EachError(func(t string, p int32, err error) {
if errors.Is(err, context.Canceled) {
errP = err

return
}

errP = fmt.Errorf("fetch err topic %s partition %d: %w", t, p, err)
})
if errP != nil {
return errP
}

// var rs []*kgo.Record
fetches.EachRecord(func(r *kgo.Record) {
data, err := c.Decode(r.Value)
if err != nil {
errP = fmt.Errorf("decode record failed: %w", err)

return
for iter := fetches.RecordIter(); !iter.Done(); {
if err := c.iteration(ctx, cl, iter); err != nil {
return err
}
if err := c.Callback(ctx, data); err != nil {
errP = fmt.Errorf("callback failed: %w", err)
}
}
}

return
}
func (c consume[T]) iteration(ctx context.Context, cl *kgo.Client, iter *kgo.FetchesRecordIter) (err error) {
r := iter.Next()
if r == nil {
return nil
}

if err := cl.CommitRecords(ctx, r); err != nil {
errP = fmt.Errorf("commit records failed: %w", err)
defer func() {
if err != nil {
err = wrapErr(r, err)

return
return
}

if errCommit := cl.CommitRecords(ctx, r); errCommit != nil {
err = wrapErr(r, fmt.Errorf("commit records failed: %w", errCommit))

return
}
}()

if c.skip(r) {
return nil
}

if c.PreCheck != nil {
if err := c.PreCheck(ctx, r); err != nil {
if errors.Is(err, ErrSkip) {
return nil
}
})

if errP != nil {
return errP
return fmt.Errorf("pre check failed: %w", err)
}
}

data, err := c.Decode(r.Value)
if err != nil {
if errors.Is(err, ErrSkip) {
return nil
}

return fmt.Errorf("decode record failed: %w", err)
}

ctxCallback := context.WithValue(ctx, KeyRecord, r)
if err := c.Callback(ctxCallback, data); err != nil {
return fmt.Errorf("callback failed: %w", err)
}

return nil
}
22 changes: 22 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package wkafka

import "context"

type ctxKey string

const (
// KeyRecord is the context key for *Record.
KeyRecord ctxKey = "kafka_record"
)

// ContextRecord returns the Record from the context in callback function.
// - If the context is nil, or the Record is not set, nil is returned.
func ContextRecord(ctx context.Context) *Record {
if ctx == nil {
return nil
}

record, _ := ctx.Value(KeyRecord).(*Record)

return record
}
33 changes: 32 additions & 1 deletion error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
package wkafka

import "fmt"
import (
"fmt"
"strings"

"github.com/twmb/franz-go/pkg/kgo"
)

var (
ErrNotImplemented = fmt.Errorf("not implemented")
ErrClientClosed = fmt.Errorf("client closed")
ErrNilData = fmt.Errorf("nil data")
// ErrSkip is use to skip message in the PreCheck hook.
ErrSkip = fmt.Errorf("skip message")
)

func wrapErr(r *kgo.Record, err error) error {
return fmt.Errorf("message error - topic: %q, partition: %d, offset: %d, key: `%s`, headers: `%s` value: `%s`: %w",
r.Topic, r.Partition, r.Offset, r.Key, stringHeader(r.Headers), r.Value, err,
)
}

func stringHeader(headers []Header) string {
var str strings.Builder
str.WriteString("{")
for i, header := range headers {
str.WriteString(fmt.Sprintf("%q: %q", header.Key, header.Value))

if i == len(headers)-1 {
continue
}

str.WriteString(",")
}

str.WriteString("}")

return str.String()
}
23 changes: 20 additions & 3 deletions example/consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
Expand All @@ -20,21 +21,37 @@ var (
}
)

func callBack(_ context.Context, msg map[string]interface{}) error {
slog.Info("callback", slog.Any("msg", msg))
func callBack(ctx context.Context, msg map[string]interface{}) error {
record := wkafka.ContextRecord(ctx)

slog.Info("callback", slog.Any("msg", msg), slog.String("topic", record.Topic), slog.String("key", string(record.Key)))

return nil
}

func decode(data []byte) (map[string]interface{}, error) {
if !json.Valid(data) {
return nil, wkafka.ErrSkip
}

var msg map[string]interface{}
if err := json.Unmarshal(data, &msg); err != nil {
return nil, err

Check failure on line 39 in example/consume/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Unmarshal(data []byte, v any) error (wrapcheck)

Check failure on line 39 in example/consume/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Unmarshal(data []byte, v any) error (wrapcheck)
}

return msg, nil
}

func main() {
initializer.Init(run)
}

func run(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.NewClient(kafkaConfig, wkafka.WithConsumer(consumeConfig, callBack, nil))
client, err := wkafka.NewClient(kafkaConfig, wkafka.WithConsumer(consumeConfig, callBack, decode, nil))
if err != nil {
return err

Check failure on line 52 in example/consume/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)

Check failure on line 52 in example/consume/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)
}

defer client.Close()

if err := client.Consume(ctx); err != nil {
Expand Down
36 changes: 13 additions & 23 deletions example/produce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,21 @@ type Data struct {
Details map[string]interface{}
}

func (d *Data) ProducerEncode() ([]byte, error) {
return []byte(d.Name), nil
}

func (d *Data) ProducerKey() []byte {
return []byte(d.Name)
}

func (d *Data) ProducerHeaders() []wkafka.Header {
return []wkafka.Header{
{
Key: "name",
Value: []byte(d.Name),
},
func (d *Data) ProducerHook(r *wkafka.Record) *wkafka.Record {
if d == nil {
return r
}
}

func (d *Data) ProducerTopic() string {
return d.Topic
}
r.Value = []byte(d.Name)
r.Headers = append(r.Headers, wkafka.Header{
Key: "name",
Value: []byte(d.Name),
})
r.Key = []byte(d.Name)
r.Topic = d.Topic

var _ wkafka.ProducerEncode = (*Data)(nil)
var _ wkafka.ProducerKey = (*Data)(nil)
var _ wkafka.ProducerHeaders = (*Data)(nil)
var _ wkafka.ProducerTopic = (*Data)(nil)
return r
}

func main() {
initializer.Init(run)
Expand All @@ -65,7 +55,7 @@ func run(ctx context.Context, _ *sync.WaitGroup) error {
},
}

producer, err := client.Producer(wkafka.ProducerConfig{})
producer, err := wkafka.NewProducer(client, wkafka.ProducerConfig[*Data]{})
if err != nil {
return err

Check failure on line 60 in example/produce/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewProducer[T any](client *github.com/worldline-go/wkafka.Client, cfg github.com/worldline-go/wkafka.ProducerConfig[T]) (github.com/worldline-go/wkafka.Producer[T], error) (wrapcheck)

Check failure on line 60 in example/produce/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewProducer[T any](client *github.com/worldline-go/wkafka.Client, cfg github.com/worldline-go/wkafka.ProducerConfig[T]) (github.com/worldline-go/wkafka.Producer[T], error) (wrapcheck)
}
Expand Down
Loading

0 comments on commit ab9f604

Please sign in to comment.