Skip to content

Commit

Permalink
fix: update packages
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Sep 26, 2024
1 parent adf8acd commit f05e6fd
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 62 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ topics: [] # list of topics to subscribe
group_id: "" # group id to subscribe, make is as unique as possible per service
# start offset to consume, 0 is the earliest offset, -1 is the latest offset and more than 0 is the offset number
# group_id has already committed offset then this will be ignored
start_offset: 0
start_offset: 0 # -1 to start end of the offsets
skip: # this is programatically skip, kafka will still consume the message
# example skip topic and offset
mytopic: # topic name to skip
Expand All @@ -72,10 +72,10 @@ max_poll_records: 0
# if this value is more than max_poll_records then max_poll_records will be used
batch_count: 100
dlq:
disabled: false # disable dead letter queue
disable: false # disable dead letter queue
topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
retry_interval: "10s" # retry time interval of the message if can't be processed, default is 10s
start_offset: 0 # same as start_offset but for dead letter topic
start_offset: 0 # -1 to start end of the offsets
skip: # same as skip but just for dead letter topic and not need to specify topic name
# example skip offset
0:
Expand Down
3 changes: 1 addition & 2 deletions example/admin/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package admin
import (
"context"
"log/slog"
"sync"

"github.com/worldline-go/wkafka"
)
Expand All @@ -14,7 +13,7 @@ var (
}
)

func RunExampleList(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleList(ctx context.Context) error {
client, err := wkafka.New(ctx, kafkaConfigList, wkafka.WithPingRetry(true))
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions example/admin/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package admin
import (
"context"
"log/slog"
"sync"

"github.com/worldline-go/wkafka"
)
Expand All @@ -14,7 +13,7 @@ var (
}
)

func RunExamplePartition(ctx context.Context, _ *sync.WaitGroup) error {
func RunExamplePartition(ctx context.Context) error {
client, err := wkafka.New(ctx, kafkaConfigPartition)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions example/admin/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package admin
import (
"context"
"log/slog"
"sync"

"github.com/worldline-go/wkafka"
)
Expand All @@ -14,7 +13,7 @@ var (
}
)

func RunExampleTopic(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleTopic(ctx context.Context) error {
client, err := wkafka.New(ctx, kafkaConfigTopic)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions example/consumer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/worldline-go/wkafka"
)
Expand Down Expand Up @@ -56,7 +55,7 @@ func ProcessBatch(_ context.Context, msg []DataBatch) error {
return nil
}

func RunExampleBatch(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleBatch(ctx context.Context) error {
client, err := wkafka.New(
ctx, kafkaConfigBatch,
wkafka.WithConsumer(consumeConfigBatch),
Expand Down
3 changes: 1 addition & 2 deletions example/consumer/batcherr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/rs/zerolog/log"
"github.com/worldline-go/wkafka"
Expand Down Expand Up @@ -70,7 +69,7 @@ func ProcessBatchErr(ctx context.Context, msg []DataBatchErr) error {
return nil
}

func RunExampleBatchErr(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleBatchErr(ctx context.Context) error {
client, err := wkafka.New(
ctx, kafkaConfigBatchErr,
wkafka.WithConsumer(consumeConfigBatchErr),
Expand Down
3 changes: 1 addition & 2 deletions example/consumer/byte.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/worldline-go/wkafka"
)
Expand All @@ -18,7 +17,7 @@ func ProcessSingleByte(ctx context.Context, raw []byte) error {
return ProcessSingle(ctx, msg)
}

func RunExampleSingleByte(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleSingleByte(ctx context.Context) error {
client, err := wkafka.New(
ctx, kafkaConfigSingle,
wkafka.WithConsumer(consumeConfigSingle),
Expand Down
9 changes: 4 additions & 5 deletions example/consumer/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"fmt"
"log/slog"
"net/http"
"sync"
"time"

"connectrpc.com/grpcreflect"
"github.com/worldline-go/initializer"
"github.com/rakunlabs/into"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +59,7 @@ func ProcessSingle(_ context.Context, msg DataSingle) error {
return nil
}

func RunExampleSingle(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleSingle(ctx context.Context) error {
client, err := wkafka.New(
ctx, kafkaConfigSingle,
wkafka.WithConsumer(consumeConfigSingle),
Expand All @@ -79,7 +78,7 @@ func RunExampleSingle(ctx context.Context, _ *sync.WaitGroup) error {
return nil
}

func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleSingleWithHandler(ctx context.Context) error {
client, err := wkafka.New(
ctx, kafkaConfigSingle,
wkafka.WithConsumer(consumeConfigSingle),
Expand All @@ -105,7 +104,7 @@ func RunExampleSingleWithHandler(ctx context.Context, _ *sync.WaitGroup) error {
Handler: h2c.NewHandler(mux, &http2.Server{}),
}

initializer.Shutdown.Add(s.Close)
into.ShutdownAdd(s.Close, "http server")

g, ctx := errgroup.WithContext(ctx)

Expand Down
34 changes: 34 additions & 0 deletions example/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module github.com/worldline-go/wkafka/example

go 1.22

replace github.com/worldline-go/wkafka => ../

require (
connectrpc.com/grpcreflect v1.2.0
github.com/rakunlabs/into v0.4.0
github.com/rakunlabs/logi v0.3.3
github.com/rs/zerolog v1.33.0
github.com/worldline-go/wkafka v0.0.0-00010101000000-000000000000
golang.org/x/net v0.29.0
golang.org/x/sync v0.8.0
)

require (
connectrpc.com/connect v1.17.0 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lmittmann/tint v1.0.5 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/twmb/franz-go v1.17.1 // indirect
github.com/twmb/franz-go/pkg/kadm v1.13.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
github.com/twmb/tlscfg v1.2.1 // indirect
github.com/worldline-go/logz v0.5.1 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
63 changes: 63 additions & 0 deletions example/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk=
connectrpc.com/connect v1.17.0/go.mod h1:0292hj1rnx8oFrStN7cB4jjVBeqs+Yx5yDIC2prWDO8=
connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U=
connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw=
github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rakunlabs/into v0.4.0 h1:FcG/EWWovQZjLrTb83ic/M0i1GndtMQVqA1nY5e+koU=
github.com/rakunlabs/into v0.4.0/go.mod h1:1fWgREm1FXNLCnfFPTPf6mCzgmi9jlqevYQbWLMnXeo=
github.com/rakunlabs/logi v0.3.3 h1:8uSmch05+yULJiDWQzaKmFy6qd/iWTUtSB50+8TkkFU=
github.com/rakunlabs/logi v0.3.3/go.mod h1:kGFfpXq6EmJ2UcC74lzTwcIqAeJY7rBFQj4p3KFkkl8=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8XMQBEC+60=
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/tlscfg v1.2.1 h1:IU2efmP9utQEIV2fufpZjPq7xgcZK4qu25viD51BB44=
github.com/twmb/tlscfg v1.2.1/go.mod h1:GameEQddljI+8Es373JfQEBvtI4dCTLKWGJbqT2kErs=
github.com/worldline-go/logz v0.5.1 h1:jpbKUd1FbecEzs7/p1zFH7tFtQ5demmc89Fa2YTHL/Q=
github.com/worldline-go/logz v0.5.1/go.mod h1:CbiHbwLTA6oKud62HbmfwG5fgSZ6ppVbkSdRv44X8Vc=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
13 changes: 8 additions & 5 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"log/slog"
"os"
"sort"
"sync"

"github.com/worldline-go/initializer"
"github.com/worldline-go/logz"
"github.com/rakunlabs/into"
"github.com/rakunlabs/logi"

"github.com/worldline-go/wkafka/example/admin"
"github.com/worldline-go/wkafka/example/consumer"
"github.com/worldline-go/wkafka/example/producer"
)

var examples = map[string]func(context.Context, *sync.WaitGroup) error{
var examples = map[string]func(context.Context) error{
"admin_topic": admin.RunExampleTopic,
"admin_partition": admin.RunExamplePartition,
"admin_list": admin.RunExampleList,
Expand Down Expand Up @@ -53,5 +53,8 @@ func main() {
return
}

initializer.Init(run, initializer.WithLogger(initializer.Slog), initializer.WithOptionsLogz(logz.WithCaller(false)))
into.Init(
run,
into.WithLogger(logi.InitializeLog(logi.WithCaller(false))),
)
}
3 changes: 1 addition & 2 deletions example/producer/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package producer

import (
"context"
"sync"

"github.com/worldline-go/wkafka"
)
Expand Down Expand Up @@ -31,7 +30,7 @@ func ProduceHook(d *Data, r *wkafka.Record) error {
return nil
}

func RunExampleHook(ctx context.Context, _ *sync.WaitGroup) error {
func RunExampleHook(ctx context.Context) error {
client, err := wkafka.New(ctx, kafkaConfig)
if err != nil {
return err
Expand Down
21 changes: 8 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,29 @@ module github.com/worldline-go/wkafka
go 1.22

require (
connectrpc.com/connect v1.16.2
connectrpc.com/grpcreflect v1.2.0
connectrpc.com/connect v1.17.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/rs/zerolog v1.32.0
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.9.0
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/tlscfg v1.2.1
github.com/worldline-go/initializer v0.3.2
github.com/worldline-go/logz v0.5.0
golang.org/x/net v0.25.0
golang.org/x/sync v0.6.0
github.com/worldline-go/logz v0.5.1
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rakunlabs/logi v0.2.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit f05e6fd

Please sign in to comment.