Skip to content

Commit

Permalink
fix: ping broker after initialize
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Dec 14, 2023
1 parent 48baa9c commit 48c4aef
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ For creating a consumer we need to give config while creating a client with a pr
### Producer
Use consumer client or create without consumer settings.
Use consumer client or create without consumer settings, `NewClient` also try to connect to brokers.

```go
client, err := wkafka.NewClient(kafkaConfig)
Expand Down
6 changes: 5 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Client struct {
Consumer consumer
}

func NewClient(cfg Config, opts ...Option) (*Client, error) {
func NewClient(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
o := options{
ClientID: DefaultClientID,
AutoTopicCreation: true,
Expand Down Expand Up @@ -96,6 +96,10 @@ func NewClient(cfg Config, opts ...Option) (*Client, error) {
clientID: []byte(o.ClientID),
}

if err := cl.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("connection to kafka brokers: %w", err)
}

return cl, nil
}

Expand Down
1 change: 1 addition & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func Test_GroupConsuming(t *testing.T) {
for _, c := range tt.consumers {
process := p.SetWait(c.MessageWait).SetClientID(c.ClientID)
client, err := wkafka.NewClient(
ctx,
tkafka.Config(),
wkafka.WithClientID(c.ClientID),
wkafka.WithConsumer(c.Config, process),
Expand Down
2 changes: 1 addition & 1 deletion example/admin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
}

func run(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.NewClient(kafkaConfig)
client, err := wkafka.NewClient(ctx, kafkaConfig)
if err != nil {
return err

Check failure on line 25 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(ctx context.Context, cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)

Check failure on line 25 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(ctx context.Context, cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)
}
Expand Down
4 changes: 2 additions & 2 deletions example/consume/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var (
kafkaConfig = wkafka.Config{
Brokers: []string{"localhost:9092"},
Brokers: []string{"localhost:9095"},
}
consumeConfig = wkafka.ConsumeConfig{
Topics: []string{"test"},
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {

func run(ctx context.Context, _ *sync.WaitGroup) error {
p := Processor{}
client, err := wkafka.NewClient(kafkaConfig, wkafka.WithConsumerBatch(consumeConfig, p))
client, err := wkafka.NewClient(ctx, kafkaConfig, wkafka.WithConsumerBatch(consumeConfig, p))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion example/produce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
}

func run(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.NewClient(kafkaConfig)
client, err := wkafka.NewClient(ctx, kafkaConfig)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ go 1.21

require (
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.15.0
github.com/twmb/franz-go/pkg/kadm v1.9.2
github.com/twmb/franz-go v1.15.3
github.com/twmb/franz-go/pkg/kadm v1.10.0
github.com/twmb/tlscfg v1.2.1
github.com/worldline-go/initializer v0.2.3
github.com/worldline-go/initializer v0.2.4
)

require (
Expand All @@ -18,7 +18,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.30.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.6.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
github.com/worldline-go/logz v0.5.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twmb/franz-go v1.15.0 h1:bw5n1COKJzWpkCXG/kMtHrurcS9HSWV6e3If5CUdc+M=
github.com/twmb/franz-go v1.15.0/go.mod h1:nMAvTC2kHtK+ceaSHeHm4dlxC78389M/1DjpOswEgu4=
github.com/twmb/franz-go/pkg/kadm v1.9.2 h1:2Aj7DOaSFT5TyJ5BLEbAanXuby7CeWjpXW9ht8fy73c=
github.com/twmb/franz-go/pkg/kadm v1.9.2/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc=
github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM=
github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go v1.15.3 h1:96nCgxz4DvGPSCumz6giquYy8GGDNsYCwWcloBdjJ4w=
github.com/twmb/franz-go v1.15.3/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48=
github.com/twmb/franz-go/pkg/kadm v1.10.0 h1:3oYKNP+e3HGo4GYadrDeRxOaAIsOXmX6LBVMz9PxpCU=
github.com/twmb/franz-go/pkg/kadm v1.10.0/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs=
github.com/worldline-go/initializer v0.2.3 h1:ovQRcNzscDJD/fjcHEuvxOzmcwF6VOI6XhWZY2ou4cQ=
github.com/worldline-go/initializer v0.2.3/go.mod h1:UkLyW92jTTU3faHv/95dHisu36SszZRMt98Es22ye0Q=
github.com/worldline-go/initializer v0.2.4 h1:niJKtvdE9T3fupzhc4d1+FfHin4QHn0Go+qxLjwpsD4=
github.com/worldline-go/initializer v0.2.4/go.mod h1:UkLyW92jTTU3faHv/95dHisu36SszZRMt98Es22ye0Q=
github.com/worldline-go/logz v0.5.0 h1:o/xVrxo51Lt1F1Otu3In6wHSZlmCjzLZA8f9pEeGnE0=
github.com/worldline-go/logz v0.5.0/go.mod h1:CWLYHvL+YkzCRfZmQ86zbMyWR/9mmZ2dIQEeDugaIXo=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
Expand Down
3 changes: 2 additions & 1 deletion tkafka/connect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tkafka

import (
"context"
"os"
"strings"

Expand Down Expand Up @@ -28,5 +29,5 @@ func Config() wkafka.Config {
}

func TestClient() (*wkafka.Client, error) {
return wkafka.NewClient(Config())
return wkafka.NewClient(context.Background(), Config())
}

0 comments on commit 48c4aef

Please sign in to comment.