Skip to content

Commit

Permalink
fix: compression added
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Oct 20, 2023
1 parent ad9c209 commit 60df08a
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 2 deletions.
7 changes: 6 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ func NewClient(cfg Config, opts ...Option) (*Client, error) {
opt(&o)
}

compressions, err := compressionOpts(cfg.Compression)
if err != nil {
return nil, err
}

kgoOpt := []kgo.Opt{
kgo.SeedBrokers(cfg.Brokers...),
kgo.ClientID(o.ClientID),
kgo.ProducerBatchCompression(kgo.NoCompression()),
kgo.ProducerBatchCompression(compressions...),
// kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)),
}

Expand Down
46 changes: 45 additions & 1 deletion codec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,50 @@
package wkafka

import "encoding/json"
import (
"encoding/json"
"fmt"

"github.com/twmb/franz-go/pkg/kgo"
)

// Compression

Check failure on line 10 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Comment should end in a period (godot)

Check failure on line 10 in codec.go

View workflow job for this annotation

GitHub Actions / sonarcloud

Comment should end in a period (godot)
func compressionOpts(c []string) ([]kgo.CompressionCodec, error) {
if err := compressionVerify(c); err != nil {
return nil, err
}

opts := make([]kgo.CompressionCodec, 0, len(c)+1)
for _, v := range c {
switch v {
case "gzip":
opts = append(opts, kgo.GzipCompression())
case "snappy":
opts = append(opts, kgo.SnappyCompression())
case "lz4":
opts = append(opts, kgo.Lz4Compression())
case "zstd":
opts = append(opts, kgo.ZstdCompression())
}
}

opts = append(opts, kgo.NoCompression())

return opts, nil
}

func compressionVerify(c []string) error {
for _, v := range c {
switch v {
case "gzip", "snappy", "lz4", "zstd":
default:
return fmt.Errorf("%w: %q", ErrInvalidCompression, v)
}
}

return nil
}

// Codec is use to marshal/unmarshal data to bytes.

type codecJSON[T any] struct{}

Expand Down
54 changes: 54 additions & 0 deletions codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package wkafka

import (
"reflect"
"testing"

"github.com/twmb/franz-go/pkg/kgo"
)

func Test_compressionOpts(t *testing.T) {
tests := []struct {
name string
c []string
want []kgo.CompressionCodec
wantErr bool
}{
{
name: "valid compression options",
c: []string{"gzip", "snappy", "lz4", "zstd"},
want: []kgo.CompressionCodec{
kgo.GzipCompression(),
kgo.SnappyCompression(),
kgo.Lz4Compression(),
kgo.ZstdCompression(),
kgo.NoCompression(),
},
wantErr: false,
},
{
name: "invalid compression option",
c: []string{"gzip", "invalid", "lz4"},
want: nil,
wantErr: true,
},
{
name: "empty compression option",
c: []string{},
want: []kgo.CompressionCodec{kgo.NoCompression()},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := compressionOpts(tt.c)
if (err != nil) != tt.wantErr {
t.Errorf("compressionOpts() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("compressionOpts() = %v, want %v", got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,12 @@ type Config struct {
// Required at least one broker. Default is "localhost:9092" for local development.
Brokers []string `cfg:"brokers" default:"localhost:9092"`
Security SecurityConfig `cfg:"security"`
// Compression is chosen in the order preferred based on broker support.
// The default is to use no compression.
// Available:
// - gzip
// - snappy
// - lz4
// - zstd
Compression []string
}
2 changes: 2 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
ErrNilData = fmt.Errorf("nil data")
// ErrSkip is use to skip message in the PreCheck hook.
ErrSkip = fmt.Errorf("skip message")
// ErrInvalidCompression for producer setting check.
ErrInvalidCompression = fmt.Errorf("invalid compression")
)

func wrapErr(r *kgo.Record, err error) error {
Expand Down

0 comments on commit 60df08a

Please sign in to comment.