Skip to content

Commit

Permalink
fix: change consumer functions
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 3, 2024
1 parent 76ea277 commit f7d119c
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 263 deletions.
35 changes: 21 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ type Client struct {
Kafka *kgo.Client

clientID []byte
Consumer consumer
consumer consumer
}

func NewClient(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
o := options{
ClientID: DefaultClientID,
AutoTopicCreation: true,
Expand Down Expand Up @@ -59,30 +59,28 @@ func NewClient(ctx context.Context, cfg Config, opts ...Option) (*Client, error)
kgoOpt = append(kgoOpt, kgo.SASL(saslOpts...))
}

if o.Consumer != nil {
lConfig := o.Consumer.config()

if o.ConsumerEnabled {
// validate consumer
if err := lConfig.Validate(); err != nil {
if err := cfg.Consumer.Validation.Validate(o.ConsumerConfig); err != nil {
return nil, fmt.Errorf("validate consumer config: %w", err)
}

// start offset settings
startOffset := kgo.NewOffset()
switch v := lConfig.StartOffset; {
switch v := o.ConsumerConfig.StartOffset; {
case v == 0 || v == -2 || v < -2:
startOffset = startOffset.AtStart()
case v == -1:
startOffset = startOffset.AtEnd()
default:
startOffset = startOffset.At(lConfig.StartOffset)
startOffset = startOffset.At(o.ConsumerConfig.StartOffset)
}

kgoOpt = append(kgoOpt,
kgo.DisableAutoCommit(),
kgo.RequireStableFetchOffsets(),
kgo.ConsumerGroup(lConfig.GroupID),
kgo.ConsumeTopics(lConfig.Topics...),
kgo.ConsumerGroup(o.ConsumerConfig.GroupID),
kgo.ConsumeTopics(o.ConsumerConfig.Topics...),
kgo.ConsumeResetOffset(startOffset),
)
}
Expand All @@ -98,7 +96,6 @@ func NewClient(ctx context.Context, cfg Config, opts ...Option) (*Client, error)

cl := &Client{
Kafka: kgoClient,
Consumer: o.Consumer,
clientID: []byte(o.ClientID),
}

Expand All @@ -115,24 +112,34 @@ func (c *Client) Close() {
}
}

func (c *Client) Consume(ctx context.Context) error {
if c.Consumer == nil {
// Consume starts consuming messages from kafka.
// - Only works if client is created with consumer config.
func (c *Client) Consume(ctx context.Context, opts ...OptionConsumer) error {
o := optionConsumer{}
if err := o.apply(opts...); err != nil {
return err
}

c.consumer = o.Consumer
if c.consumer == nil {
return fmt.Errorf("consumer is nil: %w", ErrNotImplemented)
}

if err := c.Consumer.Consume(ctx, c.Kafka); err != nil {
if err := c.consumer.Consume(ctx, c.Kafka); err != nil {
return fmt.Errorf("failed to consume: %w", err)
}

return nil
}

// Produce sends a message to kafka. For type producer check wkafka.NewProducer.
func (c *Client) ProduceRaw(ctx context.Context, records []*kgo.Record) error {
result := c.Kafka.ProduceSync(ctx, records...)

return result.FirstErr()

Check failure on line 139 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (github.com/twmb/franz-go/pkg/kgo.ProduceResults).FirstErr() error (wrapcheck)

Check failure on line 139 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (github.com/twmb/franz-go/pkg/kgo.ProduceResults).FirstErr() error (wrapcheck)
}

// Admin returns an admin client to manage kafka.
func (c *Client) Admin() *kadm.Client {
return kadm.NewClient(c.Kafka)
}
169 changes: 88 additions & 81 deletions clientoptions.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package wkafka

import (
"context"

"github.com/twmb/franz-go/pkg/kgo"
)

// DefaultBatchCount is default batch count for batch consumer, if not set.
var DefaultBatchCount = 100

type options struct {
Consumer consumer
ConsumerEnabled bool
ConsumerConfig ConsumerConfig
// Consumer consumer
ClientID string
InfoVersion string
KGOOptions []kgo.Opt
Expand Down Expand Up @@ -56,86 +56,93 @@ func WithKGOOptions(opts ...kgo.Opt) Option {
}
}

// WithConsumer sets the listener to use.
func WithConsumer[T any](
cfg ConsumeConfig,
processor Processor[T],
opts ...OptionSingle,
) Option {
func WithConsumer(cfg ConsumerConfig) Option {
return func(o *options) {
var decodeWithRecord func([]byte, *kgo.Record) (T, error)
if v, ok := processor.(ProcessorDecodeWithRecord[T]); ok {
decodeWithRecord = v.DecodeWithRecord
}

var decode func([]byte) (T, error)
if decodeWithRecord == nil {
decode = codecJSON[T]{}.Decode
if v, ok := processor.(ProcessorDecode[T]); ok {
decode = v.Decode
}
}

var precheck func(context.Context, *kgo.Record) error
if v, ok := processor.(ProcessorPreCheck); ok {
precheck = v.PreCheck
}

// additional options
opt := optionSingle{}
opt.apply(opts...)

o.Consumer = consumerSingle[T]{
Process: processor.Process,
Cfg: cfg,
PreCheck: precheck,
DecodeWithRecord: decodeWithRecord,
Decode: decode,
Option: opt,
}
o.ConsumerConfig = cfg
o.ConsumerEnabled = true
}
}

// WithConsumer sets the listener to use.
func WithConsumerBatch[T any](
cfg ConsumeConfig,
processor Processor[[]T],
opts ...OptionBatch,
) Option {
return func(o *options) {
var decodeWithRecord func([]byte, *kgo.Record) (T, error)
if v, ok := processor.(ProcessorDecodeWithRecord[T]); ok {
decodeWithRecord = v.DecodeWithRecord
}

var decode func([]byte) (T, error)
if decodeWithRecord == nil {
decode = codecJSON[T]{}.Decode
if v, ok := processor.(ProcessorDecode[T]); ok {
decode = v.Decode
}
}

var precheck func(context.Context, *kgo.Record) error
if v, ok := processor.(ProcessorPreCheck); ok {
precheck = v.PreCheck
}

if cfg.BatchCount <= 0 {
cfg.BatchCount = DefaultBatchCount
}

// additional options
opt := optionBatch{}
opt.apply(opts...)

o.Consumer = consumerBatch[T]{
Process: processor.Process,
Cfg: cfg,
PreCheck: precheck,
DecodeWithRecord: decodeWithRecord,
Decode: decode,
Option: opt,
}
}
}
// func WithConsumer[T any](
// cfg ConsumeConfig,
// processor Processor[T],
// opts ...OptionSingle,
// ) Option {
// return func(o *options) {
// var decodeWithRecord func([]byte, *kgo.Record) (T, error)
// if v, ok := processor.(ProcessorDecodeWithRecord[T]); ok {
// decodeWithRecord = v.DecodeWithRecord
// }

// var decode func([]byte) (T, error)
// if decodeWithRecord == nil {
// decode = codecJSON[T]{}.Decode
// if v, ok := processor.(ProcessorDecode[T]); ok {
// decode = v.Decode
// }
// }

// var precheck func(context.Context, *kgo.Record) error
// if v, ok := processor.(ProcessorPreCheck); ok {
// precheck = v.PreCheck
// }

// // additional options
// opt := optionSingle{}
// opt.apply(opts...)

// o.Consumer = consumerSingle[T]{
// Process: processor.Process,
// Cfg: cfg,
// PreCheck: precheck,
// DecodeWithRecord: decodeWithRecord,
// Decode: decode,
// Option: opt,
// }
// }
// }

// WithConsumer sets the listener to use.
// func WithConsumerBatch[T any](
// cfg ConsumerConfig,
// processor Processor[[]T],
// opts ...OptionBatch,
// ) Option {
// return func(o *options) {
// var decodeWithRecord func([]byte, *kgo.Record) (T, error)
// if v, ok := processor.(ProcessorDecodeWithRecord[T]); ok {
// decodeWithRecord = v.DecodeWithRecord
// }

// var decode func([]byte) (T, error)
// if decodeWithRecord == nil {
// decode = codecJSON[T]{}.Decode
// if v, ok := processor.(ProcessorDecode[T]); ok {
// decode = v.Decode
// }
// }

// var precheck func(context.Context, *kgo.Record) error
// if v, ok := processor.(ProcessorPreCheck); ok {
// precheck = v.PreCheck
// }

// if cfg.BatchCount <= 0 {
// cfg.BatchCount = DefaultBatchCount
// }

// // additional options
// opt := optionBatch{}
// opt.apply(opts...)

// o.Consumer = consumerBatch[T]{
// Process: processor.Process,
// Cfg: cfg,
// PreCheck: precheck,
// DecodeWithRecord: decodeWithRecord,
// Decode: decode,
// Option: opt,
// }
// }
// }
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (codecJSON[T]) Encode(data T) ([]byte, error) {
return json.Marshal(data)

Check failure on line 52 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Marshal(v any) ([]byte, error) (wrapcheck)

Check failure on line 52 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Marshal(v any) ([]byte, error) (wrapcheck)
}

func (codecJSON[T]) Decode(raw []byte) (T, error) {
func (codecJSON[T]) Decode(raw []byte, _ *kgo.Record) (T, error) {

Check failure on line 55 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Decode returns interface (T) (ireturn)

Check failure on line 55 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Decode returns interface (T) (ireturn)
var data T
err := json.Unmarshal(raw, &data)
if err != nil {
Expand Down
76 changes: 76 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package wkafka

import (
"fmt"
"regexp"
)

type Config struct {
// Brokers is a list of kafka brokers to connect to.
// Not all brokers need to be specified, the list is so that
Expand All @@ -15,4 +20,75 @@ type Config struct {
// - lz4
// - zstd
Compressions []string `cfg:"compressions"`

Consumer ConsumerPreConfig `cfg:"consumer"`
}

type ConsumerPreConfig struct {
// PrefixGroupID add prefix to group_id.
PrefixGroupID string `cfg:"prefix_group_id"`

Validation Validation `cfg:"validation"`
}

// Apply configuration to ConsumerConfig and check validation.
func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig) (ConsumerConfig, error) {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}

if err := c.Validation.Validate(consumerConfig); err != nil {
return consumerConfig, fmt.Errorf("validate consumer config: %w", err)
}

return consumerConfig, nil
}

// Validation is a configuration for validation when consumer initialized.
type Validation struct {
GroupID GroupIDValidation `cfg:"group_id"`
}

// GroupIDValidation is a configuration for group_id validation.
type GroupIDValidation struct {
Enabled bool `cfg:"enabled"`
// RgxGroupID is a regex pattern to validate RgxGroupID.
RgxGroupID string `cfg:"rgx_group_id"`
// DisableWord boundary check.
DisableWordBoundary bool `cfg:"disable_word_boundary"`
}

func (v GroupIDValidation) Validate(groupID string) error {
if !v.Enabled {
return nil
}

if groupID == "" {
return fmt.Errorf("group_id is required")
}

if v.RgxGroupID != "" {
if !v.DisableWordBoundary {
v.RgxGroupID = fmt.Sprintf(`\b%s\b`, v.RgxGroupID)
}

rgx, err := regexp.Compile(v.RgxGroupID)
if err != nil {
return fmt.Errorf("group_id validation regex: %w", err)
}

if !rgx.MatchString(groupID) {
return fmt.Errorf("group_id validation failed regex [%s], value [%s]", v.RgxGroupID, groupID)
}
}

return nil
}

func (v Validation) Validate(consumerConfig ConsumerConfig) error {
if err := v.GroupID.Validate(consumerConfig.GroupID); err != nil {
return err
}

return nil
}
Loading

0 comments on commit f7d119c

Please sign in to comment.