Skip to content

Commit

Permalink
Merge pull request #162 from LaithSiriani/main
Browse files Browse the repository at this point in the history
Kafka Output support manipulating kafka message headers.
  • Loading branch information
mashhurs authored Apr 9, 2024
2 parents 5cbe4eb + dcd49da commit e521d0c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.4.0
- Add support for setting Kafka message headers in output plugin [#162](https://github.com/logstash-plugins/logstash-integration-kafka/pull/162)

## 11.3.4
- Fix "retries" and "value_serializer" error handling in output plugin (#160) [#160](https://github.com/logstash-plugins/logstash-integration-kafka/pull/160)

Expand Down
17 changes: 17 additions & 0 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-key_serializer>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-linger_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-max_request_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-message_header>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-message_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-metadata_fetch_timeout_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-metadata_max_age_ms>> |<<number,number>>|No
Expand Down Expand Up @@ -275,6 +276,22 @@ to allow other records to be sent so that the sends can be batched together.

The maximum size of a request

[id="plugins-{type}s-{plugin}-message_headers"]
===== `message_headers`

* Value type is <<hash,hash>>
** Keys are header names, and must be <<string,string>>
** Values are header values, and must be <<string,string>>
** Values support interpolation from event field values
* There is no default value for this setting.

A map of key value pairs, each corresponding to a header name and its value respectively.
Example:
[source,ruby]
----------------------------------
message_header => { "event_timestamp" => "%{@timestamp}" }
----------------------------------

[id="plugins-{type}s-{plugin}-message_key"]
===== `message_key`

Expand Down
10 changes: 10 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default
# The key for the message
config :message_key, :validate => :string
# Headers added to kafka message in the form of key-value pairs
config :message_headers, :validate => :hash, :default => {}
# the timeout setting for initial metadata request to fetch topic metadata.
config :metadata_fetch_timeout_ms, :validate => :number, :default => 60_000
# Partitioner to use - can be `default`, `uniform_sticky`, `round_robin` or a fully qualified class name of a custom partitioner.
Expand Down Expand Up @@ -204,6 +206,11 @@ def register
else
raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
end
@message_headers.each do |key, value|
if !key.is_a? String
raise LogStash::ConfigurationError, "'message_headers' contains a key that is not a string!"
end
end
@producer = create_producer
end

Expand Down Expand Up @@ -315,6 +322,9 @@ def write_to_kafka(event, serialized_data)
else
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
end
@message_headers.each do |key, value|
record.headers().add(key, event.sprintf(value).to_java_bytes)
end
prepare(record)
rescue LogStash::ShutdownSignal
logger.debug('producer received shutdown signal')
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.3.4'
s.version = '11.4.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
19 changes: 19 additions & 0 deletions spec/integration/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,25 @@
end
end

context 'when setting message_headers' do
let(:num_events) { 10 }
let(:test_topic) { 'logstash_integration_topic4' }

before :each do
config = base_config.merge({"topic_id" => test_topic, "message_headers" => {"event_timestamp" => "%{@timestamp}"}})
load_kafka_data(config)
end

it 'messages should contain headers' do
messages = fetch_messages(test_topic)

expect(messages.size).to eq(num_events)
messages.each do |m|
expect(m.headers).to eq({"event_timestamp" => LogStash::Timestamp.at(0).to_s})
end
end
end

context 'setting partitioner' do
let(:test_topic) { 'logstash_integration_partitioner_topic' }
let(:partitioner) { nil }
Expand Down
10 changes: 10 additions & 0 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@
kafka.multi_receive([event])
end

it 'should support field referenced message_headers' do
expect(org.apache.kafka.clients.producer.ProducerRecord).to receive(:new).
with("test", event.to_s).and_call_original
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
expect_any_instance_of(org.apache.kafka.common.header.internals.RecordHeaders).to receive(:add).with("host","172.0.0.1".to_java_bytes).and_call_original
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge({"message_headers" => { "host" => "%{host}"}}))
kafka.register
kafka.multi_receive([event])
end

it 'should not raise config error when truststore location is not set and ssl is enabled' do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL"))
expect(org.apache.kafka.clients.producer.KafkaProducer).to receive(:new)
Expand Down

0 comments on commit e521d0c

Please sign in to comment.