Skip to content

Commit

Permalink
feat: dql publish
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 5, 2024
1 parent 6337921 commit 5e66b66
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 72 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ lint: .golangci.yml bin/golangci-lint-$(GOLANGCI_LINT_VERSION) ## Lint Go files
test: ## Run unit tests
@go test -v -race ./...

test-without-cache: ## Run unit tests without cache
@go test -count=1 -v -race ./...

coverage: ## Run unit tests with coverage
@go test -v -race -cover -coverpkg=./... -coverprofile=coverage.out -covermode=atomic ./...
@go tool cover -func=coverage.out
Expand Down
112 changes: 92 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/sync/errgroup"
)

type Client struct {
Kafka *kgo.Client
Kafka *kgo.Client
KafkaDLQ *kgo.Client

clientID []byte
consumerConfig ConsumerConfig
Expand All @@ -19,11 +21,48 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
o := options{
ClientID: DefaultClientID,
AutoTopicCreation: true,
AppName: idProgname,
}
for _, opt := range opts {
opt(&o)

o.apply(opts...)

// validate client and add defaults to consumer config
consumerConfig, err := cfg.Consumer.Apply(o.ConsumerConfig, o.AppName)
if err != nil {
return nil, fmt.Errorf("validate config: %w", err)
}

o.ConsumerConfig = consumerConfig

kgoClient, err := newClient(ctx, cfg, o)
if err != nil {
return nil, err
}

var kgoClientDLQ *kgo.Client
if o.ConsumerEnabled {
kgoClientDLQ, err = newClient(ctx, cfg, o.WithDLQ())
if err != nil {
return nil, err
}
}

cl := &Client{
Kafka: kgoClient,
KafkaDLQ: kgoClientDLQ,
clientID: []byte(o.ClientID),
consumerConfig: o.ConsumerConfig,
}

// main and dlq use same config, ask for validation once
if err := cl.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("connection to kafka brokers: %w", err)
}

return cl, nil
}

func newClient(ctx context.Context, cfg Config, o options) (*kgo.Client, error) {

Check failure on line 65 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
compressions, err := compressionOpts(cfg.Compressions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -65,46 +104,52 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
return nil, fmt.Errorf("validate consumer config: %w", err)
}

var startOffsetCfg int64
if o.DLQ {
startOffsetCfg = o.ConsumerConfig.DLQ.StartOffset
} else {
startOffsetCfg = o.ConsumerConfig.StartOffset
}

// start offset settings
startOffset := kgo.NewOffset()
switch v := o.ConsumerConfig.StartOffset; {
switch v := startOffsetCfg; {
case v == 0 || v == -2 || v < -2:
startOffset = startOffset.AtStart()
case v == -1:
startOffset = startOffset.AtEnd()
default:
startOffset = startOffset.At(o.ConsumerConfig.StartOffset)
startOffset = startOffset.At(startOffsetCfg)
}

kgoOpt = append(kgoOpt,
kgo.DisableAutoCommit(),
kgo.RequireStableFetchOffsets(),
kgo.ConsumerGroup(o.ConsumerConfig.GroupID),
kgo.ConsumeTopics(o.ConsumerConfig.Topics...),
kgo.ConsumeResetOffset(startOffset),
)

if o.DLQ {
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.DLQ.Topic))
} else {
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.Topics...))
}
}

// Add custom options
kgoOpt = append(kgoOpt, o.KGOOptions...)
if o.DLQ {
kgoOpt = append(kgoOpt, o.KGOOptionsDLQ...)
} else {
kgoOpt = append(kgoOpt, o.KGOOptions...)
}

// Create kafka client
kgoClient, err := kgo.NewClient(kgoOpt...)
if err != nil {
return nil, fmt.Errorf("create kafka client: %w", err)
}

cl := &Client{
Kafka: kgoClient,
clientID: []byte(o.ClientID),
consumerConfig: o.ConsumerConfig,
}

if err := cl.Kafka.Ping(ctx); err != nil {
return nil, fmt.Errorf("connection to kafka brokers: %w", err)
}

return cl, nil
return kgoClient, nil
}

func (c *Client) Close() {
Expand All @@ -131,8 +176,35 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
return fmt.Errorf("consumer is nil: %w", ErrNotImplemented)
}

if err := o.Consumer.Consume(ctx, c.Kafka); err != nil {
return fmt.Errorf("failed to consume: %w", err)
if o.ConsumerDLQ == nil {
if err := o.Consumer.Consume(ctx, c.Kafka); err != nil {
return fmt.Errorf("failed to consume: %w", err)
}

return nil
}

// consume main and dlq concurrently
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
if err := o.Consumer.Consume(ctx, c.Kafka); err != nil {
return fmt.Errorf("failed to consume: %w", err)
}

return nil
})

g.Go(func() error {
if err := o.ConsumerDLQ.Consume(ctx, c.KafkaDLQ); err != nil {
return fmt.Errorf("failed to consume DLQ: %w", err)
}

return nil
})

if err := g.Wait(); err != nil {
return err

Check failure on line 207 in client.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func (*golang.org/x/sync/errgroup.Group).Wait() error (wrapcheck)
}

return nil
Expand Down
43 changes: 38 additions & 5 deletions clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,35 @@ import (
var DefaultBatchCount = 100

type options struct {
AppName string
ConsumerEnabled bool
ConsumerConfig ConsumerConfig
// Consumer consumer
ClientID string
InfoVersion string
KGOOptions []kgo.Opt
KGOOptionsDLQ []kgo.Opt
AutoTopicCreation bool
DLQ bool
}

func (o *options) apply(opts ...Option) {
for _, opt := range opts {
opt(o)
}
}

func (o options) WithDLQ() options {
o.DLQ = true

return o
}

type Option func(*options)

// WithClientID to set client_id in kafka server.
// Default is using DefaultClientID variable.
//
// No need to set most of time.
// No need to set most of time!
func WithClientID(clientID string) Option {
return func(o *options) {
o.ClientID = clientID
Expand All @@ -32,9 +46,20 @@ func WithClientID(clientID string) Option {
// WithClientInfo to set client_id in kafka server.
// Not usable with WithClientID option.
// - appname:version@hostname
func WithClientInfo(name, version string) Option {
func WithClientInfo(appName, version string) Option {
return func(o *options) {
o.ClientID = name + ":" + version + "@" + idHostname
o.ClientID = appName + ":" + version + "@" + idHostname
o.AppName = appName
}
}

// WithAppName to set app name in kafka server.
// Default is using idProgname variable.
//
// Use WithClientInfo instead if you want to set version and appname.
func WithAppName(appName string) Option {
return func(o *options) {
o.AppName = appName
}
}

Expand All @@ -50,9 +75,17 @@ func WithAutoTopicCreation(v bool) Option {
}
}

// WithKGOOptions to set kgo options.
func WithKGOOptions(opts ...kgo.Opt) Option {
return func(o *options) {
o.KGOOptions = opts
o.KGOOptions = append(o.KGOOptions, opts...)
}
}

// WithKGOOptionsDLQ to set kgo options for DLQ client.
func WithKGOOptionsDLQ(opts ...kgo.Opt) Option {
return func(o *options) {
o.KGOOptionsDLQ = append(o.KGOOptionsDLQ, opts...)
}
}

Expand Down
27 changes: 25 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,39 @@ type Config struct {
type ConsumerPreConfig struct {
// PrefixGroupID add prefix to group_id.
PrefixGroupID string `cfg:"prefix_group_id"`

// FormatDLQTopic is a format string to generate DLQ topic name.
// - %s is a placeholder for program name.
// - Default is "finops_%s_dlq"
FormatDLQTopic string `cfg:"format_dlq_topic"`
// Validation is a configuration for validation when consumer initialized.
Validation Validation `cfg:"validation"`
}

// Apply configuration to ConsumerConfig and check validation.
func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig) (ConsumerConfig, error) {
func (c ConsumerPreConfig) Apply(consumerConfig ConsumerConfig, progName string) (ConsumerConfig, error) {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}

// add default topic name for DLQ
if !consumerConfig.DLQ.Disable {
if consumerConfig.DLQ.Topic == "" {
if c.FormatDLQTopic == "" {
c.FormatDLQTopic = "finops_%s_dlq"
}

consumerConfig.DLQ.Topic = fmt.Sprintf(c.FormatDLQTopic, progName)
}

if consumerConfig.DLQ.SkipExtra == nil {
consumerConfig.DLQ.SkipExtra = map[string]map[int32]Offsets{
consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip,
}
} else {
consumerConfig.DLQ.SkipExtra[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip
}
}

if err := c.Validation.Validate(consumerConfig); err != nil {
return consumerConfig, fmt.Errorf("validate consumer config: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
},
want: ConsumerConfig{
GroupID: "finops_test",
DLQ: DLQ{
Topic: "finops_serviceX_dlq",
SkipExtra: map[string]map[int32]Offsets{
"finops_serviceX_dlq": nil,
},
},
},
},
}
Expand All @@ -47,7 +53,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
PrefixGroupID: tt.fields.PrefixGroupID,
Validation: tt.fields.Validation,
}
got, err := c.Apply(tt.args.consumerConfig)
got, err := c.Apply(tt.args.consumerConfig, "serviceX")
if (err != nil) != tt.wantErr {
t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading

0 comments on commit 5e66b66

Please sign in to comment.