diff --git a/Makefile b/Makefile index 60aa966..618b247 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ env-logs: ## Show env logs env-down: ## Stop env docker compose -p wkafka down +run-example: ## Run example + @go run ./example/main.go + .golangci.yml: @$(MAKE) golangci diff --git a/client.go b/client.go index 07484df..20cff50 100644 --- a/client.go +++ b/client.go @@ -4,8 +4,10 @@ import ( "context" "fmt" + "github.com/rs/zerolog/log" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/logz" "golang.org/x/sync/errgroup" ) @@ -14,7 +16,8 @@ type Client struct { KafkaDLQ *kgo.Client clientID []byte - consumerConfig ConsumerConfig + consumerConfig *ConsumerConfig + logger logz.Adapter } func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { @@ -22,47 +25,49 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { ClientID: DefaultClientID, AutoTopicCreation: true, AppName: idProgname, + Logger: logz.AdapterKV{Log: log.Logger}, } o.apply(opts...) // validate client and add defaults to consumer config - consumerConfig, err := cfg.Consumer.Apply(o.ConsumerConfig, o.AppName) - if err != nil { - return nil, fmt.Errorf("validate config: %w", err) + if o.ConsumerConfig != nil { + if err := configApply(cfg.Consumer, o.ConsumerConfig, o.AppName, o.Logger); err != nil { + return nil, fmt.Errorf("validate config: %w", err) + } } - o.ConsumerConfig = consumerConfig + c := &Client{ + consumerConfig: o.ConsumerConfig, + logger: o.Logger, + clientID: []byte(o.ClientID), + } - kgoClient, err := newClient(ctx, cfg, o) + kgoClient, err := newClient(c, cfg, &o, false) if err != nil { return nil, err } var kgoClientDLQ *kgo.Client if o.ConsumerEnabled { - kgoClientDLQ, err = newClient(ctx, cfg, o.WithDLQ()) + kgoClientDLQ, err = newClient(c, cfg, &o, true) if err != nil { return nil, err } } - cl := &Client{ - Kafka: kgoClient, - KafkaDLQ: kgoClientDLQ, - clientID: []byte(o.ClientID), - consumerConfig: o.ConsumerConfig, - } + c.Kafka = kgoClient + c.KafkaDLQ = kgoClientDLQ // main and dlq use same config, ask for validation once - if err := cl.Kafka.Ping(ctx); err != nil { + if err := c.Kafka.Ping(ctx); err != nil { return nil, fmt.Errorf("connection to kafka brokers: %w", err) } - return cl, nil + return c, nil } -func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error) { +func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, error) { compressions, err := compressionOpts(cfg.Compressions) if err != nil { return nil, err @@ -99,13 +104,8 @@ func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error) } if o.ConsumerEnabled { - // validate consumer - if err := cfg.Consumer.Validation.Validate(o.ConsumerConfig); err != nil { - return nil, fmt.Errorf("validate consumer config: %w", err) - } - var startOffsetCfg int64 - if o.DLQ { + if isDLQ { startOffsetCfg = o.ConsumerConfig.DLQ.StartOffset } else { startOffsetCfg = o.ConsumerConfig.StartOffset @@ -127,17 +127,24 @@ func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error) kgo.RequireStableFetchOffsets(), kgo.ConsumerGroup(o.ConsumerConfig.GroupID), kgo.ConsumeResetOffset(startOffset), + kgo.OnPartitionsLost(partitionLost(c)), + kgo.OnPartitionsRevoked(partitionRevoked(c)), ) - if o.DLQ { - kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.DLQ.Topic)) + if isDLQ { + topics := []string{o.ConsumerConfig.DLQ.Topic} + if len(o.ConsumerConfig.DLQ.TopicsExtra) > 0 { + topics = append(topics, o.ConsumerConfig.DLQ.TopicsExtra...) + } + + kgoOpt = append(kgoOpt, kgo.ConsumeTopics(topics...)) } else { kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.Topics...)) } } // Add custom options - if o.DLQ { + if isDLQ { kgoOpt = append(kgoOpt, o.KGOOptionsDLQ...) } else { kgoOpt = append(kgoOpt, o.KGOOptions...) @@ -156,6 +163,9 @@ func (c *Client) Close() { if c.Kafka != nil { c.Kafka.Close() } + if c.KafkaDLQ != nil { + c.KafkaDLQ.Close() + } } // Consume starts consuming messages from kafka. @@ -163,7 +173,7 @@ func (c *Client) Close() { func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error { o := optionConsumer{ Client: c, - ConsumerConfig: c.consumerConfig, + ConsumerConfig: *c.consumerConfig, } opts = append([]OptionConsumer{OptionConsumer(callback)}, opts...) diff --git a/clientoptions.go b/clientoptions.go index 369fce8..9c558ff 100644 --- a/clientoptions.go +++ b/clientoptions.go @@ -2,6 +2,7 @@ package wkafka import ( "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/logz" ) // DefaultBatchCount is default batch count for batch consumer, if not set. @@ -10,13 +11,13 @@ var DefaultBatchCount = 100 type options struct { AppName string ConsumerEnabled bool - ConsumerConfig ConsumerConfig + ConsumerConfig *ConsumerConfig // Consumer consumer ClientID string KGOOptions []kgo.Opt KGOOptionsDLQ []kgo.Opt AutoTopicCreation bool - DLQ bool + Logger logz.Adapter } func (o *options) apply(opts ...Option) { @@ -25,12 +26,6 @@ func (o *options) apply(opts ...Option) { } } -func (o options) WithDLQ() options { - o.DLQ = true - - return o -} - type Option func(*options) // WithClientID to set client_id in kafka server. @@ -91,7 +86,16 @@ func WithKGOOptionsDLQ(opts ...kgo.Opt) Option { func WithConsumer(cfg ConsumerConfig) Option { return func(o *options) { - o.ConsumerConfig = cfg + o.ConsumerConfig = &cfg o.ConsumerEnabled = true } } + +// WithLogger configures the client to use the provided logger. +// - For zerolog logz.AdapterKV{Log: logger} can usable. +// - Default is using zerolog's global logger. +func WithLogger(logger logz.Adapter) Option { + return func(o *options) { + o.Logger = logger + } +} diff --git a/config.go b/config.go index 304c2e0..e005ea2 100644 --- a/config.go +++ b/config.go @@ -3,8 +3,13 @@ package wkafka import ( "fmt" "regexp" + "time" + + "github.com/worldline-go/logz" ) +var DefaultRetryInterval = 10 * time.Second + type Config struct { // Brokers is a list of kafka brokers to connect to. // Not all brokers need to be specified, the list is so that @@ -28,27 +33,39 @@ type ConsumerPreConfig struct { // PrefixGroupID add prefix to group_id. PrefixGroupID string `cfg:"prefix_group_id"` // FormatDLQTopic is a format string to generate DLQ topic name. - // - %s is a placeholder for program name. - // - Default is "finops_%s_dlq" + // - Example is "finops_{{.AppName}}_dlq" + // - It should be exist if DLQ is enabled and topic is not set. + // + // - Available variables: + // - AppName FormatDLQTopic string `cfg:"format_dlq_topic"` // Validation is a configuration for validation when consumer initialized. Validation Validation `cfg:"validation"` } -// Apply configuration to ConsumerConfig and check validation. -func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig, progName string) (ConsumerConfig, error) { +// configApply configuration to ConsumerConfig and check validation. +func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger logz.Adapter) error { if c.PrefixGroupID != "" { consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID } + if !consumerConfig.DLQ.Disable && consumerConfig.DLQ.Topic == "" && c.FormatDLQTopic == "" { + consumerConfig.DLQ.Disable = true + logger.Warn("dlq is disabled because topic and format_dlq_topic is not set") + } + // add default topic name for DLQ if !consumerConfig.DLQ.Disable { if consumerConfig.DLQ.Topic == "" { if c.FormatDLQTopic == "" { - c.FormatDLQTopic = "finops_%s_dlq" + return fmt.Errorf("format_dlq_topic is required if dlq topic is not set") } - consumerConfig.DLQ.Topic = fmt.Sprintf(c.FormatDLQTopic, progName) + var err error + consumerConfig.DLQ.Topic, err = templateRun(c.FormatDLQTopic, map[string]string{"AppName": progName}) + if err != nil { + return fmt.Errorf("format_dlq_topic: %w", err) + } } if consumerConfig.DLQ.SkipExtra == nil { @@ -58,13 +75,17 @@ func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig, progName string) } else { consumerConfig.DLQ.SkipExtra[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip } + + if consumerConfig.DLQ.RetryInterval == 0 { + consumerConfig.DLQ.RetryInterval = DefaultRetryInterval + } } if err := c.Validation.Validate(consumerConfig); err != nil { - return consumerConfig, fmt.Errorf("validate consumer config: %w", err) + return fmt.Errorf("validate consumer config: %w", err) } - return consumerConfig, nil + return nil } // Validation is a configuration for validation when consumer initialized. @@ -108,7 +129,7 @@ func (v GroupIDValidation) Validate(groupID string) error { return nil } -func (v Validation) Validate(consumerConfig ConsumerConfig) error { +func (v Validation) Validate(consumerConfig *ConsumerConfig) error { if err := v.GroupID.Validate(consumerConfig.GroupID); err != nil { return err } diff --git a/config_test.go b/config_test.go index 1d70705..480d412 100644 --- a/config_test.go +++ b/config_test.go @@ -3,6 +3,8 @@ package wkafka import ( "reflect" "testing" + + "github.com/worldline-go/logz" ) func TestConsumerPreConfig_Apply(t *testing.T) { @@ -50,10 +52,12 @@ func TestConsumerPreConfig_Apply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := ConsumerPreConfig{ - PrefixGroupID: tt.fields.PrefixGroupID, - Validation: tt.fields.Validation, + PrefixGroupID: tt.fields.PrefixGroupID, + Validation: tt.fields.Validation, + FormatDLQTopic: "finops_{{.AppName}}_dlq", } - got, err := c.Apply(tt.args.consumerConfig, "serviceX") + got := tt.args.consumerConfig + err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{}) if (err != nil) != tt.wantErr { t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/consumer.go b/consumer.go index a146cbd..2a48064 100644 --- a/consumer.go +++ b/consumer.go @@ -2,7 +2,7 @@ package wkafka import ( "context" - "fmt" + "time" "github.com/twmb/franz-go/pkg/kgo" ) @@ -50,8 +50,14 @@ type ConsumerConfig struct { } type DLQ struct { - // Disable is a flag to Disable DLQ. - Disable bool `cfg:"enable"` + // Disable is a flag to disable DLQ. + // - Default is false. + // - If topic is not set, it will be generated from format_dlq_topic. + // - If topic and format_dlq_topic is not set, dlq will be disabled! + Disable bool `cfg:"disable"` + // RetryInterval is a time interval to retry again of DLQ messages. + // - Default is 10 seconds. + RetryInterval time.Duration `cfg:"retry_interval"` // StartOffset is used when there is no committed offset for GroupID. // // Available options: @@ -104,7 +110,7 @@ type consumer interface { } func skip(cfg *ConsumerConfig, r *kgo.Record) bool { - if cfg.Skip == nil { + if len(cfg.Skip) == 0 { return false } @@ -169,10 +175,6 @@ type optionConsumer struct { Consumer consumer ConsumerDLQ consumer ConsumerConfig ConsumerConfig - // Concurrent to run the consumer in concurrent mode for each partition and topic. - // - Default is false. - // - Each topic could have different type of value so use with processor map. - Concurrent bool `cfg:"concurrent"` } type ( @@ -203,6 +205,7 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB ProduceDLQ: produceDLQ, Cfg: o.ConsumerConfig, Skip: skip, + Logger: o.Client.logger, } o.ConsumerDLQ = &consumerBatch[T]{ @@ -211,6 +214,7 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB Cfg: o.ConsumerConfig, Skip: skipDLQ, IsDLQ: true, + Logger: o.Client.logger, } return nil @@ -230,6 +234,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc ProduceDLQ: produceDLQ, Cfg: o.ConsumerConfig, Skip: skip, + Logger: o.Client.logger, } o.ConsumerDLQ = &consumerSingle[T]{ @@ -238,6 +243,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc Cfg: o.ConsumerConfig, Skip: skipDLQ, IsDLQ: true, + Logger: o.Client.logger, } return nil @@ -268,10 +274,6 @@ func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Reco // - Use this option after the WithCallback option. func WithDecode[T any](fn func(raw []byte, r *kgo.Record) (T, error)) OptionConsumer { return func(o *optionConsumer) error { - if o.Consumer == nil { - return fmt.Errorf("consumer is nil, use WithCallback[Batch] option first") - } - switch v := o.Consumer.(type) { case *consumerBatch[T]: v.Decode = fn @@ -283,23 +285,12 @@ func WithDecode[T any](fn func(raw []byte, r *kgo.Record) (T, error)) OptionCons } } +// WithPreCheck to set wkafka consumer's pre check function. +// - Return ErrSkip will skip the message. func WithPreCheck(fn func(ctx context.Context, r *kgo.Record) error) OptionConsumer { return func(o *optionConsumer) error { - if o.Consumer == nil { - return fmt.Errorf("consumer is nil, use WithCallback[Batch] option first") - } - o.Consumer.setPreCheck(fn) return nil } } - -// TODO implement concurrent mode -// func WithConcurrent() OptionConsumer { -// return func(o *optionConsumer) error { -// o.Concurrent = true - -// return nil -// } -// } diff --git a/consumerbatch.go b/consumerbatch.go index 07985c6..e9f2b06 100644 --- a/consumerbatch.go +++ b/consumerbatch.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/logz" ) type consumerBatch[T any] struct { @@ -17,6 +18,7 @@ type consumerBatch[T any] struct { Option optionConsumer ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool + Logger logz.Adapter IsDLQ bool } @@ -40,28 +42,12 @@ func (c *consumerBatch[T]) Consume(ctx context.Context, cl *kgo.Client) error { continue } - if !c.Option.Concurrent { - if err := c.batchIteration(ctx, cl, fetch); err != nil { - return err - } - - continue - } - - if err := c.concurrentIteration(ctx, cl, fetch); err != nil { + if err := c.batchIteration(ctx, cl, fetch); err != nil { return err } } } -///////////////////////////////// -// BATCH - CONCURRENT ITERATION -///////////////////////////////// - -func (c *consumerBatch[T]) concurrentIteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error { - return nil -} - ///////////////////////////////// // BATCH - ITERATION ///////////////////////////////// @@ -112,13 +98,13 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f ctxCallback := context.WithValue(ctx, KeyRecord, batchRecords) if err := c.Process(ctxCallback, batch); err != nil { - if c.ProduceDLQ != nil && isDQLError(err) { - if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil { - return wrapErr(r, fmt.Errorf("produce to DLQ failed: %w", err), c.IsDLQ) - } - } else { - return wrapErr(r, err, c.IsDLQ) - } + // if c.ProduceDLQ != nil && isDQLError(err) { + // if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil { + // return wrapErr(r, fmt.Errorf("produce to DLQ failed: %w", err), c.IsDLQ) + // } + // } else { + // return wrapErr(r, err, c.IsDLQ) + // } } if err := cl.CommitRecords(ctx, records...); err != nil { diff --git a/consumersingle.go b/consumersingle.go index 686f7a5..472c5db 100644 --- a/consumersingle.go +++ b/consumersingle.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/logz" ) type consumerSingle[T any] struct { @@ -17,6 +18,7 @@ type consumerSingle[T any] struct { Option optionConsumer ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool + Logger logz.Adapter IsDLQ bool } @@ -40,28 +42,12 @@ func (c *consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error { continue } - if !c.Option.Concurrent { - if err := c.iteration(ctx, cl, fetch); err != nil { - return err - } - - continue - } - - if err := c.concurrentIteration(ctx, cl, fetch); err != nil { + if err := c.iteration(ctx, cl, fetch); err != nil { return err } } } -///////////////////////////////// -// SINGLE - CONCURRENT ITERATION -///////////////////////////////// - -func (c *consumerSingle[T]) concurrentIteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error { - return nil -} - //////////////////// // SINGLE - ITERATION //////////////////// @@ -70,13 +56,14 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch for iter := fetch.RecordIter(); !iter.Done(); { r := iter.Next() - if err := c.iterationRecord(ctx, r); err != nil { - // send to DLQ if enabled - if c.ProduceDLQ != nil && isDQLError(err) { - if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil { - return wrapErr(r, fmt.Errorf("produce to DLQ failed: %w", err), c.IsDLQ) - } - } else { + // listening DLQ topics + if c.IsDLQ { + if err := c.iterationDLQ(ctx, r); err != nil { + return wrapErr(r, err, c.IsDLQ) + } + } else { + // listening main topics + if err := c.iterationMain(ctx, r); err != nil { return wrapErr(r, err, c.IsDLQ) } } @@ -89,6 +76,60 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch return nil } +// iterationDLQ is used to listen DLQ topics, error usually comes from context cancellation. +func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) error { + wait := waitRetry{ + Interval: c.Cfg.DLQ.RetryInterval, + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := c.iterationRecord(ctx, r); err != nil { + errOrg, _ := isDQLError(err) + errWrapped := wrapErr(r, errOrg, c.IsDLQ) + c.Logger.Error("process failed", "err", errWrapped, "retry_interval", wait.CurrentInterval().String()) + + if err := wait.Sleep(ctx); err != nil { + return err + } + + continue + } + + break + } + + return nil +} + +// iterationMain is used to listen main topics. +func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) error { + if err := c.iterationRecord(ctx, r); err != nil { + errOrg, ok := isDQLError(err) + if !ok { + return err + } + + // send to DLQ if enabled + if c.ProduceDLQ != nil { + if err := c.ProduceDLQ(ctx, err, []*kgo.Record{r}); err != nil { + return fmt.Errorf("produce to DLQ failed: %w", err) + } + + return nil + } + + return errOrg + } + + return nil +} + func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error { if c.Skip(&c.Cfg, r) { return nil diff --git a/error.go b/error.go index 2c08b8d..4722880 100644 --- a/error.go +++ b/error.go @@ -28,17 +28,23 @@ type DLQIndexedError struct { } func (e *DLQIndexedError) Error() string { - return ErrDLQ.Error() + return "DLQ indexed error" } -func isDQLError(err error) bool { +// isDQLError check if error is DLQ error and return the original error or error. +func isDQLError(err error) (error, bool) { if errors.Is(err, ErrDLQ) { - return true + return unwrapErr(err), true } var errDLQIndexed *DLQIndexedError - return errors.As(err, &errDLQIndexed) + ok := errors.As(err, &errDLQIndexed) + if ok { + return errDLQIndexed.Err, true + } + + return err, false } func wrapErr(r *kgo.Record, err error, dlq bool) error { diff --git a/example/admin/main.go b/example/admin/main.go index 5c1b659..2a8ab2c 100644 --- a/example/admin/main.go +++ b/example/admin/main.go @@ -1,11 +1,9 @@ -package main +package admin import ( "context" - "log/slog" "sync" - "github.com/worldline-go/initializer" "github.com/worldline-go/wkafka" ) @@ -15,11 +13,7 @@ var ( } ) -func main() { - initializer.Init(run) -} - -func run(ctx context.Context, _ *sync.WaitGroup) error { +func RunExampleTopic(ctx context.Context, _ *sync.WaitGroup) error { client, err := wkafka.New(ctx, kafkaConfig) if err != nil { return err @@ -28,24 +22,26 @@ func run(ctx context.Context, _ *sync.WaitGroup) error { admClient := client.Admin() - resp, err := admClient.CreateTopic(ctx, -1, -1, nil, "test-1234") - if err != nil { - return err - } + // resp, err := admClient.CreateTopic(ctx, -1, -1, nil, "test-1234") + // if err != nil { + // return err + // } - slog.Info("topic created", - slog.String("topic", resp.Topic), - slog.Int64("partitions", int64(resp.NumPartitions)), - slog.Int64("replicas", int64(resp.ReplicationFactor)), - ) + // slog.Info("topic created", + // slog.String("topic", resp.Topic), + // slog.Int64("partitions", int64(resp.NumPartitions)), + // slog.Int64("replicas", int64(resp.ReplicationFactor)), + // ) - // list topics - topics, err := admClient.ListTopics(ctx) - if err != nil { - return err - } + // // list topics + // topics, err := admClient.ListTopics(ctx) + // if err != nil { + // return err + // } + + admClient.CreatePartitions(ctx, 2, "finops_testapp_dlq") - slog.Info("all topics", slog.Any("topics", topics.Names())) + // slog.Info("all topics", slog.Any("topics", topics.Names())) return nil } diff --git a/example/consume/main.go b/example/consume/main.go deleted file mode 100644 index 53b9d52..0000000 --- a/example/consume/main.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "log/slog" - "sync" - - "github.com/twmb/franz-go/pkg/kgo" - "github.com/worldline-go/initializer" - "github.com/worldline-go/wkafka" -) - -var ( - kafkaConfig = wkafka.Config{ - Brokers: []string{"localhost:9092"}, - } - consumeConfig = wkafka.ConsumerConfig{ - Topics: []string{"test"}, - GroupID: "test1", - BatchCount: 2, - } -) - -type Data struct { - Test int `json:"test"` - - Metadata `json:"-"` -} - -type Metadata struct { - Topic string - Key []byte -} - -type Processor struct{} - -func (Processor) Process(_ context.Context, msg []Data) error { - slog.Info("batch process", slog.Int("count", len(msg))) - for _, m := range msg { - slog.Info("callback", slog.Any("test", m.Test), slog.String("topic", m.Metadata.Topic), slog.String("key", string(m.Metadata.Key))) - } - - return nil -} - -func (Processor) Decode(data []byte, r *kgo.Record) (Data, error) { - if !json.Valid(data) { - return Data{}, wkafka.ErrSkip - } - - var msg Data - if err := json.Unmarshal(data, &msg); err != nil { - return Data{}, err - } - - msg.Metadata.Topic = r.Topic - msg.Metadata.Key = r.Key - - return msg, nil -} - -func main() { - initializer.Init(run) -} - -func run(ctx context.Context, _ *sync.WaitGroup) error { - p := Processor{} - - client, err := wkafka.New( - ctx, kafkaConfig, - wkafka.WithConsumer(consumeConfig), - wkafka.WithClientInfo("testapp", "v0.1.0"), - ) - if err != nil { - return err - } - - defer client.Close() - - if err := client.Consume(ctx, - wkafka.WithCallbackBatch(p.Process), - wkafka.WithDecode(p.Decode), - ); err != nil { - return fmt.Errorf("consume: %w", err) - } - - return nil -} diff --git a/example/consumer/batch.go b/example/consumer/batch.go new file mode 100644 index 0000000..025029d --- /dev/null +++ b/example/consumer/batch.go @@ -0,0 +1,81 @@ +package consumer + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + + "github.com/twmb/franz-go/pkg/kgo" + "github.com/worldline-go/wkafka" +) + +var ( + kafkaConfigBatch = wkafka.Config{ + Brokers: []string{"localhost:9092"}, + } + consumeConfigBatch = wkafka.ConsumerConfig{ + Topics: []string{"test"}, + GroupID: "test_batch", + BatchCount: 5, + } +) + +type DataBatch struct { + Test int `json:"test"` + + MetadataBatch `json:"-"` +} + +type MetadataBatch struct { + Topic string + Key []byte +} + +func ProcessBatch(_ context.Context, msg []DataBatch) error { + slog.Info("batch process", slog.Int("count", len(msg))) + for _, m := range msg { + slog.Info("callback", slog.Any("test", m.Test), slog.String("topic", m.MetadataBatch.Topic), slog.String("key", string(m.MetadataBatch.Key))) + } + + return nil +} + +func Decode(data []byte, r *kgo.Record) (DataBatch, error) { + if !json.Valid(data) { + return DataBatch{}, wkafka.ErrSkip + } + + var msg DataBatch + if err := json.Unmarshal(data, &msg); err != nil { + return DataBatch{}, err + } + + msg.MetadataBatch.Topic = r.Topic + msg.MetadataBatch.Key = r.Key + + return msg, nil +} + +func RunExampleBatch(ctx context.Context, _ *sync.WaitGroup) error { + client, err := wkafka.New( + ctx, kafkaConfigBatch, + wkafka.WithConsumer(consumeConfigBatch), + wkafka.WithClientInfo("testapp", "v0.1.0"), + ) + if err != nil { + return err + } + + defer client.Close() + + if err := client.Consume(ctx, + wkafka.WithCallbackBatch(ProcessBatch), + wkafka.WithDecode(Decode), + ); err != nil { + return fmt.Errorf("consume: %w", err) + } + + return nil +} diff --git a/example/consumer/single.go b/example/consumer/single.go new file mode 100644 index 0000000..6dd7659 --- /dev/null +++ b/example/consumer/single.go @@ -0,0 +1,62 @@ +package consumer + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/worldline-go/wkafka" +) + +var ( + kafkaConfigSingle = wkafka.Config{ + Brokers: []string{"localhost:9092"}, + Consumer: wkafka.ConsumerPreConfig{ + FormatDLQTopic: "finops_{{.AppName}}_dlq", + }, + } + consumeConfigSingle = wkafka.ConsumerConfig{ + Topics: []string{"test"}, + GroupID: "test_single", + } +) + +type DataSingle struct { + Test int `json:"test"` + IsErr bool `json:"is_err"` + IsErrFatal bool `json:"is_err_fatal"` +} + +func ProcessSingle(_ context.Context, msg DataSingle) error { + slog.Info("callback", slog.Any("test", msg.Test), slog.Bool("is_err", msg.IsErr)) + + if msg.IsErrFatal { + return fmt.Errorf("test fatal error %d", msg.Test) + } + + if msg.IsErr { + return fmt.Errorf("test error %d: %w", msg.Test, wkafka.ErrDLQ) + } + + return nil +} + +func RunExampleSingle(ctx context.Context, _ *sync.WaitGroup) error { + client, err := wkafka.New( + ctx, kafkaConfigSingle, + wkafka.WithConsumer(consumeConfigSingle), + wkafka.WithClientInfo("testapp", "v0.1.0"), + ) + if err != nil { + return err + } + + defer client.Close() + + if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil { + return fmt.Errorf("consume: %w", err) + } + + return nil +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..32b6be9 --- /dev/null +++ b/example/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "context" + "log/slog" + "os" + "sync" + + "github.com/worldline-go/initializer" + "github.com/worldline-go/logz" + "github.com/worldline-go/wkafka/example/admin" + "github.com/worldline-go/wkafka/example/consumer" + "github.com/worldline-go/wkafka/example/producer" +) + +var examples = map[string]func(context.Context, *sync.WaitGroup) error{ + "admin_topic": admin.RunExampleTopic, + "consumer_batch": consumer.RunExampleBatch, + "consumer_single": consumer.RunExampleSingle, + "producer_hook": producer.RunExampleHook, +} + +func getExampleList() []string { + exampleNames := make([]string, 0, len(examples)) + for k := range examples { + exampleNames = append(exampleNames, k) + } + + return exampleNames +} + +func main() { + exampleName := os.Getenv("EXAMPLE") + + if exampleName == "" { + slog.Error("EXAMPLE env variable is not set", slog.Any("examples", getExampleList())) + + return + } + + run := examples[exampleName] + if run == nil { + slog.Error("unknown example", slog.String("example", exampleName)) + } + + initializer.Init(run, initializer.WithOptionsLogz(logz.WithCaller(false))) +} diff --git a/example/produce/main.go b/example/producer/hook.go similarity index 84% rename from example/produce/main.go rename to example/producer/hook.go index f462ae2..00e36e8 100644 --- a/example/produce/main.go +++ b/example/producer/hook.go @@ -1,10 +1,9 @@ -package main +package producer import ( "context" "sync" - "github.com/worldline-go/initializer" "github.com/worldline-go/wkafka" ) @@ -32,11 +31,7 @@ func ProduceHook(d *Data, r *wkafka.Record) error { return nil } -func main() { - initializer.Init(run) -} - -func run(ctx context.Context, _ *sync.WaitGroup) error { +func RunExampleHook(ctx context.Context, _ *sync.WaitGroup) error { client, err := wkafka.New(ctx, kafkaConfig) if err != nil { return err diff --git a/go.mod b/go.mod index d3245fd..1c106f5 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/worldline-go/wkafka go 1.21 require ( + github.com/rs/zerolog v1.30.0 github.com/stretchr/testify v1.8.4 - github.com/twmb/franz-go v1.15.3 + github.com/twmb/franz-go v1.15.4 github.com/twmb/franz-go/pkg/kadm v1.10.0 github.com/twmb/tlscfg v1.2.1 github.com/worldline-go/initializer v0.2.4 + github.com/worldline-go/logz v0.5.0 golang.org/x/sync v0.6.0 ) @@ -16,12 +18,10 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - github.com/pierrec/lz4/v4 v4.1.18 // indirect + github.com/pierrec/lz4/v4 v4.1.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rs/zerolog v1.30.0 // indirect github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect - github.com/worldline-go/logz v0.5.0 // indirect - golang.org/x/crypto v0.13.0 // indirect - golang.org/x/sys v0.12.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sys v0.15.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0f18102..8e80d2b 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= +github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -21,8 +21,8 @@ github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/twmb/franz-go v1.15.3 h1:96nCgxz4DvGPSCumz6giquYy8GGDNsYCwWcloBdjJ4w= -github.com/twmb/franz-go v1.15.3/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48= +github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw= +github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU= github.com/twmb/franz-go/pkg/kadm v1.10.0 h1:3oYKNP+e3HGo4GYadrDeRxOaAIsOXmX6LBVMz9PxpCU= github.com/twmb/franz-go/pkg/kadm v1.10.0/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc= github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= @@ -33,16 +33,16 @@ github.com/worldline-go/initializer v0.2.4 h1:niJKtvdE9T3fupzhc4d1+FfHin4QHn0Go+ github.com/worldline-go/initializer v0.2.4/go.mod h1:UkLyW92jTTU3faHv/95dHisu36SszZRMt98Es22ye0Q= github.com/worldline-go/logz v0.5.0 h1:o/xVrxo51Lt1F1Otu3In6wHSZlmCjzLZA8f9pEeGnE0= github.com/worldline-go/logz v0.5.0/go.mod h1:CWLYHvL+YkzCRfZmQ86zbMyWR/9mmZ2dIQEeDugaIXo= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/partition.go b/partition.go new file mode 100644 index 0000000..a399e07 --- /dev/null +++ b/partition.go @@ -0,0 +1,27 @@ +package wkafka + +import ( + "context" + + "github.com/twmb/franz-go/pkg/kgo" +) + +func partitionLost(c *Client) func(context.Context, *kgo.Client, map[string][]int32) { + return func(ctx context.Context, cl *kgo.Client, partitions map[string][]int32) { + if len(partitions) == 0 { + return + } + + c.logger.Info("partition lost", "partitions", partitions) + } +} + +func partitionRevoked(c *Client) func(context.Context, *kgo.Client, map[string][]int32) { + return func(ctx context.Context, cl *kgo.Client, partitions map[string][]int32) { + if len(partitions) == 0 { + return + } + + c.logger.Info("partition revoked", "partitions", partitions) + } +} diff --git a/producerdlq.go b/producerdlq.go index 0ce31bb..f77103f 100644 --- a/producerdlq.go +++ b/producerdlq.go @@ -13,7 +13,7 @@ import ( // - err could be ErrDLQIndexed or any other error func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Record) error) func(ctx context.Context, err error, records []*kgo.Record) error { return func(ctx context.Context, err error, records []*kgo.Record) error { - recordsModified := make([]*kgo.Record, 0, len(records)) + recordsSend := make([]*kgo.Record, len(records)) errDLQIndexed := &DLQIndexedError{} if !errors.As(err, &errDLQIndexed) { @@ -31,7 +31,7 @@ func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Recor err = unwrapErr(err) } - recordsModified[i] = &kgo.Record{ + recordsSend[i] = &kgo.Record{ Topic: topic, Key: r.Key, Value: r.Value, @@ -46,6 +46,6 @@ func producerDLQ(topic string, fn func(ctx context.Context, records []*kgo.Recor } } - return fn(ctx, records) + return fn(ctx, recordsSend) } } diff --git a/template.go b/template.go new file mode 100644 index 0000000..2ff1a1f --- /dev/null +++ b/template.go @@ -0,0 +1,22 @@ +package wkafka + +import ( + "bytes" + "text/template" +) + +var goTemplate = template.New("wkafka") + +func templateRun(txt string, data interface{}) (string, error) { + tmpl, err := goTemplate.Parse(txt) + if err != nil { + return "", err + } + + var buf bytes.Buffer + if err := tmpl.Execute(&buf, data); err != nil { + return "", err + } + + return buf.String(), nil +} diff --git a/template_test.go b/template_test.go new file mode 100644 index 0000000..b64b25f --- /dev/null +++ b/template_test.go @@ -0,0 +1,53 @@ +package wkafka + +import "testing" + +func Test_templateRun(t *testing.T) { + type args struct { + txt string + data interface{} + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test", + args: args{ + txt: "finops_{{.AppName}}_dlq", + data: map[string]string{"AppName": "test"}, + }, + want: "finops_test_dlq", + }, + { + name: "test-2", + args: args{ + txt: "finops_dlq", + data: map[string]string{"AppName": "test"}, + }, + want: "finops_dlq", + }, + { + name: "test error", + args: args{ + txt: "finops_{{.AppName}_dlq", + data: nil, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := templateRun(tt.args.txt, tt.args.data) + if (err != nil) != tt.wantErr { + t.Errorf("templateRun() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("templateRun() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/wait.go b/wait.go new file mode 100644 index 0000000..6671ee9 --- /dev/null +++ b/wait.go @@ -0,0 +1,23 @@ +package wkafka + +import ( + "context" + "time" +) + +type waitRetry struct { + Interval time.Duration +} + +func (w *waitRetry) CurrentInterval() time.Duration { + return w.Interval +} + +func (w *waitRetry) Sleep(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(w.Interval): + return nil + } +}