diff --git a/README.md b/README.md
index e90350f..0a06fa4 100644
--- a/README.md
+++ b/README.md
@@ -130,6 +130,21 @@ mux.Handle(grpcreflect.NewHandlerV1(reflector))
mux.Handle(grpcreflect.NewHandlerV1Alpha(reflector))
```
+Handler Example
+
+```sh
+make env
+
+# run the example
+EXAMPLE=consumer_single_handler make example
+```
+
+Add messages in here to skip the message http://localhost:7071
+
+Go to this side for grpcUI http://localhost:8082/#/grpc/wkafka
+
+
+
### Producer
Use consumer client or create without consumer settings, `New` also try to connect to brokers.
diff --git a/client.go b/client.go
index d676548..d3590e0 100644
--- a/client.go
+++ b/client.go
@@ -2,9 +2,12 @@ package wkafka
import (
"context"
+ "errors"
"fmt"
"sync"
+ "time"
+ "github.com/cenkalti/backoff/v4"
"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
@@ -12,6 +15,8 @@ import (
"golang.org/x/sync/errgroup"
)
+var ErrConnection = errors.New("connect to kafka brokers failed")
+
type Client struct {
Kafka *kgo.Client
KafkaDLQ *kgo.Client
@@ -21,10 +26,11 @@ type Client struct {
clientID []byte
consumerConfig *ConsumerConfig
consumerMutex sync.RWMutex
- logger logz.Adapter
+ logger Logger
// log purpose
+ Brokers []string
dlqTopics []string
topics []string
Meter Meter
@@ -36,6 +42,8 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
AutoTopicCreation: true,
AppName: idProgname,
Logger: logz.AdapterKV{Log: log.Logger},
+ Ping: true,
+ PingRetry: false,
}
o.apply(opts...)
@@ -78,11 +86,35 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
c.Kafka = kgoClient
c.KafkaDLQ = kgoClientDLQ
- // main and dlq use same config, ask for validation once
- if err := c.Kafka.Ping(ctx); err != nil {
- return nil, fmt.Errorf("connection to kafka brokers: %w", err)
+ if o.Ping {
+ if o.PingRetry {
+ if o.PingBackoff == nil {
+ o.PingBackoff = defaultBackoff()
+ }
+
+ b := backoff.WithContext(o.PingBackoff, ctx)
+
+ if err := backoff.RetryNotify(func() error {
+ if err := c.Kafka.Ping(ctx); err != nil {
+ return fmt.Errorf("%w: %w", ErrConnection, err)
+ }
+
+ return nil
+ }, b, func(err error, d time.Duration) {
+ c.logger.Warn("wkafka ping failed", "error", err.Error(), "retry_in", d.String())
+ }); err != nil {
+ return nil, err
+ }
+ } else {
+ // main and dlq use same config, ask for validation once
+ if err := c.Kafka.Ping(ctx); err != nil {
+ return nil, fmt.Errorf("%w: %w", ErrConnection, err)
+ }
+ }
}
+ c.Brokers = cfg.Brokers
+
return c, nil
}
@@ -203,6 +235,10 @@ func (c *Client) Close() {
}
}
+func (c *Client) DLQTopics() []string {
+ return c.dlqTopics
+}
+
// Consume starts consuming messages from kafka and blocks until context is done or an error occurs.
// - Only works if client is created with consumer config.
// - Just run one time.
@@ -267,7 +303,11 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error {
result := c.Kafka.ProduceSync(ctx, records...)
- return result.FirstErr()
+ if err := result.FirstErr(); err != nil {
+ return errors.Join(ctx.Err(), err)
+ }
+
+ return nil
}
// Admin returns an admin client to manage kafka.
diff --git a/clientoptions.go b/clientoptions.go
index aa8e901..b542829 100644
--- a/clientoptions.go
+++ b/clientoptions.go
@@ -1,13 +1,23 @@
package wkafka
import (
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
"github.com/twmb/franz-go/pkg/kgo"
- "github.com/worldline-go/logz"
)
// DefaultBatchCount is default batch count for batch consumer, if not set.
var DefaultBatchCount = 100
+func defaultBackoff() backoff.BackOff {
+ return backoff.NewExponentialBackOff(
+ backoff.WithInitialInterval(2*time.Second),
+ backoff.WithMaxInterval(7*time.Second),
+ backoff.WithMaxElapsedTime(30*time.Second),
+ )
+}
+
type options struct {
AppName string
ConsumerEnabled bool
@@ -18,8 +28,12 @@ type options struct {
KGOOptions []kgo.Opt
KGOOptionsDLQ []kgo.Opt
AutoTopicCreation bool
- Logger logz.Adapter
+ Logger Logger
Meter Meter
+
+ Ping bool
+ PingRetry bool
+ PingBackoff backoff.BackOff
}
func (o *options) apply(opts ...Option) {
@@ -62,6 +76,7 @@ func WithClientInfo(appName, version string) Option {
// Use WithClientInfo instead if you want to set version and appname.
func WithAppName(appName string) Option {
return func(o *options) {
+ o.ClientID = appName + "@" + idHostname
o.AppName = appName
}
}
@@ -104,7 +119,7 @@ func WithConsumer(cfg ConsumerConfig) Option {
// WithLogger configures the client to use the provided logger.
// - For zerolog logz.AdapterKV{Log: logger} can usable.
// - Default is using zerolog's global logger.
-func WithLogger(logger logz.Adapter) Option {
+func WithLogger(logger Logger) Option {
return func(o *options) {
o.Logger = logger
}
@@ -114,7 +129,28 @@ func WithLogger(logger logz.Adapter) Option {
func WithNoLogger(v bool) Option {
return func(o *options) {
if v {
- o.Logger = logz.AdapterNoop{}
+ o.Logger = LogNoop{}
}
}
}
+
+// WithPing to ping kafka brokers on client creation.
+// - Default is enabled.
+func WithPing(v bool) Option {
+ return func(o *options) {
+ o.Ping = v
+ }
+}
+
+// WithPingRetry to retry ping kafka brokers on client creation.
+func WithPingRetry(v bool) Option {
+ return func(o *options) {
+ o.PingRetry = v
+ }
+}
+
+func WithPingBackoff(b backoff.BackOff) Option {
+ return func(o *options) {
+ o.PingBackoff = b
+ }
+}
diff --git a/config.go b/config.go
index 9cedf85..015077f 100644
--- a/config.go
+++ b/config.go
@@ -4,8 +4,6 @@ import (
"fmt"
"regexp"
"time"
-
- "github.com/worldline-go/logz"
)
var DefaultRetryInterval = 10 * time.Second
@@ -45,7 +43,7 @@ type ConsumerPreConfig struct {
}
// configApply configuration to ConsumerConfig and check validation.
-func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger logz.Adapter) error {
+func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger Logger) error {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}
diff --git a/config_test.go b/config_test.go
index 9450fb4..6a476b0 100644
--- a/config_test.go
+++ b/config_test.go
@@ -3,8 +3,6 @@ package wkafka
import (
"reflect"
"testing"
-
- "github.com/worldline-go/logz"
)
func TestConsumerPreConfig_Apply(t *testing.T) {
@@ -57,7 +55,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
Validation: tt.fields.Validation,
FormatDLQTopic: "finops_{{.AppName}}_dlq",
}
- err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{})
+ err := configApply(c, &tt.args.consumerConfig, "serviceX", LogNoop{})
if (err != nil) != tt.wantErr {
t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr)
return
diff --git a/consumerbatch.go b/consumerbatch.go
index 18cacf3..4416d33 100644
--- a/consumerbatch.go
+++ b/consumerbatch.go
@@ -7,7 +7,6 @@ import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
- "github.com/worldline-go/logz"
)
type consumerBatch[T any] struct {
@@ -22,7 +21,7 @@ type consumerBatch[T any] struct {
Option optionConsumer
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
- Logger logz.Adapter
+ Logger Logger
PartitionHandler *partitionHandler
IsDLQ bool
Meter Meter
@@ -96,6 +95,8 @@ func (c *consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, f
// skip precheck and record section
/////////////////////////////////
if c.Skip(c.Cfg, r) {
+ c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)
+
continue
}
@@ -245,6 +246,8 @@ func (c *consumerBatch[T]) iterationDLQ(ctx context.Context, r *kgo.Record) erro
func (c *consumerBatch[T]) iterationRecordDLQ(ctx context.Context, r *kgo.Record) error {
if c.Skip(c.Cfg, r) {
+ c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)
+
return nil
}
diff --git a/consumersingle.go b/consumersingle.go
index f176bb1..09a27f0 100644
--- a/consumersingle.go
+++ b/consumersingle.go
@@ -7,7 +7,6 @@ import (
"time"
"github.com/twmb/franz-go/pkg/kgo"
- "github.com/worldline-go/logz"
)
type consumerSingle[T any] struct {
@@ -22,7 +21,7 @@ type consumerSingle[T any] struct {
Option optionConsumer
ProduceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
- Logger logz.Adapter
+ Logger Logger
PartitionHandler *partitionHandler
IsDLQ bool
Meter Meter
@@ -180,6 +179,8 @@ func (c *consumerSingle[T]) iterationMain(ctx context.Context, r *kgo.Record) er
func (c *consumerSingle[T]) iterationRecord(ctx context.Context, r *kgo.Record) error {
if c.Skip(c.Cfg, r) {
+ c.Logger.Info("record skipped", "topic", r.Topic, "partition", r.Partition, "offset", r.Offset)
+
return nil
}
diff --git a/env/config/turna.yaml b/env/config/turna.yaml
new file mode 100644
index 0000000..7587ba2
--- /dev/null
+++ b/env/config/turna.yaml
@@ -0,0 +1,18 @@
+server:
+ entrypoints:
+ web:
+ address: ":8082"
+ http:
+ middlewares:
+ view:
+ view:
+ prefix_path: /
+ info:
+ grpc:
+ - name: "wkafka"
+ addr: "dns:///localhost:8080"
+ routers:
+ view:
+ path: /
+ middlewares:
+ - view
diff --git a/env/docker-compose.yml b/env/docker-compose.yml
index fe24f0e..5d25269 100644
--- a/env/docker-compose.yml
+++ b/env/docker-compose.yml
@@ -17,9 +17,14 @@ services:
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
redpanda:
- image: docker.io/redpandadata/console:v2.6.0
+ image: docker.io/redpandadata/console:v2.3.10
ports:
- "7071:7071"
environment:
- KAFKA_BROKERS=kafka:9094
- SERVER_LISTENPORT=7071
+ turna:
+ image: ghcr.io/rakunlabs/turna:v0.7.0-alpine3.20.0
+ network_mode: host
+ volumes:
+ - ./config/turna.yaml:/turna.yaml
diff --git a/example/admin/list.go b/example/admin/list.go
index a07e200..a79bf5d 100644
--- a/example/admin/list.go
+++ b/example/admin/list.go
@@ -15,7 +15,7 @@ var (
)
func RunExampleList(ctx context.Context, _ *sync.WaitGroup) error {
- client, err := wkafka.New(ctx, kafkaConfigList)
+ client, err := wkafka.New(ctx, kafkaConfigList, wkafka.WithPingRetry(true))
if err != nil {
return err
}
diff --git a/example/consumer/single.go b/example/consumer/single.go
index 4d3f05e..313f9c2 100644
--- a/example/consumer/single.go
+++ b/example/consumer/single.go
@@ -93,7 +93,7 @@ func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error {
defer client.Close()
mux := http.NewServeMux()
- mux.Handle(handler.New(client))
+ mux.Handle(handler.New(client, handler.WithLogger(slog.Default())))
reflector := grpcreflect.NewStaticReflector(wkafkaconnect.WkafkaServiceName)
diff --git a/go.mod b/go.mod
index c1cfd22..3e2236f 100644
--- a/go.mod
+++ b/go.mod
@@ -5,10 +5,11 @@ go 1.22
require (
connectrpc.com/connect v1.16.2
connectrpc.com/grpcreflect v1.2.0
+ github.com/cenkalti/backoff/v4 v4.3.0
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.8.4
- github.com/twmb/franz-go v1.15.4
- github.com/twmb/franz-go/pkg/kadm v1.10.0
+ github.com/twmb/franz-go v1.17.0
+ github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/tlscfg v1.2.1
github.com/worldline-go/initializer v0.3.2
github.com/worldline-go/logz v0.5.0
@@ -20,14 +21,14 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
- github.com/klauspost/compress v1.17.0 // indirect
+ github.com/klauspost/compress v1.17.8 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
- github.com/pierrec/lz4/v4 v4.1.19 // indirect
+ github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rakunlabs/logi v0.2.1 // indirect
- github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
+ github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
diff --git a/go.sum b/go.sum
index df43a1e..de8903b 100644
--- a/go.sum
+++ b/go.sum
@@ -2,14 +2,16 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U=
connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY=
+github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
+github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
-github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
-github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
+github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
+github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc=
github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@@ -18,8 +20,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
-github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
-github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -30,12 +32,12 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
-github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw=
-github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU=
-github.com/twmb/franz-go/pkg/kadm v1.10.0 h1:3oYKNP+e3HGo4GYadrDeRxOaAIsOXmX6LBVMz9PxpCU=
-github.com/twmb/franz-go/pkg/kadm v1.10.0/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc=
-github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
-github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
+github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
+github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
+github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4=
+github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
+github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
+github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs=
github.com/worldline-go/initializer v0.3.2 h1:HPH7AIgNdcSdj/KqCWIEMsTwTlFgqoPIUUqI9yErkCM=
diff --git a/handler/gen/wkafka/wkafka.pb.go b/handler/gen/wkafka/wkafka.pb.go
index 162995b..4c495f5 100644
--- a/handler/gen/wkafka/wkafka.pb.go
+++ b/handler/gen/wkafka/wkafka.pb.go
@@ -71,8 +71,9 @@ type CreateSkipRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Topics map[string]*Topic `protobuf:"bytes,1,rep,name=topics,proto3" json:"topics,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
- Option SkipOption `protobuf:"varint,2,opt,name=option,proto3,enum=wkafka.SkipOption" json:"option,omitempty"`
+ Topics map[string]*Topic `protobuf:"bytes,1,rep,name=topics,proto3" json:"topics,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+ Option SkipOption `protobuf:"varint,2,opt,name=option,proto3,enum=wkafka.SkipOption" json:"option,omitempty"`
+ EnableMainTopics bool `protobuf:"varint,3,opt,name=enable_main_topics,json=enableMainTopics,proto3" json:"enable_main_topics,omitempty"`
}
func (x *CreateSkipRequest) Reset() {
@@ -121,6 +122,13 @@ func (x *CreateSkipRequest) GetOption() SkipOption {
return SkipOption_APPEND
}
+func (x *CreateSkipRequest) GetEnableMainTopics() bool {
+ if x != nil {
+ return x.EnableMainTopics
+ }
+ return false
+}
+
type Topic struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -274,7 +282,7 @@ var File_wkafka_wkafka_proto protoreflect.FileDescriptor
var file_wkafka_wkafka_proto_rawDesc = []byte{
0x0a, 0x13, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2f, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x22, 0xc8, 0x01,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x22, 0xf6, 0x01,
0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x3d, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x43, 0x72, 0x65,
@@ -282,44 +290,47 @@ var file_wkafka_wkafka_proto_rawDesc = []byte{
0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69,
0x63, 0x73, 0x12, 0x2a, 0x0a, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x12, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x53, 0x6b, 0x69, 0x70,
- 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x48,
- 0x0a, 0x0b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a,
- 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12,
- 0x23, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d,
- 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x76,
- 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x98, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70,
- 0x69, 0x63, 0x12, 0x3d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
- 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e,
- 0x54, 0x6f, 0x70, 0x69, 0x63, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
- 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
- 0x73, 0x1a, 0x50, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45,
- 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x05, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x50,
- 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
- 0x02, 0x38, 0x01, 0x22, 0x3d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
- 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
- 0x03, 0x52, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x65,
- 0x66, 0x6f, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f,
- 0x72, 0x65, 0x22, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18,
- 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x25, 0x0a, 0x0a, 0x53, 0x6b, 0x69, 0x70,
- 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44,
- 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x01, 0x32,
- 0x44, 0x0a, 0x0d, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
- 0x12, 0x33, 0x0a, 0x04, 0x53, 0x6b, 0x69, 0x70, 0x12, 0x19, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b,
- 0x61, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x84, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x77, 0x6b,
- 0x61, 0x66, 0x6b, 0x61, 0x42, 0x0b, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x74,
- 0x6f, 0x50, 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
- 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x2d, 0x67, 0x6f, 0x2f, 0x77, 0x6b, 0x61,
- 0x66, 0x6b, 0x61, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f,
- 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xa2, 0x02, 0x03, 0x57, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x57,
- 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xca, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xe2, 0x02,
- 0x12, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64,
- 0x61, 0x74, 0x61, 0xea, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x33,
+ 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x06, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c,
+ 0x0a, 0x12, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x65, 0x6e, 0x61, 0x62,
+ 0x6c, 0x65, 0x4d, 0x61, 0x69, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x1a, 0x48, 0x0a, 0x0b,
+ 0x54, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b,
+ 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x23, 0x0a,
+ 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x77,
+ 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x98, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63,
+ 0x12, 0x3d, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01,
+ 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e,
+ 0x74, 0x72, 0x79, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a,
+ 0x50, 0x0a, 0x0f, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74,
+ 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52,
+ 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
+ 0x01, 0x22, 0x3d, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18,
+ 0x0a, 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x03, 0x52,
+ 0x07, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x65, 0x66, 0x6f,
+ 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x62, 0x65, 0x66, 0x6f, 0x72, 0x65,
+ 0x22, 0x24, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x25, 0x0a, 0x0a, 0x53, 0x6b, 0x69, 0x70, 0x4f, 0x70,
+ 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00,
+ 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x01, 0x32, 0x44, 0x0a,
+ 0x0d, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33,
+ 0x0a, 0x04, 0x53, 0x6b, 0x69, 0x70, 0x12, 0x19, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e,
+ 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x53, 0x6b, 0x69, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x1a, 0x10, 0x2e, 0x77, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x42, 0x84, 0x01, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x77, 0x6b, 0x61, 0x66,
+ 0x6b, 0x61, 0x42, 0x0b, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50,
+ 0x01, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x6f,
+ 0x72, 0x6c, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x2d, 0x67, 0x6f, 0x2f, 0x77, 0x6b, 0x61, 0x66, 0x6b,
+ 0x61, 0x2f, 0x68, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x77, 0x6b,
+ 0x61, 0x66, 0x6b, 0x61, 0xa2, 0x02, 0x03, 0x57, 0x58, 0x58, 0xaa, 0x02, 0x06, 0x57, 0x6b, 0x61,
+ 0x66, 0x6b, 0x61, 0xca, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0xe2, 0x02, 0x12, 0x57,
+ 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+ 0x61, 0xea, 0x02, 0x06, 0x57, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x33,
}
var (
diff --git a/handler/handler.go b/handler/handler.go
index 2da956f..46d5768 100644
--- a/handler/handler.go
+++ b/handler/handler.go
@@ -3,8 +3,8 @@ package handler
import (
"context"
"fmt"
- "log/slog"
"net/http"
+ "slices"
"connectrpc.com/connect"
@@ -15,12 +15,40 @@ import (
type Handler struct {
Client *wkafka.Client
+
+ Logger wkafka.Logger
+}
+
+type option struct {
+ Logger wkafka.Logger
+}
+
+func (o *option) apply(opts ...Option) {
+ for _, opt := range opts {
+ opt(o)
+ }
+
+ if o.Logger == nil {
+ o.Logger = wkafka.LogNoop{}
+ }
+}
+
+type Option func(*option)
+
+func WithLogger(logger wkafka.Logger) Option {
+ return func(o *option) {
+ o.Logger = logger
+ }
}
// NewHandler returns a http.Handler implementation.
-func New(client *wkafka.Client) (string, http.Handler) {
+func New(client *wkafka.Client, opts ...Option) (string, http.Handler) {
+ o := option{}
+ o.apply(opts...)
+
return wkafkaconnect.NewWkafkaServiceHandler(&Handler{
Client: client,
+ Logger: o.Logger,
})
}
@@ -47,10 +75,21 @@ func convertSkipMap(skip map[string]*wkafkahandler.Topic) wkafka.SkipMap {
func (h *Handler) Skip(ctx context.Context, req *connect.Request[wkafkahandler.CreateSkipRequest]) (*connect.Response[wkafkahandler.Response], error) {
topics := req.Msg.GetTopics()
- slog.Debug("topics", slog.Any("topics", topics))
+ h.Logger.Debug("skip topics", "topics", topics)
+
+ if !req.Msg.GetEnableMainTopics() {
+ dlqTopics := h.Client.DLQTopics()
- for k, v := range topics {
- slog.Debug("skip", slog.String("topic", k), slog.Any("partitions", v.GetPartitions()))
+ deleteTopicsInList := make([]string, 0)
+ for k := range topics {
+ if !slices.Contains(dlqTopics, k) {
+ deleteTopicsInList = append(deleteTopicsInList, k)
+ }
+ }
+
+ for _, k := range deleteTopicsInList {
+ delete(topics, k)
+ }
}
switch req.Msg.GetOption() {
diff --git a/handler/proto/wkafka/wkafka.proto b/handler/proto/wkafka/wkafka.proto
index 3bb3059..03cebb8 100644
--- a/handler/proto/wkafka/wkafka.proto
+++ b/handler/proto/wkafka/wkafka.proto
@@ -10,6 +10,7 @@ enum SkipOption {
message CreateSkipRequest {
map topics = 1;
SkipOption option = 2;
+ bool enable_main_topics = 3;
}
message Topic {
diff --git a/logger.go b/logger.go
new file mode 100644
index 0000000..2c7be18
--- /dev/null
+++ b/logger.go
@@ -0,0 +1,15 @@
+package wkafka
+
+type Logger interface {
+ Error(msg string, keysAndValues ...interface{})
+ Info(msg string, keysAndValues ...interface{})
+ Debug(msg string, keysAndValues ...interface{})
+ Warn(msg string, keysAndValues ...interface{})
+}
+
+type LogNoop struct{}
+
+func (LogNoop) Error(_ string, _ ...interface{}) {}
+func (LogNoop) Info(_ string, _ ...interface{}) {}
+func (LogNoop) Debug(_ string, _ ...interface{}) {}
+func (LogNoop) Warn(_ string, _ ...interface{}) {}
diff --git a/partition.go b/partition.go
index 1f267c5..95f0743 100644
--- a/partition.go
+++ b/partition.go
@@ -5,11 +5,10 @@ import (
"sync"
"github.com/twmb/franz-go/pkg/kgo"
- "github.com/worldline-go/logz"
)
type partitionHandler struct {
- logger logz.Adapter
+ logger Logger
mapPartitionsRevoked map[string][]int32
mapPartitionsLost map[string][]int32
diff --git a/tkafka/connect.go b/tkafka/connect.go
index 26548ad..836ceac 100644
--- a/tkafka/connect.go
+++ b/tkafka/connect.go
@@ -8,7 +8,9 @@ import (
"github.com/worldline-go/wkafka"
)
-var DefaultBrokerAddress = []string{"127.0.0.1:9092"}
+var (
+ DefaultBrokerAddress = []string{"127.0.0.1:9092"}
+)
func BrokerAddress() []string {
brokers := DefaultBrokerAddress
@@ -28,6 +30,51 @@ func Config() wkafka.Config {
}
}
-func TestClient() (*wkafka.Client, error) {
- return wkafka.New(context.Background(), Config())
+func TestClient(opts ...OptionTestClient) (*wkafka.Client, error) {
+ o := optionTestClient{
+ Config: Config(),
+ Ctx: context.Background(),
+ WkafkaOpts: []wkafka.Option{wkafka.WithPingRetry(true)},
+ }
+
+ o.apply(opts...)
+
+ if o.Ctx == nil {
+ o.Ctx = context.Background()
+ }
+
+ return wkafka.New(o.Ctx, o.Config, o.WkafkaOpts...)
+}
+
+type optionTestClient struct {
+ Config wkafka.Config
+ Ctx context.Context
+
+ WkafkaOpts []wkafka.Option
+}
+
+func (o *optionTestClient) apply(opts ...OptionTestClient) {
+ for _, opt := range opts {
+ opt(o)
+ }
+}
+
+type OptionTestClient func(*optionTestClient)
+
+func WithContext(ctx context.Context) OptionTestClient {
+ return func(o *optionTestClient) {
+ o.Ctx = ctx
+ }
+}
+
+func WithWafkaOptions(opts ...wkafka.Option) OptionTestClient {
+ return func(o *optionTestClient) {
+ o.WkafkaOpts = append(o.WkafkaOpts, opts...)
+ }
+}
+
+func WithConfig(cfg wkafka.Config) OptionTestClient {
+ return func(o *optionTestClient) {
+ o.Config = cfg
+ }
}
diff --git a/tkafka/generate.go b/tkafka/generate.go
index c8aeb6c..c556726 100644
--- a/tkafka/generate.go
+++ b/tkafka/generate.go
@@ -10,7 +10,7 @@ import (
type Generate struct {
kadm *kadm.Client
- Topics []string
+ Topics map[string]struct{}
}
type Topic struct {
@@ -24,12 +24,35 @@ type Topic struct {
func NewGenerate(c *wkafka.Client) *Generate {
return &Generate{
- kadm: c.Admin(),
+ kadm: c.Admin(),
+ Topics: make(map[string]struct{}),
}
}
func (g *Generate) Cleanup() (kadm.DeleteTopicResponses, error) {
- return g.kadm.DeleteTopics(context.Background(), g.Topics...)
+ return g.kadm.DeleteTopics(context.Background(), g.getTopics()...)
+}
+
+func (g *Generate) getTopics() []string {
+ topics := make([]string, 0, len(g.Topics))
+ for t := range g.Topics {
+ topics = append(topics, t)
+ }
+
+ return topics
+}
+
+func (g *Generate) DeleteTopics(ctx context.Context, topics ...string) (kadm.DeleteTopicResponses, error) {
+ // remove the topics from the list of topics
+ for _, t := range topics {
+ delete(g.Topics, t)
+ }
+
+ return g.kadm.DeleteTopics(ctx, topics...)
+}
+
+func (g *Generate) DeleteGroups(ctx context.Context, groups ...string) (kadm.DeleteGroupResponses, error) {
+ return g.kadm.DeleteGroups(ctx, groups...)
}
func (g *Generate) CreateTopics(ctx context.Context, topics ...Topic) ([]kadm.CreateTopicResponse, error) {
@@ -46,11 +69,12 @@ func (g *Generate) CreateTopics(ctx context.Context, topics ...Topic) ([]kadm.Cr
}
response, err := g.kadm.CreateTopic(ctx, partitions, replicationFactor, nil, t.Name)
- g.Topics = append(g.Topics, t.Name)
if err != nil {
return nil, err
}
+ g.Topics[t.Name] = struct{}{}
+
responses = append(responses, response)
}
diff --git a/tkafka/read.go b/tkafka/read.go
new file mode 100644
index 0000000..bbe02d5
--- /dev/null
+++ b/tkafka/read.go
@@ -0,0 +1,59 @@
+package tkafka
+
+import (
+ "context"
+ "errors"
+
+ "github.com/worldline-go/wkafka"
+)
+
+type ReadConfig struct {
+ Name string
+
+ KafkaConfig wkafka.Config
+ KafkaConsumer wkafka.ConsumerConfig
+}
+
+func (c *ReadConfig) sanitize() {
+ if c.Name == "" {
+ c.Name = "tkafka"
+ }
+
+ if c.KafkaConfig.Brokers == nil {
+ c.KafkaConfig.Brokers = BrokerAddress()
+ }
+}
+
+// Consume for testing creating client and start consuming messages to your function.
+// - Cancel the context to stop this function.
+func Consume[T any](ctx context.Context, cfg ReadConfig, fn func(ctx context.Context, data T) error) error {
+ if cfg.Name == "" {
+ cfg.Name = "tkafka"
+ }
+
+ cfg.sanitize()
+
+ // create a new client with read config
+ kafkaClient, err := wkafka.New(ctx,
+ cfg.KafkaConfig,
+ wkafka.WithConsumer(cfg.KafkaConsumer),
+ wkafka.WithAppName(cfg.Name),
+ )
+ if err != nil {
+ return err
+ }
+
+ defer kafkaClient.Close()
+
+ if err := kafkaClient.Consume(ctx, wkafka.WithCallback(fn)); err != nil {
+ if errors.Is(err, context.Canceled) {
+ return nil
+ }
+
+ return err
+ }
+
+ // consume messages from topic
+
+ return nil
+}