Skip to content

Commit

Permalink
feat: dead letter que
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Dec 18, 2023
1 parent 7fc13a7 commit 614decc
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ jobs:
- name: golangci config
run: |
[[ ! -f ".golangci.yml" ]] && curl -kfsSL -O https://raw.githubusercontent.com/worldline-go/guide/main/lint/.golangci.yml
echo GOPATH="$(dirname ${PWD})" >> $GITHUB_ENV
- name: golangci-lint general
uses: golangci/golangci-lint-action@v3
with:
args: --issues-exit-code 0 --out-format checkstyle:golangci-lint-report.out,colored-line-number
- name: golangci-lint critical check
run: |
golangci-lint run --new-from-rev remotes/origin/${{ github.event.repository.default_branch }} ./...
args: --timeout 5m --new-from-rev remotes/origin/${{ github.event.repository.default_branch }}
- name: Run tests
run: |
GOPATH="$(dirname ${PWD})" golangci-lint run --issues-exit-code 0 --out-format checkstyle ./... > golangci-lint-report.out
go test -coverprofile=coverage.out -json ./... > test-report.out
- name: SonarCloud Scan
uses: sonarsource/sonarcloud-github-action@master
Expand Down
20 changes: 18 additions & 2 deletions consumerbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,32 @@ func (c consumerBatch[T]) Consume(ctx context.Context, cl *kgo.Client) error {
}

if !c.Cfg.Concurrent {
if err := c.batchIteration(ctx, cl, fetch); err != nil {
if err := c.iteration(ctx, cl, fetch); err != nil {
return err
}

continue
}

if err := c.concurrentIteration(ctx, cl, fetch); err != nil {
return err
}
}
}

func (c consumerBatch[T]) batchIteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error {
/////////////////////////////////
// SINGLE - CONCURRENT ITERATION
/////////////////////////////////

func (c consumerBatch[T]) concurrentIteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error {

Check failure on line 58 in consumerbatch.go

View workflow job for this annotation

GitHub Actions / sonarcloud

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
return nil
}

////////////////////
// BATCH - ITERATION
////////////////////

func (c consumerBatch[T]) iteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error {
batch := make([]T, 0, c.Cfg.BatchCount)
records := make([]*kgo.Record, 0, c.Cfg.BatchCount)
for iter := fetch.RecordIter(); !iter.Done(); {
Expand Down
2 changes: 1 addition & 1 deletion consumersingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c consumerSingle[T]) concurrentIteration(ctx context.Context, cl *kgo.Clie
}

////////////////////
// SINGLE ITERATION
// SINGLE - ITERATION
////////////////////

func (c consumerSingle[T]) iteration(ctx context.Context, cl *kgo.Client, fetch kgo.Fetches) error {
Expand Down
1 change: 1 addition & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (

// GetRecord returns the Record from the context in callback function.
// - If the context is nil, or the Record is not set, nil is returned.
// - Batch consumer doesn't have context record.
func GetRecord(ctx context.Context) *Record {
if ctx == nil {
return nil
Expand Down

0 comments on commit 614decc

Please sign in to comment.