Skip to content

Commit

Permalink
Add an example Kubernetes liveness probe to the README
Browse files Browse the repository at this point in the history
  • Loading branch information
meqif committed Jan 20, 2023
1 parent a0b1cd4 commit bd99bb7
Showing 1 changed file with 86 additions and 5 deletions.
91 changes: 86 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,23 +528,104 @@ Please see [Compression](#compression)

### Consumer rebalance listener

You may also want to set a consuemr rebalance listener.
You may also want to set a consumer rebalance listener for whatever reason. A very simple listener that simply logs the partition assignment and rebalance events would look like this:

```ruby
class MyConsumerRebalanceListener
def on_partitions_assigned(_consumer, _list)
attr_reader :logger
def initialize(logger)
@logger = logger
end
def on_partitions_assigned(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions assigned: #{partitions}")
end
def on_partitions_revoked(_consumer, _list)
def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
logger.warn("Partitions revoked: #{partitions}")
end
end
Racecar.config.consumer_rebalance_listener do
MyConsumerRebalanceListener.new
MyConsumerRebalanceListener.new(logger)
end
```

Note that the exact requirements for the consumer rebalance listener are set by rdkafka-ruby and may change from version to version.
Note that the exact requirements for the consumer rebalance listener interface are set by rdkafka-ruby and may change from version to version.

#### Liveness probe

You can also take advantage of the consumer rebalance listener to implement a liveness probe for Kubernetes.

The core idea is detecting lack of progress in specific partitions by 1. updating the timestamp on files representing activity in a partition and 2. checking the timestamp of said files in a Kubernetes probe.
Since partitions can be rebalanced for a variety of reasons, some housekeeping is necessary and the consumer rebalance listener can be used for that purpose. That will avoid false positives due to partitions that have been re-assigned to a different Kafka consumer.

``` ruby
require 'fileutils'
class MyConsumer < Racecar::Consumer
def process(message)
do_stuff(message)
liveness_probe(message.partition)
end
private
def liveness_probe(partition)
FileUtils.touch("/app/tmp/kafka_partition_#{partition}_alive")
end
end
```

``` ruby
class RebalanceListener
def on_partitions_assigned(_consumer, list)
# Clean up every liveness file that was carried from the consumer's previous life
Dir['/app/tmp/kafka_partition_*'].each { |f| File.delete(f) }
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions assigned: #{partitions}")
# Ensure new files are created to detect cases where the consumer didn't process
# any messages from a newly-assigned partition
partitions.each do |partition|
File.new("/app/tmp/kafka_partition_#{partition}_alive", 'w')
end
end
def on_partitions_revoked(_consumer, list)
partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten
Rails.logger.warn("Partitions revoked: #{partitions}")
end
end
Racecar.config.consumer_rebalance_listener do
RebalanceListener.new
end
```

Something like the following bash script could be used by Kubernetes to detect (and log) when progress has been made in specific partitions (tweak `MAX_AGE_MINUTES` to taste):

``` bash
#!/usr/bin/env bash
set -euo pipefail
MAX_AGE_MINUTES=5
STUCK_PARTITIONS=$(
find /app/tmp/ -name 'kafka_partition_*_alive' -mmin "+$MAX_AGE_MINUTES" | \
sed -e 's|/app/tmp/||' -e 's|_alive||' -e 's|kafka_partition_||'
)
if [ -n "$STUCK_PARTITIONS" ]; then
echo -n "Found stuck partitions:" >&2
echo "$STUCK_PARTITIONS" | tr '\n' ', ' | sed 's/,$/\n/' >&2
exit 1
fi
```

### Logging

Expand Down

0 comments on commit bd99bb7

Please sign in to comment.