diff --git a/README.md b/README.md index 4eb9cab..e7aad35 100644 --- a/README.md +++ b/README.md @@ -99,8 +99,10 @@ if err != nil { defer client.Close() ``` -Now you need to run consumer with a handler function. -There is 2 options to run consumer, batch or single (__WithCallbackBatch__ or __WithCallback__). +Now you need to run consumer with a callback function. +There is 2 options to run consumer, batch or single (__WithCallbackBatch__ or __WithCallback__). +Default decoder is json, but you can change it with __WithDecode__ option. +If you use `[]byte` as data type then raw data will be passed to the callback function, batch consumer like `[][]byte` type. ```go // example single consumer diff --git a/example/consumer/byte.go b/example/consumer/byte.go new file mode 100644 index 0000000..00f0efd --- /dev/null +++ b/example/consumer/byte.go @@ -0,0 +1,38 @@ +package consumer + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/worldline-go/wkafka" +) + +func ProcessSingleByte(ctx context.Context, raw []byte) error { + var msg DataSingle + if err := json.Unmarshal(raw, &msg); err != nil { + return err + } + + return ProcessSingle(ctx, msg) +} + +func RunExampleSingleByte(ctx context.Context, _ *sync.WaitGroup) error { + client, err := wkafka.New( + ctx, kafkaConfigSingle, + wkafka.WithConsumer(consumeConfigSingle), + wkafka.WithClientInfo("testapp", "v0.1.0"), + ) + if err != nil { + return err + } + + defer client.Close() + + if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingleByte)); err != nil { + return fmt.Errorf("consume: %w", err) + } + + return nil +} diff --git a/example/main.go b/example/main.go index 6bfd4c7..84a052c 100644 --- a/example/main.go +++ b/example/main.go @@ -15,12 +15,13 @@ import ( ) var examples = map[string]func(context.Context, *sync.WaitGroup) error{ - "admin_topic": admin.RunExampleTopic, - "admin_partition": admin.RunExamplePartition, - "admin_list": admin.RunExampleList, - "consumer_batch": consumer.RunExampleBatch, - "consumer_single": consumer.RunExampleSingle, - "producer_hook": producer.RunExampleHook, + "admin_topic": admin.RunExampleTopic, + "admin_partition": admin.RunExamplePartition, + "admin_list": admin.RunExampleList, + "consumer_batch": consumer.RunExampleBatch, + "consumer_single": consumer.RunExampleSingle, + "consumer_single_byte": consumer.RunExampleSingleByte, + "producer_hook": producer.RunExampleHook, } func getExampleList() []string {