Skip to content

Commit

Permalink
fix "retries" and "value_serializer" error handling in output plugin (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvd authored Feb 6, 2024
1 parent 1d9763d commit 5cbe4eb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.3.4
- Fix "retries" and "value_serializer" error handling in output plugin (#160) [#160](https://github.com/logstash-plugins/logstash-integration-kafka/pull/160)

## 11.3.3
- Fix "Can't modify frozen string" error when record value is `nil` (tombstones) [#155](https://github.com/logstash-plugins/logstash-integration-kafka/pull/155)

Expand Down
6 changes: 3 additions & 3 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,14 @@ def register

if !@retries.nil?
if @retries < 0
raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
raise LogStash::ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0"
end

logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries)
end

reassign_dns_lookup

@producer = create_producer
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
@codec.on_event do |event, data|
write_to_kafka(event, data)
Expand All @@ -203,8 +202,9 @@ def register
write_to_kafka(event, data.to_java_bytes)
end
else
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
raise LogStash::ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
end
@producer = create_producer
end

def prepare(record)
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.3.3'
s.version = '11.3.4'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
20 changes: 20 additions & 0 deletions spec/unit/outputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,26 @@
kafka.multi_receive([event])
end
end
context 'when retries is -1' do
let(:retries) { -1 }

it "should raise a Configuration error" do
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
expect { kafka.register }.to raise_error(LogStash::ConfigurationError)
end
end
end

describe "value_serializer" do
let(:output) { LogStash::Plugin.lookup("output", "kafka").new(config) }

context "when a random string is set" do
let(:config) { { "topic_id" => "random", "value_serializer" => "test_string" } }

it "raises a ConfigurationError" do
expect { output.register }.to raise_error(LogStash::ConfigurationError)
end
end
end

context 'when ssl endpoint identification disabled' do
Expand Down

0 comments on commit 5cbe4eb

Please sign in to comment.