diff --git a/CHANGELOG.md b/CHANGELOG.md index a97d45c5..3f0a8e24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.4.0 + - Add support for setting Kafka message headers in output plugin [#162](https://github.com/logstash-plugins/logstash-integration-kafka/pull/162) + ## 11.3.4 - Fix "retries" and "value_serializer" error handling in output plugin (#160) [#160](https://github.com/logstash-plugins/logstash-integration-kafka/pull/160) diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index b95784d6..c6779b53 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -91,6 +91,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -275,6 +276,22 @@ to allow other records to be sent so that the sends can be batched together. The maximum size of a request +[id="plugins-{type}s-{plugin}-message_headers"] +===== `message_headers` + + * Value type is <> + ** Keys are header names, and must be <> + ** Values are header values, and must be <> + ** Values support interpolation from event field values + * There is no default value for this setting. + +A map of key value pairs, each corresponding to a header name and its value respectively. +Example: +[source,ruby] +---------------------------------- + message_header => { "event_timestamp" => "%{@timestamp}" } +---------------------------------- + [id="plugins-{type}s-{plugin}-message_key"] ===== `message_key` diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 762521f7..6c9de423 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -106,6 +106,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default # The key for the message config :message_key, :validate => :string + # Headers added to kafka message in the form of key-value pairs + config :message_headers, :validate => :hash, :default => {} # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60_000 # Partitioner to use - can be `default`, `uniform_sticky`, `round_robin` or a fully qualified class name of a custom partitioner. @@ -204,6 +206,11 @@ def register else raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer" end + @message_headers.each do |key, value| + if !key.is_a? String + raise LogStash::ConfigurationError, "'message_headers' contains a key that is not a string!" + end + end @producer = create_producer end @@ -315,6 +322,9 @@ def write_to_kafka(event, serialized_data) else record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data) end + @message_headers.each do |key, value| + record.headers().add(key, event.sprintf(value).to_java_bytes) + end prepare(record) rescue LogStash::ShutdownSignal logger.debug('producer received shutdown signal') diff --git a/logstash-integration-kafka.gemspec b/logstash-integration-kafka.gemspec index 1618975f..c88f6131 100644 --- a/logstash-integration-kafka.gemspec +++ b/logstash-integration-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-kafka' - s.version = '11.3.4' + s.version = '11.4.0' s.licenses = ['Apache-2.0'] s.summary = "Integration with Kafka - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ diff --git a/spec/integration/outputs/kafka_spec.rb b/spec/integration/outputs/kafka_spec.rb index b562c028..c5f5d72d 100644 --- a/spec/integration/outputs/kafka_spec.rb +++ b/spec/integration/outputs/kafka_spec.rb @@ -191,6 +191,25 @@ end end + context 'when setting message_headers' do + let(:num_events) { 10 } + let(:test_topic) { 'logstash_integration_topic4' } + + before :each do + config = base_config.merge({"topic_id" => test_topic, "message_headers" => {"event_timestamp" => "%{@timestamp}"}}) + load_kafka_data(config) + end + + it 'messages should contain headers' do + messages = fetch_messages(test_topic) + + expect(messages.size).to eq(num_events) + messages.each do |m| + expect(m.headers).to eq({"event_timestamp" => LogStash::Timestamp.at(0).to_s}) + end + end + end + context 'setting partitioner' do let(:test_topic) { 'logstash_integration_partitioner_topic' } let(:partitioner) { nil } diff --git a/spec/unit/outputs/kafka_spec.rb b/spec/unit/outputs/kafka_spec.rb index df5613eb..97fcb7c0 100644 --- a/spec/unit/outputs/kafka_spec.rb +++ b/spec/unit/outputs/kafka_spec.rb @@ -60,6 +60,16 @@ kafka.multi_receive([event]) end + it 'should support field referenced message_headers' do + expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new). + with("test", event.to_s).and_call_original + expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send) + expect_any_instance_of(org.apache.kafka.common.header.internals.RecordHeaders).to receive(:add).with("host","172.0.0.1".to_java_bytes).and_call_original + kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_headers" => { "host" => "%{host}"}})) + kafka.register + kafka.multi_receive([event]) + end + it 'should not raise config error when truststore location is not set and ssl is enabled' do kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL")) expect(org.apache.kafka.clients.producer.KafkaProducer).to receive(:new)