From fa9b5211a209226a7560187b0bf78f180e6ec276 Mon Sep 17 00:00:00 2001 From: Jiawen Wang <74594733+summeriiii@users.noreply.github.com> Date: Mon, 14 Oct 2024 01:09:02 +0800 Subject: [PATCH] [improve][cli] Add compactor type option for compaction tool (#23446) --- .../pulsar/compaction/CompactorTool.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index ba68e07cf5b0d..ea042071e1080 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -55,6 +55,11 @@ public class CompactorTool { private static class Arguments { + public enum CompactorType { + PUBLISHING, + EVENT_TIME + } + @Option(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker") private String brokerConfigFile = "conf/broker.conf"; @@ -66,6 +71,10 @@ private static class Arguments { @Option(names = {"-g", "--generate-docs"}, description = "Generate docs") private boolean generateDocs = false; + + @Option(names = {"-ct", "--compactor-type"}, description = "Choose compactor type, " + + "valid types are [PUBLISHING, EVENT_TIME]") + private CompactorType compactorType = CompactorType.PUBLISHING; } public static PulsarClient createClient(ServiceConfiguration brokerConfig) throws PulsarClientException { @@ -172,7 +181,17 @@ public static void main(String[] args) throws Exception { @Cleanup PulsarClient pulsar = createClient(brokerConfig); - Compactor compactor = new PublishingOrderCompactor(brokerConfig, pulsar, bk, scheduler); + Compactor compactor = null; + + switch (arguments.compactorType) { + case PUBLISHING: + compactor = new PublishingOrderCompactor(brokerConfig, pulsar, bk, scheduler); + break; + case EVENT_TIME: + compactor = new EventTimeOrderCompactor(brokerConfig, pulsar, bk, scheduler); + break; + } + long ledgerId = compactor.compact(arguments.topic).get(); log.info("Compaction of topic {} complete. Compacted to ledger {}", arguments.topic, ledgerId); }