Skip to content

Commit

Permalink
[improve][broker] Add callback parameters to the SendCallback.sendCom…
Browse files Browse the repository at this point in the history
…plete (apache#23196)
  • Loading branch information
crossoverJie authored Aug 21, 2024
1 parent a605ea3 commit 18cb458
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.OpSendMsgStats;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
Expand Down Expand Up @@ -173,7 +174,7 @@ private static final class ProducerSendCallback implements SendCallback {
private MessageImpl msg;

@Override
public void sendComplete(Exception exception) {
public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) {
if (exception != null) {
log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.OpSendMsgStats;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
Expand Down Expand Up @@ -377,7 +378,7 @@ protected static final class ProducerSendCallback implements SendCallback {
private MessageImpl msg;

@Override
public void sendComplete(Exception exception) {
public void sendComplete(Throwable exception, OpSendMsgStats opSendMsgStats) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception);
// cursor should be rewinded since it was incremented when readMoreEntries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.client.impl.AbstractBatchMessageContainer.INITIAL_BATCH_BUFFER_SIZE;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MockBrokerService;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand All @@ -38,13 +42,20 @@

@Test(groups = "broker-impl")
@Slf4j
public class ProduceWithMessageIdTest {
public class ProduceWithMessageIdTest extends ProducerConsumerBase {
MockBrokerService mockBrokerService;

@BeforeClass(alwaysRun = true)
public void setup() {
public void setup() throws Exception {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
super.internalSetup();
super.producerBaseSetup();
}

@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@AfterClass(alwaysRun = true)
Expand Down Expand Up @@ -86,7 +97,7 @@ public void testSend() throws Exception {
AtomicBoolean result = new AtomicBoolean(false);
producer.sendAsync(msg, new SendCallback() {
@Override
public void sendComplete(Exception e) {
public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) {
log.info("sendComplete", e);
result.set(e == null);
}
Expand Down Expand Up @@ -115,4 +126,72 @@ public CompletableFuture<MessageId> getFuture() {
// the result is true only if broker received right message id.
Awaitility.await().untilTrue(result);
}

@Test
public void sendWithCallBack() throws Exception {

int batchSize = 10;

String topic = "persistent://public/default/testSendWithCallBack";
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchSize)
.create();

CountDownLatch cdl = new CountDownLatch(1);
AtomicReference<OpSendMsgStats> sendMsgStats = new AtomicReference<>();
SendCallback sendComplete = new SendCallback() {
@Override
public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) {
log.info("sendComplete", e);
if (e == null){
cdl.countDown();
sendMsgStats.set(opSendMsgStats);
}
}

@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {

}

@Override
public SendCallback getNextSendCallback() {
return null;
}

@Override
public MessageImpl<?> getNextMessage() {
return null;
}

@Override
public CompletableFuture<MessageId> getFuture() {
return null;
}
};
int totalReadabled = 0;
int totalUncompressedSize = 0;
for (int i = 0; i < batchSize; i++) {
MessageMetadata metadata = new MessageMetadata();
ByteBuffer buffer = ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8));
MessageImpl<byte[]> msg = MessageImpl.create(metadata, buffer, Schema.BYTES, topic);
msg.getDataBuffer().retain();
totalReadabled += msg.getDataBuffer().readableBytes();
totalUncompressedSize += msg.getUncompressedSize();
producer.sendAsync(msg, sendComplete);
}

cdl.await();
OpSendMsgStats opSendMsgStats = sendMsgStats.get();
Assert.assertEquals(opSendMsgStats.getUncompressedSize(), totalUncompressedSize + INITIAL_BATCH_BUFFER_SIZE);
Assert.assertEquals(opSendMsgStats.getSequenceId(), 0);
Assert.assertEquals(opSendMsgStats.getRetryCount(), 1);
Assert.assertEquals(opSendMsgStats.getBatchSizeByte(), totalReadabled);
Assert.assertEquals(opSendMsgStats.getNumMessagesInBatch(), batchSize);
Assert.assertEquals(opSendMsgStats.getHighestSequenceId(), batchSize-1);
Assert.assertEquals(opSendMsgStats.getTotalChunks(), 0);
Assert.assertEquals(opSendMsgStats.getChunkId(), -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void discard(Exception ex) {
try {
// Need to protect ourselves from any exception being thrown in the future handler from the application
if (firstCallback != null) {
firstCallback.sendComplete(ex);
firstCallback.sendComplete(ex, null);
}
if (batchedMessageMetadataAndPayload != null) {
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;


public interface OpSendMsgStats {
long getUncompressedSize();

long getSequenceId();

int getRetryCount();

long getBatchSizeByte();

int getNumMessagesInBatch();

long getHighestSequenceId();

int getTotalChunks();

int getChunkId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.Builder;

@Builder
public class OpSendMsgStatsImpl implements OpSendMsgStats {
private long uncompressedSize;
private long sequenceId;
private int retryCount;
private long batchSizeByte;
private int numMessagesInBatch;
private long highestSequenceId;
private int totalChunks;
private int chunkId;

@Override
public long getUncompressedSize() {
return uncompressedSize;
}

@Override
public long getSequenceId() {
return sequenceId;
}

@Override
public int getRetryCount() {
return retryCount;
}

@Override
public long getBatchSizeByte() {
return batchSizeByte;
}

@Override
public int getNumMessagesInBatch() {
return numMessagesInBatch;
}

@Override
public long getHighestSequenceId() {
return highestSequenceId;
}

@Override
public int getTotalChunks() {
return totalChunks;
}

@Override
public int getChunkId() {
return chunkId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public MessageImpl<?> getNextMessage() {
}

@Override
public void sendComplete(Exception e) {
public void sendComplete(Throwable e, OpSendMsgStats opSendMsgStats) {
SendCallback loopingCallback = this;
MessageImpl<?> loopingMsg = currentMsg;
while (loopingCallback != null) {
Expand All @@ -424,7 +424,7 @@ public void sendComplete(Exception e) {
}
}

private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl<?> msg) {
private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl<?> msg) {
long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback)
? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt;
long latencyNanos = System.nanoTime() - createdAt;
Expand Down Expand Up @@ -842,7 +842,7 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
log.warn("[{}] [{}] GetOrCreateSchema error", topic, producerName, t);
if (t instanceof PulsarClientException.IncompatibleSchemaException) {
msg.setSchemaState(MessageImpl.SchemaState.Broken);
callback.sendComplete((PulsarClientException.IncompatibleSchemaException) t);
callback.sendComplete(t, null);
}
} else {
log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName);
Expand Down Expand Up @@ -985,19 +985,19 @@ private boolean isValidProducerState(SendCallback callback, long sequenceId) {
case Closing:
case Closed:
callback.sendComplete(
new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId));
new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId), null);
return false;
case ProducerFenced:
callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"));
callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"), null);
return false;
case Terminated:
callback.sendComplete(
new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId));
new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId), null);
return false;
case Failed:
case Uninitialized:
default:
callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId));
callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId), null);
return false;
}
}
Expand All @@ -1012,20 +1012,20 @@ private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int pa
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError(
"Producer send queue is full", sequenceId));
"Producer send queue is full", sequenceId), null);
return false;
}

if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError(
"Client memory buffer is full", sequenceId));
"Client memory buffer is full", sequenceId), null);
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.sendComplete(new PulsarClientException(e, sequenceId));
callback.sendComplete(new PulsarClientException(e, sequenceId), null);
return false;
}

Expand Down Expand Up @@ -1302,7 +1302,7 @@ private void releaseSemaphoreForSendOp(OpSendMsg op) {
private void completeCallbackAndReleaseSemaphore(long payloadSize, SendCallback callback, Exception exception) {
semaphore.ifPresent(Semaphore::release);
client.getMemoryLimitController().releaseMemory(payloadSize);
callback.sendComplete(exception);
callback.sendComplete(exception, null);
}

/**
Expand Down Expand Up @@ -1595,7 +1595,17 @@ void sendComplete(final Exception e) {
rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
}

callback.sendComplete(finalEx);
OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder()
.uncompressedSize(uncompressedSize)
.sequenceId(sequenceId)
.retryCount(retryCount)
.batchSizeByte(batchSizeByte)
.numMessagesInBatch(numMessagesInBatch)
.highestSequenceId(highestSequenceId)
.totalChunks(totalChunks)
.chunkId(chunkId)
.build();
callback.sendComplete(finalEx, opSendMsgStats);
}
}

Expand Down
Loading

0 comments on commit 18cb458

Please sign in to comment.