Skip to content

Commit

Permalink
[improve][cli] Add compactor type option for compaction tool (#23446)
Browse files Browse the repository at this point in the history
  • Loading branch information
summeriiii authored Oct 13, 2024
1 parent aa6bc09 commit fa9b521
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
public class CompactorTool {

private static class Arguments {
public enum CompactorType {
PUBLISHING,
EVENT_TIME
}

@Option(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
private String brokerConfigFile = "conf/broker.conf";

Expand All @@ -66,6 +71,10 @@ private static class Arguments {

@Option(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;

@Option(names = {"-ct", "--compactor-type"}, description = "Choose compactor type, "
+ "valid types are [PUBLISHING, EVENT_TIME]")
private CompactorType compactorType = CompactorType.PUBLISHING;
}

public static PulsarClient createClient(ServiceConfiguration brokerConfig) throws PulsarClientException {
Expand Down Expand Up @@ -172,7 +181,17 @@ public static void main(String[] args) throws Exception {
@Cleanup
PulsarClient pulsar = createClient(brokerConfig);

Compactor compactor = new PublishingOrderCompactor(brokerConfig, pulsar, bk, scheduler);
Compactor compactor = null;

switch (arguments.compactorType) {
case PUBLISHING:
compactor = new PublishingOrderCompactor(brokerConfig, pulsar, bk, scheduler);
break;
case EVENT_TIME:
compactor = new EventTimeOrderCompactor(brokerConfig, pulsar, bk, scheduler);
break;
}

long ledgerId = compactor.compact(arguments.topic).get();
log.info("Compaction of topic {} complete. Compacted to ledger {}", arguments.topic, ledgerId);
}
Expand Down

0 comments on commit fa9b521

Please sign in to comment.