From 3c03c4ae51cab1a5361f011a45ce391247eb01b4 Mon Sep 17 00:00:00 2001 From: xiaowei Date: Thu, 27 Oct 2022 22:39:36 +0200 Subject: [PATCH 1/5] go.mod: update to 1.19 --- go.mod | 12 +++++++++--- loggers/kplogrus/logrus.go | 2 +- loggers/kpzap/zap.go | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 966416a..be6d638 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,21 @@ module github.com/northvolt/kinesis-producer -go 1.16 +go 1.19 require ( github.com/aws/aws-sdk-go v1.21.10 github.com/golang/protobuf v1.3.2 github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 - github.com/pkg/errors v0.8.1 // indirect github.com/sirupsen/logrus v1.4.2 + go.uber.org/zap v1.10.0 +) + +require ( + github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/pkg/errors v0.8.1 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect - go.uber.org/zap v1.10.0 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect + golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect ) diff --git a/loggers/kplogrus/logrus.go b/loggers/kplogrus/logrus.go index 27f6792..b03f827 100644 --- a/loggers/kplogrus/logrus.go +++ b/loggers/kplogrus/logrus.go @@ -1,7 +1,7 @@ package kplogrus import ( - producer "github.com/a8m/kinesis-producer" + producer "github.com/northvolt/kinesis-producer" "github.com/sirupsen/logrus" ) diff --git a/loggers/kpzap/zap.go b/loggers/kpzap/zap.go index a885437..3557241 100644 --- a/loggers/kpzap/zap.go +++ b/loggers/kpzap/zap.go @@ -3,7 +3,7 @@ package kpzap import ( "go.uber.org/zap" - producer "github.com/a8m/kinesis-producer" + producer "github.com/northvolt/kinesis-producer" ) // Logger implements a zap.Logger logger for kinesis-producer From cca95fbf53442c987d084c460bb5c1410af862d3 Mon Sep 17 00:00:00 2001 From: xiaowei Date: Thu, 27 Oct 2022 21:20:45 +0200 Subject: [PATCH 2/5] revert fix for ARM No need to support 32-bit hardware anymore --- config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/config.go b/config.go index 5016edb..d38604c 100644 --- a/config.go +++ b/config.go @@ -14,9 +14,9 @@ const ( maxRecordSize = 1 << 20 // 1MiB maxRequestSize = 5 << 20 // 5MiB maxRecordsPerRequest = 500 - maxAggregationSize = 1048576 // 1MiB - maxAggregationCount = 2147483647 // FIX: Set it to math.MaxInt32 in order to build on ARM, see https://github.com/a8m/kinesis-producer/issues/4 - defaultAggregationSize = 51200 // 50k + maxAggregationSize = 1048576 // 1MiB + maxAggregationCount = 4294967295 + defaultAggregationSize = 51200 // 50k defaultMaxConnections = 24 defaultFlushInterval = 5 * time.Second partitionKeyIndexSize = 8 From 624e244d7fb6b3c84a0c8fabd0af2fa55abb8b14 Mon Sep 17 00:00:00 2001 From: xiaowei Date: Thu, 27 Oct 2022 22:32:14 +0200 Subject: [PATCH 3/5] logger: fix format --- logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logger.go b/logger.go index c27dfee..b30d5a2 100644 --- a/logger.go +++ b/logger.go @@ -19,7 +19,7 @@ type LogValue struct { } func (v LogValue) String() string { - return fmt.Sprintf(" %s=%s", v.Name, v.Value) + return fmt.Sprintf(" %s=%v", v.Name, v.Value) } // StdLogger implements the Logger interface using standard library loggers From 41e4c96d05413673dc96e4233ea5df9312e43029 Mon Sep 17 00:00:00 2001 From: xiaowei Date: Thu, 27 Oct 2022 22:33:54 +0200 Subject: [PATCH 4/5] producer: enable logging in verbose mode --- producer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/producer.go b/producer.go index 307c54e..d84ff52 100644 --- a/producer.go +++ b/producer.go @@ -242,7 +242,10 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin defer p.semaphore.release() for { - //p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)}) + if p.Verbose { + p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)}) + } + out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{ StreamName: &p.StreamName, Records: records, From 17ef056f61c1708bc734e3d2fc2493856f9a3613 Mon Sep 17 00:00:00 2001 From: xiaowei Date: Thu, 27 Oct 2022 22:36:05 +0200 Subject: [PATCH 5/5] producer: do not flush records when size is zero When huge amount of small records are coming in, rsize itself could be larger than p.BatchSize already, even size is still 0. This could happen for: - the very first record - the one after flush interval --- producer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/producer.go b/producer.go index d84ff52..1338641 100644 --- a/producer.go +++ b/producer.go @@ -177,7 +177,7 @@ func (p *Producer) loop() { // the record size limit applies to the total size of the // partition key and data blob. rsize := len(record.Data) + len([]byte(*record.PartitionKey)) - if size+rsize > p.BatchSize { + if size != 0 && size+rsize > p.BatchSize { flush("batch size") } size += rsize