diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 5c37688..42022e5 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -131,6 +131,8 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -563,6 +565,20 @@ to a given topic partition. This avoids repeated fetching-and-failing in a tight The SASL client callback handler class the specified SASL mechanism should use. +[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class""] +===== `sasl_login_callback_handler_class` +* Value type is <> +* There is no default value for this setting. + +The SASL login callback handler class the specified SASL mechanism should use. + +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url""] +===== `sasl_oauthbearer_token_endpoint_url` +* Value type is <> +* There is no default value for this setting. + +The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server for integration with OAuth 2.0 identity providers. + [id="plugins-{type}s-{plugin}-sasl_jaas_config"] ===== `sasl_jaas_config` diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 716d3dd..990a096 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -102,6 +102,8 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -399,6 +401,20 @@ The amount of time to wait before attempting to retry a failed produce request t The SASL client callback handler class the specified SASL mechanism should use. +[id="plugins-{type}s-{plugin}-sasl_login_callback_handler_class""] +===== `sasl_login_callback_handler_class` +* Value type is <> +* There is no default value for this setting. + +The SASL login callback handler class the specified SASL mechanism should use. + +[id="plugins-{type}s-{plugin}-sasl_oauthbearer_token_endpoint_url""] +===== `sasl_oauthbearer_token_endpoint_url` +* Value type is <> +* There is no default value for this setting. + +The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server for integration with OAuth 2.0 identity providers. + [id="plugins-{type}s-{plugin}-sasl_jaas_config"] ===== `sasl_jaas_config` diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 604bc68..0754f6c 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -210,6 +210,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT" # SASL client callback handler class config :sasl_client_callback_handler_class, :validate => :string + # SASL login callback handler class + config :sasl_login_callback_handler_class, :validate => :string + # The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server. + config :sasl_oauthbearer_token_endpoint_url, :validate => :string # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. # This may be any mechanism for which a security provider is available. # GSSAPI is the default mechanism. diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index ebf233f..1dabd1d 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -149,6 +149,10 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :security_protocol, :validate => ["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"], :default => "PLAINTEXT" # SASL client callback handler class config :sasl_client_callback_handler_class, :validate => :string + # SASL login callback handler class + config :sasl_login_callback_handler_class, :validate => :string + # The URL where the Kafka client requests OAuth 2.0 tokens from an authorization server. + config :sasl_oauthbearer_token_endpoint_url, :validate => :string # http://kafka.apache.org/documentation.html#security_sasl[SASL mechanism] used for client connections. # This may be any mechanism for which a security provider is available. # GSSAPI is the default mechanism. @@ -363,6 +367,8 @@ def create_producer props.put(kafka::VALUE_SERIALIZER_CLASS_CONFIG, value_serializer) props.put("security.protocol", security_protocol) unless security_protocol.nil? + props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil? + props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil? if security_protocol == "SSL" set_trustore_keystore_config(props) diff --git a/lib/logstash/plugin_mixins/kafka/common.rb b/lib/logstash/plugin_mixins/kafka/common.rb index 1ae8546..09d14e7 100644 --- a/lib/logstash/plugin_mixins/kafka/common.rb +++ b/lib/logstash/plugin_mixins/kafka/common.rb @@ -42,6 +42,8 @@ def set_sasl_config(props) props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil? + props.put("sasl.login.callback.handler.class", sasl_login_callback_handler_class) unless sasl_login_callback_handler_class.nil? + props.put("sasl.oauthbearer.token.endpoint.url", sasl_oauthbearer_token_endpoint_url) unless sasl_oauthbearer_token_endpoint_url.nil? end def reassign_dns_lookup