Skip to content

Commit

Permalink
chore: update README
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 7, 2024
1 parent bd9cf6b commit ba9b473
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ if err != nil {
defer client.Close()
```

Now you need to run consumer with a handler function.
There is 2 options to run consumer, batch or single (__WithCallbackBatch__ or __WithCallback__).
Now you need to run consumer with a callback function.
There is 2 options to run consumer, batch or single (__WithCallbackBatch__ or __WithCallback__).
Default decoder is json, but you can change it with __WithDecode__ option.
If you use `[]byte` as data type then raw data will be passed to the callback function, batch consumer like `[][]byte` type.

```go
// example single consumer
Expand Down
38 changes: 38 additions & 0 deletions example/consumer/byte.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package consumer

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/worldline-go/wkafka"
)

func ProcessSingleByte(ctx context.Context, raw []byte) error {
var msg DataSingle
if err := json.Unmarshal(raw, &msg); err != nil {
return err

Check failure on line 15 in example/consumer/byte.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Unmarshal(data []byte, v any) error (wrapcheck)

Check failure on line 15 in example/consumer/byte.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func encoding/json.Unmarshal(data []byte, v any) error (wrapcheck)
}

return ProcessSingle(ctx, msg)
}

func RunExampleSingleByte(ctx context.Context, _ *sync.WaitGroup) error {
client, err := wkafka.New(
ctx, kafkaConfigSingle,
wkafka.WithConsumer(consumeConfigSingle),
wkafka.WithClientInfo("testapp", "v0.1.0"),
)
if err != nil {
return err

Check failure on line 28 in example/consumer/byte.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.New(ctx context.Context, cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)

Check failure on line 28 in example/consumer/byte.go

View workflow job for this annotation

GitHub Actions / sonarcloud

error returned from external package is unwrapped: sig: func github.com/worldline-go/wkafka.New(ctx context.Context, cfg github.com/worldline-go/wkafka.Config, opts ...github.com/worldline-go/wkafka.Option) (*github.com/worldline-go/wkafka.Client, error) (wrapcheck)
}

defer client.Close()

if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingleByte)); err != nil {
return fmt.Errorf("consume: %w", err)
}

return nil
}
13 changes: 7 additions & 6 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (
)

var examples = map[string]func(context.Context, *sync.WaitGroup) error{
"admin_topic": admin.RunExampleTopic,
"admin_partition": admin.RunExamplePartition,
"admin_list": admin.RunExampleList,
"consumer_batch": consumer.RunExampleBatch,
"consumer_single": consumer.RunExampleSingle,
"producer_hook": producer.RunExampleHook,
"admin_topic": admin.RunExampleTopic,
"admin_partition": admin.RunExamplePartition,
"admin_list": admin.RunExampleList,
"consumer_batch": consumer.RunExampleBatch,
"consumer_single": consumer.RunExampleSingle,
"consumer_single_byte": consumer.RunExampleSingleByte,
"producer_hook": producer.RunExampleHook,
}

func getExampleList() []string {
Expand Down

0 comments on commit ba9b473

Please sign in to comment.