From 8bd7767568fb756638b35221a74e9595ba100adc Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Tue, 11 Jun 2024 01:27:51 +0200 Subject: [PATCH] feat: skip handler Signed-off-by: Eray Ates --- README.md | 15 + client.go | 22 +- clientoptions.go | 2 + config.go | 6 +- config_test.go | 8 +- consumer.go | 85 +--- consumerbatch.go | 6 +- consumersingle.go | 6 +- env/docker-compose.yml | 2 +- error.go | 10 +- example/consumer/single.go | 62 ++- example/main.go | 19 +- go.mod | 23 +- go.sum | 44 +- handler/gen/wkafka/wkafka.pb.go | 437 ++++++++++++++++++ handler/gen/wkafka/wkafka_grpc.pb.go | 110 +++++ .../wkafka/wkafkaconnect/wkafka.connect.go | 112 +++++ handler/handler.go | 72 +++ handler/proto/buf.gen.yaml | 15 + handler/proto/buf.yaml | 7 + handler/proto/wkafka/wkafka.proto | 30 ++ meter.go | 2 +- skip.go | 83 ++++ consumer2_test.go => skip_test.go | 133 +++--- 24 files changed, 1111 insertions(+), 200 deletions(-) create mode 100644 handler/gen/wkafka/wkafka.pb.go create mode 100644 handler/gen/wkafka/wkafka_grpc.pb.go create mode 100644 handler/gen/wkafka/wkafkaconnect/wkafka.connect.go create mode 100644 handler/handler.go create mode 100644 handler/proto/buf.gen.yaml create mode 100644 handler/proto/buf.yaml create mode 100644 handler/proto/wkafka/wkafka.proto create mode 100644 skip.go rename consumer2_test.go => skip_test.go (56%) diff --git a/README.md b/README.md index e7aad35..e90350f 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,21 @@ Send record to dead letter queue, use __WrapErrDLQ__ function with to wrap the e > Check the aditional options for custom decode and precheck. + +#### Skip Handler + +Editing the skip map and use our handler to initialize server mux. + +```go +mux := http.NewServeMux() +mux.Handle(handler.New(client)) + +reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName) + +mux.Handle(grpcreflect.NewHandlerV1(reflector)) +mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector)) +``` + ### Producer Use consumer client or create without consumer settings, `New` also try to connect to brokers. diff --git a/client.go b/client.go index 0e6da50..d676548 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package wkafka import ( "context" "fmt" + "sync" "github.com/rs/zerolog/log" "github.com/twmb/franz-go/pkg/kadm" @@ -19,6 +20,7 @@ type Client struct { clientID []byte consumerConfig *ConsumerConfig + consumerMutex sync.RWMutex logger logz.Adapter // log purpose @@ -34,7 +36,6 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { AutoTopicCreation: true, AppName: idProgname, Logger: logz.AdapterKV{Log: log.Logger}, - Meter: EmptyMeter(), } o.apply(opts...) @@ -50,6 +51,10 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { } } + if o.Meter == nil { + o.Meter = noopMeter() + } + c := &Client{ consumerConfig: o.ConsumerConfig, logger: o.Logger, @@ -204,7 +209,7 @@ func (c *Client) Close() { func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error { o := optionConsumer{ Client: c, - ConsumerConfig: *c.consumerConfig, + ConsumerConfig: c.consumerConfig, Meter: c.Meter, } @@ -269,3 +274,16 @@ func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error { func (c *Client) Admin() *kadm.Client { return kadm.NewClient(c.Kafka) } + +func (c *Client) Skip(modify func(SkipMap) SkipMap) { + c.consumerMutex.Lock() + defer c.consumerMutex.Unlock() + + if modify == nil { + return + } + + c.consumerConfig.Skip = modify(c.consumerConfig.Skip) + + c.logger.Debug("wkafka skip modified", "skip", c.consumerConfig.Skip) +} diff --git a/clientoptions.go b/clientoptions.go index c455e18..aa8e901 100644 --- a/clientoptions.go +++ b/clientoptions.go @@ -92,6 +92,8 @@ func WithKGOOptionsDLQ(opts ...kgo.Opt) Option { } } +// WithConsumer configures the client to use the provided consumer config. +// - It is shallow copied and to make safe use skip function to modify skip map. func WithConsumer(cfg ConsumerConfig) Option { return func(o *options) { o.ConsumerConfig = &cfg diff --git a/config.go b/config.go index 06ccad2..9cedf85 100644 --- a/config.go +++ b/config.go @@ -69,12 +69,12 @@ func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName s } } - if consumerConfig.DLQ.SkipExtra == nil { - consumerConfig.DLQ.SkipExtra = map[string]map[int32]OffsetConfig{ + if consumerConfig.Skip == nil { + consumerConfig.Skip = map[string]map[int32]OffsetConfig{ consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip, } } else { - consumerConfig.DLQ.SkipExtra[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip + consumerConfig.Skip[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip } if consumerConfig.DLQ.RetryInterval == 0 { diff --git a/config_test.go b/config_test.go index 6718453..9450fb4 100644 --- a/config_test.go +++ b/config_test.go @@ -40,11 +40,11 @@ func TestConsumerPreConfig_Apply(t *testing.T) { }, want: ConsumerConfig{ GroupID: "finops_test", + Skip: map[string]map[int32]OffsetConfig{ + "finops_serviceX_dlq": nil, + }, DLQ: DLQConfig{ - Topic: "finops_serviceX_dlq", - SkipExtra: map[string]map[int32]OffsetConfig{ - "finops_serviceX_dlq": nil, - }, + Topic: "finops_serviceX_dlq", RetryInterval: DefaultRetryInterval, }, }, diff --git a/consumer.go b/consumer.go index a8c2557..111e58d 100644 --- a/consumer.go +++ b/consumer.go @@ -8,6 +8,8 @@ import ( "github.com/twmb/franz-go/pkg/kgo" ) +type SkipMap = map[string]map[int32]OffsetConfig + type ConsumerConfig struct { // Topics is a list of kafka topics to consume. // Required at least one topic, topic name if not exist will be created or consumer waits for topic creation. @@ -79,8 +81,6 @@ type DLQConfig struct { Topic string `cfg:"topic"` // TopicExtra is extra a list of kafka topics to just consume from DLQ. TopicsExtra []string `cfg:"topics_extra"` - // SkipExtra are optional message offsets to be skipped for topicsExtra. - SkipExtra map[string]map[int32]OffsetConfig `cfg:"skip_extra"` } type OffsetConfig struct { @@ -110,72 +110,11 @@ type consumer interface { setPreCheck(fn func(ctx context.Context, r *kgo.Record) error) } -func skip(cfg *ConsumerConfig, r *kgo.Record) bool { - if len(cfg.Skip) == 0 { - return false - } - - if _, ok := cfg.Skip[r.Topic]; !ok { - return false - } - - if _, ok := cfg.Skip[r.Topic][r.Partition]; !ok { - return false - } - - offsets := cfg.Skip[r.Topic][r.Partition] - - if offsets.Before > 0 && r.Offset <= offsets.Before { - return true - } - - for _, offset := range offsets.Offsets { - if r.Offset == offset { - return true - } - } - - return false -} - -func skipDLQ(cfg *ConsumerConfig, r *kgo.Record) bool { - dlqCfg := cfg.DLQ - if dlqCfg.Disable { - return false - } - - if len(dlqCfg.SkipExtra) == 0 { - return false - } - - if _, ok := dlqCfg.SkipExtra[r.Topic]; !ok { - return false - } - - if _, ok := dlqCfg.SkipExtra[r.Topic][r.Partition]; !ok { - return false - } - - offsets := dlqCfg.SkipExtra[r.Topic][r.Partition] - - if offsets.Before > 0 && r.Offset <= offsets.Before { - return true - } - - for _, offset := range offsets.Offsets { - if r.Offset == offset { - return true - } - } - - return false -} - type optionConsumer struct { Client *Client Consumer consumer ConsumerDLQ consumer - ConsumerConfig ConsumerConfig + ConsumerConfig *ConsumerConfig Meter Meter } @@ -213,20 +152,25 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB Decode: decode, ProduceDLQ: produceDLQ, Cfg: o.ConsumerConfig, - Skip: skip, + Skip: newSkipper(&o.Client.consumerMutex, false), Logger: o.Client.logger, PartitionHandler: o.Client.partitionHandler, Meter: o.Meter, } + if o.ConsumerConfig.DLQ.Disable { + return nil + } + o.ConsumerDLQ = &consumerBatch[T]{ Decode: decode, ProcessDLQ: dlqProcessBatch(fn), Cfg: o.ConsumerConfig, - Skip: skipDLQ, + Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable), IsDLQ: true, Logger: o.Client.logger, PartitionHandler: o.Client.partitionHandlerDLQ, + Meter: o.Meter, } return nil @@ -245,20 +189,25 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc Decode: decode, ProduceDLQ: produceDLQ, Cfg: o.ConsumerConfig, - Skip: skip, + Skip: newSkipper(&o.Client.consumerMutex, false), Logger: o.Client.logger, PartitionHandler: o.Client.partitionHandler, Meter: o.Meter, } + if o.ConsumerConfig.DLQ.Disable { + return nil + } + o.ConsumerDLQ = &consumerSingle[T]{ ProcessDLQ: fn, Decode: decode, Cfg: o.ConsumerConfig, - Skip: skipDLQ, + Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable), IsDLQ: true, Logger: o.Client.logger, PartitionHandler: o.Client.partitionHandlerDLQ, + Meter: o.Meter, } return nil diff --git a/consumerbatch.go b/consumerbatch.go index 657c509..18cacf3 100644 --- a/consumerbatch.go +++ b/consumerbatch.go @@ -15,7 +15,7 @@ type consumerBatch[T any] struct { Process func(ctx context.Context, msg []T) error // ProcessDLQ is nil for main consumer. ProcessDLQ func(ctx context.Context, msg T) error - Cfg ConsumerConfig + Cfg *ConsumerConfig Decode func(raw []byte, r *kgo.Record) (T, error) // PreCheck is a function that is called before the callback and decode. PreCheck func(ctx context.Context, r *kgo.Record) error @@ -95,7 +95,7 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f // skip precheck and record section ///////////////////////////////// - if c.Skip(&c.Cfg, r) { + if c.Skip(c.Cfg, r) { continue } @@ -244,7 +244,7 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro } func (c *consumerBatch[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error { - if c.Skip(&c.Cfg, r) { + if c.Skip(c.Cfg, r) { return nil } diff --git a/consumersingle.go b/consumersingle.go index 494f2b7..f176bb1 100644 --- a/consumersingle.go +++ b/consumersingle.go @@ -15,7 +15,7 @@ type consumerSingle[T any] struct { Process func(ctx context.Context, msg T) error // ProcessDLQ is nil for main consumer. ProcessDLQ func(ctx context.Context, msg T) error - Cfg ConsumerConfig + Cfg *ConsumerConfig Decode func(raw []byte, r *kgo.Record) (T, error) // PreCheck is a function that is called before the callback and decode. PreCheck func(ctx context.Context, r *kgo.Record) error @@ -64,7 +64,6 @@ func (c *consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error { func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error { for iter := fetch.RecordIter(); !iter.Done(); { r := iter.Next() - // check partition is revoked if c.PartitionHandler.IsRevokedRecord(r) { continue @@ -88,7 +87,6 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch } } else { // listening main topics - if err := c.iterationMain(ctx, r); err != nil { c.Meter.Meter(start, 1, r.Topic, err, false) return wrapErr(r, err, c.IsDLQ) @@ -181,7 +179,7 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er } func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error { - if c.Skip(&c.Cfg, r) { + if c.Skip(c.Cfg, r) { return nil } diff --git a/env/docker-compose.yml b/env/docker-compose.yml index 813b208..fe24f0e 100644 --- a/env/docker-compose.yml +++ b/env/docker-compose.yml @@ -17,7 +17,7 @@ services: - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER redpanda: - image: docker.io/redpandadata/console:v2.3.8 + image: docker.io/redpandadata/console:v2.6.0 ports: - "7071:7071" environment: diff --git a/error.go b/error.go index 5bb5007..4eadcb7 100644 --- a/error.go +++ b/error.go @@ -12,15 +12,15 @@ import ( ) var ( - errNotImplemented = fmt.Errorf("not implemented") - errClientClosed = fmt.Errorf("client closed") - errPartitionRevoked = fmt.Errorf("partition revoked") + errNotImplemented = errors.New("not implemented") + errClientClosed = errors.New("client closed") + errPartitionRevoked = errors.New("partition revoked") // ErrSkip is use to skip message in the PreCheck hook or Decode function. - ErrSkip = fmt.Errorf("skip message") + ErrSkip = errors.New("skip message") // ErrDLQ use with callback function to send message to DLQ topic. // Prefer to use WrapErrDLQ to wrap error. - ErrDLQ = fmt.Errorf("error DLQ") + ErrDLQ = errors.New("error DLQ") ) // DLQError is use with callback function to send message to DLQ topic. diff --git a/example/consumer/single.go b/example/consumer/single.go index 54adcae..4d3f05e 100644 --- a/example/consumer/single.go +++ b/example/consumer/single.go @@ -4,11 +4,19 @@ import ( "context" "fmt" "log/slog" + "net/http" "sync" "time" - "github.com/rs/zerolog/log" + "connectrpc.com/grpcreflect" + "github.com/worldline-go/initializer" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "golang.org/x/sync/errgroup" + "github.com/worldline-go/wkafka" + "github.com/worldline-go/wkafka/handler" + "github.com/worldline-go/wkafka/handler/gen/wkafka/wkafkaconnect" ) var ( @@ -35,9 +43,9 @@ func ProcessSingle(_ context.Context, msg DataSingle) error { slog.Info("callback", slog.Any("test", msg.Test), slog.Bool("is_err", msg.IsErr)) if duration, err := time.ParseDuration(msg.Timeout); err != nil { - log.Error().Err(err).Msg("parse duration") + slog.Error("parse duration", "error", err.Error()) } else { - log.Info().Dur("duration", duration).Msg("sleep") + slog.Info("sleep", slog.Duration("duration", duration)) time.Sleep(duration) } @@ -70,3 +78,51 @@ func RunExampleSingle(ctx context.Context, _ *sync.WaitGroup) error { return nil } + +func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error { + client, err := wkafka.New( + ctx, kafkaConfigSingle, + wkafka.WithConsumer(consumeConfigSingle), + wkafka.WithClientInfo("testapp", "v0.1.0"), + wkafka.WithLogger(slog.Default()), + ) + if err != nil { + return err + } + + defer client.Close() + + mux := http.NewServeMux() + mux.Handle(handler.New(client)) + + reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName) + + mux.Handle(grpcreflect.NewHandlerV1(reflector)) + mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector)) + + s := http.Server{ + Addr: ":8080", + Handler: h2c.NewHandler(mux, &http2.Server{}), + } + + initializer.Shutdown.Add(s.Close) + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + slog.Info("started listening on :8080") + + go func() { + <-ctx.Done() + s.Close() + }() + + return s.ListenAndServe() + }) + + g.Go(func() error { + return client.Consume(ctx, wkafka.WithCallback(ProcessSingle)) + }) + + return g.Wait() +} diff --git a/example/main.go b/example/main.go index 2e2b190..bba0c07 100644 --- a/example/main.go +++ b/example/main.go @@ -15,14 +15,15 @@ import ( ) var examples = map[string]func(context.Context, *sync.WaitGroup) error{ - "admin_topic": admin.RunExampleTopic, - "admin_partition": admin.RunExamplePartition, - "admin_list": admin.RunExampleList, - "consumer_batch": consumer.RunExampleBatch, - "consumer_batch_err": consumer.RunExampleBatchErr, - "consumer_single": consumer.RunExampleSingle, - "consumer_single_byte": consumer.RunExampleSingleByte, - "producer_hook": producer.RunExampleHook, + "admin_topic": admin.RunExampleTopic, + "admin_partition": admin.RunExamplePartition, + "admin_list": admin.RunExampleList, + "consumer_batch": consumer.RunExampleBatch, + "consumer_batch_err": consumer.RunExampleBatchErr, + "consumer_single": consumer.RunExampleSingle, + "consumer_single_handler": consumer.RunExampleSingleWithHandler, + "consumer_single_byte": consumer.RunExampleSingleByte, + "producer_hook": producer.RunExampleHook, } func getExampleList() []string { @@ -52,5 +53,5 @@ func main() { return } - initializer.Init(run, initializer.WithOptionsLogz(logz.WithCaller(false))) + initializer.Init(run, initializer.WithLogger(initializer.Slog), initializer.WithOptionsLogz(logz.WithCaller(false))) } diff --git a/go.mod b/go.mod index 1c106f5..2723ef1 100644 --- a/go.mod +++ b/go.mod @@ -1,27 +1,38 @@ module github.com/worldline-go/wkafka -go 1.21 +go 1.22 + +toolchain go1.22.3 require ( - github.com/rs/zerolog v1.30.0 + connectrpc.com/connect v1.16.2 + connectrpc.com/grpcreflect v1.2.0 + github.com/rs/zerolog v1.32.0 github.com/stretchr/testify v1.8.4 github.com/twmb/franz-go v1.15.4 github.com/twmb/franz-go/pkg/kadm v1.10.0 github.com/twmb/tlscfg v1.2.1 - github.com/worldline-go/initializer v0.2.4 + github.com/worldline-go/initializer v0.3.2 github.com/worldline-go/logz v0.5.0 + golang.org/x/net v0.23.0 golang.org/x/sync v0.6.0 + google.golang.org/grpc v1.64.0 + google.golang.org/protobuf v1.34.1 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.17.0 // indirect + github.com/lmittmann/tint v1.0.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/pierrec/lz4/v4 v4.1.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rakunlabs/logi v0.2.1 // indirect github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8e80d2b..fae05e6 100644 --- a/go.sum +++ b/go.sum @@ -1,24 +1,33 @@ +connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= +connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= +connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= +connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rakunlabs/logi v0.2.1 h1:3QOSEiB6jhvOyVTZICR+X1/JMsO4xOEZtleLBvC64FA= +github.com/rakunlabs/logi v0.2.1/go.mod h1:LLnCp/aiBxqJYrArP8j8dotuwwPNHnj6lIY/WsgRauo= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= -github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w= +github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= +github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw= @@ -29,20 +38,29 @@ github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqj github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44= github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs= -github.com/worldline-go/initializer v0.2.4 h1:niJKtvdE9T3fupzhc4d1+FfHin4QHn0Go+qxLjwpsD4= -github.com/worldline-go/initializer v0.2.4/go.mod h1:UkLyW92jTTU3faHv/95dHisu36SszZRMt98Es22ye0Q= +github.com/worldline-go/initializer v0.3.2 h1:HPH7AIgNdcSdj/KqCWIEMsTwTlFgqoPIUUqI9yErkCM= +github.com/worldline-go/initializer v0.3.2/go.mod h1:gGgLm/kFyicEelNMNnW2+e7BnE5AoGhKOX7oF+P2XJg= github.com/worldline-go/logz v0.5.0 h1:o/xVrxo51Lt1F1Otu3In6wHSZlmCjzLZA8f9pEeGnE0= github.com/worldline-go/logz v0.5.0/go.mod h1:CWLYHvL+YkzCRfZmQ86zbMyWR/9mmZ2dIQEeDugaIXo= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= +google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/handler/gen/wkafka/wkafka.pb.go b/handler/gen/wkafka/wkafka.pb.go new file mode 100644 index 0000000..3fd628a --- /dev/null +++ b/handler/gen/wkafka/wkafka.pb.go @@ -0,0 +1,437 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc (unknown) +// source: wkafka/wkafka.proto + +package wkafka + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SkipOption int32 + +const ( + SkipOption_APPEND SkipOption = 0 + SkipOption_REPLACE SkipOption = 1 +) + +// Enum value maps for SkipOption. +var ( + SkipOption_name = map[int32]string{ + 0: "APPEND", + 1: "REPLACE", + } + SkipOption_value = map[string]int32{ + "APPEND": 0, + "REPLACE": 1, + } +) + +func (x SkipOption) Enum() *SkipOption { + p := new(SkipOption) + *p = x + return p +} + +func (x SkipOption) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SkipOption) Descriptor() protoreflect.EnumDescriptor { + return file_wkafka_wkafka_proto_enumTypes[0].Descriptor() +} + +func (SkipOption) Type() protoreflect.EnumType { + return &file_wkafka_wkafka_proto_enumTypes[0] +} + +func (x SkipOption) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SkipOption.Descriptor instead. +func (SkipOption) EnumDescriptor() ([]byte, []int) { + return file_wkafka_wkafka_proto_rawDescGZIP(), []int{0} +} + +type CreateSkipRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topics map[string]*Topic `protobuf:"bytes,1,rep,name=topics,proto3" json:"topics,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Option SkipOption `protobuf:"varint,2,opt,name=option,proto3,enum=wkafka.SkipOption" json:"option,omitempty"` +} + +func (x *CreateSkipRequest) Reset() { + *x = CreateSkipRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_wkafka_wkafka_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateSkipRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateSkipRequest) ProtoMessage() {} + +func (x *CreateSkipRequest) ProtoReflect() protoreflect.Message { + mi := &file_wkafka_wkafka_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateSkipRequest.ProtoReflect.Descriptor instead. +func (*CreateSkipRequest) Descriptor() ([]byte, []int) { + return file_wkafka_wkafka_proto_rawDescGZIP(), []int{0} +} + +func (x *CreateSkipRequest) GetTopics() map[string]*Topic { + if x != nil { + return x.Topics + } + return nil +} + +func (x *CreateSkipRequest) GetOption() SkipOption { + if x != nil { + return x.Option + } + return SkipOption_APPEND +} + +type Topic struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Partitions map[int32]*Partition `protobuf:"bytes,1,rep,name=partitions,proto3" json:"partitions,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Topic) Reset() { + *x = Topic{} + if protoimpl.UnsafeEnabled { + mi := &file_wkafka_wkafka_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Topic) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Topic) ProtoMessage() {} + +func (x *Topic) ProtoReflect() protoreflect.Message { + mi := &file_wkafka_wkafka_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Topic.ProtoReflect.Descriptor instead. +func (*Topic) Descriptor() ([]byte, []int) { + return file_wkafka_wkafka_proto_rawDescGZIP(), []int{1} +} + +func (x *Topic) GetPartitions() map[int32]*Partition { + if x != nil { + return x.Partitions + } + return nil +} + +type Partition struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Offsets []int64 `protobuf:"varint,1,rep,packed,name=offsets,proto3" json:"offsets,omitempty"` + Before int64 `protobuf:"varint,2,opt,name=before,proto3" json:"before,omitempty"` +} + +func (x *Partition) Reset() { + *x = Partition{} + if protoimpl.UnsafeEnabled { + mi := &file_wkafka_wkafka_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Partition) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Partition) ProtoMessage() {} + +func (x *Partition) ProtoReflect() protoreflect.Message { + mi := &file_wkafka_wkafka_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Partition.ProtoReflect.Descriptor instead. +func (*Partition) Descriptor() ([]byte, []int) { + return file_wkafka_wkafka_proto_rawDescGZIP(), []int{2} +} + +func (x *Partition) GetOffsets() []int64 { + if x != nil { + return x.Offsets + } + return nil +} + +func (x *Partition) GetBefore() int64 { + if x != nil { + return x.Before + } + return 0 +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_wkafka_wkafka_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_wkafka_wkafka_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_wkafka_wkafka_proto_rawDescGZIP(), []int{3} +} + +func (x *Response) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_wkafka_wkafka_proto protoreflect.FileDescriptor + +var file_wkafka_wkafka_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2f, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x22, 0xc8, 0x01, + 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x43, 0x72, 0x65, + 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x54, + 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, + 0x63, 0x73, 0x12, 0x2a, 0x0a, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x53, 0x6b, 0x69, 0x70, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x48, + 0x0a, 0x0b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x23, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, + 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x98, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70, + 0x69, 0x63, 0x12, 0x3d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x1a, 0x50, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0x3d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x03, 0x52, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x65, + 0x66, 0x6f, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, + 0x72, 0x65, 0x22, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x25, 0x0a, 0x0a, 0x53, 0x6b, 0x69, 0x70, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, + 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x01, 0x32, + 0x44, 0x0a, 0x0d, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x33, 0x0a, 0x04, 0x53, 0x6b, 0x69, 0x70, 0x12, 0x19, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, + 0x61, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x84, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x77, 0x6b, + 0x61, 0x66, 0x6b, 0x61, 0x42, 0x0b, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x2d, 0x67, 0x6f, 0x2f, 0x77, 0x6b, 0x61, + 0x66, 0x6b, 0x61, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, + 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xa2, 0x02, 0x03, 0x57, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x57, + 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xca, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xe2, 0x02, + 0x12, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0xea, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_wkafka_wkafka_proto_rawDescOnce sync.Once + file_wkafka_wkafka_proto_rawDescData = file_wkafka_wkafka_proto_rawDesc +) + +func file_wkafka_wkafka_proto_rawDescGZIP() []byte { + file_wkafka_wkafka_proto_rawDescOnce.Do(func() { + file_wkafka_wkafka_proto_rawDescData = protoimpl.X.CompressGZIP(file_wkafka_wkafka_proto_rawDescData) + }) + return file_wkafka_wkafka_proto_rawDescData +} + +var file_wkafka_wkafka_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_wkafka_wkafka_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_wkafka_wkafka_proto_goTypes = []interface{}{ + (SkipOption)(0), // 0: wkafka.SkipOption + (*CreateSkipRequest)(nil), // 1: wkafka.CreateSkipRequest + (*Topic)(nil), // 2: wkafka.Topic + (*Partition)(nil), // 3: wkafka.Partition + (*Response)(nil), // 4: wkafka.Response + nil, // 5: wkafka.CreateSkipRequest.TopicsEntry + nil, // 6: wkafka.Topic.PartitionsEntry +} +var file_wkafka_wkafka_proto_depIdxs = []int32{ + 5, // 0: wkafka.CreateSkipRequest.topics:type_name -> wkafka.CreateSkipRequest.TopicsEntry + 0, // 1: wkafka.CreateSkipRequest.option:type_name -> wkafka.SkipOption + 6, // 2: wkafka.Topic.partitions:type_name -> wkafka.Topic.PartitionsEntry + 2, // 3: wkafka.CreateSkipRequest.TopicsEntry.value:type_name -> wkafka.Topic + 3, // 4: wkafka.Topic.PartitionsEntry.value:type_name -> wkafka.Partition + 1, // 5: wkafka.WkafkaService.Skip:input_type -> wkafka.CreateSkipRequest + 4, // 6: wkafka.WkafkaService.Skip:output_type -> wkafka.Response + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_wkafka_wkafka_proto_init() } +func file_wkafka_wkafka_proto_init() { + if File_wkafka_wkafka_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_wkafka_wkafka_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateSkipRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_wkafka_wkafka_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Topic); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_wkafka_wkafka_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Partition); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_wkafka_wkafka_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_wkafka_wkafka_proto_rawDesc, + NumEnums: 1, + NumMessages: 6, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_wkafka_wkafka_proto_goTypes, + DependencyIndexes: file_wkafka_wkafka_proto_depIdxs, + EnumInfos: file_wkafka_wkafka_proto_enumTypes, + MessageInfos: file_wkafka_wkafka_proto_msgTypes, + }.Build() + File_wkafka_wkafka_proto = out.File + file_wkafka_wkafka_proto_rawDesc = nil + file_wkafka_wkafka_proto_goTypes = nil + file_wkafka_wkafka_proto_depIdxs = nil +} diff --git a/handler/gen/wkafka/wkafka_grpc.pb.go b/handler/gen/wkafka/wkafka_grpc.pb.go new file mode 100644 index 0000000..ea676fe --- /dev/null +++ b/handler/gen/wkafka/wkafka_grpc.pb.go @@ -0,0 +1,110 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.4.0 +// - protoc (unknown) +// source: wkafka/wkafka.proto + +package wkafka + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 + +const ( + WkafkaService_Skip_FullMethodName = "/wkafka.WkafkaService/Skip" +) + +// WkafkaServiceClient is the client API for WkafkaService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WkafkaServiceClient interface { + Skip(ctx context.Context, in *CreateSkipRequest, opts ...grpc.CallOption) (*Response, error) +} + +type wkafkaServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewWkafkaServiceClient(cc grpc.ClientConnInterface) WkafkaServiceClient { + return &wkafkaServiceClient{cc} +} + +func (c *wkafkaServiceClient) Skip(ctx context.Context, in *CreateSkipRequest, opts ...grpc.CallOption) (*Response, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(Response) + err := c.cc.Invoke(ctx, WkafkaService_Skip_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// WkafkaServiceServer is the server API for WkafkaService service. +// All implementations must embed UnimplementedWkafkaServiceServer +// for forward compatibility +type WkafkaServiceServer interface { + Skip(context.Context, *CreateSkipRequest) (*Response, error) + mustEmbedUnimplementedWkafkaServiceServer() +} + +// UnimplementedWkafkaServiceServer must be embedded to have forward compatible implementations. +type UnimplementedWkafkaServiceServer struct { +} + +func (UnimplementedWkafkaServiceServer) Skip(context.Context, *CreateSkipRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method Skip not implemented") +} +func (UnimplementedWkafkaServiceServer) mustEmbedUnimplementedWkafkaServiceServer() {} + +// UnsafeWkafkaServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WkafkaServiceServer will +// result in compilation errors. +type UnsafeWkafkaServiceServer interface { + mustEmbedUnimplementedWkafkaServiceServer() +} + +func RegisterWkafkaServiceServer(s grpc.ServiceRegistrar, srv WkafkaServiceServer) { + s.RegisterService(&WkafkaService_ServiceDesc, srv) +} + +func _WkafkaService_Skip_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateSkipRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WkafkaServiceServer).Skip(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: WkafkaService_Skip_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WkafkaServiceServer).Skip(ctx, req.(*CreateSkipRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// WkafkaService_ServiceDesc is the grpc.ServiceDesc for WkafkaService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var WkafkaService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "wkafka.WkafkaService", + HandlerType: (*WkafkaServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Skip", + Handler: _WkafkaService_Skip_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "wkafka/wkafka.proto", +} diff --git a/handler/gen/wkafka/wkafkaconnect/wkafka.connect.go b/handler/gen/wkafka/wkafkaconnect/wkafka.connect.go new file mode 100644 index 0000000..883f5b5 --- /dev/null +++ b/handler/gen/wkafka/wkafkaconnect/wkafka.connect.go @@ -0,0 +1,112 @@ +// Code generated by protoc-gen-connect-go. DO NOT EDIT. +// +// Source: wkafka/wkafka.proto + +package wkafkaconnect + +import ( + connect "connectrpc.com/connect" + context "context" + errors "errors" + wkafka "github.com/worldline-go/wkafka/handler/gen/wkafka" + http "net/http" + strings "strings" +) + +// This is a compile-time assertion to ensure that this generated file and the connect package are +// compatible. If you get a compiler error that this constant is not defined, this code was +// generated with a version of connect newer than the one compiled into your binary. You can fix the +// problem by either regenerating this code with an older version of connect or updating the connect +// version compiled into your binary. +const _ = connect.IsAtLeastVersion1_13_0 + +const ( + // WkafkaServiceName is the fully-qualified name of the WkafkaService service. + WkafkaServiceName = "wkafka.WkafkaService" +) + +// These constants are the fully-qualified names of the RPCs defined in this package. They're +// exposed at runtime as Spec.Procedure and as the final two segments of the HTTP route. +// +// Note that these are different from the fully-qualified method names used by +// google.golang.org/protobuf/reflect/protoreflect. To convert from these constants to +// reflection-formatted method names, remove the leading slash and convert the remaining slash to a +// period. +const ( + // WkafkaServiceSkipProcedure is the fully-qualified name of the WkafkaService's Skip RPC. + WkafkaServiceSkipProcedure = "/wkafka.WkafkaService/Skip" +) + +// These variables are the protoreflect.Descriptor objects for the RPCs defined in this package. +var ( + wkafkaServiceServiceDescriptor = wkafka.File_wkafka_wkafka_proto.Services().ByName("WkafkaService") + wkafkaServiceSkipMethodDescriptor = wkafkaServiceServiceDescriptor.Methods().ByName("Skip") +) + +// WkafkaServiceClient is a client for the wkafka.WkafkaService service. +type WkafkaServiceClient interface { + Skip(context.Context, *connect.Request[wkafka.CreateSkipRequest]) (*connect.Response[wkafka.Response], error) +} + +// NewWkafkaServiceClient constructs a client for the wkafka.WkafkaService service. By default, it +// uses the Connect protocol with the binary Protobuf Codec, asks for gzipped responses, and sends +// uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the connect.WithGRPC() or +// connect.WithGRPCWeb() options. +// +// The URL supplied here should be the base URL for the Connect or gRPC server (for example, +// http://api.acme.com or https://acme.com/grpc). +func NewWkafkaServiceClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) WkafkaServiceClient { + baseURL = strings.TrimRight(baseURL, "/") + return &wkafkaServiceClient{ + skip: connect.NewClient[wkafka.CreateSkipRequest, wkafka.Response]( + httpClient, + baseURL+WkafkaServiceSkipProcedure, + connect.WithSchema(wkafkaServiceSkipMethodDescriptor), + connect.WithClientOptions(opts...), + ), + } +} + +// wkafkaServiceClient implements WkafkaServiceClient. +type wkafkaServiceClient struct { + skip *connect.Client[wkafka.CreateSkipRequest, wkafka.Response] +} + +// Skip calls wkafka.WkafkaService.Skip. +func (c *wkafkaServiceClient) Skip(ctx context.Context, req *connect.Request[wkafka.CreateSkipRequest]) (*connect.Response[wkafka.Response], error) { + return c.skip.CallUnary(ctx, req) +} + +// WkafkaServiceHandler is an implementation of the wkafka.WkafkaService service. +type WkafkaServiceHandler interface { + Skip(context.Context, *connect.Request[wkafka.CreateSkipRequest]) (*connect.Response[wkafka.Response], error) +} + +// NewWkafkaServiceHandler builds an HTTP handler from the service implementation. It returns the +// path on which to mount the handler and the handler itself. +// +// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf +// and JSON codecs. They also support gzip compression. +func NewWkafkaServiceHandler(svc WkafkaServiceHandler, opts ...connect.HandlerOption) (string, http.Handler) { + wkafkaServiceSkipHandler := connect.NewUnaryHandler( + WkafkaServiceSkipProcedure, + svc.Skip, + connect.WithSchema(wkafkaServiceSkipMethodDescriptor), + connect.WithHandlerOptions(opts...), + ) + return "/wkafka.WkafkaService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case WkafkaServiceSkipProcedure: + wkafkaServiceSkipHandler.ServeHTTP(w, r) + default: + http.NotFound(w, r) + } + }) +} + +// UnimplementedWkafkaServiceHandler returns CodeUnimplemented from all methods. +type UnimplementedWkafkaServiceHandler struct{} + +func (UnimplementedWkafkaServiceHandler) Skip(context.Context, *connect.Request[wkafka.CreateSkipRequest]) (*connect.Response[wkafka.Response], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("wkafka.WkafkaService.Skip is not implemented")) +} diff --git a/handler/handler.go b/handler/handler.go new file mode 100644 index 0000000..2da956f --- /dev/null +++ b/handler/handler.go @@ -0,0 +1,72 @@ +package handler + +import ( + "context" + "fmt" + "log/slog" + "net/http" + + "connectrpc.com/connect" + + "github.com/worldline-go/wkafka" + wkafkahandler "github.com/worldline-go/wkafka/handler/gen/wkafka" + "github.com/worldline-go/wkafka/handler/gen/wkafka/wkafkaconnect" +) + +type Handler struct { + Client *wkafka.Client +} + +// NewHandler returns a http.Handler implementation. +func New(client *wkafka.Client) (string, http.Handler) { + return wkafkaconnect.NewWkafkaServiceHandler(&Handler{ + Client: client, + }) +} + +func convertSkipMap(skip map[string]*wkafkahandler.Topic) wkafka.SkipMap { + if len(skip) == 0 { + return nil + } + + m := make(wkafka.SkipMap, len(skip)) + for k, v := range skip { + p := make(map[int32]wkafka.OffsetConfig, len(v.GetPartitions())) + for k, v := range v.GetPartitions() { + p[k] = wkafka.OffsetConfig{ + Before: v.GetBefore(), + Offsets: v.GetOffsets(), + } + } + + m[k] = p + } + + return m +} + +func (h *Handler) Skip(ctx context.Context, req *connect.Request[wkafkahandler.CreateSkipRequest]) (*connect.Response[wkafkahandler.Response], error) { + topics := req.Msg.GetTopics() + slog.Debug("topics", slog.Any("topics", topics)) + + for k, v := range topics { + slog.Debug("skip", slog.String("topic", k), slog.Any("partitions", v.GetPartitions())) + } + + switch req.Msg.GetOption() { + case wkafkahandler.SkipOption_APPEND: + h.Client.Skip(wkafka.SkipAppend(convertSkipMap(req.Msg.GetTopics()))) + + return connect.NewResponse(&wkafkahandler.Response{ + Message: "skip appended", + }), nil + case wkafkahandler.SkipOption_REPLACE: + h.Client.Skip(wkafka.SkipReplace(convertSkipMap(req.Msg.GetTopics()))) + + return connect.NewResponse(&wkafkahandler.Response{ + Message: "skip replaced", + }), nil + } + + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid option: %v", req.Msg.GetOption())) +} diff --git a/handler/proto/buf.gen.yaml b/handler/proto/buf.gen.yaml new file mode 100644 index 0000000..4721048 --- /dev/null +++ b/handler/proto/buf.gen.yaml @@ -0,0 +1,15 @@ +version: v1 +managed: + enabled: true + go_package_prefix: + default: github.com/worldline-go/wkafka/handler/gen +plugins: + - plugin: go + out: ../gen + opt: paths=source_relative + - plugin: connect-go + out: ../gen + opt: paths=source_relative + - plugin: go-grpc + out: ../gen + opt: paths=source_relative diff --git a/handler/proto/buf.yaml b/handler/proto/buf.yaml new file mode 100644 index 0000000..1a51945 --- /dev/null +++ b/handler/proto/buf.yaml @@ -0,0 +1,7 @@ +version: v1 +breaking: + use: + - FILE +lint: + use: + - DEFAULT diff --git a/handler/proto/wkafka/wkafka.proto b/handler/proto/wkafka/wkafka.proto new file mode 100644 index 0000000..3bb3059 --- /dev/null +++ b/handler/proto/wkafka/wkafka.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package wkafka; + +enum SkipOption { + APPEND = 0; + REPLACE = 1; +} + +message CreateSkipRequest { + map topics = 1; + SkipOption option = 2; +} + +message Topic { + map partitions = 1; +} + +message Partition { + repeated int64 offsets = 1; + int64 before = 2; +} + +message Response { + string message = 1; +} + +service WkafkaService { + rpc Skip (CreateSkipRequest) returns (Response); +} diff --git a/meter.go b/meter.go index 32deddd..e9e400e 100644 --- a/meter.go +++ b/meter.go @@ -18,6 +18,6 @@ func NewMeter(meterFunc func(time.Time, int64, string, error, bool)) Meter { return &meterFuncImpl{meterFunc: meterFunc} } -func EmptyMeter() Meter { +func noopMeter() Meter { return &meterFuncImpl{meterFunc: func(time.Time, int64, string, error, bool) {}} } diff --git a/skip.go b/skip.go new file mode 100644 index 0000000..e932b61 --- /dev/null +++ b/skip.go @@ -0,0 +1,83 @@ +package wkafka + +import ( + "sync" + + "github.com/twmb/franz-go/pkg/kgo" +) + +func newSkipper(c *sync.RWMutex, disable bool) func(cfg *ConsumerConfig, r *kgo.Record) bool { + if disable { + return func(cfg *ConsumerConfig, r *kgo.Record) bool { + return false + } + } + + return func(cfg *ConsumerConfig, r *kgo.Record) bool { + c.RLock() + defer c.RUnlock() + + return skipCheck(cfg.Skip, r) + } +} + +func skipCheck(skip map[string]map[int32]OffsetConfig, r *kgo.Record) bool { + if len(skip) == 0 { + return false + } + + topic, ok := skip[r.Topic] + if !ok { + return false + } + + offsets, ok := topic[r.Partition] + if !ok { + return false + } + + if offsets.Before > 0 && r.Offset <= offsets.Before { + return true + } + + for _, offset := range offsets.Offsets { + if r.Offset == offset { + return true + } + } + + return false +} + +func SkipReplace(skip SkipMap) func(SkipMap) SkipMap { + return func(m SkipMap) SkipMap { + return skip + } +} + +func SkipAppend(skip SkipMap) func(SkipMap) SkipMap { + return func(m SkipMap) SkipMap { + if m == nil { + return skip + } + + for topic, partitions := range skip { + if v, ok := m[topic]; !ok || v == nil { + m[topic] = make(map[int32]OffsetConfig) + } + + for partition, offset := range partitions { + if _, ok := m[topic][partition]; !ok { + m[topic][partition] = offset + } else { + m[topic][partition] = OffsetConfig{ + Before: offset.Before, + Offsets: append(m[topic][partition].Offsets, offset.Offsets...), + } + } + } + } + + return m + } +} diff --git a/consumer2_test.go b/skip_test.go similarity index 56% rename from consumer2_test.go rename to skip_test.go index 1573d4e..20a9c69 100644 --- a/consumer2_test.go +++ b/skip_test.go @@ -1,6 +1,8 @@ package wkafka import ( + "reflect" + "sync" "testing" "github.com/twmb/franz-go/pkg/kgo" @@ -102,121 +104,96 @@ func Test_skip(t *testing.T) { }, }, } + + var cfgMutex sync.RWMutex for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := skip(tt.args.cfg, tt.args.r); got != tt.want { + if got := newSkipper(&cfgMutex, false)(tt.args.cfg, tt.args.r); got != tt.want { t.Errorf("skip() = %v, want %v", got, tt.want) } }) } } -func Test_skipDLQ(t *testing.T) { +func TestSkipAppend(t *testing.T) { type args struct { - cfg *ConsumerConfig - r *kgo.Record + skip SkipMap + base SkipMap } tests := []struct { name string args args - want bool + want SkipMap }{ { name: "empty", args: args{ - cfg: &ConsumerConfig{}, - r: &kgo.Record{}, + skip: nil, }, - want: false, + want: nil, }, { - name: "empty config", + name: "mixed", args: args{ - cfg: &ConsumerConfig{}, - r: &kgo.Record{ - Topic: "topic", - Partition: 0, - Offset: 5, - }, - }, - want: false, - }, - { - name: "skip topic", - args: args{ - cfg: &ConsumerConfig{ - DLQ: DLQConfig{ - SkipExtra: map[string]map[int32]OffsetConfig{ - "topic": { - 0: { - Offsets: []int64{ - 5, - }, - }, - }, + base: map[string]map[int32]OffsetConfig{ + "topic2": { + 0: { + Before: 10, + Offsets: []int64{1, 2, 3}, + }, + }, + "topic": { + 0: { + Offsets: []int64{11}, }, }, }, - r: &kgo.Record{ - Topic: "topic", - Partition: 0, - Offset: 5, - }, - }, - want: true, - }, - { - name: "skip topic before", - args: args{ - cfg: &ConsumerConfig{ - DLQ: DLQConfig{ - SkipExtra: map[string]map[int32]OffsetConfig{ - "topic": { - 0: { - Before: 5, - }, + skip: map[string]map[int32]OffsetConfig{ + "topic2": { + 1: { + Before: 5, + Offsets: []int64{9, 10}, + }, + }, + "topic": { + 0: { + Before: 5, + Offsets: []int64{ + 9, + 10, }, }, }, }, - r: &kgo.Record{ - Topic: "topic", - Partition: 0, - Offset: 5, - }, }, - want: true, - }, - { - name: "topic before", - args: args{ - cfg: &ConsumerConfig{ - DLQ: DLQConfig{ - SkipExtra: map[string]map[int32]OffsetConfig{ - "topic": { - 0: { - Before: 5, - Offsets: []int64{ - 9, - 10, - }, - }, - }, - }, + want: map[string]map[int32]OffsetConfig{ + "topic2": { + 0: { + Before: 10, + Offsets: []int64{1, 2, 3}, + }, + 1: { + Before: 5, + Offsets: []int64{9, 10}, }, }, - r: &kgo.Record{ - Topic: "topic", - Partition: 0, - Offset: 6, + "topic": { + 0: { + Before: 5, + Offsets: []int64{ + 11, + 9, + 10, + }, + }, }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := skipDLQ(tt.args.cfg, tt.args.r); got != tt.want { - t.Errorf("skipDLQ() = %v, want %v", got, tt.want) + if got := SkipAppend(tt.args.skip)(tt.args.base); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SkipAppend() = %v, want %v", got, tt.want) } }) }