Skip to content

Commit

Permalink
feat: wkafka handler ui
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Oct 2, 2024
1 parent f05e6fd commit 5df6d6b
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 137 deletions.
40 changes: 8 additions & 32 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
LOCAL_BIN_DIR := $(PWD)/bin

## golangci configuration
GOLANGCI_CONFIG_URL := https://raw.githubusercontent.com/worldline-go/guide/main/lint/.golangci.yml
GOLANGCI_LINT_VERSION := v1.55.2

.DEFAULT_GOAL := help

.PHONY: generate
generate: ## Generate gRPC code
cd handler/proto && buf generate --path wkafka/wkafka.proto
@echo "> Successfully generated gRPC code"

.PHONY: env
env: ## Start env
docker compose -p wkafka -f env/docker-compose.yml up -d
Expand All @@ -31,21 +30,10 @@ example: ## Run example
ci-run: ## Run CI in local with act
act -j sonarcloud

.golangci.yml:
@$(MAKE) golangci

.PHONY: golangci
golangci: ## Download .golangci.yml file
@curl --insecure -o .golangci.yml -L'#' $(GOLANGCI_CONFIG_URL)

bin/golangci-lint-$(GOLANGCI_LINT_VERSION):
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(LOCAL_BIN_DIR) $(GOLANGCI_LINT_VERSION)
@mv $(LOCAL_BIN_DIR)/golangci-lint $(LOCAL_BIN_DIR)/golangci-lint-$(GOLANGCI_LINT_VERSION)

.PHONY: lint
lint: .golangci.yml bin/golangci-lint-$(GOLANGCI_LINT_VERSION) ## Lint Go files
@$(LOCAL_BIN_DIR)/golangci-lint-$(GOLANGCI_LINT_VERSION) --version
@GOPATH="$(shell dirname $(PWD))" $(LOCAL_BIN_DIR)/golangci-lint-$(GOLANGCI_LINT_VERSION) run ./...
lint: ## Lint Go files
golangci-lint--version
GOPATH="$(shell dirname $(PWD))" golangci-lint run ./...

.PHONY: test
test: ## Run unit tests
Expand All @@ -64,18 +52,6 @@ coverage: ## Run unit tests with coverage
@go test -v -race -cover -coverpkg=./... -coverprofile=coverage.out -covermode=atomic ./...
@go tool cover -func=coverage.out

.PHONY: html
html: ## Show html coverage result
@go tool cover -html=./coverage.out

.PHONY: html-gen
html-gen: ## Export html coverage result
@go tool cover -html=./coverage.out -o ./coverage.html

.PHONY: html-wsl
html-wsl: html-gen ## Open html coverage result in wsl
@explorer.exe `wslpath -w ./coverage.html` || true

.PHONY: help
help: ## Display this help screen
@grep -h -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ max_poll_records: 0
# if this value is more than max_poll_records then max_poll_records will be used
batch_count: 100
dlq:
disable: false # disable dead letter queue
disabled: false # disable dead letter queue
topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
retry_interval: "10s" # retry time interval of the message if can't be processed, default is 10s
start_offset: 0 # -1 to start end of the offsets
Expand Down Expand Up @@ -121,6 +121,8 @@ Send record to dead letter queue, use __WrapErrDLQ__ function with to wrap the e
Editing the skip map and use our handler to initialize server mux.

```go
// import github.com/worldline-go/wkafka/handler

mux := http.NewServeMux()
mux.Handle(handler.New(client))

Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
return nil, fmt.Errorf("validate config: %w", err)
}

if !o.ConsumerConfig.DLQ.Disable {
if !o.ConsumerConfig.DLQ.Disabled {
o.ConsumerDLQEnabled = true
}
}
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName s
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}

if !consumerConfig.DLQ.Disable && consumerConfig.DLQ.Topic == "" && c.FormatDLQTopic == "" {
consumerConfig.DLQ.Disable = true
if !consumerConfig.DLQ.Disabled && consumerConfig.DLQ.Topic == "" && c.FormatDLQTopic == "" {
consumerConfig.DLQ.Disabled = true
logger.Warn("dlq is disabled because topic and format_dlq_topic is not set")
}

// add default topic name for DLQ
if !consumerConfig.DLQ.Disable {
if !consumerConfig.DLQ.Disabled {
if consumerConfig.DLQ.Topic == "" {
if c.FormatDLQTopic == "" {
return fmt.Errorf("format_dlq_topic is required if dlq topic is not set")
Expand Down
14 changes: 7 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type ConsumerConfig struct {
}

type DLQConfig struct {
// Disable is a flag to disable DLQ.
// Disabled is a flag to disable DLQ.
// - Default is false.
// - If topic is not set, it will be generated from format_dlq_topic.
// - If topic and format_dlq_topic is not set, dlq will be disabled!
Disable bool `cfg:"disable"`
Disabled bool `cfg:"disabled"`
// RetryInterval is a time interval to retry again of DLQ messages.
// - Default is 10 seconds.
RetryInterval time.Duration `cfg:"retry_interval"`
Expand Down Expand Up @@ -158,15 +158,15 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
Meter: o.Meter,
}

if o.ConsumerConfig.DLQ.Disable {
if o.ConsumerConfig.DLQ.Disabled {
return nil
}

o.ConsumerDLQ = &consumerBatch[T]{
Decode: decode,
ProcessDLQ: dlqProcessBatch(fn),
Cfg: o.ConsumerConfig,
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable),
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disabled),
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
Expand Down Expand Up @@ -195,15 +195,15 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
Meter: o.Meter,
}

if o.ConsumerConfig.DLQ.Disable {
if o.ConsumerConfig.DLQ.Disabled {
return nil
}

o.ConsumerDLQ = &consumerSingle[T]{
ProcessDLQ: fn,
Decode: decode,
Cfg: o.ConsumerConfig,
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disable),
Skip: newSkipper(&o.Client.consumerMutex, o.ConsumerConfig.DLQ.Disabled),
IsDLQ: true,
Logger: o.Client.logger,
PartitionHandler: o.Client.partitionHandlerDLQ,
Expand All @@ -226,7 +226,7 @@ func getDecodeProduceDLQ[T any](o *optionConsumer) (func(raw []byte, r *kgo.Reco
}

var produceDLQ func(ctx context.Context, err *DLQError, records []*kgo.Record) error
if !o.ConsumerConfig.DLQ.Disable {
if !o.ConsumerConfig.DLQ.Disabled {
produceDLQ = producerDLQ(o.ConsumerConfig.DLQ.Topic, o.Client.clientID, o.Client.ProduceRaw)
}

Expand Down
Loading

0 comments on commit 5df6d6b

Please sign in to comment.