Skip to content

Commit

Permalink
tests/migrations: test disambiguating source topics by specifying a r…
Browse files Browse the repository at this point in the history
…evision
  • Loading branch information
bashtanov committed Oct 17, 2024
1 parent fe0dfb1 commit f17e24f
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,50 @@ def test_creating_and_listing_migrations(self):
{"ntr_no_topic_manifest", "missing_segments"})

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_higher_level_migration_api(self):
rpk = RpkTool(self.redpanda)
def test_conflicting_names(self):
def make_msg(i: int):
return {
component: str.encode(f"{component}{i}")
for component in ('key', 'value')
}

topic = TopicSpec(partition_count=3)
ns_topic = make_namespaced_topic(topic.name)
producer = self.get_ck_producer()

# create, populate and unmount 3 topics
revisions = {}
for i in range(3):
self.client().create_topic(topic)
revisions[i] = self.get_topic_initial_revision(topic.name)
producer.produce(topic.name, **make_msg(i))

out_migr_id = self.admin.unmount_topics([ns_topic]).json()["id"]
self.wait_partitions_disappear([topic])
self.wait_migration_disappear(out_migr_id)

# mount and consume from them in random order
cluster_uuid = self.admin.get_cluster_uuid(self.redpanda.nodes[0])
for i in sorted(range(3), key=lambda element: random.random()):
hinted_ns_topic = make_namespaced_topic(
f"{ns_topic.topic}/{cluster_uuid}/{revisions[i]}")
in_topic = InboundTopic(hinted_ns_topic)
in_migr_id = self.admin.mount_topics([in_topic]).json()["id"]
self.wait_partitions_appear([topic])
self.wait_migration_disappear(in_migr_id)

with self.ck_consumer() as consumer:
consumer.subscribe([topic.name])
records = consumer.consume(1, 10)
assert len(records) == 1
assert {
'key': records[0].key(),
'value': records[0].value()
} == make_msg(i)
self.client().delete_topic(topic.name)

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_higher_level_migration_api(self):
topics = [TopicSpec(partition_count=3) for i in range(5)]
for t in topics:
self.client().create_topic(t)
Expand Down

0 comments on commit f17e24f

Please sign in to comment.