Skip to content

Commit

Permalink
Test with Kafka 3.6
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Jun 16, 2024
1 parent adda7c3 commit 2de221d
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 113 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ on:
branches:
- master
env:
OTP_VERSION: "24.1"
REBAR_VERSION: "3.17.0"
OTP_VERSION: "26"
REBAR_VERSION: "3.20.0"

jobs:
lint:
Expand Down Expand Up @@ -44,8 +44,8 @@ jobs:
strategy:
fail-fast: false
matrix:
otp: ["24.1", "23.3.4.7", "22.3.4.21"]
kafka: ["2.4", "1.1", "0.11"]
otp: ["26"]
kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"]
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -69,7 +69,8 @@ jobs:
run: |
export KAFKA_VERSION=${{ matrix.kafka }}
echo "Running Kafka ${KAFKA_VERSION}"
scripts/setup-test-env.sh && rebar3 do ct,eunit
make test-env
make t
- name: Store test logs
uses: actions/upload-artifact@v1
if: always()
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ relx
docker/
TAGS
.vscode/
test/data/ssl/*.pem
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
KAFKA_VERSION ?= 3.6
export KAFKA_VERSION
all: compile

compile:
Expand All @@ -8,6 +10,10 @@ lint:

test-env:
@./scripts/setup-test-env.sh
@mkdir -p ./test/data/ssl
@docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem
@docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem
@docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem

ut:
@rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION)
Expand Down
6 changes: 3 additions & 3 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: "2"

services:
zookeeper:
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: zookeeper
command: run zookeeper
network_mode: host
kafka_1:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-1"
network_mode: host
environment:
Expand All @@ -23,7 +23,7 @@ services:
kafka_2:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-2"
network_mode: host
environment:
Expand Down
77 changes: 56 additions & 21 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash -eu

if [ -n "${DEBUG:-}" ]; then
set -x
fi

docker ps > /dev/null || {
echo "You must be a member of docker group to run this script"
exit 1
Expand All @@ -18,46 +22,77 @@ function docker_compose {
fi
}

VERSION=${KAFKA_VERSION:-2.4}
if [ -z $VERSION ]; then VERSION=$1; fi
KAFKA_VERSION=${KAFKA_VERSION:-3.6}
if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi

case $VERSION in
case $KAFKA_VERSION in
0.9*)
KAFKA_VERSION="0.9";;
0.10*)
VERSION="0.10";;
KAFKA_VERSION="0.10";;
0.11*)
VERSION="0.11";;
KAFKA_VERSION="0.11";;
1.*)
VERSION="1.1";;
KAFKA_VERSION="1.1";;
2.*)
VERSION="2.4";;
KAFKA_VERSION="2.8";;
3.*)
KAFKA_VERSION="3.6";;
*)
VERSION="2.4";;
KAFKA_VERSION="3.6";;
esac

echo "Using KAFKA_VERSION=$VERSION"
export KAFKA_VERSION=$VERSION
export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}"
echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION"

TD="$(cd "$(dirname "$0")" && pwd)"

docker_compose -f $TD/docker-compose.yml down || true
docker_compose -f $TD/docker-compose.yml up -d

if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then
MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092"
else
MAYBE_ZOOKEEPER="--zookeeper localhost:2181"
fi

n=0
while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do
if [ $n -gt 4 ]; then
echo "timeout waiting for kakfa_1"
exit 1
TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list"
MAX_WAIT_SEC=10

function wait_for_kafka {
local which_kafka="$1"
local n=0
local port=':9092'
local topic_list listener
if [ "$which_kafka" = 'kafka-2' ]; then
port=':9192'
fi
n=$(( n + 1 ))
sleep 1
done
while true; do
listener="$(netstat -tnlp 2>&1 | grep $port || true)"
if [ "$listener" != '' ]; then
topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)"
if [ "${topic_list-}" = '' ]; then
break
fi
fi
if [ $n -gt $MAX_WAIT_SEC ]; then
echo "timeout waiting for kafka-1"
echo "last print: ${topic_list:-}"
exit 1
fi
n=$(( n + 1 ))
sleep 1
done
}

wait_for_kafka kafka-1
wait_for_kafka kafka-2

function create_topic {
TOPIC_NAME="$1"
PARTITIONS="${2:-1}"
REPLICAS="${3:-1}"
CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
docker exec kafka-1 bash -c "$CMD"
}

Expand All @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE"
create_topic "lz4-test"
create_topic "test-topic"

if [[ "$KAFKA_VERSION" = 2* ]]; then
if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then
MAYBE_NEW_CONSUMER=""
else
MAYBE_NEW_CONSUMER="--new-consumer"
Expand All @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l

# for kafka 0.11 or later, add sasl-scram test credentials
if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
fi
2 changes: 1 addition & 1 deletion src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) ->
State =
case BeginOffset < StableOffset of
true ->
%% There are chances that kafka may return empty message set
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
Expand Down
21 changes: 17 additions & 4 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
StableOffset = get_stable_offset(Header),
{NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches),
{NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches),
case Offset < StableOffset andalso Msgs =:= [] of
true ->
%% Not reached the latest stable offset yet,
%% but received an empty batch-set (all messages are dropped).
%% try again with new begin-offset
NewBeginOffset =
case NewBeginOffset0 > Offset of
true ->
%% Not reached the latest stable offset yet,
%% but resulted in an empty batch-set,
%% i.e. all messages are dropped due to they are before
%% the last fetch Offset.
%% try again with new begin-offset.
NewBeginOffset0;
false when NewBeginOffset0 =:= Offset ->
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
NewBeginOffset0 + 1
end,
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {StableOffset, Msgs}}
Expand Down
35 changes: 30 additions & 5 deletions test/brod_consumer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,21 @@ end_per_testcase(Case, Config) ->
ok
end.

all() -> [F || {F, _A} <- module_info(exports),
all() ->
Cases = [F || {F, _A} <- module_info(exports),
case atom_to_list(F) of
"t_" ++ _ -> true;
_ -> false
end].

end],
Filter = fun(Case) ->
try
?MODULE:Case(kafka_version_match)
catch
_:_ ->
true
end
end,
lists:filter(Filter, Cases).

%%%_* Test functions ===========================================================

Expand Down Expand Up @@ -413,6 +422,12 @@ t_fold(Config) when is_list(Config) ->
0, ErrorFoldF, #{})),
ok.

%% This test case does not work with Kafka 0.9, not sure aobut 0.10 and 0.11
%% since all 0.x versions are old enough, we only try to verify this against
%% 1.x or newer
t_direct_fetch_with_small_max_bytes(kafka_version_match) ->
{Major, _Minor} = kafka_test_helper:kafka_version(),
Major > 1;
t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) ->
Client = ?config(client),
Topic = ?TOPIC,
Expand All @@ -428,6 +443,11 @@ t_direct_fetch_with_small_max_bytes(Config) when is_list(Config) ->
?assertEqual(Key, Msg#kafka_message.key),
ok.

%% Starting from version 3, Kafka no longer returns incomplete batch
%% for Fetch request v0, cannot test max_bytes expansion anymore.
t_direct_fetch_expand_max_bytes(kafka_version_match) ->
{Major, _Minor} = kafka_test_helper:kafka_version(),
Major < 3;
t_direct_fetch_expand_max_bytes({init, Config}) when is_list(Config) ->
%% kafka returns empty message set when it's 0.9
%% or when fetch request sent was version 0
Expand All @@ -441,7 +461,7 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) ->
Value = crypto:strong_rand_bytes(100),
ok = brod:produce_sync(Client, ?TOPIC, Partition, Key, Value),
{ok, Offset} = brod:resolve_offset(?HOSTS, Topic, Partition,
?OFFSET_LATEST, ?config(client_config)),
?OFFSET_LATEST, ?config(client_config)),
{ok, {_, [Msg]}} = brod:fetch({?HOSTS, ?config(client_config)},
Topic, Partition, Offset - 1,
#{max_bytes => 13}),
Expand All @@ -450,6 +470,9 @@ t_direct_fetch_expand_max_bytes(Config) when is_list(Config) ->

%% @doc Consumer should be smart enough to try greater max_bytes
%% when it's not great enough to fetch one single message
t_consumer_max_bytes_too_small(kafka_version_match) ->
{Major, _Minor} = kafka_test_helper:kafka_version(),
Major < 3;
t_consumer_max_bytes_too_small({init, Config}) ->
meck:new(brod_kafka_request, [passthrough, no_passthrough_cover, no_history]),
%% kafka returns empty message set when it's 0.9
Expand Down Expand Up @@ -843,7 +866,9 @@ wait_for_max_bytes_sequence([{Compare, MaxBytes} | Rest] = Waiting, Cnt) ->
wait_for_max_bytes_sequence(Waiting, Cnt + 1);
_ ->
ct:fail("unexpected ~p, expecting ~p", [Bytes, {Compare, MaxBytes}])
end
end;
Other ->
error(Other)
after
3000 ->
ct:fail("timeout", [])
Expand Down
12 changes: 10 additions & 2 deletions test/brod_offset_txn_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ end_per_testcase(_Case, Config) ->
end,
Config.

all() -> [F || {F, _A} <- module_info(exports),
all() ->
Cases = [F || {F, _A} <- module_info(exports),
case atom_to_list(F) of
"t_" ++ _ -> true;
_ -> false
end].
end],
%% no transaction before 0.11
case kafka_test_helper:kafka_version() of
{0, Minor} when Minor < 11 ->
[];
_ ->
Cases
end.

client_config() ->
case os:getenv("KAFKA_VERSION") of
Expand Down
3 changes: 3 additions & 0 deletions test/data/ssl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This dir holds files for TLS/SSL tests.
The files are copied from Kafka docker image in the `make test-env` step.
See how the docker image is built here: https://github.com/zmstone/docker-kafka
22 changes: 0 additions & 22 deletions test/data/ssl/ca.pem

This file was deleted.

20 changes: 0 additions & 20 deletions test/data/ssl/client-crt.pem

This file was deleted.

Loading

0 comments on commit 2de221d

Please sign in to comment.