Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS IAM authentication #126

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,32 @@ dependencies {
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'org.lz4:lz4-java:1.7.1'
implementation 'org.xerial.snappy:snappy-java:1.1.8.4'

implementation('software.amazon.msk:aws-msk-iam-auth:1.1.4') {
because 'AWS MSK IAM authentication support'
}
}
task generateGemJarRequiresFile {
doLast {

task vendor {
doLast{
File jars_file = file('lib/logstash-integration-kafka_jars.rb')
String vendorPathPrefix = "vendor/jar-dependencies"
jars_file.newWriter().withWriter { w ->
w << "# AUTOGENERATED BY THE GRADLE SCRIPT. DO NOT EDIT.\n\n"
w << "require \'jar_dependencies\'\n"
configurations.runtimeClasspath.allDependencies.each {
w << "require_jar(\'${it.group}\', \'${it.name}\', \'${it.version}\')\n"
}
}
}
}
configurations.runtimeClasspath.each {
// File path in gradle cache contains the dependency group/name/version
// Eg: .gradle/caches/.../.../io.confluent/common-utils/6.2.2/<sha1-hash>/common-utils-6.2.2.jar
String version = it.getParentFile().getParentFile().getName()
String name = it.getParentFile().getParentFile().getParentFile().getName()
String group = it.getParentFile().getParentFile().getParentFile().getParentFile().getName()

task vendor {
doLast {
String vendorPathPrefix = "vendor/jar-dependencies"
configurations.runtimeClasspath.allDependencies.each { dep ->
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}/${dep.name}/${dep.version}") }.singleFile
String groupPath = dep.group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}/${groupPath}/${dep.name}/${dep.version}/${dep.name}-${dep.version}.jar")
newJarFile.mkdirs()
Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
w << "require_jar(\'${group}\', \'${name}\', \'${version}\')\n"
String groupPath = group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}/${groupPath}/${name}/${version}/${name}-${version}.jar")
newJarFile.mkdirs()
Files.copy(it.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
}
}
}
}

vendor.dependsOn(generateGemJarRequiresFile)
23 changes: 22 additions & 1 deletion docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ For more information see https://kafka.apache.org/{kafka_client_doc}/documentati

Kafka consumer configuration: https://kafka.apache.org/{kafka_client_doc}/documentation.html#consumerconfigs

==== AWS MSK IAM authentication
If you use AWS MSK, the AWS MSK IAM access control enables you to handle both authentication and authorization for your MSK cluster with AWS IAM.
For more information on this AWS MSK feature see the https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html[AWS documentation].

To use this Kafka input with AWS MSK IAM authentication, set the following configuration:
```
security_protocol => "SASL_SSL"
sasl_mechanism => "AWS_MSK_IAM"
sasl_jaas_config => "software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl_client_callback_handler_class => "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
```
For more IAM authentication configurations, see the https://github.com/aws/aws-msk-iam-auth[AWS MSK IAM authentication library documentation].

==== Metadata fields

The following metadata from Kafka broker are added under the `[@metadata]` field:
Expand Down Expand Up @@ -131,6 +144,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_registry_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schema_registry_proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-schema_registry_secret>> |<<string,string>>|No
Expand Down Expand Up @@ -542,9 +556,16 @@ This can be defined either in Kafka's JAAS config or in Kafka's config.
* Default value is `"GSSAPI"`

http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
This may be any mechanism for which a security provider is available.
This may be any mechanism for which a security provider is available. For AWS MSK IAM authentication use `AWS_MSK_IAM`.
GSSAPI is the default mechanism.

[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
===== `sasl_client_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-schema_registry_key"]
===== `schema_registry_key`

Expand Down
23 changes: 22 additions & 1 deletion docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ https://kafka.apache.org/{kafka_client_doc}/documentation.html#producerconfigs

NOTE: This plugin does not support using a proxy when communicating to the Kafka broker.

==== AWS MSK IAM authentication
If you use AWS MSK, the AWS MSK IAM access control enables you to handle both authentication and authorization for your MSK cluster with AWS IAM.
For more information on this AWS MSK feature see the https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html[AWS documentation].

To use this Kafka input with AWS MSK IAM authentication, set the following configuration:
```
security_protocol => "SASL_SSL"
sasl_mechanism => "AWS_MSK_IAM"
sasl_jaas_config => "software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl_client_callback_handler_class => "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
```
For more IAM authentication configurations, see the https://github.com/aws/aws-msk-iam-auth[AWS MSK IAM authentication library documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Kafka Output Configuration Options

Expand Down Expand Up @@ -103,6 +116,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-sasl_jaas_config>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_kerberos_service_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_mechanism>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sasl_client_callback_handler_class>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-security_protocol>> |<<string,string>>, one of `["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]`|No
| <<plugins-{type}s-{plugin}-send_buffer_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-ssl_endpoint_identification_algorithm>> |<<string,string>>|No
Expand Down Expand Up @@ -402,9 +416,16 @@ This can be defined either in Kafka's JAAS config or in Kafka's config.
* Default value is `"GSSAPI"`

http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections.
This may be any mechanism for which a security provider is available.
This may be any mechanism for which a security provider is available. For AWS MSK IAM authentication use `AWS_MSK_IAM`.
GSSAPI is the default mechanism.

[id="plugins-{type}s-{plugin}-sasl_client_callback_handler_class""]
===== `sasl_client_callback_handler_class`
* Value type is <<string,string>>
* There is no default value for this setting.

The SASL client callback handler class the specified SASL mechanism should use.

[id="plugins-{type}s-{plugin}-security_protocol"]
===== `security_protocol`

Expand Down
21 changes: 21 additions & 0 deletions lib/logstash-integration-kafka_jars.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,24 @@
require_jar('org.slf4j', 'slf4j-api', '1.7.36')
require_jar('org.lz4', 'lz4-java', '1.7.1')
require_jar('org.xerial.snappy', 'snappy-java', '1.1.8.4')
require_jar('com.amazonaws', 'aws-java-sdk-core', '1.12.290')
require_jar('com.amazonaws', 'aws-java-sdk-sts', '1.12.290')
require_jar('software.amazon.awssdk', 'annotations', '2.17.267')
require_jar('software.amazon.awssdk', 'apache-client', '2.17.267')
require_jar('software.amazon.awssdk', 'auth', '2.17.267')
require_jar('software.amazon.awssdk', 'aws-core', '2.17.267')
require_jar('software.amazon.awssdk', 'aws-json-protocol', '2.17.267')
require_jar('software.amazon.awssdk', 'aws-query-protocol', '2.17.267')
require_jar('software.amazon.awssdk', 'http-client-spi', '2.17.267')
require_jar('software.amazon.awssdk', 'json-utils', '2.17.267')
require_jar('software.amazon.awssdk', 'metrics-spi', '2.17.267')
require_jar('software.amazon.awssdk', 'netty-nio-client', '2.17.267')
require_jar('software.amazon.awssdk', 'profiles', '2.17.267')
require_jar('software.amazon.awssdk', 'protocol-core', '2.17.267')
require_jar('software.amazon.awssdk', 'regions', '2.17.267')
require_jar('software.amazon.awssdk', 'sdk-core', '2.17.267')
require_jar('software.amazon.awssdk', 'sso', '2.17.267')
require_jar('software.amazon.awssdk', 'sts', '2.17.267')
require_jar('software.amazon.awssdk', 'third-party-jackson-core', '2.17.267')
require_jar('software.amazon.awssdk', 'utils', '2.17.267')
require_jar('software.amazon.msk', 'aws-msk-iam-auth', '1.1.4')
4 changes: 3 additions & 1 deletion lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
config :jaas_path, :validate => :path
# JAAS configuration settings. This allows JAAS config to be a part of the plugin configuration and allows for different JAAS configuration per each plugin config.
config :sasl_jaas_config, :validate => :string
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# Optional path to kerberos config file. This is krb5.conf style as detailed in https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html
config :kerberos_config, :validate => :path
# Option to add Kafka metadata like topic, message size and header key values to the event.
Expand Down Expand Up @@ -420,7 +422,7 @@ def create_consumer(client_id)
props.put(kafka::SEND_BUFFER_CONFIG, send_buffer_bytes.to_s) unless send_buffer_bytes.nil?
props.put(kafka::SESSION_TIMEOUT_MS_CONFIG, session_timeout_ms.to_s) unless session_timeout_ms.nil?
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, value_deserializer_class)
props.put(kafka::CLIENT_RACK_CONFIG, client_rack) unless client_rack.nil?
props.put(kafka::CLIENT_RACK_CONFIG, client_rack) unless client_rack.nil?

props.put("security.protocol", security_protocol) unless security_protocol.nil?
if schema_registry_url
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :jaas_path, :validate => :path
# JAAS configuration settings. This allows JAAS config to be a part of the plugin configuration and allows for different JAAS configuration per each plugin config.
config :sasl_jaas_config, :validate => :string
# SASL client callback handler class
config :sasl_client_callback_handler_class, :validate => :string
# Optional path to kerberos config file. This is krb5.conf style as detailed in https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html
config :kerberos_config, :validate => :path

Expand Down
3 changes: 2 additions & 1 deletion lib/logstash/plugin_mixins/kafka/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def set_sasl_config(props)

props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil?
props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil?
props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil?
end

end
end end end
end end end