Skip to content

Commit

Permalink
Merge branch 'apache:master' into dev/bk_batch_read
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored Aug 16, 2024
2 parents 9c09343 + 67fc5b9 commit 3dcb6b9
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 4 deletions.
109 changes: 109 additions & 0 deletions pip/pip-370.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# PIP-370: configurable remote topic creation in geo-replication

# Background knowledge

**The current topic creation behavior when enabling Geo-Replication**
Users using Geo-Replication backup data across multiple clusters, as well as Admin APIs related to Geo-Replication and internal replicators of brokers, will trigger topics of auto-creation between clusters.
- For partitioned topics.
- After enabling namespace-level Geo-Replication: the broker will create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`. It does not depend on enabling `allowAutoTopicCreation`.
- When enabling topic-level Geo-Replication on a partitioned topic: the broker will create topics on the remote cluster automatically. It does not depend on enabling `allowAutoTopicCreation`.
- When calling `pulsar-admin topics update-partitioned-topic -p {partitions}`, the broker will also update partitions on the remote cluster automatically.
- For non-partitioned topics and partitions of partitioned topics.
- The internal Geo-Replicator will trigger topics auto-creation for remote clusters. **(Highlight)** It depends on enabling `allowAutoTopicCreation`. In fact, this behavior is not related to Geo-Replication, it is the behavior of the internal producer of Geo-Replicator,

# Motivation

In the following scenarios, automatic topic creation across clusters is problematic due to race conditions during deployments, and there is no choice that prevents pulsar resource creation affects each other between clusters.

- Users want to maintain pulsar resources manually.
- Users pulsar resources using `GitOps CD` automated deployment, for which
- Clusters are deployed simultaneously without user intervention.
- Each cluster is precisely configured from git repo config variables - including the list of all tenants/namespaces/topics to be created in each cluster.
- Clusters are configured to be exact clones of each other in terms of pulsar resources.

**Passed solution**: disable `allowAutoTopicCreation`, the APIs `pulsar-admin topics create-partitioned-topic` still create topics on the remote cluster when enabled namespace level replication, the API `enable topic-level replication` still create topics, And the internal replicator will keep printing error logs due to a not found error.

# Goals

- **Phase 1**: Introduce a flag to disable the replicators to automatically trigger topic creation.
- **Phase 2**: Move all topic creation/expand-partitions behaviors related to Replication to the internal Replicator, pulsar admin API that relates to pulsar topics management does not care about replication anymore.
- Move the topic creation operations from `pulsar-admin topics create-partitioned-topic` and `pulsar-admin topics set-replication-clusters` to the component Replicator in the broker internal.
- (The same as before)When calling `pulsar-admin topics update-partitioned-topic -p {partitions}`, the broker will also update partitions on the remote cluster automatically.

Note: the proposal will only focus on phase one, and the detailed design for phase two with come up with another proposal.

# Detailed Design

## Configuration

**broker.conf**
```properties
# Whether the internal replication of the local cluster will trigger topic auto-creation on the remote cluster.
# 1. After enabling namespace-level Geo-Replication: whether the local broker will create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether the local broker will create topics on the remote cluster.
# 3. Whether the internal Geo-Replicator in the local cluster will trigger non-persistent topic auto-creation for remote clusters.
# It is not a dynamic config, the default value is "true" to preserve backward-compatible behavior.
createTopicToRemoteClusterForReplication=true
```

## Design & Implementation Details

### Phase 1: Introduce a flag to disable the replicators to automatically trigger topic creation.
- If `createTopicToRemoteClusterForReplication` is set to `false`.
1. After enabling namespace-level Geo-Replication: the broker will not create topics on the remote cluster automatically when calling `pulsar-admin topics create-partitioned-topic`.
2. When enabling topic-level Geo-Replication on a partitioned topic: broker will not create topics on the remote cluster automatically.
3. The internal Geo-Replicator will not trigger topic auto-creation for remote clusters, it just keeps retrying to check if the topic exists on the remote cluster, once the topic is created, the replicator starts.
4. It does not change the behavior of creating subscriptions after enabling `enableReplicatedSubscriptions`, the subscription will also be created on the remote cluster after users enable. `enableReplicatedSubscriptions`.
5. The config `allowAutoTopicCreation` still works for the local cluster as before, it will not be affected by the new config `createTopicToRemoteClusterForReplication`.
- If `createTopicToRemoteClusterForReplication` is set to `true`.
a. All components work as before, see details: `Motivation -> The current topic creation behavior when enabling Geo-Replication`

### Phase 2: The replicator will check remote topics' partitioned metadata and update partitions in the remote cluster to the same as the current cluster if needed.
- If `createTopicToRemoteClusterForReplication` is set to `false`.
- The behavior is the same as Phase 1.
- If `createTopicToRemoteClusterForReplication` is set to `true`.
- Pulsar admin API that relates to pulsar topics management does not care about replication anymore.
- When a replicator for a topic partition starts, it checks the partitioned metadata in the remote cluster first and updates partitions in the remote cluster to the same as the current cluster if needed. Seem the example as follows:

| `partitions` of local cluster | `partitions` of remote cluster | After `PIP-370 Phase 2` | Before `PIP-370 Phase 2` |
| --- | --- | --- | --- |
| `2` | no topic exists | create a partitioned topic with `2` partitions in the remote cluster | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), and will not care about partitioned metadata. |
| `2` | `1`| **In dispute:** The replicator copies messages from `partition-0` to the remote cluster, does not copy any data for `partition-1` and just prints error logs in the background. | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), and the partitioned metadata in the remote cluster is still `1` |
| `2` | `2` | modifies nothing. | The same as "After `PIP-370 Phase 2`" |
| `2` | `>2` | **In dispute:** modifies nothing, the messages will be copied to the same partition in the remote cluster, and no message will be copied to the partition who is larger than `2` in the remote cluster | The same as "After `PIP-370 Phase 2`" |
| `2` | `0`(non-partitioned topic) | **In dispute:** The replicator does not copy any data and just prints error logs in the background. | the replicator will only trigger partition creation (`{topic}-partition-0` and `{topic}-partition-1`), then users will get `3` non-partitioned topics: `[{tp}, {topic}-partition-0, {topic}-partition-1`. |
| `0`(non-partitioned topic) | `0`(non-partitioned topic) | Copy data normally | It is the same as before `PIP-370`. |
| `0`(non-partitioned topic) | no topic exists | create a non-partitioned topic in the remote cluster. | It is the same as before `PIP-370`. |
| `0`(non-partitioned topic) | `>=1` | **In dispute:** The replicator does not copy any data and just prints error logs in the background. | The replicator will only trigger a non-partitioned topic's creation, then users will get `1` non-partitioned topic and `1` partitioned topic. |

## Metrics

<!--
For each metric provide:
* Full name
* Description
* Attributes (labels)
* Unit
-->
| Name | Description | Attributes | Units|
| --- | --- | --- | --- |
| `pulsar_broker_replication_count` | Counter. The number of topics enabled replication. | cluster | - |
| `pulsar_broker_replication_disconnected_count` | Counter. The number of topics that enabled replication and its replicator failed to connect | cluster | - |


# Monitoring
- If `pulsar_broker_replication_disconnected_count` keeps larger than `0` for a period of time, it means some replicators do not work, we should push an alert out.

# Backward & Forward Compatibility

## Regarding to Phase-1
This PIP guarantees full compatibility with default settings(the default value of `createTopicToRemoteClusterForReplication` is `true`). If you want to cherry-pick PIP-370 for another branch in the future, you need to cherry-pick PIP-344 as well. Because the behavior of disables `createTopicToRemoteClusterForReplication` depends on the API `PulsarClient.getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled)`, which was introduced by [PIP-344](https://github.com/apache/pulsar/blob/master/pip/pip-344.md).

## Regarding to Phase-2
The two scenarios are as follows, the replication will not work as before, which will lead backlog increase, please take care of checking your clusters before upgrading.
- `local_cluster.topic.partitions = 2` and `remote_cluster.topic.partitions = 0(non-partitioned topic)`: see detail in the section `Design & Implementation Details -> Phase-2`.
- `local_cluster.topic.partitions = 0(non-partitioned topic)` and and `remote_cluster.topic.partitions >= 1`: see detail in the section `Design & Implementation Details -> Phase-2`.

# Links
* Mailing List discussion thread: https://lists.apache.org/thread/9fx354cqcy3412w1nx8kwdf9h141omdg
* Mailing List voting thread: https://lists.apache.org/thread/vph22st5td1rdh1gd68gkrnp9doo6ct2
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,66 @@ public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
consumer.close();
}

@Test
public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";

final int maxRedeliveryCount = 1;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

byte[] key = new byte[]{1, 2, 3, 4};
for (int i = 0; i < sendMessages; i++) {
producer.newMessage()
.orderingKey(key)
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.send();
}

producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getOrderingKey(), key);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();
}

public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ public void testAutoConsumeSchemaRetryLetter() throws Exception {
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";

byte[] key = "key".getBytes();
byte[] orderingKey = "orderingKey".getBytes();

final int maxRedeliveryCount = 3;

final int sendMessages = 10;
Expand Down Expand Up @@ -285,7 +288,11 @@ public void testRetryTopicProperties() throws Exception {

Set<String> originMessageIds = new HashSet<>();
for (int i = 0; i < sendMessages; i++) {
MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
MessageId msgId = producer.newMessage()
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.keyBytes(key)
.orderingKey(orderingKey)
.send();
originMessageIds.add(msgId.toString());
}

Expand All @@ -298,6 +305,10 @@ public void testRetryTopicProperties() throws Exception {
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
assertTrue(message.hasKey());
assertEquals(message.getKeyBytes(), key);
assertTrue(message.hasOrderingKey());
assertEquals(message.getOrderingKey(), orderingKey);
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
Expand All @@ -317,6 +328,10 @@ public void testRetryTopicProperties() throws Exception {
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
assertTrue(message.hasKey());
assertEquals(message.getKeyBytes(), key);
assertTrue(message.hasOrderingKey());
assertEquals(message.getOrderingKey(), orderingKey);
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
deadLetterConsumer.acknowledge(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,17 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
}
}

private static void copyMessageKeyIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) {
private static void copyMessageKeysIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) {
if (message.hasKey()) {
if (message.hasBase64EncodedKey()) {
typedMessageBuilderNew.keyBytes(message.getKeyBytes());
} else {
typedMessageBuilderNew.key(message.getKey());
}
}
if (message.hasOrderingKey()) {
typedMessageBuilderNew.orderingKey(message.getOrderingKey());
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -704,6 +707,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
consumerDlqMessagesCounter.increment();

Expand Down Expand Up @@ -732,7 +736,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
Expand Down Expand Up @@ -2196,7 +2200,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);
Expand Down

0 comments on commit 3dcb6b9

Please sign in to comment.