diff --git a/config.go b/config.go index 5016edb..d38604c 100644 --- a/config.go +++ b/config.go @@ -14,9 +14,9 @@ const ( maxRecordSize = 1 << 20 // 1MiB maxRequestSize = 5 << 20 // 5MiB maxRecordsPerRequest = 500 - maxAggregationSize = 1048576 // 1MiB - maxAggregationCount = 2147483647 // FIX: Set it to math.MaxInt32 in order to build on ARM, see https://github.com/a8m/kinesis-producer/issues/4 - defaultAggregationSize = 51200 // 50k + maxAggregationSize = 1048576 // 1MiB + maxAggregationCount = 4294967295 + defaultAggregationSize = 51200 // 50k defaultMaxConnections = 24 defaultFlushInterval = 5 * time.Second partitionKeyIndexSize = 8 diff --git a/go.mod b/go.mod index 966416a..be6d638 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,21 @@ module github.com/northvolt/kinesis-producer -go 1.16 +go 1.19 require ( github.com/aws/aws-sdk-go v1.21.10 github.com/golang/protobuf v1.3.2 github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 - github.com/pkg/errors v0.8.1 // indirect github.com/sirupsen/logrus v1.4.2 + go.uber.org/zap v1.10.0 +) + +require ( + github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/pkg/errors v0.8.1 // indirect go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect - go.uber.org/zap v1.10.0 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect + golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect ) diff --git a/logger.go b/logger.go index c27dfee..b30d5a2 100644 --- a/logger.go +++ b/logger.go @@ -19,7 +19,7 @@ type LogValue struct { } func (v LogValue) String() string { - return fmt.Sprintf(" %s=%s", v.Name, v.Value) + return fmt.Sprintf(" %s=%v", v.Name, v.Value) } // StdLogger implements the Logger interface using standard library loggers diff --git a/loggers/kplogrus/logrus.go b/loggers/kplogrus/logrus.go index 27f6792..b03f827 100644 --- a/loggers/kplogrus/logrus.go +++ b/loggers/kplogrus/logrus.go @@ -1,7 +1,7 @@ package kplogrus import ( - producer "github.com/a8m/kinesis-producer" + producer "github.com/northvolt/kinesis-producer" "github.com/sirupsen/logrus" ) diff --git a/loggers/kpzap/zap.go b/loggers/kpzap/zap.go index a885437..3557241 100644 --- a/loggers/kpzap/zap.go +++ b/loggers/kpzap/zap.go @@ -3,7 +3,7 @@ package kpzap import ( "go.uber.org/zap" - producer "github.com/a8m/kinesis-producer" + producer "github.com/northvolt/kinesis-producer" ) // Logger implements a zap.Logger logger for kinesis-producer diff --git a/producer.go b/producer.go index 307c54e..1338641 100644 --- a/producer.go +++ b/producer.go @@ -177,7 +177,7 @@ func (p *Producer) loop() { // the record size limit applies to the total size of the // partition key and data blob. rsize := len(record.Data) + len([]byte(*record.PartitionKey)) - if size+rsize > p.BatchSize { + if size != 0 && size+rsize > p.BatchSize { flush("batch size") } size += rsize @@ -242,7 +242,10 @@ func (p *Producer) flush(records []*kinesis.PutRecordsRequestEntry, reason strin defer p.semaphore.release() for { - //p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)}) + if p.Verbose { + p.Logger.Info("flushing records", LogValue{"reason", reason}, LogValue{"records", len(records)}) + } + out, err := p.Client.PutRecords(&kinesis.PutRecordsInput{ StreamName: &p.StreamName, Records: records,