Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document how a Kubernetes-style liveness probe may be implemented #309

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,107 @@ E, [2022-10-09T11:28:29.976548 #15] ERROR -- : (try 5/10): Error for topic subsc

Please see [Compression](#compression)

### Consumer rebalance listener

You may also want to set a consumer rebalance listener for whatever reason. A very simple listener that simply logs the partition assignment and rebalance events would look like this:

```ruby
class MyConsumerRebalanceListener
attr_reader :logger

def initialize(logger)
@logger = logger
end

def on_partitions_assigned(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions assigned: #{partitions}")
end

def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions revoked: #{partitions}")
end
end

Racecar.config.consumer_rebalance_listener do
MyConsumerRebalanceListener.new(logger)
end
```

Note that the exact requirements for the consumer rebalance listener interface are set by rdkafka-ruby and may change from version to version.

#### Liveness probe

You can also take advantage of the consumer rebalance listener to implement a liveness probe for Kubernetes.

The core idea is detecting lack of progress in specific partitions by 1. updating the timestamp on files representing activity in a partition and 2. checking the timestamp of said files in a Kubernetes probe.
Since partitions can be rebalanced for a variety of reasons, some housekeeping is necessary and the consumer rebalance listener can be used for that purpose. That will avoid false positives due to partitions that have been re-assigned to a different Kafka consumer.

``` ruby
require 'fileutils'

class MyConsumer < Racecar::Consumer
def process(message)
do_stuff(message)
liveness_probe(message.partition)
end

private

def liveness_probe(partition)
FileUtils.touch("/app/tmp/kafka_partition_#{partition}_alive")
end
end
```

``` ruby
class RebalanceListener
def on_partitions_assigned(_consumer, list)
# Clean up every liveness file that was carried from the consumer's previous life
Dir['/app/tmp/kafka_partition_*'].each { |f| File.delete(f) }

partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions assigned: #{partitions}")

# Ensure new files are created to detect cases where the consumer didn't process
# any messages from a newly-assigned partition
partitions.each do |partition|
File.new("/app/tmp/kafka_partition_#{partition}_alive", 'w')
end
end

def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions revoked: #{partitions}")
end
end

Racecar.config.consumer_rebalance_listener do
RebalanceListener.new
end
```

Something like the following bash script could be used by Kubernetes to detect (and log) when progress has been made in specific partitions (tweak `MAX_AGE_MINUTES` to taste):

``` bash
#!/usr/bin/env bash

set -euo pipefail

MAX_AGE_MINUTES=5
STUCK_PARTITIONS=$(
find /app/tmp/ -name 'kafka_partition_*_alive' -mmin "+$MAX_AGE_MINUTES" | \
sed -e 's|/app/tmp/||' -e 's|_alive||' -e 's|kafka_partition_||'
)

if [ -n "$STUCK_PARTITIONS" ]; then
echo -n "Found stuck partitions:" >&2
echo "$STUCK_PARTITIONS" | tr '\n' ', ' | sed 's/,$/\n/' >&2
exit 1
fi
```

### Logging

By default, Racecar will log to `STDOUT`. If you're using Rails, your application code will use whatever logger you've configured there.
Expand Down
9 changes: 7 additions & 2 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ class Config < KingKonf::Config
for backward compatibility, however this can be quite memory intensive"
integer :statistics_interval, default: 1

# The error handler must be set directly on the object.
attr_reader :error_handler
# The error handler and consumer rebalance listener must be set directly on the object.
attr_reader :error_handler, :consumer_rebalance_listener

attr_accessor :subscriptions, :logger, :parallel_workers

Expand All @@ -187,6 +187,7 @@ def max_wait_time_ms
def initialize(env: ENV)
super(env: env)
@error_handler = proc {}
@consumer_rebalance_listener = nil
@subscriptions = []
@logger = Logger.new(STDOUT)
end
Expand Down Expand Up @@ -234,6 +235,10 @@ def on_error(&handler)
@error_handler = handler
end

def consumer_rebalance_listener(&handler)
@consumer_rebalance_listener = handler
end

def rdkafka_consumer
consumer_config = consumer.map do |param|
param.split("=", 2).map(&:strip)
Expand Down
10 changes: 9 additions & 1 deletion lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,18 @@ def close

def current
@consumers[@consumer_id_iterator.peek] ||= begin
consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer
consumer_config = Rdkafka::Config.new(rdkafka_config(current_subscription))

if @config.consumer_rebalance_listener.respond_to?(:call)
consumer_config.consumer_rebalance_listener = @config.consumer_rebalance_listener.call
end

consumer = consumer_config.consumer

@instrumenter.instrument('join_group') do
consumer.subscribe current_subscription.topic
end

consumer
end
end
Expand Down