Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add producer config to enable producer idempotence #152

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-buffer_memory>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-client_dns_lookup>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-client_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-enable_idempotence>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-compression_type>> |<<string,string>>, one of `["none", "gzip", "snappy", "lz4", "zstd"]`|No
| <<plugins-{type}s-{plugin}-connections_max_idle_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-kerberos_config>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-key_serializer>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-linger_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_in_flight_requests_per_connection>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_request_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-message_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-metadata_fetch_timeout_ms>> |<<number,number>>|No
Expand Down Expand Up @@ -197,6 +199,22 @@ The id string to pass to the server when making requests.
The purpose of this is to be able to track the source of requests beyond just
ip/port by allowing a logical application name to be included with the request

[id="plugins-{type}s-{plugin}-enable_idempotence"]
===== `enable_idempotence`

* Value type is <<boolean,boolean>>
* There is no default value for this setting.

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream.
If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
Note that enabling idempotence requires `max.in.flight.requests.per.connection` to be less than or equal to 5
(with message ordering preserved for any allowable value), `retries` to be greater than 0, and `acks` must be 'all'.

Idempotence is enabled by default if no conflicting configurations are set.
If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
If idempotence is explicitly enabled and conflicting configurations are set,
a https://kafka.apache.org/{kafka_client_doc}/javadoc/org/apache/kafka/common/config/ConfigException.html[ConfigException] is thrown.

[id="plugins-{type}s-{plugin}-compression_type"]
===== `compression_type`

Expand Down Expand Up @@ -267,6 +285,19 @@ This setting accomplishes this by adding a small amount of artificial delay—th
rather than immediately sending out a record the producer will wait for up to the given delay
to allow other records to be sent so that the sends can be batched together.

[id="plugins-{type}s-{plugin}-max_in_flight_requests_per_connection"]
===== `max_in_flight_requests_per_connection`

* Value type is <<number,number>>
* Default value is `5`.

The maximum number of unacknowledged requests the client will send on a single connection before blocking.
Note that if this configuration is set to be greater than 1 and `enable.idempotence` is set to false,
there is a risk of message reordering after a failed send due to retries (i.e., if retries are enabled);
if retries are disabled or if `enable.idempotence` is set to true, ordering will be preserved.
Additionally, enabling idempotence requires the value of this configuration to be less than or equal to 5.
If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.

[id="plugins-{type}s-{plugin}-max_request_size"]
===== `max_request_size`

Expand Down
9 changes: 9 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
# The purpose of this is to be able to track the source of requests beyond just
# ip/port by allowing a logical application name to be included with the request
config :client_id, :validate => :string
# When set to ‘true’, the producer will ensure that exactly one copy of each message is written in the stream.
# If ‘false’, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
# Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5
# (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be ‘all’.
config :enable_idempotence, :validate => :boolean
# Serializer class for the key of the message
config :key_serializer, :validate => :string, :default => 'org.apache.kafka.common.serialization.StringSerializer'
# The producer groups together any records that arrive in between request
Expand All @@ -102,6 +107,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
# rather than immediately sending out a record the producer will wait for up to the given delay
# to allow other records to be sent so that the sends can be batched together.
config :linger_ms, :validate => :number, :default => 0 # Kafka default
# The maximum number of unacknowledged requests the client will send on a single connection before blocking.
config :max_in_flight_requests_per_connection, :validate => :number, :default => 5 # Kafka default
# The maximum size of a request
config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default
# The key for the message
Expand Down Expand Up @@ -334,8 +341,10 @@ def create_producer
props.put(kafka::COMPRESSION_TYPE_CONFIG, compression_type)
props.put(kafka::CLIENT_DNS_LOOKUP_CONFIG, client_dns_lookup)
props.put(kafka::CLIENT_ID_CONFIG, client_id) unless client_id.nil?
props.put(kafka::ENABLE_IDEMPOTENCE_CONFIG, enable_idempotence.to_s) unless enable_idempotence.nil?
props.put(kafka::KEY_SERIALIZER_CLASS_CONFIG, key_serializer)
props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s)
props.put(kafka::MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, max_in_flight_requests_per_connection.to_s)
props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s)
props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms.to_s) unless metadata_max_age_ms.nil?
unless partitioner.nil?
Expand Down