Skip to content

Commit

Permalink
Merge pull request #6 from northvolt/debug
Browse files Browse the repository at this point in the history
do not flush records when size is zero
  • Loading branch information
loivis authored Oct 27, 2022
2 parents 4886478 + 17ef056 commit 14a5278
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 11 deletions.
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ const (
maxRecordSize = 1 << 20 // 1MiB
maxRequestSize = 5 << 20 // 5MiB
maxRecordsPerRequest = 500
maxAggregationSize = 1048576 // 1MiB
maxAggregationCount = 2147483647 // FIX: Set it to math.MaxInt32 in order to build on ARM, see https://github.com/a8m/kinesis-producer/issues/4
defaultAggregationSize = 51200 // 50k
maxAggregationSize = 1048576 // 1MiB
maxAggregationCount = 4294967295
defaultAggregationSize = 51200 // 50k
defaultMaxConnections = 24
defaultFlushInterval = 5 * time.Second
partitionKeyIndexSize = 8
Expand Down
12 changes: 9 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
module github.com/northvolt/kinesis-producer

go 1.16
go 1.19

require (
github.com/aws/aws-sdk-go v1.21.10
github.com/golang/protobuf v1.3.2
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7
github.com/pkg/errors v0.8.1 // indirect
github.com/sirupsen/logrus v1.4.2
go.uber.org/zap v1.10.0
)

require (
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
github.com/pkg/errors v0.8.1 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect
)
2 changes: 1 addition & 1 deletion logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LogValue struct {
}

func (v LogValue) String() string {
return fmt.Sprintf(" %s=%s", v.Name, v.Value)
return fmt.Sprintf(" %s=%v", v.Name, v.Value)
}

// StdLogger implements the Logger interface using standard library loggers
Expand Down
2 changes: 1 addition & 1 deletion loggers/kplogrus/logrus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kplogrus

import (
producer "github.com/a8m/kinesis-producer"
producer "github.com/northvolt/kinesis-producer"
"github.com/sirupsen/logrus"
)

Expand Down
2 changes: 1 addition & 1 deletion loggers/kpzap/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kpzap
import (
"go.uber.org/zap"

producer "github.com/a8m/kinesis-producer"
producer "github.com/northvolt/kinesis-producer"
)

// Logger implements a zap.Logger logger for kinesis-producer
Expand Down
7 changes: 5 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (p *Producer) loop() {
// the record size limit applies to the total size of the
// partition key and data blob.
rsize := len(record.Data) + len([]byte(*record.PartitionKey))
if size+rsize > p.BatchSize {
if size != 0 && size+rsize > p.BatchSize {
flush("batch size")
}
size += rsize
Expand Down Expand Up @@ -242,7 +242,10 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin
defer p.semaphore.release()

for {
//p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)})
if p.Verbose {
p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)})
}

out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{
StreamName: &p.StreamName,
Records: records,
Expand Down

0 comments on commit 14a5278

Please sign in to comment.