diff --git a/CHANGELOG.md b/CHANGELOG.md index fc61f16a..ad1e57b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ## 5.1.0 - 2024-08-07 +- [#209](https://github.com/deviceinsight/kafkactl/pull/209) Allow configuring basicAuth for avro schema registry ### Added - [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry diff --git a/README.adoc b/README.adoc index db9a00b9..bd9a1c30 100644 --- a/README.adoc +++ b/README.adoc @@ -163,6 +163,10 @@ contexts: # optional: timeout for requests (defaults to 5s) requestTimeout: 10s + # optional: basic auth credentials + username: admin + password: admin + # optional: tls config for avro tls: enabled: true diff --git a/cmd/root_test.go b/cmd/root_test.go index b3258ee5..47cf35c4 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -60,6 +60,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { _ = os.Setenv(global.KafkaVersion, "2.0.1") _ = os.Setenv(global.AvroSchemaRegistry, "registry:8888") _ = os.Setenv(global.AvroJSONCodec, "avro") + _ = os.Setenv(global.AvroRequestTimeout, "10") + _ = os.Setenv(global.AvroTLSEnabled, "true") + _ = os.Setenv(global.AvroTLSCa, "my-avro-ca") + _ = os.Setenv(global.AvroTLSCert, "my-avro-cert") + _ = os.Setenv(global.AvroTLSCertKey, "my-avro-cert-key") + _ = os.Setenv(global.AvroTLSInsecure, "true") + _ = os.Setenv(global.AvroUsername, "avro-user") + _ = os.Setenv(global.AvroPassword, "avro-pass") _ = os.Setenv(global.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset") _ = os.Setenv(global.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf") _ = os.Setenv(global.ProtobufProtoFiles, "message.proto other.proto") @@ -102,6 +110,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) { testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion")) testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry")) testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec")) + testutil.AssertEquals(t, "10", viper.GetString("contexts.default.avro.requestTimeout")) + testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.enabled")) + testutil.AssertEquals(t, "my-avro-ca", viper.GetString("contexts.default.avro.tls.ca")) + testutil.AssertEquals(t, "my-avro-cert", viper.GetString("contexts.default.avro.tls.cert")) + testutil.AssertEquals(t, "my-avro-cert-key", viper.GetString("contexts.default.avro.tls.certKey")) + testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.insecure")) + testutil.AssertEquals(t, "avro-user", viper.GetString("contexts.default.avro.username")) + testutil.AssertEquals(t, "avro-pass", viper.GetString("contexts.default.avro.password")) testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0]) testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0]) testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0]) diff --git a/internal/common-operation.go b/internal/common-operation.go index 9e257551..9d264f9b 100644 --- a/internal/common-operation.go +++ b/internal/common-operation.go @@ -44,6 +44,15 @@ type SaslConfig struct { TokenProvider TokenProvider } +type AvroConfig struct { + SchemaRegistry string + JSONCodec avro.JSONCodec + RequestTimeout time.Duration + TLS TLSConfig + Username string + Password string +} + type TLSConfig struct { Enabled bool CA string @@ -78,21 +87,18 @@ type ProducerConfig struct { } type ClientContext struct { - Name string - Brokers []string - TLS TLSConfig - Sasl SaslConfig - Kubernetes K8sConfig - RequestTimeout time.Duration - ClientID string - KafkaVersion sarama.KafkaVersion - AvroSchemaRegistry string - AvroJSONCodec avro.JSONCodec - AvroRequestTimeout time.Duration - AvroTLS TLSConfig - Protobuf protobuf.SearchContext - Producer ProducerConfig - Consumer ConsumerConfig + Name string + Brokers []string + TLS TLSConfig + Sasl SaslConfig + Kubernetes K8sConfig + RequestTimeout time.Duration + ClientID string + KafkaVersion sarama.KafkaVersion + Avro AvroConfig + Protobuf protobuf.SearchContext + Producer ProducerConfig + Consumer ConsumerConfig } type Config struct { @@ -131,14 +137,16 @@ func CreateClientContext() (ClientContext, error) { } else { return context, err } - context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry") - context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec")) - context.AvroRequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout") - context.AvroTLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled") - context.AvroTLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca") - context.AvroTLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert") - context.AvroTLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey") - context.AvroTLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure") + context.Avro.SchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry") + context.Avro.JSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec")) + context.Avro.RequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout") + context.Avro.TLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled") + context.Avro.TLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca") + context.Avro.TLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert") + context.Avro.TLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey") + context.Avro.TLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure") + context.Avro.Username = viper.GetString("contexts." + context.Name + ".avro.username") + context.Avro.Password = viper.GetString("contexts." + context.Name + ".avro.password") context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles") context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths") context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles") @@ -254,29 +262,36 @@ func CreateClientConfig(context *ClientContext) (*sarama.Config, error) { func CreateAvroSchemaRegistryClient(context *ClientContext) (srclient.ISchemaRegistryClient, error) { - timeout := context.AvroRequestTimeout + timeout := context.Avro.RequestTimeout - if context.AvroRequestTimeout <= 0 { + if context.Avro.RequestTimeout <= 0 { timeout = 5 * time.Second } - client := &http.Client{Timeout: timeout} + httpClient := &http.Client{Timeout: timeout} - if context.AvroTLS.Enabled { + if context.Avro.TLS.Enabled { output.Debugf("avro TLS is enabled.") - tlsConfig, err := setupTLSConfig(context.AvroTLS) + tlsConfig, err := setupTLSConfig(context.Avro.TLS) if err != nil { return nil, errors.Wrap(err, "failed to setup avro tls config") } - client.Transport = &http.Transport{ + httpClient.Transport = &http.Transport{ TLSClientConfig: tlsConfig, } } - baseURL := avro.FormatBaseURL(context.AvroSchemaRegistry) - return srclient.CreateSchemaRegistryClientWithOptions(baseURL, client, 16), nil + baseURL := avro.FormatBaseURL(context.Avro.SchemaRegistry) + client := srclient.CreateSchemaRegistryClientWithOptions(baseURL, httpClient, 16) + + if context.Avro.Username != "" { + output.Debugf("avro BasicAuth is enabled.") + client.SetCredentials(context.Avro.Username, context.Avro.Password) + } + + return client, nil } func GetClientID(context *ClientContext, defaultPrefix string) string { diff --git a/internal/consume/consume-operation.go b/internal/consume/consume-operation.go index 5867a5c7..4ed7aa73 100644 --- a/internal/consume/consume-operation.go +++ b/internal/consume/consume-operation.go @@ -90,14 +90,14 @@ func (operation *Operation) Consume(topic string, flags Flags) error { var deserializers MessageDeserializerChain - if clientContext.AvroSchemaRegistry != "" { + if clientContext.Avro.SchemaRegistry != "" { client, err := internal.CreateAvroSchemaRegistryClient(&clientContext) if err != nil { return err } deserializer := AvroMessageDeserializer{topic: topic, registry: CreateCachingSchemaRegistry(client), - jsonCodec: clientContext.AvroJSONCodec} + jsonCodec: clientContext.Avro.JSONCodec} deserializers = append(deserializers, deserializer) } diff --git a/internal/global/env-variables.go b/internal/global/env-variables.go index 5f57b3a8..5db84810 100644 --- a/internal/global/env-variables.go +++ b/internal/global/env-variables.go @@ -18,6 +18,14 @@ const ( KafkaVersion = "KAFKAVERSION" AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY" AvroJSONCodec = "AVRO_JSONCODEC" + AvroRequestTimeout = "AVRO_REQUESTTIMEOUT" + AvroTLSEnabled = "AVRO_TLS_ENABLED" + AvroTLSCa = "AVRO_TLS_CA" + AvroTLSCert = "AVRO_TLS_CERT" + AvroTLSCertKey = "AVRO_TLS_CERTKEY" + AvroTLSInsecure = "AVRO_TLS_INSECURE" + AvroUsername = "AVRO_USERNAME" + AvroPassword = "AVRO_PASSWORD" ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES" ProtobufImportPaths = "PROTOBUF_IMPORTPATHS" ProtobufProtoFiles = "PROTOBUF_PROTOFILES" @@ -44,6 +52,14 @@ var EnvVariables = []string{ KafkaVersion, AvroSchemaRegistry, AvroJSONCodec, + AvroRequestTimeout, + AvroTLSEnabled, + AvroTLSCa, + AvroTLSCert, + AvroTLSCertKey, + AvroTLSInsecure, + AvroUsername, + AvroPassword, ProtobufProtoSetFiles, ProtobufImportPaths, ProtobufProtoFiles, diff --git a/internal/k8s/k8s-operation.go b/internal/k8s/k8s-operation.go index 87622ed4..e72c0527 100644 --- a/internal/k8s/k8s-operation.go +++ b/internal/k8s/k8s-operation.go @@ -190,8 +190,16 @@ func parsePodEnvironment(context internal.ClientContext) []string { envVariables = appendStringIfDefined(envVariables, global.RequestTimeout, context.RequestTimeout.String()) envVariables = appendStringIfDefined(envVariables, global.ClientID, context.ClientID) envVariables = appendStringIfDefined(envVariables, global.KafkaVersion, context.KafkaVersion.String()) - envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.AvroSchemaRegistry) - envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.AvroJSONCodec.String()) + envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.Avro.SchemaRegistry) + envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.Avro.JSONCodec.String()) + envVariables = appendStringIfDefined(envVariables, global.AvroRequestTimeout, context.Avro.RequestTimeout.String()) + envVariables = appendBool(envVariables, global.AvroTLSEnabled, context.Avro.TLS.Enabled) + envVariables = appendStringIfDefined(envVariables, global.AvroTLSCa, context.Avro.TLS.CA) + envVariables = appendStringIfDefined(envVariables, global.AvroTLSCert, context.Avro.TLS.Cert) + envVariables = appendStringIfDefined(envVariables, global.AvroTLSCertKey, context.Avro.TLS.CertKey) + envVariables = appendBool(envVariables, global.AvroTLSInsecure, context.Avro.TLS.Insecure) + envVariables = appendStringIfDefined(envVariables, global.AvroUsername, context.Avro.Username) + envVariables = appendStringIfDefined(envVariables, global.AvroPassword, context.Avro.Password) envVariables = appendStrings(envVariables, global.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles) envVariables = appendStrings(envVariables, global.ProtobufImportPaths, context.Protobuf.ProtoImportPaths) envVariables = appendStrings(envVariables, global.ProtobufProtoFiles, context.Protobuf.ProtoFiles) diff --git a/internal/k8s/k8s-operation_test.go b/internal/k8s/k8s-operation_test.go index 5654e338..4205ebcb 100644 --- a/internal/k8s/k8s-operation_test.go +++ b/internal/k8s/k8s-operation_test.go @@ -35,8 +35,16 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) { context.Sasl.TokenProvider.Options["int-key"] = 12 context.ClientID = "my-client" context.KafkaVersion = sarama.V2_0_1_0 - context.AvroSchemaRegistry = "registry:8888" - context.AvroJSONCodec = avro.Avro + context.Avro.SchemaRegistry = "registry:8888" + context.Avro.JSONCodec = avro.Avro + context.Avro.RequestTimeout = 10 * time.Second + context.Avro.TLS.Enabled = true + context.Avro.TLS.CA = "my-avro-ca" + context.Avro.TLS.Cert = "my-avro-cert" + context.Avro.TLS.CertKey = "my-avro-cert-key" + context.Avro.TLS.Insecure = true + context.Avro.Username = "avro-user" + context.Avro.Password = "avro-pass" context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"} context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"} context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"} @@ -76,6 +84,14 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) { testutil.AssertEquals(t, "2.0.1", envMap[global.KafkaVersion]) testutil.AssertEquals(t, "registry:8888", envMap[global.AvroSchemaRegistry]) testutil.AssertEquals(t, "avro", envMap[global.AvroJSONCodec]) + testutil.AssertEquals(t, "10s", envMap[global.AvroRequestTimeout]) + testutil.AssertEquals(t, "true", envMap[global.AvroTLSEnabled]) + testutil.AssertEquals(t, "my-avro-ca", envMap[global.AvroTLSCa]) + testutil.AssertEquals(t, "my-avro-cert", envMap[global.AvroTLSCert]) + testutil.AssertEquals(t, "my-avro-cert-key", envMap[global.AvroTLSCertKey]) + testutil.AssertEquals(t, "true", envMap[global.AvroTLSInsecure]) + testutil.AssertEquals(t, "avro-user", envMap[global.AvroUsername]) + testutil.AssertEquals(t, "avro-pass", envMap[global.AvroPassword]) testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[global.ProtobufProtoSetFiles]) testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[global.ProtobufImportPaths]) testutil.AssertEquals(t, "message.proto other.proto", envMap[global.ProtobufProtoFiles]) diff --git a/internal/producer/producer-operation.go b/internal/producer/producer-operation.go index 4d3ea866..ee0163a7 100644 --- a/internal/producer/producer-operation.go +++ b/internal/producer/producer-operation.go @@ -79,12 +79,12 @@ func (operation *Operation) Produce(topic string, flags Flags) error { serializers := MessageSerializerChain{topic: topic} - if clientContext.AvroSchemaRegistry != "" { + if clientContext.Avro.SchemaRegistry != "" { client, err := internal.CreateAvroSchemaRegistryClient(&clientContext) if err != nil { return err } - serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.AvroJSONCodec} + serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.Avro.JSONCodec} serializers.serializers = append(serializers.serializers, serializer) }