From 2506dab6294870cf678e1eb078604bf0505202b8 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Wed, 4 Sep 2024 06:13:20 +0200 Subject: [PATCH] feat(config): clickhouse connection settings (#1471) --- config/aggregation.go | 44 ++++++++++++++++++++++++++++++++++++++----- config/config_test.go | 15 ++++++++++----- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/config/aggregation.go b/config/aggregation.go index 2829e2aa7..5e80cb8db 100644 --- a/config/aggregation.go +++ b/config/aggregation.go @@ -35,6 +35,13 @@ type ClickHouseAggregationConfiguration struct { Username string Password string Database string + + // ClickHouse connection options + DialTimeout time.Duration + MaxOpenConns int + MaxIdleConns int + ConnMaxLifetime time.Duration + BlockBufferSize uint8 } func (c ClickHouseAggregationConfiguration) Validate() error { @@ -42,6 +49,26 @@ func (c ClickHouseAggregationConfiguration) Validate() error { return errors.New("address is required") } + if c.DialTimeout <= 0 { + return errors.New("dial timeout must be greater than 0") + } + + if c.MaxOpenConns <= 0 { + return errors.New("max open connections must be greater than 0") + } + + if c.MaxIdleConns <= 0 { + return errors.New("max idle connections must be greater than 0") + } + + if c.ConnMaxLifetime <= 0 { + return errors.New("connection max lifetime must be greater than 0") + } + + if c.BlockBufferSize <= 0 { + return errors.New("block buffer size must be greater than 0") + } + return nil } @@ -53,12 +80,12 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio Username: c.Username, Password: c.Password, }, - DialTimeout: time.Duration(10) * time.Second, - MaxOpenConns: 5, - MaxIdleConns: 5, - ConnMaxLifetime: time.Duration(10) * time.Minute, + DialTimeout: c.DialTimeout, + MaxOpenConns: c.MaxOpenConns, + MaxIdleConns: c.MaxIdleConns, + ConnMaxLifetime: c.ConnMaxLifetime, ConnOpenStrategy: clickhouse.ConnOpenInOrder, - BlockBufferSize: 10, + BlockBufferSize: c.BlockBufferSize, } // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. // See: https://clickhouse.com/docs/en/integrations/go#using-tls @@ -76,4 +103,11 @@ func ConfigureAggregation(v *viper.Viper) { v.SetDefault("aggregation.clickhouse.database", "openmeter") v.SetDefault("aggregation.clickhouse.username", "default") v.SetDefault("aggregation.clickhouse.password", "default") + + // ClickHouse connection options + v.SetDefault("aggregation.clickhouse.dialTimeout", "10s") + v.SetDefault("aggregation.clickhouse.maxOpenConns", 5) + v.SetDefault("aggregation.clickhouse.maxIdleConns", 5) + v.SetDefault("aggregation.clickhouse.connMaxLifetime", "10m") + v.SetDefault("aggregation.clickhouse.blockBufferSize", 10) } diff --git a/config/config_test.go b/config/config_test.go index ec2eac40c..41af984b6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -105,11 +105,16 @@ func TestComplete(t *testing.T) { }, Aggregation: AggregationConfiguration{ ClickHouse: ClickHouseAggregationConfiguration{ - Address: "127.0.0.1:9440", - TLS: true, - Username: "default", - Password: "default", - Database: "openmeter", + Address: "127.0.0.1:9440", + TLS: true, + Username: "default", + Password: "default", + Database: "openmeter", + DialTimeout: 10 * time.Second, + MaxOpenConns: 5, + MaxIdleConns: 5, + ConnMaxLifetime: 10 * time.Minute, + BlockBufferSize: 10, }, }, Sink: SinkConfiguration{