From bd99bb786c01c16f7ea78210ddf40c3920dc6c38 Mon Sep 17 00:00:00 2001 From: Ricardo Martins Date: Mon, 12 Dec 2022 13:02:30 +0000 Subject: [PATCH] Add an example Kubernetes liveness probe to the README --- README.md | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 7afd2cfb..de03864c 100644 --- a/README.md +++ b/README.md @@ -528,23 +528,104 @@ Please see [Compression](#compression) ### Consumer rebalance listener -You may also want to set a consuemr rebalance listener. +You may also want to set a consumer rebalance listener for whatever reason. A very simple listener that simply logs the partition assignment and rebalance events would look like this: ```ruby class MyConsumerRebalanceListener - def on_partitions_assigned(_consumer, _list) + attr_reader :logger + + def initialize(logger) + @logger = logger + end + + def on_partitions_assigned(_consumer, list) + partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten + logger.warn("Partitions assigned: #{partitions}") end - def on_partitions_revoked(_consumer, _list) + def on_partitions_revoked(_consumer, list) + partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten + logger.warn("Partitions revoked: #{partitions}") end end Racecar.config.consumer_rebalance_listener do - MyConsumerRebalanceListener.new + MyConsumerRebalanceListener.new(logger) end ``` -Note that the exact requirements for the consumer rebalance listener are set by rdkafka-ruby and may change from version to version. +Note that the exact requirements for the consumer rebalance listener interface are set by rdkafka-ruby and may change from version to version. + +#### Liveness probe + +You can also take advantage of the consumer rebalance listener to implement a liveness probe for Kubernetes. + +The core idea is detecting lack of progress in specific partitions by 1. updating the timestamp on files representing activity in a partition and 2. checking the timestamp of said files in a Kubernetes probe. +Since partitions can be rebalanced for a variety of reasons, some housekeeping is necessary and the consumer rebalance listener can be used for that purpose. That will avoid false positives due to partitions that have been re-assigned to a different Kafka consumer. + +``` ruby +require 'fileutils' + +class MyConsumer < Racecar::Consumer + def process(message) + do_stuff(message) + liveness_probe(message.partition) + end + + private + + def liveness_probe(partition) + FileUtils.touch("/app/tmp/kafka_partition_#{partition}_alive") + end +end +``` + +``` ruby +class RebalanceListener + def on_partitions_assigned(_consumer, list) + # Clean up every liveness file that was carried from the consumer's previous life + Dir['/app/tmp/kafka_partition_*'].each { |f| File.delete(f) } + + partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten + Rails.logger.warn("Partitions assigned: #{partitions}") + + # Ensure new files are created to detect cases where the consumer didn't process + # any messages from a newly-assigned partition + partitions.each do |partition| + File.new("/app/tmp/kafka_partition_#{partition}_alive", 'w') + end + end + + def on_partitions_revoked(_consumer, list) + partitions = list.to_h.map { |_key, values| values.map(&:partition) }.flatten + Rails.logger.warn("Partitions revoked: #{partitions}") + end +end + +Racecar.config.consumer_rebalance_listener do + RebalanceListener.new +end +``` + +Something like the following bash script could be used by Kubernetes to detect (and log) when progress has been made in specific partitions (tweak `MAX_AGE_MINUTES` to taste): + +``` bash +#!/usr/bin/env bash + +set -euo pipefail + +MAX_AGE_MINUTES=5 +STUCK_PARTITIONS=$( + find /app/tmp/ -name 'kafka_partition_*_alive' -mmin "+$MAX_AGE_MINUTES" | \ + sed -e 's|/app/tmp/||' -e 's|_alive||' -e 's|kafka_partition_||' +) + +if [ -n "$STUCK_PARTITIONS" ]; then + echo -n "Found stuck partitions:" >&2 + echo "$STUCK_PARTITIONS" | tr '\n' ', ' | sed 's/,$/\n/' >&2 + exit 1 +fi +``` ### Logging