Skip to content

Commit

Permalink
fix: consumer goroutine
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Oct 19, 2023
1 parent d897152 commit a758327
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 47 deletions.
7 changes: 6 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)

Expand Down Expand Up @@ -110,8 +111,12 @@ func (c *Client) Consume(ctx context.Context) error {
return nil
}

func (c *Client) produceRaw(ctx context.Context, records []*kgo.Record) error {
func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error {
result := c.Kafka.ProduceSync(ctx, records...)

return result.FirstErr()

Check failure on line 117 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (github.com/twmb/franz-go/pkg/kgo.ProduceResults).FirstErr() error (wrapcheck)
}

func (c *Client) Admin() *kadm.Client {
return kadm.NewClient(c.Kafka)
}
56 changes: 20 additions & 36 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ type ConsumeConfig struct {
// - 90
// before: 20 // skip all offsets before or equal to this offset
Skip map[string]map[int32]Offsets `cfg:"skip"`
// MaxPollRecords is the maximum number of records returned in a single call to poll.
// - Default is max.poll.records in the broker configuration, usually 500.
// - Fetching messages from broker, this is not related with batch processing!
MaxPollRecords int `cfg:"max_poll_records"`
// Concurrent to run the consumer in concurrent mode for each partition and topic.
// - Default is false.
// - Each topic could have different type of value so use with processor map.
Concurrent bool `cfg:"concurrent"`
}

type Offsets struct {
Expand Down Expand Up @@ -96,53 +104,29 @@ func (c consume[T]) skip(r *kgo.Record) bool {

func (c consume[T]) Consume(ctx context.Context, cl *kgo.Client) error {
for {
fetches := cl.PollFetches(ctx)
if fetches.IsClientClosed() {
fetch := cl.PollRecords(ctx, c.Cfg.MaxPollRecords)
if fetch.IsClientClosed() {
return ErrClientClosed
}

var errP error
fetches.EachError(func(t string, p int32, err error) {
if errors.Is(err, context.Canceled) {
errP = err
if err := fetch.Err(); err != nil {
return fmt.Errorf("poll fetch err: %w", err)
}

return
for iter := fetch.RecordIter(); !iter.Done(); {
r := iter.Next()
if err := c.iteration(ctx, r); err != nil {
return wrapErr(r, err)
}

errP = fmt.Errorf("fetch err topic %s partition %d: %w", t, p, err)
})
if errP != nil {
return errP
}

for iter := fetches.RecordIter(); !iter.Done(); {
if err := c.iteration(ctx, cl, iter); err != nil {
return err
if err := cl.CommitRecords(ctx, r); err != nil {
return wrapErr(r, fmt.Errorf("commit records failed: %w", err))
}
}
}
}

func (c consume[T]) iteration(ctx context.Context, cl *kgo.Client, iter *kgo.FetchesRecordIter) (err error) {
r := iter.Next()
if r == nil {
return nil
}

defer func() {
if err != nil {
err = wrapErr(r, err)

return
}

if errCommit := cl.CommitRecords(ctx, r); errCommit != nil {
err = wrapErr(r, fmt.Errorf("commit records failed: %w", errCommit))

return
}
}()

func (c consume[T]) iteration(ctx context.Context, r *kgo.Record) error {
if c.skip(r) {
return nil
}
Expand Down
51 changes: 51 additions & 0 deletions example/admin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"log/slog"
"sync"

"github.com/worldline-go/initializer"
"github.com/worldline-go/wkafka"
)

var (
kafkaConfig = wkafka.Config{
Brokers: []string{"localhost:9092"},
}
)

func main() {
initializer.Init(run)
}

func run(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.NewClient(kafkaConfig)
if err != nil {
return err

Check failure on line 25 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)

Check failure on line 25 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.NewClient(cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)
}
defer client.Close()

admClient := client.Admin()

resp, err := admClient.CreateTopic(ctx, -1, -1, nil, "test-1234")
if err != nil {
return err

Check failure on line 33 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (*github.com/twmb/franz-go/pkg/kadm.Client).CreateTopic(ctx context.Context, partitions int32, replicationFactor int16, configs map[string]*string, topic string) (github.com/twmb/franz-go/pkg/kadm.CreateTopicResponse, error) (wrapcheck)

Check failure on line 33 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (*github.com/twmb/franz-go/pkg/kadm.Client).CreateTopic(ctx context.Context, partitions int32, replicationFactor int16, configs map[string]*string, topic string) (github.com/twmb/franz-go/pkg/kadm.CreateTopicResponse, error) (wrapcheck)
}

slog.Info("topic created",
slog.String("topic", resp.Topic),
slog.Int64("partitions", int64(resp.NumPartitions)),
slog.Int64("replicas", int64(resp.ReplicationFactor)),
)

// list topics
topics, err := admClient.ListTopics(ctx)
if err != nil {
return err

Check failure on line 45 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (*github.com/twmb/franz-go/pkg/kadm.Client).ListTopics(ctx context.Context, topics ...string) (github.com/twmb/franz-go/pkg/kadm.TopicDetails, error) (wrapcheck)

Check failure on line 45 in example/admin/main.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (*github.com/twmb/franz-go/pkg/kadm.Client).ListTopics(ctx context.Context, topics ...string) (github.com/twmb/franz-go/pkg/kadm.TopicDetails, error) (wrapcheck)
}

slog.Info("all topics", slog.Any("topics", topics.Names()))

return nil
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ go 1.21
require (
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.15.0
github.com/twmb/franz-go/pkg/kadm v1.9.2
github.com/twmb/tlscfg v1.2.1
github.com/worldline-go/initializer v0.2.3
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.30.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.6.1 // indirect
github.com/worldline-go/logz v0.5.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
Expand All @@ -23,6 +23,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twmb/franz-go v1.15.0 h1:bw5n1COKJzWpkCXG/kMtHrurcS9HSWV6e3If5CUdc+M=
github.com/twmb/franz-go v1.15.0/go.mod h1:nMAvTC2kHtK+ceaSHeHm4dlxC78389M/1DjpOswEgu4=
github.com/twmb/franz-go/pkg/kadm v1.9.2 h1:2Aj7DOaSFT5TyJ5BLEbAanXuby7CeWjpXW9ht8fy73c=
github.com/twmb/franz-go/pkg/kadm v1.9.2/go.mod h1:hUMoV4SRho+2ij/S9cL39JaLsr+XINjn0ZkCdBY2DXc=
github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM=
github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
Expand All @@ -31,14 +33,14 @@ github.com/worldline-go/initializer v0.2.3 h1:ovQRcNzscDJD/fjcHEuvxOzmcwF6VOI6Xh
github.com/worldline-go/initializer v0.2.3/go.mod h1:UkLyW92jTTU3faHv/95dHisu36SszZRMt98Es22ye0Q=
github.com/worldline-go/logz v0.5.0 h1:o/xVrxo51Lt1F1Otu3In6wHSZlmCjzLZA8f9pEeGnE0=
github.com/worldline-go/logz v0.5.0/go.mod h1:CWLYHvL+YkzCRfZmQ86zbMyWR/9mmZ2dIQEeDugaIXo=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewProducer[T any](client *Client, cfg ProducerConfig[T]) (Producer[T], err

return &produce[T]{
ProducerConfig: setCfg,
produceRaw: client.produceRaw,
produceRaw: client.ProduceRaw,
}, nil
}

Expand Down

0 comments on commit a758327

Please sign in to comment.