Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring basicAuth for avro schema registry #209

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

## 5.1.0 - 2024-08-07
- [#209](https://github.com/deviceinsight/kafkactl/pull/209) Allow configuring basicAuth for avro schema registry

### Added
- [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry
Expand Down
4 changes: 4 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ contexts:
# optional: timeout for requests (defaults to 5s)
requestTimeout: 10s

# optional: basic auth credentials
username: admin
password: admin

# optional: tls config for avro
tls:
enabled: true
Expand Down
16 changes: 16 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
_ = os.Setenv(global.KafkaVersion, "2.0.1")
_ = os.Setenv(global.AvroSchemaRegistry, "registry:8888")
_ = os.Setenv(global.AvroJSONCodec, "avro")
_ = os.Setenv(global.AvroRequestTimeout, "10")
_ = os.Setenv(global.AvroTLSEnabled, "true")
_ = os.Setenv(global.AvroTLSCa, "my-avro-ca")
_ = os.Setenv(global.AvroTLSCert, "my-avro-cert")
_ = os.Setenv(global.AvroTLSCertKey, "my-avro-cert-key")
_ = os.Setenv(global.AvroTLSInsecure, "true")
_ = os.Setenv(global.AvroUsername, "avro-user")
_ = os.Setenv(global.AvroPassword, "avro-pass")
_ = os.Setenv(global.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset")
_ = os.Setenv(global.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf")
_ = os.Setenv(global.ProtobufProtoFiles, "message.proto other.proto")
Expand Down Expand Up @@ -102,6 +110,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion"))
testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry"))
testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec"))
testutil.AssertEquals(t, "10", viper.GetString("contexts.default.avro.requestTimeout"))
testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.enabled"))
testutil.AssertEquals(t, "my-avro-ca", viper.GetString("contexts.default.avro.tls.ca"))
testutil.AssertEquals(t, "my-avro-cert", viper.GetString("contexts.default.avro.tls.cert"))
testutil.AssertEquals(t, "my-avro-cert-key", viper.GetString("contexts.default.avro.tls.certKey"))
testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.insecure"))
testutil.AssertEquals(t, "avro-user", viper.GetString("contexts.default.avro.username"))
testutil.AssertEquals(t, "avro-pass", viper.GetString("contexts.default.avro.password"))
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0])
testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0])
testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0])
Expand Down
77 changes: 46 additions & 31 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type SaslConfig struct {
TokenProvider TokenProvider
}

type AvroConfig struct {
SchemaRegistry string
JSONCodec avro.JSONCodec
RequestTimeout time.Duration
TLS TLSConfig
Username string
Password string
}

type TLSConfig struct {
Enabled bool
CA string
Expand Down Expand Up @@ -78,21 +87,18 @@ type ProducerConfig struct {
}

type ClientContext struct {
Name string
Brokers []string
TLS TLSConfig
Sasl SaslConfig
Kubernetes K8sConfig
RequestTimeout time.Duration
ClientID string
KafkaVersion sarama.KafkaVersion
AvroSchemaRegistry string
AvroJSONCodec avro.JSONCodec
AvroRequestTimeout time.Duration
AvroTLS TLSConfig
Protobuf protobuf.SearchContext
Producer ProducerConfig
Consumer ConsumerConfig
Name string
Brokers []string
TLS TLSConfig
Sasl SaslConfig
Kubernetes K8sConfig
RequestTimeout time.Duration
ClientID string
KafkaVersion sarama.KafkaVersion
Avro AvroConfig
Protobuf protobuf.SearchContext
Producer ProducerConfig
Consumer ConsumerConfig
}

type Config struct {
Expand Down Expand Up @@ -131,14 +137,16 @@ func CreateClientContext() (ClientContext, error) {
} else {
return context, err
}
context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.AvroRequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout")
context.AvroTLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled")
context.AvroTLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca")
context.AvroTLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert")
context.AvroTLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey")
context.AvroTLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure")
context.Avro.SchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.Avro.JSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.Avro.RequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout")
context.Avro.TLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled")
context.Avro.TLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca")
context.Avro.TLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert")
context.Avro.TLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey")
context.Avro.TLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure")
context.Avro.Username = viper.GetString("contexts." + context.Name + ".avro.username")
context.Avro.Password = viper.GetString("contexts." + context.Name + ".avro.password")
context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles")
context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths")
context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles")
Expand Down Expand Up @@ -254,29 +262,36 @@ func CreateClientConfig(context *ClientContext) (*sarama.Config, error) {

func CreateAvroSchemaRegistryClient(context *ClientContext) (srclient.ISchemaRegistryClient, error) {

timeout := context.AvroRequestTimeout
timeout := context.Avro.RequestTimeout

if context.AvroRequestTimeout <= 0 {
if context.Avro.RequestTimeout <= 0 {
timeout = 5 * time.Second
}

client := &http.Client{Timeout: timeout}
httpClient := &http.Client{Timeout: timeout}

if context.AvroTLS.Enabled {
if context.Avro.TLS.Enabled {
output.Debugf("avro TLS is enabled.")

tlsConfig, err := setupTLSConfig(context.AvroTLS)
tlsConfig, err := setupTLSConfig(context.Avro.TLS)
if err != nil {
return nil, errors.Wrap(err, "failed to setup avro tls config")
}

client.Transport = &http.Transport{
httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}

baseURL := avro.FormatBaseURL(context.AvroSchemaRegistry)
return srclient.CreateSchemaRegistryClientWithOptions(baseURL, client, 16), nil
baseURL := avro.FormatBaseURL(context.Avro.SchemaRegistry)
client := srclient.CreateSchemaRegistryClientWithOptions(baseURL, httpClient, 16)

if context.Avro.Username != "" {
output.Debugf("avro BasicAuth is enabled.")
client.SetCredentials(context.Avro.Username, context.Avro.Password)
}

return client, nil
}

func GetClientID(context *ClientContext, defaultPrefix string) string {
Expand Down
4 changes: 2 additions & 2 deletions internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (operation *Operation) Consume(topic string, flags Flags) error {

var deserializers MessageDeserializerChain

if clientContext.AvroSchemaRegistry != "" {
if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
if err != nil {
return err
}

deserializer := AvroMessageDeserializer{topic: topic, registry: CreateCachingSchemaRegistry(client),
jsonCodec: clientContext.AvroJSONCodec}
jsonCodec: clientContext.Avro.JSONCodec}

deserializers = append(deserializers, deserializer)
}
Expand Down
16 changes: 16 additions & 0 deletions internal/global/env-variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const (
KafkaVersion = "KAFKAVERSION"
AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY"
AvroJSONCodec = "AVRO_JSONCODEC"
AvroRequestTimeout = "AVRO_REQUESTTIMEOUT"
AvroTLSEnabled = "AVRO_TLS_ENABLED"
AvroTLSCa = "AVRO_TLS_CA"
AvroTLSCert = "AVRO_TLS_CERT"
AvroTLSCertKey = "AVRO_TLS_CERTKEY"
AvroTLSInsecure = "AVRO_TLS_INSECURE"
AvroUsername = "AVRO_USERNAME"
AvroPassword = "AVRO_PASSWORD"
ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES"
ProtobufImportPaths = "PROTOBUF_IMPORTPATHS"
ProtobufProtoFiles = "PROTOBUF_PROTOFILES"
Expand All @@ -44,6 +52,14 @@ var EnvVariables = []string{
KafkaVersion,
AvroSchemaRegistry,
AvroJSONCodec,
AvroRequestTimeout,
AvroTLSEnabled,
AvroTLSCa,
AvroTLSCert,
AvroTLSCertKey,
AvroTLSInsecure,
AvroUsername,
AvroPassword,
ProtobufProtoSetFiles,
ProtobufImportPaths,
ProtobufProtoFiles,
Expand Down
12 changes: 10 additions & 2 deletions internal/k8s/k8s-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,16 @@ func parsePodEnvironment(context internal.ClientContext) []string {
envVariables = appendStringIfDefined(envVariables, global.RequestTimeout, context.RequestTimeout.String())
envVariables = appendStringIfDefined(envVariables, global.ClientID, context.ClientID)
envVariables = appendStringIfDefined(envVariables, global.KafkaVersion, context.KafkaVersion.String())
envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.AvroSchemaRegistry)
envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.AvroJSONCodec.String())
envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.Avro.SchemaRegistry)
envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.Avro.JSONCodec.String())
envVariables = appendStringIfDefined(envVariables, global.AvroRequestTimeout, context.Avro.RequestTimeout.String())
envVariables = appendBool(envVariables, global.AvroTLSEnabled, context.Avro.TLS.Enabled)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCa, context.Avro.TLS.CA)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCert, context.Avro.TLS.Cert)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCertKey, context.Avro.TLS.CertKey)
envVariables = appendBool(envVariables, global.AvroTLSInsecure, context.Avro.TLS.Insecure)
envVariables = appendStringIfDefined(envVariables, global.AvroUsername, context.Avro.Username)
envVariables = appendStringIfDefined(envVariables, global.AvroPassword, context.Avro.Password)
envVariables = appendStrings(envVariables, global.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles)
envVariables = appendStrings(envVariables, global.ProtobufImportPaths, context.Protobuf.ProtoImportPaths)
envVariables = appendStrings(envVariables, global.ProtobufProtoFiles, context.Protobuf.ProtoFiles)
Expand Down
20 changes: 18 additions & 2 deletions internal/k8s/k8s-operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
context.Sasl.TokenProvider.Options["int-key"] = 12
context.ClientID = "my-client"
context.KafkaVersion = sarama.V2_0_1_0
context.AvroSchemaRegistry = "registry:8888"
context.AvroJSONCodec = avro.Avro
context.Avro.SchemaRegistry = "registry:8888"
context.Avro.JSONCodec = avro.Avro
context.Avro.RequestTimeout = 10 * time.Second
context.Avro.TLS.Enabled = true
context.Avro.TLS.CA = "my-avro-ca"
context.Avro.TLS.Cert = "my-avro-cert"
context.Avro.TLS.CertKey = "my-avro-cert-key"
context.Avro.TLS.Insecure = true
context.Avro.Username = "avro-user"
context.Avro.Password = "avro-pass"
context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"}
context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"}
context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"}
Expand Down Expand Up @@ -76,6 +84,14 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
testutil.AssertEquals(t, "2.0.1", envMap[global.KafkaVersion])
testutil.AssertEquals(t, "registry:8888", envMap[global.AvroSchemaRegistry])
testutil.AssertEquals(t, "avro", envMap[global.AvroJSONCodec])
testutil.AssertEquals(t, "10s", envMap[global.AvroRequestTimeout])
testutil.AssertEquals(t, "true", envMap[global.AvroTLSEnabled])
testutil.AssertEquals(t, "my-avro-ca", envMap[global.AvroTLSCa])
testutil.AssertEquals(t, "my-avro-cert", envMap[global.AvroTLSCert])
testutil.AssertEquals(t, "my-avro-cert-key", envMap[global.AvroTLSCertKey])
testutil.AssertEquals(t, "true", envMap[global.AvroTLSInsecure])
testutil.AssertEquals(t, "avro-user", envMap[global.AvroUsername])
testutil.AssertEquals(t, "avro-pass", envMap[global.AvroPassword])
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[global.ProtobufProtoSetFiles])
testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[global.ProtobufImportPaths])
testutil.AssertEquals(t, "message.proto other.proto", envMap[global.ProtobufProtoFiles])
Expand Down
4 changes: 2 additions & 2 deletions internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func (operation *Operation) Produce(topic string, flags Flags) error {

serializers := MessageSerializerChain{topic: topic}

if clientContext.AvroSchemaRegistry != "" {
if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
if err != nil {
return err
}
serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.AvroJSONCodec}
serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.Avro.JSONCodec}

serializers.serializers = append(serializers.serializers, serializer)
}
Expand Down
Loading