Skip to content

Commit

Permalink
feat: dlq process option
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 6, 2024
1 parent d5d779a commit 9c331a8
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 32 deletions.
98 changes: 91 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ This library is using [franz-go](https://github.com/twmb/franz-go).
## Usage

First set the connection config to create a new kafka client.
Main config struct that contains brokers and security settings.
Main config struct that contains brokers, security settings and consumer validation.

```yaml
brokers:
Expand All @@ -38,37 +38,121 @@ security:
algorithm: "" # "SCRAM-SHA-256" or "SCRAM-SHA-512"
user: ""
pass: ""
consumer: # consumer validation and default values
prefix_group_id: "" # add always a prefix to group id
format_dlq_topic: "" # format dead letter topic name, ex: "finops_{{.AppName}}_dlq"
validation:
group_id: # validate group id
enabled: false
rgx_group_id: "" # regex to validate group id ex: "^finops_.*$"
```
### Consumer
For creating a consumer we need to give config while creating a client with a processor struct.
For creating a consumer we need to give additional consumer config when initializing the client.
```yaml
topics: [] # list of topics to subscribe
group_id: "" # group id to subscribe, make is as unique as possible per service
# start offset to consume, 0 is the earliest offset, -1 is the latest offset and more than 0 is the offset number
# group_id has already committed offset then this will be ignored
start_offset: 0
skip: # this is programatically skip, kafka will still consume the message
# example skip topic and offset
mytopic: # topic name to skip
0: # partition number
offsets: # list of offsets to skip
- 31
- 90
before: 20 # skip all offsets before or equal to this offset
# max records to consume per poll, 0 is default value from kafka usually 500
# no need to touch most of the time, but batch consume's count max is max_poll_records
max_poll_records: 0
# max records to consume per batch to give callback function, default is 100
# if this value is more than max_poll_records then max_poll_records will be used
batch_count: 100
dlq:
disabled: false # disable dead letter queue
topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
retry_interval: "10s" # retry time interval of the message if can't be processed
start_offset: 0 # same as start_offset but for dead letter topic
skip: # same as skip but just for dead letter topic and not need to specify topic name
# example skip offset
0:
offsets:
- 31
before: 20
```
Always give the client information so we can view in publish message's headers and kafka UI.
```go
client, err := wkafka.New(
ctx, kafkaConfig,
wkafka.WithConsumer(consumeConfig),
wkafka.WithClientInfo("testapp", "v0.1.0"),
)
if err != nil {
return err
}

defer client.Close()
```

Now you need to run consumer with a handler function.
There is 2 options to run consumer, batch or single (__WithCallbackBatch__ or __WithCallback__).

```go
// example single consumer
if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil {
return fmt.Errorf("consume: %w", err)
}
```

> Check the aditional options for custom decode and precheck.
### Producer

Use consumer client or create without consumer settings, `NewClient` also try to connect to brokers.
Use consumer client or create without consumer settings, `New` also try to connect to brokers.

```go
client, err := wkafka.NewClient(kafkaConfig)
client, err := wkafka.New(kafkaConfig)
if err != nil {
return err
}
defer client.Close()
```

Create a producer based of client to set default values.
Create a producer based of client and specific data type.

> __WithHook__, __WithEncoder__, __WithHeaders__ options are optional.
> Use __WithHook__ to get metadata of the record and modify to produce record.
> TODO add example
```go
producer, err := wkafka.NewProducer[*Data](client, "test", wkafka.WithHook(ProduceHook))
if err != nil {
return err
}

return producer.Produce(ctx, data)
```

## Development

Initialize kafka and redpanda console with docker-compose
Initialize kafka and redpanda console with docker-compose.

```sh
# using "docker compose" command, if you use podman then add compose extension and link docker as podman
make env
```

| Service | Description |
| -------------- | ---------------- |
| localhost:9092 | Kafka broker |
| localhost:7071 | Redpanda console |

Use examples with `EXAMPLE` env variable:

```sh
EXAMPLE=... make example
```
6 changes: 0 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ type GroupIDValidation struct {
Enabled bool `cfg:"enabled"`
// RgxGroupID is a regex pattern to validate RgxGroupID.
RgxGroupID string `cfg:"rgx_group_id"`
// DisableWord boundary check.
DisableWordBoundary bool `cfg:"disable_word_boundary"`
}

func (v GroupIDValidation) Validate(groupID string) error {
Expand All @@ -112,10 +110,6 @@ func (v GroupIDValidation) Validate(groupID string) error {
}

if v.RgxGroupID != "" {
if !v.DisableWordBoundary {
v.RgxGroupID = fmt.Sprintf(`\b%s\b`, v.RgxGroupID)
}

rgx, err := regexp.Compile(v.RgxGroupID)
if err != nil {
return fmt.Errorf("group_id validation regex: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
Validation: Validation{
GroupID: GroupIDValidation{
Enabled: true,
RgxGroupID: `finops_.*`,
RgxGroupID: `^finops_.*$`,
},
},
},
Expand All @@ -45,6 +45,7 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
SkipExtra: map[string]map[int32]Offsets{
"finops_serviceX_dlq": nil,
},
RetryInterval: DefaultRetryInterval,
},
},
},
Expand All @@ -56,12 +57,12 @@ func TestConsumerPreConfig_Apply(t *testing.T) {
Validation: tt.fields.Validation,
FormatDLQTopic: "finops_{{.AppName}}_dlq",
}
got := tt.args.consumerConfig
err := configApply(c, &tt.args.consumerConfig, "serviceX", logz.AdapterNoop{})
if (err != nil) != tt.wantErr {
t.Errorf("ConsumerPreConfig.Apply() error = %v, wantErr %v", err, tt.wantErr)
return
}
got := tt.args.consumerConfig
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConsumerPreConfig.Apply() = %v, want %v", got, tt.want)
}
Expand Down
30 changes: 26 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ type ConsumerConfig struct {
// - Fetching messages from broker, this is not related with batch processing!
MaxPollRecords int `cfg:"max_poll_records"`
// BatchCount is a number of messages processed in a single batch.
// - <= 1 is 1 message per batch.
// - Processing count could be less than BatchCount if the batch is not full.
// - Usable with WithConsumerBatch
// - Default is 100.
BatchCount int `cfg:"batch_count"`
// DLQ is a dead letter queue configuration.
DLQ DLQ `cfg:"dlq"`
Expand Down Expand Up @@ -143,7 +143,7 @@ func skipDLQ(cfg *ConsumerConfig, r *kgo.Record) bool {
return false
}

if dlqCfg.SkipExtra == nil {
if len(dlqCfg.SkipExtra) == 0 {
return false
}

Expand Down Expand Up @@ -192,6 +192,13 @@ func (o *optionConsumer) apply(opts ...OptionConsumer) error {
return nil
}

// dlqProcessBatch to get one message and convert to batch message after that process.
func dlqProcessBatch[T any](fn func(ctx context.Context, msg []T) error) func(ctx context.Context, msg T) error {
return func(ctx context.Context, msg T) error {
return fn(ctx, []T{msg})
}
}

// WithCallbackBatch to set wkafka consumer's callback function.
// - Default is json.Unmarshal, use WithDecode option to add custom decode function.
// - If [][]byte then default decode function will be skipped.
Expand All @@ -210,7 +217,7 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
}

o.ConsumerDLQ = &consumerBatch[T]{
Process: fn,
ProcessDLQ: dlqProcessBatch(fn),
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
Expand Down Expand Up @@ -241,7 +248,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
}

o.ConsumerDLQ = &consumerSingle[T]{
Process: fn,
ProcessDLQ: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: skipDLQ,
Expand Down Expand Up @@ -298,3 +305,18 @@ func WithPreCheck(fn func(ctx context.Context, r *kgo.Record) error) OptionConsu
return nil
}
}

// WithCallbackDLQ to set wkafka consumer's callback function for DLQ.
// - Use this option if you want to process DLQ messages in different function.
func WithCallbackDLQ[T any](fn func(ctx context.Context, msg T) error) OptionConsumer {
return func(o *optionConsumer) error {
switch v := o.ConsumerDLQ.(type) {
case *consumerBatch[T]:
v.ProcessDLQ = fn
case *consumerSingle[T]:
v.ProcessDLQ = fn
}

return nil
}
}
Loading

0 comments on commit 9c331a8

Please sign in to comment.