Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] rptest: upgrade kgo and fix its usage #23813

Open
wants to merge 8 commits into
base: v24.1.x
Choose a base branch
from
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard 8f4fdb77f2c6173d8e1b7020c9899601a441d0d6
git reset --hard a4dff215149f8acb4fde8617f19adeabbfa69d44
go mod tidy
make
4 changes: 1 addition & 3 deletions tests/rptest/redpanda_cloud_tests/high_throughput_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ def start(self):
def stop(self):
self._logger.info("Stopping all traffic generation")
self._producer.stop()
self._consumer.stop()
self._producer.wait()
self._logger.info("Producer stopped")
self._producer_stop_time = time.time()
self._consumer.wait()
self._consumer.stop()
self._logger.info("Consumer stopped")
self._consumer_stop_time = time.time()

Expand Down
23 changes: 20 additions & 3 deletions tests/rptest/services/kgo_verifier_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class KgoVerifierService(Service):
Use ctx.cluster.alloc(ClusterSpec.simple_linux(1)) to allocate node and pass it to constructor
"""
_status_thread: Optional[StatusThread]
_stopped: bool

def __init__(self,
context,
Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(self,
node.kgo_verifier_ports = {}

self._status_thread = None
self._stopped = False

def __del__(self):
self._release_port()
Expand Down Expand Up @@ -199,7 +201,14 @@ def _assert_running(self, node):
def stop_node(self, node, **kwargs):
if self._status_thread:
self._status_thread.stop()
self._status_thread.raise_on_error()
self._status_thread = None
# Record that we just stopped, so that we can't wait() after.
# This is done inside this if statement because stop_node() is also
# called during the start of the service to potentially stop a previous
# instance of the service. Here, we know that we are stopping the service
# that we started because it was us who initialized the _status_thread.
self._stopped = True

if self._pid is None:
return
Expand Down Expand Up @@ -237,8 +246,9 @@ def _remote(self, node, action, timeout=60):
return
except Exception as e:
last_error = e
self._redpanda.logger.warn(
self._redpanda.logger.warning(
f"{self.who_am_i()} remote call failed, {e}")
time.sleep(3)
if last_error:
raise last_error

Expand All @@ -247,13 +257,20 @@ def wait_node(self, node, timeout_sec=None):
Wrapper to catch timeouts on wait, and send a `/print_stack` to the remote
process in case it is experiencing a hang bug.
"""

if self._stopped:
raise RuntimeError(
f"Can't wait {self.who_am_i()}. It was already stopped."
f" You can either stop() a service or wait() and then stop() it"
f" but not the other way around.")

try:
return self._do_wait_node(node, timeout_sec)
except:
try:
self._remote(node, "print_stack")
except Exception as e:
self._redpanda.logger.warn(
self._redpanda.logger.warning(
f"{self.who_am_i()} failed to print stacks during wait failure: {e}"
)

Expand Down Expand Up @@ -599,7 +616,7 @@ def wait_node(self, node, timeout_sec=None):
# idempotency: producer records should always land at the next offset
# after the last record they wrote.
if self._tolerate_data_loss:
self._redpanda.logger.warn(
self._redpanda.logger.warning(
f"{self.who_am_i()} observed data loss: {self._status}")
else:
raise RuntimeError(
Expand Down
5 changes: 2 additions & 3 deletions tests/rptest/tests/delete_records_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,18 +772,17 @@ class PartitionMoveExceptionReporter:
partition_move_thread.join()
self.redpanda.logger.info("Joining on producer thread")
producer.wait()
self.redpanda.logger.info("Calling consumer::stop")
consumer.stop()
self.redpanda.logger.info("Joining on consumer thread")
consumer.wait()
self.redpanda.logger.info("Calling consumer::stop")
consumer.stop()
if DeleteRecordsExceptionReporter.exc is not None:
raise DeleteRecordsExceptionReporter.exc
if PartitionMoveExceptionReporter.exc is not None:
raise PartitionMoveExceptionReporter.exc

status = consumer.consumer_status
assert status.validator.invalid_reads == 0
assert status.validator.out_of_scope_invalid_reads == 0


class TopicPartitionOffset(NamedTuple):
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/mirror_maker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ def group_state_is_valid():
wait_until(group_state_is_valid, 30)

source_group = src_rpk.group_describe(consumer_group)
consumer.stop()
consumer.wait()
consumer.stop()
self.logger.info(f"source topics: {list(src_rpk.list_topics())}")
target_rpk = RpkTool(self.redpanda)
self.logger.info(f"target topics: {list(target_rpk.list_topics())}")
Expand Down
Loading