Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add producer config to enable producer idempotence #152

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hellopeera
Copy link

Release notes

Add Kafka producer configs enable_idempotence and max_in_flight_requests_per_connection to help ensure that exactly one copy of each message is written in the stream in face of producer retries

What does this PR do?

With producer idempotence, it ensures that duplicates are not introduced due to unexpected retries due to some intermittent issue e.g. network problem, etc.

This PR exposes the Kafka producer configurations: enable_idempotence and max_in_flight_requests_per_connection which is optional to Logstash users.

How to enable idempotence for Kafka producer

Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

How is this PR test locally

Runs a local Kafka cluster

Launch the test Kafka script

./kafka_test_setup.sh

Connect a consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logstash_integration_enable_idempotence_topic

Setup Logstash Kafka output & run

Install the plugin in development mode

bin/logstash-plugin install --no-verify /path/to/logstash_plugins/ogstash-integration-kafka/logstash-integration-kafka-*.gem

Run logstash

bin/logstash -e 'input { stdin { } } output { kafka { topic_id => "logstash_integration_enable_idempotence_topic" enable_idempotence => "true" acks => "all" max_in_flight_requests_per_connection => 5 } }'

Verify if Logstash producer config is according to what is configured

Observe Kafka producer config printed on Logstash console

acks = -1
enable.idempotence = true
max.in.flight.requests.per.connection = 5

Verify if Logstash is able to producing messages

From the Logstash console, enter some messages via stdin. Verify the logstash consumer console if the message is received.

Related issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants