From 1f90897c890a3b41153b332264624916b926f3a7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 16 Aug 2024 10:43:45 +0800 Subject: [PATCH 1/2] [improve] [pip] PIP-370: configurable remote topic creation in geo-replication (#23124) --- pip/pip-370.md | 109 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 pip/pip-370.md diff --git a/pip/pip-370.md b/pip/pip-370.md new file mode 100644 index 0000000000000..6699846cee105 --- /dev/null +++ b/pip/pip-370.md @@ -0,0 +1,109 @@ +# PIP-370: configurable remote topic creation in geo-replication + +# Background knowledge + +**The current topic creation behavior when enabling Geo-Replication** +Users using Geo-Replication backup data across multiple clusters, as well as Admin APIs related to Geo-Replication and internal replicators of brokers, will trigger topics of auto-creation between clusters. +- For partitioned topics. + - After enabling namespace-level Geo-Replication: the broker will create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`. It does not depend on enabling `allowAutoTopicCreation`. + - When enabling topic-level Geo-Replication on a partitioned topic: the broker will create topics on the remote cluster automatically. It does not depend on enabling `allowAutoTopicCreation`. + - When calling `pulsar-admin topics update-partitioned-topic -p {partitions}`, the broker will also update partitions on the remote cluster automatically. +- For non-partitioned topics and partitions of partitioned topics. + - The internal Geo-Replicator will trigger topics auto-creation for remote clusters. **(Highlight)** It depends on enabling `allowAutoTopicCreation`. In fact, this behavior is not related to Geo-Replication, it is the behavior of the internal producer of Geo-Replicator, + +# Motivation + +In the following scenarios, automatic topic creation across clusters is problematic due to race conditions during deployments, and there is no choice that prevents pulsar resource creation affects each other between clusters. + +- Users want to maintain pulsar resources manually. +- Users pulsar resources using `GitOps CD` automated deployment, for which + - Clusters are deployed simultaneously without user intervention. + - Each cluster is precisely configured from git repo config variables - including the list of all tenants/namespaces/topics to be created in each cluster. + - Clusters are configured to be exact clones of each other in terms of pulsar resources. + +**Passed solution**: disable `allowAutoTopicCreation`, the APIs `pulsar-admin topics create-partitioned-topic` still create topics on the remote cluster when enabled namespace level replication, the API `enable topic-level replication` still create topics, And the internal replicator will keep printing error logs due to a not found error. + +# Goals + +- **Phase 1**: Introduce a flag to disable the replicators to automatically trigger topic creation. +- **Phase 2**: Move all topic creation/expand-partitions behaviors related to Replication to the internal Replicator, pulsar admin API that relates to pulsar topics management does not care about replication anymore. + - Move the topic creation operations from `pulsar-admin topics create-partitioned-topic` and `pulsar-admin topics set-replication-clusters` to the component Replicator in the broker internal. + - (The same as before)When calling `pulsar-admin topics update-partitioned-topic -p {partitions}`, the broker will also update partitions on the remote cluster automatically. + +Note: the proposal will only focus on phase one, and the detailed design for phase two with come up with another proposal. + +# Detailed Design + +## Configuration + +**broker.conf** +```properties +# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster. +# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`. +# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on the remote cluster. +# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for remote clusters. +# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior. +createTopicToRemoteClusterForReplication=true +``` + +## Design & Implementation Details + +### Phase 1: Introduce a flag to disable the replicators to automatically trigger topic creation. +- If `createTopicToRemoteClusterForReplication` is set to `false`. + 1. After enabling namespace-level Geo-Replication: the broker will not create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`. + 2. When enabling topic-level Geo-Replication on a partitioned topic: broker will not create topics on the remote cluster automatically. + 3. The internal Geo-Replicator will not trigger topic auto-creation for remote clusters, it just keeps retrying to check if the topic exists on the remote cluster, once the topic is created, the replicator starts. + 4. It does not change the behavior of creating subscriptions after enabling `enableReplicatedSubscriptions`, the subscription will also be created on the remote cluster after users enable. `enableReplicatedSubscriptions`. + 5. The config `allowAutoTopicCreation` still works for the local cluster as before, it will not be affected by the new config `createTopicToRemoteClusterForReplication`. +- If `createTopicToRemoteClusterForReplication` is set to `true`. + a. All components work as before, see details: `Motivation -> The current topic creation behavior when enabling Geo-Replication` + +### Phase 2: The replicator will check remote topics' partitioned metadata and update partitions in the remote cluster to the same as the current cluster if needed. +- If `createTopicToRemoteClusterForReplication` is set to `false`. + - The behavior is the same as Phase 1. +- If `createTopicToRemoteClusterForReplication` is set to `true`. + - Pulsar admin API that relates to pulsar topics management does not care about replication anymore. + - When a replicator for a topic partition starts, it checks the partitioned metadata in the remote cluster first and updates partitions in the remote cluster to the same as the current cluster if needed. Seem the example as follows: + +| `partitions` of local cluster | `partitions` of remote cluster | After `PIP-370 Phase 2` | Before `PIP-370 Phase 2` | +| --- | --- | --- | --- | +| `2` | no topic exists | create a partitioned topic with `2` partitions in the remote cluster | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), and will not care about partitioned metadata. | +| `2` | `1`| **In dispute:** The replicator copies messages from `partition-0` to the remote cluster, does not copy any data for `partition-1` and just prints error logs in the background. | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), and the partitioned metadata in the remote cluster is still `1` | +| `2` | `2` | modifies nothing. | The same as "After `PIP-370 Phase 2`" | +| `2` | `>2` | **In dispute:** modifies nothing, the messages will be copied to the same partition in the remote cluster, and no message will be copied to the partition who is larger than `2` in the remote cluster | The same as "After `PIP-370 Phase 2`" | +| `2` | `0`(non-partitioned topic) | **In dispute:** The replicator does not copy any data and just prints error logs in the background. | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), then users will get `3` non-partitioned topics: `[{tp}, {topic}-partition-0, {topic}-partition-1`. | +| `0`(non-partitioned topic) | `0`(non-partitioned topic) | Copy data normally | It is the same as before `PIP-370`. | +| `0`(non-partitioned topic) | no topic exists | create a non-partitioned topic in the remote cluster. | It is the same as before `PIP-370`. | +| `0`(non-partitioned topic) | `>=1` | **In dispute:** The replicator does not copy any data and just prints error logs in the background. | The replicator will only trigger a non-partitioned topic's creation, then users will get `1` non-partitioned topic and `1` partitioned topic. | + +## Metrics + + +| Name | Description | Attributes | Units| +| --- | --- | --- | --- | +| `pulsar_broker_replication_count` | Counter. The number of topics enabled replication. | cluster | - | +| `pulsar_broker_replication_disconnected_count` | Counter. The number of topics that enabled replication and its replicator failed to connect | cluster | - | + + +# Monitoring +- If `pulsar_broker_replication_disconnected_count` keeps larger than `0` for a period of time, it means some replicators do not work, we should push an alert out. + +# Backward & Forward Compatibility + +## Regarding to Phase-1 +This PIP guarantees full compatibility with default settings(the default value of `createTopicToRemoteClusterForReplication` is `true`). If you want to cherry-pick PIP-370 for another branch in the future, you need to cherry-pick PIP-344 as well. Because the behavior of disables `createTopicToRemoteClusterForReplication` depends on the API `PulsarClient.getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled)`, which was introduced by [PIP-344](https://github.com/apache/pulsar/blob/master/pip/pip-344.md). + +## Regarding to Phase-2 +The two scenarios are as follows, the replication will not work as before, which will lead backlog increase, please take care of checking your clusters before upgrading. +- `local_cluster.topic.partitions = 2` and `remote_cluster.topic.partitions = 0(non-partitioned topic)`: see detail in the section `Design & Implementation Details -> Phase-2`. +- `local_cluster.topic.partitions = 0(non-partitioned topic)` and and `remote_cluster.topic.partitions >= 1`: see detail in the section `Design & Implementation Details -> Phase-2`. + +# Links +* Mailing List discussion thread: https://lists.apache.org/thread/9fx354cqcy3412w1nx8kwdf9h141omdg +* Mailing List voting thread: https://lists.apache.org/thread/vph22st5td1rdh1gd68gkrnp9doo6ct2 From 67fc5b9f5342bd35d3fdacf37cf172a629ee15f9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 16 Aug 2024 06:33:18 +0300 Subject: [PATCH 2/2] [fix][client] Copy orderingKey to retry letter topic and DLQ messages and fix bug in copying (#23182) Fixes #23173 Fixes #23181 ### Motivation See #23173 and #23181 ### Modifications - copy ordering key to messages sent to retry letter topic and DLQ topic --- .../client/api/DeadLetterTopicTest.java | 60 +++++++++++++++++++ .../pulsar/client/api/RetryTopicTest.java | 17 +++++- .../pulsar/client/impl/ConsumerImpl.java | 10 +++- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 83320fffa1a7f..f5a74dcd1661b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -197,6 +197,66 @@ public void testDeadLetterTopicWithBinaryMessageKey() throws Exception { consumer.close(); } + @Test + public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + byte[] key = new byte[]{1, 2, 3, 4}; + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .orderingKey(key) + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getOrderingKey(), key); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + public void testDeadLetterTopicWithProducerName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final String subscription = "my-subscription"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 2ccae72143443..9cb82fde04118 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -257,6 +257,9 @@ public void testAutoConsumeSchemaRetryLetter() throws Exception { public void testRetryTopicProperties() throws Exception { final String topic = "persistent://my-property/my-ns/retry-topic"; + byte[] key = "key".getBytes(); + byte[] orderingKey = "orderingKey".getBytes(); + final int maxRedeliveryCount = 3; final int sendMessages = 10; @@ -285,7 +288,11 @@ public void testRetryTopicProperties() throws Exception { Set originMessageIds = new HashSet<>(); for (int i = 0; i < sendMessages; i++) { - MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + MessageId msgId = producer.newMessage() + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .keyBytes(key) + .orderingKey(orderingKey) + .send(); originMessageIds.add(msgId.toString()); } @@ -298,6 +305,10 @@ public void testRetryTopicProperties() throws Exception { if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { // check the REAL_TOPIC property assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic); + assertTrue(message.hasKey()); + assertEquals(message.getKeyBytes(), key); + assertTrue(message.hasOrderingKey()); + assertEquals(message.getOrderingKey(), orderingKey); retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); @@ -317,6 +328,10 @@ public void testRetryTopicProperties() throws Exception { if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { // check the REAL_TOPIC property assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic); + assertTrue(message.hasKey()); + assertEquals(message.getKeyBytes(), key); + assertTrue(message.hasOrderingKey()); + assertEquals(message.getOrderingKey(), orderingKey); deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } deadLetterConsumer.acknowledge(message); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 36cd52f955409..596e65484d1b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -639,7 +639,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, A } } - private static void copyMessageKeyIfNeeded(Message message, TypedMessageBuilder typedMessageBuilderNew) { + private static void copyMessageKeysIfNeeded(Message message, TypedMessageBuilder typedMessageBuilderNew) { if (message.hasKey()) { if (message.hasBase64EncodedKey()) { typedMessageBuilderNew.keyBytes(message.getKeyBytes()); @@ -647,6 +647,9 @@ private static void copyMessageKeyIfNeeded(Message message, TypedMessageBuild typedMessageBuilderNew.key(message.getKey()); } } + if (message.hasOrderingKey()) { + typedMessageBuilderNew.orderingKey(message.getOrderingKey()); + } } @SuppressWarnings("unchecked") @@ -704,6 +707,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) .value(retryMessage.getData()) .properties(propertiesMap); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { consumerDlqMessagesCounter.increment(); @@ -732,7 +736,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a if (delayTime > 0) { typedMessageBuilderNew.deliverAfter(delayTime, unit); } - copyMessageKeyIfNeeded(message, typedMessageBuilderNew); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) .thenAccept(v -> result.complete(null)) @@ -2196,7 +2200,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - copyMessageKeyIfNeeded(message, typedMessageBuilderNew); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(messageId);