Skip to content

Commit

Permalink
fix: partiton revoked on single
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 6, 2024
1 parent 727f379 commit d5d779a
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 75 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ env-logs: ## Show env logs
env-down: ## Stop env
docker compose -p wkafka down

run-example: LOG_LEVEL ?= debug
run-example: ## Run example
@go run ./example/main.go
LOG_LEVEL=$(LOG_LEVEL) go run ./example/main.go

.golangci.yml:
@$(MAKE) golangci
Expand Down
33 changes: 25 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
)

type Client struct {
Kafka *kgo.Client
KafkaDLQ *kgo.Client
Kafka *kgo.Client
KafkaDLQ *kgo.Client
partitionHandler *partitionHandler
partitionHandlerDLQ *partitionHandler

clientID []byte
consumerConfig *ConsumerConfig
Expand Down Expand Up @@ -122,13 +124,27 @@ func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, erro
startOffset = startOffset.At(startOffsetCfg)
}

// create partition handler
var partitionH *partitionHandler
if isDLQ {
c.partitionHandlerDLQ = &partitionHandler{
logger: c.logger,
}
partitionH = c.partitionHandlerDLQ
} else {
c.partitionHandler = &partitionHandler{
logger: c.logger,
}
partitionH = c.partitionHandler
}

kgoOpt = append(kgoOpt,
kgo.DisableAutoCommit(),
kgo.RequireStableFetchOffsets(),
kgo.ConsumerGroup(o.ConsumerConfig.GroupID),
kgo.ConsumeResetOffset(startOffset),
kgo.OnPartitionsLost(partitionLost(c)),
kgo.OnPartitionsRevoked(partitionRevoked(c)),
kgo.OnPartitionsLost(partitionLost(partitionH)),
kgo.OnPartitionsRevoked(partitionRevoked(partitionH)),
)

if isDLQ {
Expand All @@ -143,14 +159,14 @@ func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, erro
}
}

// Add custom options
// add custom options
if isDLQ {
kgoOpt = append(kgoOpt, o.KGOOptionsDLQ...)
} else {
kgoOpt = append(kgoOpt, o.KGOOptions...)
}

// Create kafka client
// create kafka client
kgoClient, err := kgo.NewClient(kgoOpt...)
if err != nil {
return nil, fmt.Errorf("create kafka client: %w", err)
Expand All @@ -168,8 +184,9 @@ func (c *Client) Close() {
}
}

// Consume starts consuming messages from kafka.
// Consume starts consuming messages from kafka and blocks until context is done or an error occurs.
// - Only works if client is created with consumer config.
// - Just run one time.
func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...OptionConsumer) error {
o := optionConsumer{
Client: c,
Expand All @@ -183,7 +200,7 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
}

if o.Consumer == nil {
return fmt.Errorf("consumer is nil: %w", ErrNotImplemented)
return fmt.Errorf("consumer is nil: %w", errNotImplemented)
}

if o.ConsumerDLQ == nil {
Expand Down
9 changes: 9 additions & 0 deletions clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,12 @@ func WithLogger(logger logz.Adapter) Option {
o.Logger = logger
}
}

// WithNoLogger to disable logger.
func WithNoLogger(v bool) Option {
return func(o *options) {
if v {
o.Logger = logz.AdapterNoop{}
}
}
}
2 changes: 1 addition & 1 deletion codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func compressionVerify(c []string) error {
switch v {
case "gzip", "snappy", "lz4", "zstd":
default:
return fmt.Errorf("%w: %q", ErrInvalidCompression, v)
return fmt.Errorf("invalid compression: %q", v)
}
}

Expand Down
52 changes: 28 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,23 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
decode, produceDLQ := getDecodeProduceDLQ[T](o)

o.Consumer = &consumerBatch[T]{
Process: fn,
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Logger: o.Client.logger,
Process: fn,
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandler,
}

o.ConsumerDLQ = &consumerBatch[T]{
Process: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
IsDLQ: true,
Logger: o.Client.logger,
Process: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
}

return nil
Expand All @@ -229,21 +231,23 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
decode, produceDLQ := getDecodeProduceDLQ[T](o)

o.Consumer = &consumerSingle[T]{
Process: fn,
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Logger: o.Client.logger,
Process: fn,
Decode: decode,
ProduceDLQ: produceDLQ,
Cfg: o.ConsumerConfig,
Skip: skip,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandler,
}

o.ConsumerDLQ = &consumerSingle[T]{
Process: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
IsDLQ: true,
Logger: o.Client.logger,
Process: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
}

return nil
Expand Down
15 changes: 8 additions & 7 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ type consumerBatch[T any] struct {
Cfg ConsumerConfig
Decode func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
IsDLQ bool
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
PartitionHandler *partitionHandler
IsDLQ bool
}

func (c *consumerBatch[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Record) error) {
Expand All @@ -30,7 +31,7 @@ func (c *consumerBatch[T]) Consume(ctx context.Context, cl *kgo.Client) error {
for {
fetch := cl.PollRecords(ctx, c.Cfg.MaxPollRecords)
if fetch.IsClientClosed() {
return ErrClientClosed
return errClientClosed
}

if err := fetch.Err(); err != nil {
Expand Down
39 changes: 31 additions & 8 deletions consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ type consumerSingle[T any] struct {
Cfg ConsumerConfig
Decode func(raw []byte, r *kgo.Record) (T, error)
// PreCheck is a function that is called before the callback and decode.
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
IsDLQ bool
PreCheck func(ctx context.Context, r *kgo.Record) error
Option optionConsumer
ProduceDLQ func(ctx context.Context, err error, records []*kgo.Record) error
Skip func(cfg *ConsumerConfig, r *kgo.Record) bool
Logger logz.Adapter
PartitionHandler *partitionHandler
IsDLQ bool
}

func (c *consumerSingle[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Record) error) {
Expand All @@ -28,9 +29,12 @@ func (c *consumerSingle[T]) setPreCheck(fn func(ctx context.Context, r *kgo.Reco

func (c *consumerSingle[T]) Consume(ctx context.Context, cl *kgo.Client) error {
for {
// flush the partition handler, it will be ready next poll
c.PartitionHandler.Flush()

fetch := cl.PollRecords(ctx, c.Cfg.MaxPollRecords)
if fetch.IsClientClosed() {
return ErrClientClosed
return errClientClosed
}

if err := fetch.Err(); err != nil {
Expand All @@ -56,18 +60,32 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch
for iter := fetch.RecordIter(); !iter.Done(); {
r := iter.Next()

// listening DLQ topics
// check partition is revoked
if c.PartitionHandler.IsRevokedRecord(r) {
continue
}

if c.IsDLQ {
// listening DLQ topics
// check partition is revoked and not commit it!
// when error return than it will not be committed
if err := c.iterationDLQ(ctx, r); err != nil {
if errors.Is(err, errPartitionRevoked) {
// don't commit revoked record
continue
}

return wrapErr(r, err, c.IsDLQ)
}
} else {
// listening main topics
// checking revoked partition already on above no need to check again
if err := c.iterationMain(ctx, r); err != nil {
return wrapErr(r, err, c.IsDLQ)
}
}

// commit if not see any error
if err := cl.CommitRecords(ctx, r); err != nil {
return wrapErr(r, fmt.Errorf("commit records failed: %w", err), c.IsDLQ)
}
Expand All @@ -77,6 +95,7 @@ func (c *consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch
}

// iterationDLQ is used to listen DLQ topics, error usually comes from context cancellation.
// any kind of error will be retry with interval.
func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) error {
wait := waitRetry{
Interval: c.Cfg.DLQ.RetryInterval,
Expand All @@ -89,6 +108,10 @@ func (c *consumerSingle[T]) iterationDLQ(ctx context.Context, r *kgo.Record) err
default:
}

if c.PartitionHandler.IsRevokedRecord(r) {
return errPartitionRevoked
}

if err := c.iterationRecord(ctx, r); err != nil {
errOrg, _ := isDQLError(err)
errWrapped := wrapErr(r, errOrg, c.IsDLQ)
Expand Down
23 changes: 10 additions & 13 deletions env/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@ version: '3.8'
services:
kafka:
image: docker.io/bitnami/kafka:3.5.1
# ports:
# - "9092:9092"
network_mode: host
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,INTERNAL://kafka:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
redpanda:
image: docker.io/redpandadata/console:v2.3.5
# ports:
# - "7071:7071"
network_mode: host
image: docker.io/redpandadata/console:v2.3.8
ports:
- "7071:7071"
environment:
- KAFKA_BROKERS=localhost:9092
- KAFKA_BROKERS=kafka:9094
- SERVER_LISTENPORT=7071
8 changes: 4 additions & 4 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
)

var (
ErrNotImplemented = fmt.Errorf("not implemented")
ErrClientClosed = fmt.Errorf("client closed")
errNotImplemented = fmt.Errorf("not implemented")
errClientClosed = fmt.Errorf("client closed")
errPartitionRevoked = fmt.Errorf("partition revoked")

// ErrSkip is use to skip message in the PreCheck hook or Decode function.
ErrSkip = fmt.Errorf("skip message")
// ErrInvalidCompression for producer setting check.
ErrInvalidCompression = fmt.Errorf("invalid compression")
// ErrDLQ use with callback function to send message to DLQ topic.
ErrDLQ = fmt.Errorf("send to DLQ")
)
Expand Down
5 changes: 5 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log/slog"
"os"
"sort"
"sync"

"github.com/worldline-go/initializer"
Expand All @@ -26,6 +27,8 @@ func getExampleList() []string {
exampleNames = append(exampleNames, k)
}

sort.Strings(exampleNames)

return exampleNames
}

Expand All @@ -41,6 +44,8 @@ func main() {
run := examples[exampleName]
if run == nil {
slog.Error("unknown example", slog.String("example", exampleName))

return
}

initializer.Init(run, initializer.WithOptionsLogz(logz.WithCaller(false)))
Expand Down
Loading

0 comments on commit d5d779a

Please sign in to comment.