From baf159a0a7b41febfe4244424202e354cb45df46 Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Tue, 25 Jun 2024 16:25:56 +0200 Subject: [PATCH] feat: DLQ skip modify Signed-off-by: Eray Ates --- README.md | 15 +++++ client.go | 50 +++++++++++++++-- clientoptions.go | 44 +++++++++++++-- config.go | 4 +- config_test.go | 4 +- consumerbatch.go | 7 ++- consumersingle.go | 5 +- env/config/turna.yaml | 18 ++++++ env/docker-compose.yml | 7 ++- example/admin/list.go | 2 +- example/consumer/single.go | 2 +- go.mod | 11 ++-- go.sum | 22 ++++---- handler/gen/wkafka/wkafka.pb.go | 93 +++++++++++++++++-------------- handler/handler.go | 49 ++++++++++++++-- handler/proto/wkafka/wkafka.proto | 1 + logger.go | 15 +++++ partition.go | 3 +- tkafka/connect.go | 53 +++++++++++++++++- tkafka/generate.go | 32 +++++++++-- tkafka/read.go | 59 ++++++++++++++++++++ 21 files changed, 404 insertions(+), 92 deletions(-) create mode 100644 env/config/turna.yaml create mode 100644 logger.go create mode 100644 tkafka/read.go diff --git a/README.md b/README.md index e90350f..0a06fa4 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,21 @@ mux.Handle(grpcreflect.NewHandlerV1(reflector)) mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector)) ``` +
Handler Example + +```sh +make env + +# run the example +EXAMPLE=consumer_single_handler make example +``` + +Add messages in here to skip the message http://localhost:7071 + +Go to this side for grpcUI http://localhost:8082/#/grpc/wkafka + +
+ ### Producer Use consumer client or create without consumer settings, `New` also try to connect to brokers. diff --git a/client.go b/client.go index d676548..d3590e0 100644 --- a/client.go +++ b/client.go @@ -2,9 +2,12 @@ package wkafka import ( "context" + "errors" "fmt" "sync" + "time" + "github.com/cenkalti/backoff/v4" "github.com/rs/zerolog/log" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" @@ -12,6 +15,8 @@ import ( "golang.org/x/sync/errgroup" ) +var ErrConnection = errors.New("connect to kafka brokers failed") + type Client struct { Kafka *kgo.Client KafkaDLQ *kgo.Client @@ -21,10 +26,11 @@ type Client struct { clientID []byte consumerConfig *ConsumerConfig consumerMutex sync.RWMutex - logger logz.Adapter + logger Logger // log purpose + Brokers []string dlqTopics []string topics []string Meter Meter @@ -36,6 +42,8 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { AutoTopicCreation: true, AppName: idProgname, Logger: logz.AdapterKV{Log: log.Logger}, + Ping: true, + PingRetry: false, } o.apply(opts...) @@ -78,11 +86,35 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { c.Kafka = kgoClient c.KafkaDLQ = kgoClientDLQ - // main and dlq use same config, ask for validation once - if err := c.Kafka.Ping(ctx); err != nil { - return nil, fmt.Errorf("connection to kafka brokers: %w", err) + if o.Ping { + if o.PingRetry { + if o.PingBackoff == nil { + o.PingBackoff = defaultBackoff() + } + + b := backoff.WithContext(o.PingBackoff, ctx) + + if err := backoff.RetryNotify(func() error { + if err := c.Kafka.Ping(ctx); err != nil { + return fmt.Errorf("%w: %w", ErrConnection, err) + } + + return nil + }, b, func(err error, d time.Duration) { + c.logger.Warn("wkafka ping failed", "error", err.Error(), "retry_in", d.String()) + }); err != nil { + return nil, err + } + } else { + // main and dlq use same config, ask for validation once + if err := c.Kafka.Ping(ctx); err != nil { + return nil, fmt.Errorf("%w: %w", ErrConnection, err) + } + } } + c.Brokers = cfg.Brokers + return c, nil } @@ -203,6 +235,10 @@ func (c *Client) Close() { } } +func (c *Client) DLQTopics() []string { + return c.dlqTopics +} + // Consume starts consuming messages from kafka and blocks until context is done or an error occurs. // - Only works if client is created with consumer config. // - Just run one time. @@ -267,7 +303,11 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error { result := c.Kafka.ProduceSync(ctx, records...) - return result.FirstErr() + if err := result.FirstErr(); err != nil { + return errors.Join(ctx.Err(), err) + } + + return nil } // Admin returns an admin client to manage kafka. diff --git a/clientoptions.go b/clientoptions.go index aa8e901..b542829 100644 --- a/clientoptions.go +++ b/clientoptions.go @@ -1,13 +1,23 @@ package wkafka import ( + "time" + + "github.com/cenkalti/backoff/v4" "github.com/twmb/franz-go/pkg/kgo" - "github.com/worldline-go/logz" ) // DefaultBatchCount is default batch count for batch consumer, if not set. var DefaultBatchCount = 100 +func defaultBackoff() backoff.BackOff { + return backoff.NewExponentialBackOff( + backoff.WithInitialInterval(2*time.Second), + backoff.WithMaxInterval(7*time.Second), + backoff.WithMaxElapsedTime(30*time.Second), + ) +} + type options struct { AppName string ConsumerEnabled bool @@ -18,8 +28,12 @@ type options struct { KGOOptions []kgo.Opt KGOOptionsDLQ []kgo.Opt AutoTopicCreation bool - Logger logz.Adapter + Logger Logger Meter Meter + + Ping bool + PingRetry bool + PingBackoff backoff.BackOff } func (o *options) apply(opts ...Option) { @@ -62,6 +76,7 @@ func WithClientInfo(appName, version string) Option { // Use WithClientInfo instead if you want to set version and appname. func WithAppName(appName string) Option { return func(o *options) { + o.ClientID = appName + "@" + idHostname o.AppName = appName } } @@ -104,7 +119,7 @@ func WithConsumer(cfg ConsumerConfig) Option { // WithLogger configures the client to use the provided logger. // - For zerolog logz.AdapterKV{Log: logger} can usable. // - Default is using zerolog's global logger. -func WithLogger(logger logz.Adapter) Option { +func WithLogger(logger Logger) Option { return func(o *options) { o.Logger = logger } @@ -114,7 +129,28 @@ func WithLogger(logger logz.Adapter) Option { func WithNoLogger(v bool) Option { return func(o *options) { if v { - o.Logger = logz.AdapterNoop{} + o.Logger = LogNoop{} } } } + +// WithPing to ping kafka brokers on client creation. +// - Default is enabled. +func WithPing(v bool) Option { + return func(o *options) { + o.Ping = v + } +} + +// WithPingRetry to retry ping kafka brokers on client creation. +func WithPingRetry(v bool) Option { + return func(o *options) { + o.PingRetry = v + } +} + +func WithPingBackoff(b backoff.BackOff) Option { + return func(o *options) { + o.PingBackoff = b + } +} diff --git a/config.go b/config.go index 9cedf85..015077f 100644 --- a/config.go +++ b/config.go @@ -4,8 +4,6 @@ import ( "fmt" "regexp" "time" - - "github.com/worldline-go/logz" ) var DefaultRetryInterval = 10 * time.Second @@ -45,7 +43,7 @@ type ConsumerPreConfig struct { } // configApply configuration to ConsumerConfig and check validation. -func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger logz.Adapter) error { +func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger Logger) error { if c.PrefixGroupID != "" { consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID } diff --git a/config_test.go b/config_test.go index 9450fb4..6a476b0 100644 --- a/config_test.go +++ b/config_test.go @@ -3,8 +3,6 @@ package wkafka import ( "reflect" "testing" - - "github.com/worldline-go/logz" ) func TestConsumerPreConfig_Apply(t *testing.T) { @@ -57,7 +55,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) { Validation: tt.fields.Validation, FormatDLQTopic: "finops_{{.AppName}}_dlq", } - err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{}) + err := configApply(c, &tt.args.consumerConfig, "serviceX", LogNoop{}) if (err != nil) != tt.wantErr { t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/consumerbatch.go b/consumerbatch.go index 18cacf3..4416d33 100644 --- a/consumerbatch.go +++ b/consumerbatch.go @@ -7,7 +7,6 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "github.com/worldline-go/logz" ) type consumerBatch[T any] struct { @@ -22,7 +21,7 @@ type consumerBatch[T any] struct { Option optionConsumer ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool - Logger logz.Adapter + Logger Logger PartitionHandler *partitionHandler IsDLQ bool Meter Meter @@ -96,6 +95,8 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f // skip precheck and record section ///////////////////////////////// if c.Skip(c.Cfg, r) { + c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset) + continue } @@ -245,6 +246,8 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro func (c *consumerBatch[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error { if c.Skip(c.Cfg, r) { + c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset) + return nil } diff --git a/consumersingle.go b/consumersingle.go index f176bb1..09a27f0 100644 --- a/consumersingle.go +++ b/consumersingle.go @@ -7,7 +7,6 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "github.com/worldline-go/logz" ) type consumerSingle[T any] struct { @@ -22,7 +21,7 @@ type consumerSingle[T any] struct { Option optionConsumer ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error Skip func(cfg *ConsumerConfig, r *kgo.Record) bool - Logger logz.Adapter + Logger Logger PartitionHandler *partitionHandler IsDLQ bool Meter Meter @@ -180,6 +179,8 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error { if c.Skip(c.Cfg, r) { + c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset) + return nil } diff --git a/env/config/turna.yaml b/env/config/turna.yaml new file mode 100644 index 0000000..7587ba2 --- /dev/null +++ b/env/config/turna.yaml @@ -0,0 +1,18 @@ +server: + entrypoints: + web: + address: ":8082" + http: + middlewares: + view: + view: + prefix_path: / + info: + grpc: + - name: "wkafka" + addr: "dns:///localhost:8080" + routers: + view: + path: / + middlewares: + - view diff --git a/env/docker-compose.yml b/env/docker-compose.yml index fe24f0e..5d25269 100644 --- a/env/docker-compose.yml +++ b/env/docker-compose.yml @@ -17,9 +17,14 @@ services: - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER redpanda: - image: docker.io/redpandadata/console:v2.6.0 + image: docker.io/redpandadata/console:v2.3.10 ports: - "7071:7071" environment: - KAFKA_BROKERS=kafka:9094 - SERVER_LISTENPORT=7071 + turna: + image: ghcr.io/rakunlabs/turna:v0.7.0-alpine3.20.0 + network_mode: host + volumes: + - ./config/turna.yaml:/turna.yaml diff --git a/example/admin/list.go b/example/admin/list.go index a07e200..a79bf5d 100644 --- a/example/admin/list.go +++ b/example/admin/list.go @@ -15,7 +15,7 @@ var ( ) func RunExampleList(ctx context.Context, _ *sync.WaitGroup) error { - client, err := wkafka.New(ctx, kafkaConfigList) + client, err := wkafka.New(ctx, kafkaConfigList, wkafka.WithPingRetry(true)) if err != nil { return err } diff --git a/example/consumer/single.go b/example/consumer/single.go index 4d3f05e..313f9c2 100644 --- a/example/consumer/single.go +++ b/example/consumer/single.go @@ -93,7 +93,7 @@ func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error { defer client.Close() mux := http.NewServeMux() - mux.Handle(handler.New(client)) + mux.Handle(handler.New(client, handler.WithLogger(slog.Default()))) reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName) diff --git a/go.mod b/go.mod index c1cfd22..3e2236f 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,11 @@ go 1.22 require ( connectrpc.com/connect v1.16.2 connectrpc.com/grpcreflect v1.2.0 + github.com/cenkalti/backoff/v4 v4.3.0 github.com/rs/zerolog v1.32.0 github.com/stretchr/testify v1.8.4 - github.com/twmb/franz-go v1.15.4 - github.com/twmb/franz-go/pkg/kadm v1.10.0 + github.com/twmb/franz-go v1.17.0 + github.com/twmb/franz-go/pkg/kadm v1.12.0 github.com/twmb/tlscfg v1.2.1 github.com/worldline-go/initializer v0.3.2 github.com/worldline-go/logz v0.5.0 @@ -20,14 +21,14 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/lmittmann/tint v1.0.4 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/pierrec/lz4/v4 v4.1.19 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rakunlabs/logi v0.2.1 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index df43a1e..de8903b 100644 --- a/go.sum +++ b/go.sum @@ -2,14 +2,16 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U= connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -18,8 +20,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= -github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -30,12 +32,12 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw= -github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU= -github.com/twmb/franz-go/pkg/kadm v1.10.0 h1:3oYKNP+e3HGo4GYadrDeRxOaAIsOXmX6LBVMz9PxpCU= -github.com/twmb/franz-go/pkg/kadm v1.10.0/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc= -github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= -github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= +github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= +github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= +github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4= +github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= +github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= +github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44= github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs= github.com/worldline-go/initializer v0.3.2 h1:HPH7AIgNdcSdj/KqCWIEMsTwTlFgqoPIUUqI9yErkCM= diff --git a/handler/gen/wkafka/wkafka.pb.go b/handler/gen/wkafka/wkafka.pb.go index 162995b..4c495f5 100644 --- a/handler/gen/wkafka/wkafka.pb.go +++ b/handler/gen/wkafka/wkafka.pb.go @@ -71,8 +71,9 @@ type CreateSkipRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topics map[string]*Topic `protobuf:"bytes,1,rep,name=topics,proto3" json:"topics,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Option SkipOption `protobuf:"varint,2,opt,name=option,proto3,enum=wkafka.SkipOption" json:"option,omitempty"` + Topics map[string]*Topic `protobuf:"bytes,1,rep,name=topics,proto3" json:"topics,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Option SkipOption `protobuf:"varint,2,opt,name=option,proto3,enum=wkafka.SkipOption" json:"option,omitempty"` + EnableMainTopics bool `protobuf:"varint,3,opt,name=enable_main_topics,json=enableMainTopics,proto3" json:"enable_main_topics,omitempty"` } func (x *CreateSkipRequest) Reset() { @@ -121,6 +122,13 @@ func (x *CreateSkipRequest) GetOption() SkipOption { return SkipOption_APPEND } +func (x *CreateSkipRequest) GetEnableMainTopics() bool { + if x != nil { + return x.EnableMainTopics + } + return false +} + type Topic struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -274,7 +282,7 @@ var File_wkafka_wkafka_proto protoreflect.FileDescriptor var file_wkafka_wkafka_proto_rawDesc = []byte{ 0x0a, 0x13, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2f, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x22, 0xc8, 0x01, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x22, 0xf6, 0x01, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x43, 0x72, 0x65, @@ -282,44 +290,47 @@ var file_wkafka_wkafka_proto_rawDesc = []byte{ 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x2a, 0x0a, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x53, 0x6b, 0x69, 0x70, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x48, - 0x0a, 0x0b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, - 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, - 0x23, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, - 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x98, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70, - 0x69, 0x63, 0x12, 0x3d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, - 0x54, 0x6f, 0x70, 0x69, 0x63, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x1a, 0x50, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x22, 0x3d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x03, 0x52, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x65, - 0x66, 0x6f, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, - 0x72, 0x65, 0x22, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, - 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x25, 0x0a, 0x0a, 0x53, 0x6b, 0x69, 0x70, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, - 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x01, 0x32, - 0x44, 0x0a, 0x0d, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x33, 0x0a, 0x04, 0x53, 0x6b, 0x69, 0x70, 0x12, 0x19, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, - 0x61, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x84, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x77, 0x6b, - 0x61, 0x66, 0x6b, 0x61, 0x42, 0x0b, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x50, 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x2d, 0x67, 0x6f, 0x2f, 0x77, 0x6b, 0x61, - 0x66, 0x6b, 0x61, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, - 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xa2, 0x02, 0x03, 0x57, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x57, - 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xca, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xe2, 0x02, - 0x12, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0xea, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c, + 0x0a, 0x12, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x65, 0x6e, 0x61, 0x62, + 0x6c, 0x65, 0x4d, 0x61, 0x69, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x1a, 0x48, 0x0a, 0x0b, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x23, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x77, + 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x98, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, + 0x12, 0x3d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, + 0x50, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0x3d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, + 0x0a, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x03, 0x52, + 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x65, 0x66, 0x6f, + 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, 0x72, 0x65, + 0x22, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x25, 0x0a, 0x0a, 0x53, 0x6b, 0x69, 0x70, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, + 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x01, 0x32, 0x44, 0x0a, + 0x0d, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33, + 0x0a, 0x04, 0x53, 0x6b, 0x69, 0x70, 0x12, 0x19, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x10, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x84, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x77, 0x6b, 0x61, 0x66, + 0x6b, 0x61, 0x42, 0x0b, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, + 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x6f, + 0x72, 0x6c, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x2d, 0x67, 0x6f, 0x2f, 0x77, 0x6b, 0x61, 0x66, 0x6b, + 0x61, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x77, 0x6b, + 0x61, 0x66, 0x6b, 0x61, 0xa2, 0x02, 0x03, 0x57, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x57, 0x6b, 0x61, + 0x66, 0x6b, 0x61, 0xca, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xe2, 0x02, 0x12, 0x57, + 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0xea, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/handler/handler.go b/handler/handler.go index 2da956f..46d5768 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -3,8 +3,8 @@ package handler import ( "context" "fmt" - "log/slog" "net/http" + "slices" "connectrpc.com/connect" @@ -15,12 +15,40 @@ import ( type Handler struct { Client *wkafka.Client + + Logger wkafka.Logger +} + +type option struct { + Logger wkafka.Logger +} + +func (o *option) apply(opts ...Option) { + for _, opt := range opts { + opt(o) + } + + if o.Logger == nil { + o.Logger = wkafka.LogNoop{} + } +} + +type Option func(*option) + +func WithLogger(logger wkafka.Logger) Option { + return func(o *option) { + o.Logger = logger + } } // NewHandler returns a http.Handler implementation. -func New(client *wkafka.Client) (string, http.Handler) { +func New(client *wkafka.Client, opts ...Option) (string, http.Handler) { + o := option{} + o.apply(opts...) + return wkafkaconnect.NewWkafkaServiceHandler(&Handler{ Client: client, + Logger: o.Logger, }) } @@ -47,10 +75,21 @@ func convertSkipMap(skip map[string]*wkafkahandler.Topic) wkafka.SkipMap { func (h *Handler) Skip(ctx context.Context, req *connect.Request[wkafkahandler.CreateSkipRequest]) (*connect.Response[wkafkahandler.Response], error) { topics := req.Msg.GetTopics() - slog.Debug("topics", slog.Any("topics", topics)) + h.Logger.Debug("skip topics", "topics", topics) + + if !req.Msg.GetEnableMainTopics() { + dlqTopics := h.Client.DLQTopics() - for k, v := range topics { - slog.Debug("skip", slog.String("topic", k), slog.Any("partitions", v.GetPartitions())) + deleteTopicsInList := make([]string, 0) + for k := range topics { + if !slices.Contains(dlqTopics, k) { + deleteTopicsInList = append(deleteTopicsInList, k) + } + } + + for _, k := range deleteTopicsInList { + delete(topics, k) + } } switch req.Msg.GetOption() { diff --git a/handler/proto/wkafka/wkafka.proto b/handler/proto/wkafka/wkafka.proto index 3bb3059..03cebb8 100644 --- a/handler/proto/wkafka/wkafka.proto +++ b/handler/proto/wkafka/wkafka.proto @@ -10,6 +10,7 @@ enum SkipOption { message CreateSkipRequest { map topics = 1; SkipOption option = 2; + bool enable_main_topics = 3; } message Topic { diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..2c7be18 --- /dev/null +++ b/logger.go @@ -0,0 +1,15 @@ +package wkafka + +type Logger interface { + Error(msg string, keysAndValues ...interface{}) + Info(msg string, keysAndValues ...interface{}) + Debug(msg string, keysAndValues ...interface{}) + Warn(msg string, keysAndValues ...interface{}) +} + +type LogNoop struct{} + +func (LogNoop) Error(_ string, _ ...interface{}) {} +func (LogNoop) Info(_ string, _ ...interface{}) {} +func (LogNoop) Debug(_ string, _ ...interface{}) {} +func (LogNoop) Warn(_ string, _ ...interface{}) {} diff --git a/partition.go b/partition.go index 1f267c5..95f0743 100644 --- a/partition.go +++ b/partition.go @@ -5,11 +5,10 @@ import ( "sync" "github.com/twmb/franz-go/pkg/kgo" - "github.com/worldline-go/logz" ) type partitionHandler struct { - logger logz.Adapter + logger Logger mapPartitionsRevoked map[string][]int32 mapPartitionsLost map[string][]int32 diff --git a/tkafka/connect.go b/tkafka/connect.go index 26548ad..836ceac 100644 --- a/tkafka/connect.go +++ b/tkafka/connect.go @@ -8,7 +8,9 @@ import ( "github.com/worldline-go/wkafka" ) -var DefaultBrokerAddress = []string{"127.0.0.1:9092"} +var ( + DefaultBrokerAddress = []string{"127.0.0.1:9092"} +) func BrokerAddress() []string { brokers := DefaultBrokerAddress @@ -28,6 +30,51 @@ func Config() wkafka.Config { } } -func TestClient() (*wkafka.Client, error) { - return wkafka.New(context.Background(), Config()) +func TestClient(opts ...OptionTestClient) (*wkafka.Client, error) { + o := optionTestClient{ + Config: Config(), + Ctx: context.Background(), + WkafkaOpts: []wkafka.Option{wkafka.WithPingRetry(true)}, + } + + o.apply(opts...) + + if o.Ctx == nil { + o.Ctx = context.Background() + } + + return wkafka.New(o.Ctx, o.Config, o.WkafkaOpts...) +} + +type optionTestClient struct { + Config wkafka.Config + Ctx context.Context + + WkafkaOpts []wkafka.Option +} + +func (o *optionTestClient) apply(opts ...OptionTestClient) { + for _, opt := range opts { + opt(o) + } +} + +type OptionTestClient func(*optionTestClient) + +func WithContext(ctx context.Context) OptionTestClient { + return func(o *optionTestClient) { + o.Ctx = ctx + } +} + +func WithWafkaOptions(opts ...wkafka.Option) OptionTestClient { + return func(o *optionTestClient) { + o.WkafkaOpts = append(o.WkafkaOpts, opts...) + } +} + +func WithConfig(cfg wkafka.Config) OptionTestClient { + return func(o *optionTestClient) { + o.Config = cfg + } } diff --git a/tkafka/generate.go b/tkafka/generate.go index c8aeb6c..c556726 100644 --- a/tkafka/generate.go +++ b/tkafka/generate.go @@ -10,7 +10,7 @@ import ( type Generate struct { kadm *kadm.Client - Topics []string + Topics map[string]struct{} } type Topic struct { @@ -24,12 +24,35 @@ type Topic struct { func NewGenerate(c *wkafka.Client) *Generate { return &Generate{ - kadm: c.Admin(), + kadm: c.Admin(), + Topics: make(map[string]struct{}), } } func (g *Generate) Cleanup() (kadm.DeleteTopicResponses, error) { - return g.kadm.DeleteTopics(context.Background(), g.Topics...) + return g.kadm.DeleteTopics(context.Background(), g.getTopics()...) +} + +func (g *Generate) getTopics() []string { + topics := make([]string, 0, len(g.Topics)) + for t := range g.Topics { + topics = append(topics, t) + } + + return topics +} + +func (g *Generate) DeleteTopics(ctx context.Context, topics ...string) (kadm.DeleteTopicResponses, error) { + // remove the topics from the list of topics + for _, t := range topics { + delete(g.Topics, t) + } + + return g.kadm.DeleteTopics(ctx, topics...) +} + +func (g *Generate) DeleteGroups(ctx context.Context, groups ...string) (kadm.DeleteGroupResponses, error) { + return g.kadm.DeleteGroups(ctx, groups...) } func (g *Generate) CreateTopics(ctx context.Context, topics ...Topic) ([]kadm.CreateTopicResponse, error) { @@ -46,11 +69,12 @@ func (g *Generate) CreateTopics(ctx context.Context, topics ...Topic) ([]kadm.Cr } response, err := g.kadm.CreateTopic(ctx, partitions, replicationFactor, nil, t.Name) - g.Topics = append(g.Topics, t.Name) if err != nil { return nil, err } + g.Topics[t.Name] = struct{}{} + responses = append(responses, response) } diff --git a/tkafka/read.go b/tkafka/read.go new file mode 100644 index 0000000..bbe02d5 --- /dev/null +++ b/tkafka/read.go @@ -0,0 +1,59 @@ +package tkafka + +import ( + "context" + "errors" + + "github.com/worldline-go/wkafka" +) + +type ReadConfig struct { + Name string + + KafkaConfig wkafka.Config + KafkaConsumer wkafka.ConsumerConfig +} + +func (c *ReadConfig) sanitize() { + if c.Name == "" { + c.Name = "tkafka" + } + + if c.KafkaConfig.Brokers == nil { + c.KafkaConfig.Brokers = BrokerAddress() + } +} + +// Consume for testing creating client and start consuming messages to your function. +// - Cancel the context to stop this function. +func Consume[T any](ctx context.Context, cfg ReadConfig, fn func(ctx context.Context, data T) error) error { + if cfg.Name == "" { + cfg.Name = "tkafka" + } + + cfg.sanitize() + + // create a new client with read config + kafkaClient, err := wkafka.New(ctx, + cfg.KafkaConfig, + wkafka.WithConsumer(cfg.KafkaConsumer), + wkafka.WithAppName(cfg.Name), + ) + if err != nil { + return err + } + + defer kafkaClient.Close() + + if err := kafkaClient.Consume(ctx, wkafka.WithCallback(fn)); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + + return err + } + + // consume messages from topic + + return nil +}