Skip to content

Commit

Permalink
Merge pull request #209 from deviceinsight/feature/avro-basic
Browse files Browse the repository at this point in the history
Allow configuring basicAuth for avro schema registry
  • Loading branch information
d-rk authored Aug 8, 2024
2 parents 13a0cae + b1ba517 commit 914a53f
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

## 5.1.0 - 2024-08-07
- [#209](https://github.com/deviceinsight/kafkactl/pull/209) Allow configuring basicAuth for avro schema registry

### Added
- [#207](https://github.com/deviceinsight/kafkactl/pull/207) Allow configuring TLS for avro schema registry
Expand Down
4 changes: 4 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ contexts:
# optional: timeout for requests (defaults to 5s)
requestTimeout: 10s
# optional: basic auth credentials
username: admin
password: admin
# optional: tls config for avro
tls:
enabled: true
Expand Down
16 changes: 16 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
_ = os.Setenv(global.KafkaVersion, "2.0.1")
_ = os.Setenv(global.AvroSchemaRegistry, "registry:8888")
_ = os.Setenv(global.AvroJSONCodec, "avro")
_ = os.Setenv(global.AvroRequestTimeout, "10")
_ = os.Setenv(global.AvroTLSEnabled, "true")
_ = os.Setenv(global.AvroTLSCa, "my-avro-ca")
_ = os.Setenv(global.AvroTLSCert, "my-avro-cert")
_ = os.Setenv(global.AvroTLSCertKey, "my-avro-cert-key")
_ = os.Setenv(global.AvroTLSInsecure, "true")
_ = os.Setenv(global.AvroUsername, "avro-user")
_ = os.Setenv(global.AvroPassword, "avro-pass")
_ = os.Setenv(global.ProtobufProtoSetFiles, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset")
_ = os.Setenv(global.ProtobufImportPaths, "/usr/include/protobuf /usr/lib/protobuf")
_ = os.Setenv(global.ProtobufProtoFiles, "message.proto other.proto")
Expand Down Expand Up @@ -102,6 +110,14 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {
testutil.AssertEquals(t, "2.0.1", viper.GetString("contexts.default.kafkaVersion"))
testutil.AssertEquals(t, "registry:8888", viper.GetString("contexts.default.avro.schemaRegistry"))
testutil.AssertEquals(t, "avro", viper.GetString("contexts.default.avro.jsonCodec"))
testutil.AssertEquals(t, "10", viper.GetString("contexts.default.avro.requestTimeout"))
testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.enabled"))
testutil.AssertEquals(t, "my-avro-ca", viper.GetString("contexts.default.avro.tls.ca"))
testutil.AssertEquals(t, "my-avro-cert", viper.GetString("contexts.default.avro.tls.cert"))
testutil.AssertEquals(t, "my-avro-cert-key", viper.GetString("contexts.default.avro.tls.certKey"))
testutil.AssertEquals(t, "true", viper.GetString("contexts.default.avro.tls.insecure"))
testutil.AssertEquals(t, "avro-user", viper.GetString("contexts.default.avro.username"))
testutil.AssertEquals(t, "avro-pass", viper.GetString("contexts.default.avro.password"))
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset", viper.GetStringSlice("contexts.default.protobuf.protosetFiles")[0])
testutil.AssertEquals(t, "/usr/include/protobuf", viper.GetStringSlice("contexts.default.protobuf.importPaths")[0])
testutil.AssertEquals(t, "message.proto", viper.GetStringSlice("contexts.default.protobuf.protoFiles")[0])
Expand Down
77 changes: 46 additions & 31 deletions internal/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type SaslConfig struct {
TokenProvider TokenProvider
}

type AvroConfig struct {
SchemaRegistry string
JSONCodec avro.JSONCodec
RequestTimeout time.Duration
TLS TLSConfig
Username string
Password string
}

type TLSConfig struct {
Enabled bool
CA string
Expand Down Expand Up @@ -78,21 +87,18 @@ type ProducerConfig struct {
}

type ClientContext struct {
Name string
Brokers []string
TLS TLSConfig
Sasl SaslConfig
Kubernetes K8sConfig
RequestTimeout time.Duration
ClientID string
KafkaVersion sarama.KafkaVersion
AvroSchemaRegistry string
AvroJSONCodec avro.JSONCodec
AvroRequestTimeout time.Duration
AvroTLS TLSConfig
Protobuf protobuf.SearchContext
Producer ProducerConfig
Consumer ConsumerConfig
Name string
Brokers []string
TLS TLSConfig
Sasl SaslConfig
Kubernetes K8sConfig
RequestTimeout time.Duration
ClientID string
KafkaVersion sarama.KafkaVersion
Avro AvroConfig
Protobuf protobuf.SearchContext
Producer ProducerConfig
Consumer ConsumerConfig
}

type Config struct {
Expand Down Expand Up @@ -131,14 +137,16 @@ func CreateClientContext() (ClientContext, error) {
} else {
return context, err
}
context.AvroSchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.AvroJSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.AvroRequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout")
context.AvroTLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled")
context.AvroTLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca")
context.AvroTLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert")
context.AvroTLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey")
context.AvroTLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure")
context.Avro.SchemaRegistry = viper.GetString("contexts." + context.Name + ".avro.schemaRegistry")
context.Avro.JSONCodec = avro.ParseJSONCodec(viper.GetString("contexts." + context.Name + ".avro.jsonCodec"))
context.Avro.RequestTimeout = viper.GetDuration("contexts." + context.Name + ".avro.requestTimeout")
context.Avro.TLS.Enabled = viper.GetBool("contexts." + context.Name + ".avro.tls.enabled")
context.Avro.TLS.CA = viper.GetString("contexts." + context.Name + ".avro.tls.ca")
context.Avro.TLS.Cert = viper.GetString("contexts." + context.Name + ".avro.tls.cert")
context.Avro.TLS.CertKey = viper.GetString("contexts." + context.Name + ".avro.tls.certKey")
context.Avro.TLS.Insecure = viper.GetBool("contexts." + context.Name + ".avro.tls.insecure")
context.Avro.Username = viper.GetString("contexts." + context.Name + ".avro.username")
context.Avro.Password = viper.GetString("contexts." + context.Name + ".avro.password")
context.Protobuf.ProtosetFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protosetFiles")
context.Protobuf.ProtoImportPaths = viper.GetStringSlice("contexts." + context.Name + ".protobuf.importPaths")
context.Protobuf.ProtoFiles = viper.GetStringSlice("contexts." + context.Name + ".protobuf.protoFiles")
Expand Down Expand Up @@ -254,29 +262,36 @@ func CreateClientConfig(context *ClientContext) (*sarama.Config, error) {

func CreateAvroSchemaRegistryClient(context *ClientContext) (srclient.ISchemaRegistryClient, error) {

timeout := context.AvroRequestTimeout
timeout := context.Avro.RequestTimeout

if context.AvroRequestTimeout <= 0 {
if context.Avro.RequestTimeout <= 0 {
timeout = 5 * time.Second
}

client := &http.Client{Timeout: timeout}
httpClient := &http.Client{Timeout: timeout}

if context.AvroTLS.Enabled {
if context.Avro.TLS.Enabled {
output.Debugf("avro TLS is enabled.")

tlsConfig, err := setupTLSConfig(context.AvroTLS)
tlsConfig, err := setupTLSConfig(context.Avro.TLS)
if err != nil {
return nil, errors.Wrap(err, "failed to setup avro tls config")
}

client.Transport = &http.Transport{
httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}

baseURL := avro.FormatBaseURL(context.AvroSchemaRegistry)
return srclient.CreateSchemaRegistryClientWithOptions(baseURL, client, 16), nil
baseURL := avro.FormatBaseURL(context.Avro.SchemaRegistry)
client := srclient.CreateSchemaRegistryClientWithOptions(baseURL, httpClient, 16)

if context.Avro.Username != "" {
output.Debugf("avro BasicAuth is enabled.")
client.SetCredentials(context.Avro.Username, context.Avro.Password)
}

return client, nil
}

func GetClientID(context *ClientContext, defaultPrefix string) string {
Expand Down
4 changes: 2 additions & 2 deletions internal/consume/consume-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (operation *Operation) Consume(topic string, flags Flags) error {

var deserializers MessageDeserializerChain

if clientContext.AvroSchemaRegistry != "" {
if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
if err != nil {
return err
}

deserializer := AvroMessageDeserializer{topic: topic, registry: CreateCachingSchemaRegistry(client),
jsonCodec: clientContext.AvroJSONCodec}
jsonCodec: clientContext.Avro.JSONCodec}

deserializers = append(deserializers, deserializer)
}
Expand Down
16 changes: 16 additions & 0 deletions internal/global/env-variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ const (
KafkaVersion = "KAFKAVERSION"
AvroSchemaRegistry = "AVRO_SCHEMAREGISTRY"
AvroJSONCodec = "AVRO_JSONCODEC"
AvroRequestTimeout = "AVRO_REQUESTTIMEOUT"
AvroTLSEnabled = "AVRO_TLS_ENABLED"
AvroTLSCa = "AVRO_TLS_CA"
AvroTLSCert = "AVRO_TLS_CERT"
AvroTLSCertKey = "AVRO_TLS_CERTKEY"
AvroTLSInsecure = "AVRO_TLS_INSECURE"
AvroUsername = "AVRO_USERNAME"
AvroPassword = "AVRO_PASSWORD"
ProtobufProtoSetFiles = "PROTOBUF_PROTOSETFILES"
ProtobufImportPaths = "PROTOBUF_IMPORTPATHS"
ProtobufProtoFiles = "PROTOBUF_PROTOFILES"
Expand All @@ -44,6 +52,14 @@ var EnvVariables = []string{
KafkaVersion,
AvroSchemaRegistry,
AvroJSONCodec,
AvroRequestTimeout,
AvroTLSEnabled,
AvroTLSCa,
AvroTLSCert,
AvroTLSCertKey,
AvroTLSInsecure,
AvroUsername,
AvroPassword,
ProtobufProtoSetFiles,
ProtobufImportPaths,
ProtobufProtoFiles,
Expand Down
12 changes: 10 additions & 2 deletions internal/k8s/k8s-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,16 @@ func parsePodEnvironment(context internal.ClientContext) []string {
envVariables = appendStringIfDefined(envVariables, global.RequestTimeout, context.RequestTimeout.String())
envVariables = appendStringIfDefined(envVariables, global.ClientID, context.ClientID)
envVariables = appendStringIfDefined(envVariables, global.KafkaVersion, context.KafkaVersion.String())
envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.AvroSchemaRegistry)
envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.AvroJSONCodec.String())
envVariables = appendStringIfDefined(envVariables, global.AvroSchemaRegistry, context.Avro.SchemaRegistry)
envVariables = appendStringIfDefined(envVariables, global.AvroJSONCodec, context.Avro.JSONCodec.String())
envVariables = appendStringIfDefined(envVariables, global.AvroRequestTimeout, context.Avro.RequestTimeout.String())
envVariables = appendBool(envVariables, global.AvroTLSEnabled, context.Avro.TLS.Enabled)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCa, context.Avro.TLS.CA)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCert, context.Avro.TLS.Cert)
envVariables = appendStringIfDefined(envVariables, global.AvroTLSCertKey, context.Avro.TLS.CertKey)
envVariables = appendBool(envVariables, global.AvroTLSInsecure, context.Avro.TLS.Insecure)
envVariables = appendStringIfDefined(envVariables, global.AvroUsername, context.Avro.Username)
envVariables = appendStringIfDefined(envVariables, global.AvroPassword, context.Avro.Password)
envVariables = appendStrings(envVariables, global.ProtobufProtoSetFiles, context.Protobuf.ProtosetFiles)
envVariables = appendStrings(envVariables, global.ProtobufImportPaths, context.Protobuf.ProtoImportPaths)
envVariables = appendStrings(envVariables, global.ProtobufProtoFiles, context.Protobuf.ProtoFiles)
Expand Down
20 changes: 18 additions & 2 deletions internal/k8s/k8s-operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
context.Sasl.TokenProvider.Options["int-key"] = 12
context.ClientID = "my-client"
context.KafkaVersion = sarama.V2_0_1_0
context.AvroSchemaRegistry = "registry:8888"
context.AvroJSONCodec = avro.Avro
context.Avro.SchemaRegistry = "registry:8888"
context.Avro.JSONCodec = avro.Avro
context.Avro.RequestTimeout = 10 * time.Second
context.Avro.TLS.Enabled = true
context.Avro.TLS.CA = "my-avro-ca"
context.Avro.TLS.Cert = "my-avro-cert"
context.Avro.TLS.CertKey = "my-avro-cert-key"
context.Avro.TLS.Insecure = true
context.Avro.Username = "avro-user"
context.Avro.Password = "avro-pass"
context.Protobuf.ProtosetFiles = []string{"/usr/include/protosets/ps1.protoset", "/usr/lib/ps2.protoset"}
context.Protobuf.ProtoImportPaths = []string{"/usr/include/protobuf", "/usr/lib/protobuf"}
context.Protobuf.ProtoFiles = []string{"message.proto", "other.proto"}
Expand Down Expand Up @@ -76,6 +84,14 @@ func TestAllAvailableEnvironmentVariablesAreParsed(t *testing.T) {
testutil.AssertEquals(t, "2.0.1", envMap[global.KafkaVersion])
testutil.AssertEquals(t, "registry:8888", envMap[global.AvroSchemaRegistry])
testutil.AssertEquals(t, "avro", envMap[global.AvroJSONCodec])
testutil.AssertEquals(t, "10s", envMap[global.AvroRequestTimeout])
testutil.AssertEquals(t, "true", envMap[global.AvroTLSEnabled])
testutil.AssertEquals(t, "my-avro-ca", envMap[global.AvroTLSCa])
testutil.AssertEquals(t, "my-avro-cert", envMap[global.AvroTLSCert])
testutil.AssertEquals(t, "my-avro-cert-key", envMap[global.AvroTLSCertKey])
testutil.AssertEquals(t, "true", envMap[global.AvroTLSInsecure])
testutil.AssertEquals(t, "avro-user", envMap[global.AvroUsername])
testutil.AssertEquals(t, "avro-pass", envMap[global.AvroPassword])
testutil.AssertEquals(t, "/usr/include/protosets/ps1.protoset /usr/lib/ps2.protoset", envMap[global.ProtobufProtoSetFiles])
testutil.AssertEquals(t, "/usr/include/protobuf /usr/lib/protobuf", envMap[global.ProtobufImportPaths])
testutil.AssertEquals(t, "message.proto other.proto", envMap[global.ProtobufProtoFiles])
Expand Down
4 changes: 2 additions & 2 deletions internal/producer/producer-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func (operation *Operation) Produce(topic string, flags Flags) error {

serializers := MessageSerializerChain{topic: topic}

if clientContext.AvroSchemaRegistry != "" {
if clientContext.Avro.SchemaRegistry != "" {
client, err := internal.CreateAvroSchemaRegistryClient(&clientContext)
if err != nil {
return err
}
serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.AvroJSONCodec}
serializer := AvroMessageSerializer{topic: topic, client: client, jsonCodec: clientContext.Avro.JSONCodec}

serializers.serializers = append(serializers.serializers, serializer)
}
Expand Down

0 comments on commit 914a53f

Please sign in to comment.