Skip to content

Commit

Permalink
feat: add options for consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Dec 28, 2023
1 parent 7fc13a7 commit 76ea277
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 24 deletions.
6 changes: 6 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func NewClient(ctx context.Context, cfg Config, opts ...Option) (*Client, error)

if o.Consumer != nil {
lConfig := o.Consumer.config()

// validate consumer
if err := lConfig.Validate(); err != nil {
return nil, fmt.Errorf("validate consumer config: %w", err)
}

// start offset settings
startOffset := kgo.NewOffset()
switch v := lConfig.StartOffset; {
Expand Down
17 changes: 16 additions & 1 deletion clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

// DefaultBatchCount is default batch count for batch consumer, if not set.
var DefaultBatchCount = 100

type options struct {
Consumer consumer
ClientID string
Expand Down Expand Up @@ -57,6 +60,7 @@ func WithKGOOptions(opts ...kgo.Opt) Option {
func WithConsumer[T any](
cfg ConsumeConfig,
processor Processor[T],
opts ...OptionSingle,
) Option {
return func(o *options) {
var decodeWithRecord func([]byte, *kgo.Record) (T, error)
Expand All @@ -77,12 +81,17 @@ func WithConsumer[T any](
precheck = v.PreCheck
}

// additional options
opt := optionSingle{}
opt.apply(opts...)

o.Consumer = consumerSingle[T]{
Process: processor.Process,
Cfg: cfg,
PreCheck: precheck,
DecodeWithRecord: decodeWithRecord,
Decode: decode,
Option: opt,
}
}
}
Expand All @@ -91,6 +100,7 @@ func WithConsumer[T any](
func WithConsumerBatch[T any](
cfg ConsumeConfig,
processor Processor[[]T],
opts ...OptionBatch,
) Option {
return func(o *options) {
var decodeWithRecord func([]byte, *kgo.Record) (T, error)
Expand All @@ -112,15 +122,20 @@ func WithConsumerBatch[T any](
}

if cfg.BatchCount <= 0 {
cfg.BatchCount = 1
cfg.BatchCount = DefaultBatchCount
}

// additional options
opt := optionBatch{}
opt.apply(opts...)

o.Consumer = consumerBatch[T]{
Process: processor.Process,
Cfg: cfg,
PreCheck: precheck,
DecodeWithRecord: decodeWithRecord,
Decode: decode,
Option: opt,
}
}
}
36 changes: 26 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package wkafka

import (
"context"
"fmt"
"regexp"

"github.com/twmb/franz-go/pkg/kgo"
)
Expand Down Expand Up @@ -39,24 +41,38 @@ type ConsumeConfig struct {
// - Default is max.poll.records in the broker configuration, usually 500.
// - Fetching messages from broker, this is not related with batch processing!
MaxPollRecords int `cfg:"max_poll_records"`
// Concurrent to run the consumer in concurrent mode for each partition and topic.
// - Default is false.
// - Each topic could have different type of value so use with processor map.
Concurrent bool `cfg:"concurrent"`
// BatchCount is a number of messages processed in a single batch.
// - <= 1 is 1 message per batch.
// - Processing count could be less than BatchCount if the batch is not full.
// - Usable with WithConsumerBatch
BatchCount int `cfg:"batch_count"`

Validation Validation `cfg:"validation"`
}

// Validation is a configuration for validation when consumer initialized.
type Validation struct {
// GroupID is a regex pattern to validate GroupID.
GroupID string `cfg:"group_id"`
}

// WithConcurrent to run the consumer config with on/off concurrent mode.
//
// Concurrent should be set in the programmatic way.
func (c ConsumeConfig) WithConcurrent(v bool) ConsumeConfig {
c.Concurrent = v
func (c ConsumeConfig) Validate() error {
if c.GroupID == "" {
return fmt.Errorf("group_id is required")
}

if c.Validation.GroupID != "" {
rgx, err := regexp.Compile(c.Validation.GroupID)
if err != nil {
return fmt.Errorf("group_id validation regex: %w", err)
}

if !rgx.MatchString(c.GroupID) {
return fmt.Errorf("group_id validation failed regex [%s], value [%s]", c.Validation.GroupID, c.GroupID)
}
}

return c
return nil
}

type Offsets struct {
Expand Down
40 changes: 37 additions & 3 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,45 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

type optionBatch struct {
DisableCommit bool
// Concurrent to run the consumer in concurrent mode for each partition and topic.
// - Default is false.
// - Each topic could have different type of value so use with processor map.
Concurrent bool `cfg:"concurrent"`
}

type OptionBatch func(*optionBatch)

func (o *optionBatch) apply(opts ...OptionBatch) {
for _, opt := range opts {
opt(o)
}
}

// WithBatchDisableCommit to set wkafka consumer's commit messages.
// - Need to run manually commit messages command.
// - Use this option only what you know what you are doing!
func WithBatchDisableCommit() OptionBatch {
return func(o *optionBatch) {
o.DisableCommit = true
}
}

func WithBatchConcurrent() OptionBatch {
return func(o *optionBatch) {
o.Concurrent = true
}
}

type consumerBatch[T any] struct {
Process func(ctx context.Context, msg []T) error
Cfg ConsumeConfig
Decode func(raw []byte) (T, error)
DecodeWithRecord func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionBatch
}

func (c consumerBatch[T]) config() ConsumeConfig {
Expand All @@ -37,7 +69,7 @@ func (c consumerBatch[T]) Consume(ctx context.Context, cl *kgo.Client) error {
continue
}

if !c.Cfg.Concurrent {
if !c.Option.Concurrent {
if err := c.batchIteration(ctx, cl, fetch); err != nil {
return err
}
Expand Down Expand Up @@ -100,8 +132,10 @@ func (c consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, fe
batch = make([]T, 0, c.Cfg.BatchCount)
}

if err := cl.CommitRecords(ctx, records...); err != nil {
return fmt.Errorf("commit batch records failed: %w", err)
if !c.Option.DisableCommit {
if err := cl.CommitRecords(ctx, records...); err != nil {
return fmt.Errorf("commit batch records failed: %w", err)
}
}

return nil
Expand Down
40 changes: 37 additions & 3 deletions consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,45 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

type optionSingle struct {
DisableCommit bool
// Concurrent to run the consumer in concurrent mode for each partition and topic.
// - Default is false.
// - Each topic could have different type of value so use with processor map.
Concurrent bool `cfg:"concurrent"`
}

type OptionSingle func(*optionSingle)

func (o *optionSingle) apply(opts ...OptionSingle) {
for _, opt := range opts {
opt(o)
}
}

// WithSingleDisableCommit to set wkafka consumer's commit messages.
// - Need to run manually commit messages command.
// - Use this option only what you know what you are doing!
func WithSingleDisableCommit() OptionBatch {
return func(o *optionBatch) {
o.DisableCommit = true
}
}

func WithSingleConcurrent() OptionBatch {
return func(o *optionBatch) {
o.Concurrent = true
}
}

type consumerSingle[T any] struct {
Process func(ctx context.Context, msg T) error
Cfg ConsumeConfig
Decode func(raw []byte) (T, error)
DecodeWithRecord func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionSingle
}

func (c consumerSingle[T]) config() ConsumeConfig {
Expand All @@ -37,7 +69,7 @@ func (c consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error {
continue
}

if !c.Cfg.Concurrent {
if !c.Option.Concurrent {
if err := c.iteration(ctx, cl, fetch); err != nil {
return err
}
Expand Down Expand Up @@ -72,8 +104,10 @@ func (c consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch
}
}

if err := cl.CommitRecords(ctx, r); err != nil {
return wrapErr(r, fmt.Errorf("commit records failed: %w", err))
if !c.Option.DisableCommit {
if err := cl.CommitRecords(ctx, r); err != nil {
return wrapErr(r, fmt.Errorf("commit records failed: %w", err))
}
}
}

Expand Down
8 changes: 1 addition & 7 deletions example/produce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@ type Data struct {
Details map[string]interface{}
}

func (d *Data) ProducerHook(r *wkafka.Record) *wkafka.Record {
if d == nil {
return r
}

func (d *Data) ProduceHook(r *wkafka.Record) {
r.Value = []byte(d.Name)
r.Headers = append(r.Headers, wkafka.Header{
Key: "name",
Value: []byte(d.Name),
})
r.Key = []byte(d.Name)
r.Topic = d.Topic

return r
}

func main() {
Expand Down

0 comments on commit 76ea277

Please sign in to comment.