Skip to content

Commit

Permalink
fix: examples run
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 6, 2024
1 parent 5e66b66 commit 727f379
Show file tree
Hide file tree
Showing 22 changed files with 543 additions and 261 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ env-logs: ## Show env logs
env-down: ## Stop env
docker compose -p wkafka down

run-example: ## Run example
@go run ./example/main.go

.golangci.yml:
@$(MAKE) golangci

Expand Down
62 changes: 36 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"

"github.com/rs/zerolog/log"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
"golang.org/x/sync/errgroup"
)

Expand All @@ -14,55 +16,58 @@ type Client struct {
KafkaDLQ *kgo.Client

clientID []byte
consumerConfig ConsumerConfig
consumerConfig *ConsumerConfig
logger logz.Adapter
}

func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
o := options{
ClientID: DefaultClientID,
AutoTopicCreation: true,
AppName: idProgname,
Logger: logz.AdapterKV{Log: log.Logger},
}

o.apply(opts...)

// validate client and add defaults to consumer config
consumerConfig, err := cfg.Consumer.Apply(o.ConsumerConfig, o.AppName)
if err != nil {
return nil, fmt.Errorf("validate config: %w", err)
if o.ConsumerConfig != nil {
if err := configApply(cfg.Consumer, o.ConsumerConfig, o.AppName, o.Logger); err != nil {
return nil, fmt.Errorf("validate config: %w", err)
}
}

o.ConsumerConfig = consumerConfig
c := &Client{
consumerConfig: o.ConsumerConfig,
logger: o.Logger,
clientID: []byte(o.ClientID),
}

kgoClient, err := newClient(ctx, cfg, o)
kgoClient, err := newClient(c, cfg, &o, false)
if err != nil {
return nil, err
}

var kgoClientDLQ *kgo.Client
if o.ConsumerEnabled {
kgoClientDLQ, err = newClient(ctx, cfg, o.WithDLQ())
kgoClientDLQ, err = newClient(c, cfg, &o, true)
if err != nil {
return nil, err
}
}

cl := &Client{
Kafka: kgoClient,
KafkaDLQ: kgoClientDLQ,
clientID: []byte(o.ClientID),
consumerConfig: o.ConsumerConfig,
}
c.Kafka = kgoClient
c.KafkaDLQ = kgoClientDLQ

// main and dlq use same config, ask for validation once
if err := cl.Kafka.Ping(ctx); err != nil {
if err := c.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("connection to kafka brokers: %w", err)
}

return cl, nil
return c, nil
}

func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error) {
func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, error) {
compressions, err := compressionOpts(cfg.Compressions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -99,13 +104,8 @@ func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error)
}

if o.ConsumerEnabled {

Check failure on line 106 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

`if o.ConsumerEnabled` has complex nested blocks (complexity: 6) (nestif)
// validate consumer
if err := cfg.Consumer.Validation.Validate(o.ConsumerConfig); err != nil {
return nil, fmt.Errorf("validate consumer config: %w", err)
}

var startOffsetCfg int64
if o.DLQ {
if isDLQ {
startOffsetCfg = o.ConsumerConfig.DLQ.StartOffset
} else {
startOffsetCfg = o.ConsumerConfig.StartOffset
Expand All @@ -127,17 +127,24 @@ func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error)
kgo.RequireStableFetchOffsets(),
kgo.ConsumerGroup(o.ConsumerConfig.GroupID),
kgo.ConsumeResetOffset(startOffset),
kgo.OnPartitionsLost(partitionLost(c)),
kgo.OnPartitionsRevoked(partitionRevoked(c)),
)

if o.DLQ {
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.DLQ.Topic))
if isDLQ {
topics := []string{o.ConsumerConfig.DLQ.Topic}
if len(o.ConsumerConfig.DLQ.TopicsExtra) > 0 {
topics = append(topics, o.ConsumerConfig.DLQ.TopicsExtra...)
}

kgoOpt = append(kgoOpt, kgo.ConsumeTopics(topics...))
} else {
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.Topics...))
}
}

// Add custom options
if o.DLQ {
if isDLQ {
kgoOpt = append(kgoOpt, o.KGOOptionsDLQ...)
} else {
kgoOpt = append(kgoOpt, o.KGOOptions...)
Expand All @@ -156,14 +163,17 @@ func (c *Client) Close() {
if c.Kafka != nil {
c.Kafka.Close()
}
if c.KafkaDLQ != nil {
c.KafkaDLQ.Close()
}
}

// Consume starts consuming messages from kafka.
// - Only works if client is created with consumer config.
func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error {
o := optionConsumer{
Client: c,
ConsumerConfig: c.consumerConfig,
ConsumerConfig: *c.consumerConfig,
}

opts = append([]OptionConsumer{OptionConsumer(callback)}, opts...)
Expand Down
22 changes: 13 additions & 9 deletions clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wkafka

import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/worldline-go/logz"
)

// DefaultBatchCount is default batch count for batch consumer, if not set.
Expand All @@ -10,13 +11,13 @@ var DefaultBatchCount = 100
type options struct {
AppName string
ConsumerEnabled bool
ConsumerConfig ConsumerConfig
ConsumerConfig *ConsumerConfig
// Consumer consumer
ClientID string
KGOOptions []kgo.Opt
KGOOptionsDLQ []kgo.Opt
AutoTopicCreation bool
DLQ bool
Logger logz.Adapter
}

func (o *options) apply(opts ...Option) {
Expand All @@ -25,12 +26,6 @@ func (o *options) apply(opts ...Option) {
}
}

func (o options) WithDLQ() options {
o.DLQ = true

return o
}

type Option func(*options)

// WithClientID to set client_id in kafka server.
Expand Down Expand Up @@ -91,7 +86,16 @@ func WithKGOOptionsDLQ(opts ...kgo.Opt) Option {

func WithConsumer(cfg ConsumerConfig) Option {
return func(o *options) {
o.ConsumerConfig = cfg
o.ConsumerConfig = &cfg
o.ConsumerEnabled = true
}
}

// WithLogger configures the client to use the provided logger.
// - For zerolog logz.AdapterKV{Log: logger} can usable.
// - Default is using zerolog's global logger.
func WithLogger(logger logz.Adapter) Option {
return func(o *options) {
o.Logger = logger
}
}
39 changes: 30 additions & 9 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package wkafka
import (
"fmt"
"regexp"
"time"

"github.com/worldline-go/logz"
)

var DefaultRetryInterval = 10 * time.Second

type Config struct {
// Brokers is a list of kafka brokers to connect to.
// Not all brokers need to be specified, the list is so that
Expand All @@ -28,27 +33,39 @@ type ConsumerPreConfig struct {
// PrefixGroupID add prefix to group_id.
PrefixGroupID string `cfg:"prefix_group_id"`
// FormatDLQTopic is a format string to generate DLQ topic name.
// - %s is a placeholder for program name.
// - Default is "finops_%s_dlq"
// - Example is "finops_{{.AppName}}_dlq"
// - It should be exist if DLQ is enabled and topic is not set.
//
// - Available variables:
// - AppName
FormatDLQTopic string `cfg:"format_dlq_topic"`
// Validation is a configuration for validation when consumer initialized.
Validation Validation `cfg:"validation"`
}

// Apply configuration to ConsumerConfig and check validation.
func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig, progName string) (ConsumerConfig, error) {
// configApply configuration to ConsumerConfig and check validation.
func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger logz.Adapter) error {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}

if !consumerConfig.DLQ.Disable && consumerConfig.DLQ.Topic == "" && c.FormatDLQTopic == "" {
consumerConfig.DLQ.Disable = true
logger.Warn("dlq is disabled because topic and format_dlq_topic is not set")
}

// add default topic name for DLQ
if !consumerConfig.DLQ.Disable {

Check failure on line 58 in config.go

View workflow job for this annotation

GitHub Actions / sonarcloud

`if !consumerConfig.DLQ.Disable` has complex nested blocks (complexity: 8) (nestif)
if consumerConfig.DLQ.Topic == "" {
if c.FormatDLQTopic == "" {
c.FormatDLQTopic = "finops_%s_dlq"
return fmt.Errorf("format_dlq_topic is required if dlq topic is not set")
}

consumerConfig.DLQ.Topic = fmt.Sprintf(c.FormatDLQTopic, progName)
var err error
consumerConfig.DLQ.Topic, err = templateRun(c.FormatDLQTopic, map[string]string{"AppName": progName})
if err != nil {
return fmt.Errorf("format_dlq_topic: %w", err)
}
}

if consumerConfig.DLQ.SkipExtra == nil {
Expand All @@ -58,13 +75,17 @@ func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig, progName string)
} else {
consumerConfig.DLQ.SkipExtra[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip
}

if consumerConfig.DLQ.RetryInterval == 0 {
consumerConfig.DLQ.RetryInterval = DefaultRetryInterval
}
}

if err := c.Validation.Validate(consumerConfig); err != nil {
return consumerConfig, fmt.Errorf("validate consumer config: %w", err)
return fmt.Errorf("validate consumer config: %w", err)
}

return consumerConfig, nil
return nil
}

// Validation is a configuration for validation when consumer initialized.
Expand Down Expand Up @@ -108,7 +129,7 @@ func (v GroupIDValidation) Validate(groupID string) error {
return nil
}

func (v Validation) Validate(consumerConfig ConsumerConfig) error {
func (v Validation) Validate(consumerConfig *ConsumerConfig) error {
if err := v.GroupID.Validate(consumerConfig.GroupID); err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package wkafka
import (
"reflect"
"testing"

"github.com/worldline-go/logz"
)

func TestConsumerPreConfig_Apply(t *testing.T) {
Expand Down Expand Up @@ -50,10 +52,12 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := ConsumerPreConfig{
PrefixGroupID: tt.fields.PrefixGroupID,
Validation: tt.fields.Validation,
PrefixGroupID: tt.fields.PrefixGroupID,
Validation: tt.fields.Validation,
FormatDLQTopic: "finops_{{.AppName}}_dlq",
}
got, err := c.Apply(tt.args.consumerConfig, "serviceX")
got := tt.args.consumerConfig
err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{})
if (err != nil) != tt.wantErr {
t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading

0 comments on commit 727f379

Please sign in to comment.