From ab9f6047b2716a5bd7d2aa0b1701e89b9a44efb6 Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Wed, 18 Oct 2023 03:46:19 +0200 Subject: [PATCH] fix: consumer update Signed-off-by: Eray Ates --- .gitignore | 1 + client.go | 5 -- clientoptions.go | 8 ++- codec.go | 2 +- consumer.go | 103 +++++++++++++++++++++++++----- context.go | 22 +++++++ error.go | 33 +++++++++- example/consume/main.go | 23 ++++++- example/produce/main.go | 36 ++++------- producer.go | 138 ++++++++++++++++------------------------ producer_test.go | 115 +++++++++++++++++---------------- 11 files changed, 296 insertions(+), 190 deletions(-) create mode 100644 context.go diff --git a/.gitignore b/.gitignore index 05e8c69..cf435ad 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /bin /.golangci.yml +/coverage.* diff --git a/client.go b/client.go index 073d3ac..7c72431 100644 --- a/client.go +++ b/client.go @@ -115,8 +115,3 @@ func (c *Client) produceRaw(ctx context.Context, records []*kgo.Record) error { return result.FirstErr() } - -// Producer to create a producer to send message to kafka. -func (c *Client) Producer(config ProducerConfig) (Producer, error) { - return newProducer(c, config) -} diff --git a/clientoptions.go b/clientoptions.go index c836f24..09889e5 100644 --- a/clientoptions.go +++ b/clientoptions.go @@ -44,7 +44,12 @@ func WithKGOOptions(opts ...kgo.Opt) Option { } // WithConsumer sets the listener to use. -func WithConsumer[T any](cfg ConsumeConfig, callback func(ctx context.Context, msg T) error, decode func([]byte) (T, error)) Option { +func WithConsumer[T any]( + cfg ConsumeConfig, + callback func(ctx context.Context, msg T) error, + decode func([]byte) (T, error), + preCheck func(context.Context, *Record) error, +) Option { return func(o *options) { decode := decode if decode == nil { @@ -56,6 +61,7 @@ func WithConsumer[T any](cfg ConsumeConfig, callback func(ctx context.Context, m Callback: callback, Cfg: cfg, Decode: decode, + PreCheck: preCheck, } } } diff --git a/codec.go b/codec.go index bc4ef72..98e0d08 100644 --- a/codec.go +++ b/codec.go @@ -4,7 +4,7 @@ import "encoding/json" type codecJSON[T any] struct{} -func (codecJSON[T]) Encode(data any) ([]byte, error) { +func (codecJSON[T]) Encode(data T) ([]byte, error) { return json.Marshal(data) } diff --git a/consumer.go b/consumer.go index 49bf890..62254f4 100644 --- a/consumer.go +++ b/consumer.go @@ -2,6 +2,7 @@ package wkafka import ( "context" + "errors" "fmt" "github.com/twmb/franz-go/pkg/kgo" @@ -52,6 +53,8 @@ type consume[T any] struct { Callback func(ctx context.Context, msg T) error Cfg ConsumeConfig Decode func(raw []byte) (T, error) + // PreCheck is a function that is called before the callback and decode. + PreCheck func(ctx context.Context, r *kgo.Record) error } type consumer interface { @@ -63,6 +66,34 @@ func (c consume[T]) config() ConsumeConfig { return c.Cfg } +func (c consume[T]) skip(r *kgo.Record) bool { + if c.Cfg.Skip == nil { + return false + } + + if _, ok := c.Cfg.Skip[r.Topic]; !ok { + return false + } + + if _, ok := c.Cfg.Skip[r.Topic][r.Partition]; !ok { + return false + } + + offsets := c.Cfg.Skip[r.Topic][r.Partition] + + if offsets.Before > 0 && r.Offset <= offsets.Before { + return true + } + + for _, offset := range offsets.Offsets { + if r.Offset == offset { + return true + } + } + + return false +} + func (c consume[T]) Consume(ctx context.Context, cl *kgo.Client) error { for { fetches := cl.PollFetches(ctx) @@ -72,35 +103,73 @@ func (c consume[T]) Consume(ctx context.Context, cl *kgo.Client) error { var errP error fetches.EachError(func(t string, p int32, err error) { + if errors.Is(err, context.Canceled) { + errP = err + + return + } + errP = fmt.Errorf("fetch err topic %s partition %d: %w", t, p, err) }) if errP != nil { return errP } - // var rs []*kgo.Record - fetches.EachRecord(func(r *kgo.Record) { - data, err := c.Decode(r.Value) - if err != nil { - errP = fmt.Errorf("decode record failed: %w", err) - - return + for iter := fetches.RecordIter(); !iter.Done(); { + if err := c.iteration(ctx, cl, iter); err != nil { + return err } - if err := c.Callback(ctx, data); err != nil { - errP = fmt.Errorf("callback failed: %w", err) + } + } +} - return - } +func (c consume[T]) iteration(ctx context.Context, cl *kgo.Client, iter *kgo.FetchesRecordIter) (err error) { + r := iter.Next() + if r == nil { + return nil + } - if err := cl.CommitRecords(ctx, r); err != nil { - errP = fmt.Errorf("commit records failed: %w", err) + defer func() { + if err != nil { + err = wrapErr(r, err) - return + return + } + + if errCommit := cl.CommitRecords(ctx, r); errCommit != nil { + err = wrapErr(r, fmt.Errorf("commit records failed: %w", errCommit)) + + return + } + }() + + if c.skip(r) { + return nil + } + + if c.PreCheck != nil { + if err := c.PreCheck(ctx, r); err != nil { + if errors.Is(err, ErrSkip) { + return nil } - }) - if errP != nil { - return errP + return fmt.Errorf("pre check failed: %w", err) } } + + data, err := c.Decode(r.Value) + if err != nil { + if errors.Is(err, ErrSkip) { + return nil + } + + return fmt.Errorf("decode record failed: %w", err) + } + + ctxCallback := context.WithValue(ctx, KeyRecord, r) + if err := c.Callback(ctxCallback, data); err != nil { + return fmt.Errorf("callback failed: %w", err) + } + + return nil } diff --git a/context.go b/context.go new file mode 100644 index 0000000..e99a2cf --- /dev/null +++ b/context.go @@ -0,0 +1,22 @@ +package wkafka + +import "context" + +type ctxKey string + +const ( + // KeyRecord is the context key for *Record. + KeyRecord ctxKey = "kafka_record" +) + +// ContextRecord returns the Record from the context in callback function. +// - If the context is nil, or the Record is not set, nil is returned. +func ContextRecord(ctx context.Context) *Record { + if ctx == nil { + return nil + } + + record, _ := ctx.Value(KeyRecord).(*Record) + + return record +} diff --git a/error.go b/error.go index c759a0f..8e3492e 100644 --- a/error.go +++ b/error.go @@ -1,9 +1,40 @@ package wkafka -import "fmt" +import ( + "fmt" + "strings" + + "github.com/twmb/franz-go/pkg/kgo" +) var ( ErrNotImplemented = fmt.Errorf("not implemented") ErrClientClosed = fmt.Errorf("client closed") ErrNilData = fmt.Errorf("nil data") + // ErrSkip is use to skip message in the PreCheck hook. + ErrSkip = fmt.Errorf("skip message") ) + +func wrapErr(r *kgo.Record, err error) error { + return fmt.Errorf("message error - topic: %q, partition: %d, offset: %d, key: `%s`, headers: `%s` value: `%s`: %w", + r.Topic, r.Partition, r.Offset, r.Key, stringHeader(r.Headers), r.Value, err, + ) +} + +func stringHeader(headers []Header) string { + var str strings.Builder + str.WriteString("{") + for i, header := range headers { + str.WriteString(fmt.Sprintf("%q: %q", header.Key, header.Value)) + + if i == len(headers)-1 { + continue + } + + str.WriteString(",") + } + + str.WriteString("}") + + return str.String() +} diff --git a/example/consume/main.go b/example/consume/main.go index 1afbea9..75538f1 100644 --- a/example/consume/main.go +++ b/example/consume/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "log/slog" "sync" @@ -20,21 +21,37 @@ var ( } ) -func callBack(_ context.Context, msg map[string]interface{}) error { - slog.Info("callback", slog.Any("msg", msg)) +func callBack(ctx context.Context, msg map[string]interface{}) error { + record := wkafka.ContextRecord(ctx) + + slog.Info("callback", slog.Any("msg", msg), slog.String("topic", record.Topic), slog.String("key", string(record.Key))) return nil } +func decode(data []byte) (map[string]interface{}, error) { + if !json.Valid(data) { + return nil, wkafka.ErrSkip + } + + var msg map[string]interface{} + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + + return msg, nil +} + func main() { initializer.Init(run) } func run(ctx context.Context, _ *sync.WaitGroup) error { - client, err := wkafka.NewClient(kafkaConfig, wkafka.WithConsumer(consumeConfig, callBack, nil)) + client, err := wkafka.NewClient(kafkaConfig, wkafka.WithConsumer(consumeConfig, callBack, decode, nil)) if err != nil { return err } + defer client.Close() if err := client.Consume(ctx); err != nil { diff --git a/example/produce/main.go b/example/produce/main.go index 66df7fc..0a0a744 100644 --- a/example/produce/main.go +++ b/example/produce/main.go @@ -20,31 +20,21 @@ type Data struct { Details map[string]interface{} } -func (d *Data) ProducerEncode() ([]byte, error) { - return []byte(d.Name), nil -} - -func (d *Data) ProducerKey() []byte { - return []byte(d.Name) -} - -func (d *Data) ProducerHeaders() []wkafka.Header { - return []wkafka.Header{ - { - Key: "name", - Value: []byte(d.Name), - }, +func (d *Data) ProducerHook(r *wkafka.Record) *wkafka.Record { + if d == nil { + return r } -} -func (d *Data) ProducerTopic() string { - return d.Topic -} + r.Value = []byte(d.Name) + r.Headers = append(r.Headers, wkafka.Header{ + Key: "name", + Value: []byte(d.Name), + }) + r.Key = []byte(d.Name) + r.Topic = d.Topic -var _ wkafka.ProducerEncode = (*Data)(nil) -var _ wkafka.ProducerKey = (*Data)(nil) -var _ wkafka.ProducerHeaders = (*Data)(nil) -var _ wkafka.ProducerTopic = (*Data)(nil) + return r +} func main() { initializer.Init(run) @@ -65,7 +55,7 @@ func run(ctx context.Context, _ *sync.WaitGroup) error { }, } - producer, err := client.Producer(wkafka.ProducerConfig{}) + producer, err := wkafka.NewProducer(client, wkafka.ProducerConfig[*Data]{}) if err != nil { return err } diff --git a/producer.go b/producer.go index 29f717e..5c041cd 100644 --- a/producer.go +++ b/producer.go @@ -7,39 +7,51 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) -type Header = kgo.RecordHeader -type Record = kgo.Record +type ( + Header = kgo.RecordHeader + Record = kgo.Record +) + +type Producer[T any] interface { + Produce(ctx context.Context, data ...T) error +} -type ProducerConfig struct { +type ProducerHook[T any] interface { + ProducerHook(r *Record) *Record +} + +type ProducerConfig[T any] struct { // Topic is the default topic to produce to. Topic string // Headers is the default headers to produce with it. Headers []Header // Encode is use to marshal data to bytes. Default is json.Marshal. - Encode func(any) ([]byte, error) + // - If data is []byte, Encode will be ignored. + // - This works after Hook and record.Value is nil. + Encode func(T) ([]byte, error) + // Hook is use to modify record before produce. + // Hook func(T, *Record) *Record } -type produce struct { - Config ProducerConfig - ProduceRaw func(ctx context.Context, records []*kgo.Record) error -} +func NewProducer[T any](client *Client, cfg ProducerConfig[T]) (Producer[T], error) { + var encode func(data T) ([]byte, error) -type Producer interface { - Produce(ctx context.Context, data ...any) error -} + var value T + switch any(value).(type) { + case []byte: + encode = nil + default: + encode = codecJSON[T]{}.Encode + } -// newProducer creates a new producer based on the given client. -// -// Default codec is json.Marshal. -func newProducer(client *Client, cfg ProducerConfig) (*produce, error) { - setCfg := ProducerConfig{ + setCfg := ProducerConfig[T]{ Headers: []Header{ { Key: "server", Value: client.clientID, }, }, - Encode: codecJSON[any]{}.Encode, + Encode: encode, } if cfg.Topic != "" { @@ -54,14 +66,19 @@ func newProducer(client *Client, cfg ProducerConfig) (*produce, error) { setCfg.Encode = cfg.Encode } - return &produce{ - Config: setCfg, - ProduceRaw: client.produceRaw, + return &produce[T]{ + ProducerConfig: setCfg, + produceRaw: client.produceRaw, }, nil } -func (p *produce) Produce(ctx context.Context, data ...any) error { - records := make([]*kgo.Record, len(data)) +type produce[T any] struct { + ProducerConfig[T] + produceRaw func(ctx context.Context, records []*Record) error +} + +func (p *produce[T]) Produce(ctx context.Context, data ...T) error { + records := make([]*Record, len(data)) for i, d := range data { record, err := p.prepare(d) @@ -72,76 +89,31 @@ func (p *produce) Produce(ctx context.Context, data ...any) error { records[i] = record } - return p.ProduceRaw(ctx, records) + return p.produceRaw(ctx, records) } -func (p *produce) prepare(data any) (*kgo.Record, error) { - // set data - var rawData []byte - if data == nil { - rawData = nil - } else { - if dataByte, ok := data.([]byte); ok { - rawData = dataByte - } else { - if codec, ok := data.(ProducerEncode); ok { - var err error - rawData, err = codec.ProducerEncode() - if err != nil { - return nil, fmt.Errorf("codec failed: %w", err) - } - } else { - var err error - rawData, err = p.Config.Encode(data) - if err != nil { - return nil, fmt.Errorf("codec failed: %w", err) - } - } - } +func (p *produce[T]) prepare(data T) (*Record, error) { + record := &Record{ + Headers: p.Headers, + Topic: p.Topic, } - // set key - var key []byte - if dataKey, ok := data.(ProducerKey); ok { - key = dataKey.ProducerKey() + // check data has Hook interface + if data, ok := any(data).(ProducerHook[T]); ok { + record = data.ProducerHook(record) } - // set topic - topic := p.Config.Topic - if dataTopic, ok := data.(ProducerTopic); ok { - topic = dataTopic.ProducerTopic() + if record.Value != nil { + return record, nil } - // set headers - headers := p.Config.Headers - if dataHeaders, ok := data.(ProducerHeaders); ok { - headersGot := dataHeaders.ProducerHeaders() - if len(headersGot) > 0 { - headers = append(headers, headersGot...) + if p.Encode != nil { + var err error + record.Value, err = p.Encode(data) + if err != nil { + return nil, fmt.Errorf("encode data: %w", err) } } - // create record - return &kgo.Record{ - Key: key, - Value: rawData, - Headers: headers, - Topic: topic, - }, nil -} - -type ProducerKey interface { - ProducerKey() []byte -} - -type ProducerEncode interface { - ProducerEncode() ([]byte, error) -} - -type ProducerTopic interface { - ProducerTopic() string -} - -type ProducerHeaders interface { - ProducerHeaders() []Header + return record, nil } diff --git a/producer_test.go b/producer_test.go index b48bede..4ac063d 100644 --- a/producer_test.go +++ b/producer_test.go @@ -14,46 +14,43 @@ type testData struct { Details map[string]interface{} } -func (d *testData) ProducerCodec() ([]byte, error) { - return []byte(d.Name), nil -} - -func (d *testData) ProducerKey() []byte { - return []byte(d.Name) -} - -func (d *testData) ProducerHeaders() []Header { - return []Header{ - { - Key: "name", - Value: []byte(d.Name), - }, +func (d *testData) ProducerHook(r *Record) *Record { + if d == nil { + return r } -} -func (d *testData) ProducerTopic() string { - return d.Topic + r.Value = []byte(d.Name) + r.Headers = append(r.Headers, Header{ + Key: "name", + Value: []byte(d.Name), + }) + r.Key = []byte(d.Name) + r.Topic = d.Topic + + return r } func Test_produce_Produce(t *testing.T) { - type fields struct { - Config ProducerConfig + type fields[T any] struct { + Config ProducerConfig[T] ProduceRaw func(t *testing.T) func(ctx context.Context, records []*kgo.Record) error } type args struct { ctx context.Context - data []any + data []*testData } - tests := []struct { + + type testCase[T any] struct { name string - fields fields + fields fields[T] args args wantErr bool - }{ + } + tests := []testCase[*testData]{ { name: "test", - fields: fields{ - Config: ProducerConfig{ + fields: fields[*testData]{ + Config: ProducerConfig[*testData]{ Topic: "test", Headers: []Header{ { @@ -61,36 +58,37 @@ func Test_produce_Produce(t *testing.T) { Value: []byte("test"), }, }, - Encode: codecJSON[any]{}.Encode, + Encode: codecJSON[*testData]{}.Encode, }, ProduceRaw: func(t *testing.T) func(ctx context.Context, records []*kgo.Record) error { return func(ctx context.Context, records []*kgo.Record) error { t.Helper() - if len(records) != 3 { + if len(records) != 1 { t.Errorf("produce.Produce() len(records) = %v, want %v", len(records), 3) } for i := range records { switch i { + // case 0: + // assert.Equal(t, "test", records[i].Topic) + // assert.Nil(t, records[i].Key) + // assert.Nil(t, records[i].Value) + // assert.Equal(t, 1, len(records[i].Headers)) + // assert.Equal(t, "server", records[i].Headers[0].Key) + // assert.Equal(t, "test", string(records[i].Headers[0].Value)) + // case 1: + // assert.Equal(t, "test", records[i].Topic) + // assert.Nil(t, records[i].Key) + // assert.Equal(t, "test", string(records[i].Value)) + // assert.Equal(t, 1, len(records[i].Headers)) + // assert.Equal(t, "server", records[i].Headers[0].Key) + // assert.Equal(t, "test", string(records[i].Headers[0].Value)) case 0: - assert.Equal(t, "test", records[i].Topic) - assert.Nil(t, records[i].Key) - assert.Nil(t, records[i].Value) - assert.Equal(t, 1, len(records[i].Headers)) - assert.Equal(t, "server", records[i].Headers[0].Key) - assert.Equal(t, "test", string(records[i].Headers[0].Value)) - case 1: - assert.Equal(t, "test", records[i].Topic) - assert.Nil(t, records[i].Key) - assert.Equal(t, "test", string(records[i].Value)) - assert.Equal(t, 1, len(records[i].Headers)) - assert.Equal(t, "server", records[i].Headers[0].Key) - assert.Equal(t, "test", string(records[i].Headers[0].Value)) - case 2: assert.Equal(t, "test", records[i].Topic) assert.Equal(t, "test", string(records[i].Key)) - assert.Equal(t, `{"Name":"test","Topic":"test","Details":{"key":1234}}`, string(records[i].Value)) + // assert.Equal(t, `{"Name":"test","Topic":"test","Details":{"key":1234}}`, string(records[i].Value)) + assert.Equal(t, `test`, string(records[i].Value)) assert.Equal(t, 2, len(records[i].Headers)) assert.Equal(t, "server", records[i].Headers[0].Key) assert.Equal(t, "test", string(records[i].Headers[0].Value)) @@ -104,10 +102,8 @@ func Test_produce_Produce(t *testing.T) { }, args: args{ ctx: context.Background(), - data: []any{ - nil, - []byte("test"), - &testData{ + data: []*testData{ + { Name: "test", Topic: "test", Details: map[string]interface{}{ @@ -120,10 +116,11 @@ func Test_produce_Produce(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := &produce{ - Config: tt.fields.Config, - ProduceRaw: tt.fields.ProduceRaw(t), + p := &produce[*testData]{ + ProducerConfig: tt.fields.Config, + produceRaw: tt.fields.ProduceRaw(t), } + if err := p.Produce(tt.args.ctx, tt.args.data...); (err != nil) != tt.wantErr { t.Errorf("produce.Produce() error = %v, wantErr %v", err, tt.wantErr) } @@ -132,8 +129,8 @@ func Test_produce_Produce(t *testing.T) { } func BenchmarkProduce(b *testing.B) { - p := &produce{ - Config: ProducerConfig{ + p := &produce[*testData]{ + ProducerConfig: ProducerConfig[*testData]{ Topic: "test", Headers: []Header{ { @@ -141,17 +138,23 @@ func BenchmarkProduce(b *testing.B) { Value: []byte("test"), }, }, - Encode: codecJSON[any]{}.Encode, }, - ProduceRaw: func(ctx context.Context, records []*kgo.Record) error { + produceRaw: func(ctx context.Context, records []*kgo.Record) error { return nil }, } - datas := []any{ + datas := []*testData{ nil, - []byte("test"), - &testData{ + // []byte("test"), + { + Name: "test", + Topic: "test", + Details: map[string]interface{}{ + "key": 1234, + }, + }, + { Name: "test", Topic: "test", Details: map[string]interface{}{