diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a71cf5cecc0ea..689a861ffd73c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -244,6 +244,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider brokerClientSharedExternalExecutorProvider; private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; + private final ExecutorProvider brokerClientSharedLookupExecutorProvider; private MetricsGenerator metricsGenerator; @@ -353,6 +354,8 @@ public PulsarService(ServiceConfiguration config, new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + this.brokerClientSharedLookupExecutorProvider = + new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); // here in the constructor we don't have the offloader scheduler yet this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); @@ -593,6 +596,7 @@ public CompletableFuture closeAsync() { brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); + brokerClientSharedLookupExecutorProvider.shutdownNow(); brokerClientSharedTimer.stop(); asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup)); @@ -1587,6 +1591,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index fb6b84b1096a1..1f08c22caa5bf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -21,12 +21,14 @@ import static java.lang.String.format; import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; import java.net.URI; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,9 +57,11 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; + private final ExecutorService lookupPinnedExecutor; + private final boolean createdLookupPinnedExecutor; private final ConcurrentHashMap>> lookupInProgress = new ConcurrentHashMap<>(); @@ -65,27 +69,56 @@ public class BinaryProtoLookupService implements LookupService { private final ConcurrentHashMap> partitionedMetadataInProgress = new ConcurrentHashMap<>(); + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + boolean useTls, + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, null, useTls, scheduleExecutor); + } + + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, + String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null); } public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor, + ExecutorService lookupPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; updateServiceUrl(serviceUrl); + + if (lookupPinnedExecutor == null) { + this.createdLookupPinnedExecutor = true; + this.lookupPinnedExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); + } else { + this.createdLookupPinnedExecutor = false; + this.lookupPinnedExecutor = lookupPinnedExecutor; + } } @Override @@ -153,7 +186,7 @@ private CompletableFuture> findBroker return addressFuture; } - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { @@ -218,7 +251,7 @@ private CompletableFuture> findBroker } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -230,7 +263,7 @@ private CompletableFuture getPartitionedTopicMetadata( CompletableFuture partitionFuture = new CompletableFuture<>(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { if (useFallbackForNonPIP344Brokers) { @@ -269,7 +302,7 @@ private CompletableFuture getPartitionedTopicMetadata( } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -291,7 +324,7 @@ public CompletableFuture> getSchema(TopicName topicName, by return schemaFuture; } InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(version))); @@ -305,7 +338,7 @@ public CompletableFuture> getSchema(TopicName topicName, by } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(ex -> { + }, lookupPinnedExecutor).exceptionally(ex -> { schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); @@ -348,7 +381,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, Mode mode, String topicsPattern, String topicsHash) { - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( namespace.toString(), requestId, mode, topicsPattern, topicsHash); @@ -365,7 +398,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally((e) -> { + }, lookupPinnedExecutor).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { getTopicsResultFuture.completeExceptionally( @@ -375,7 +408,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -389,7 +422,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + lookupPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 3bf1a57649bc3..82e7c82810435 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -109,6 +109,7 @@ public class PulsarClientImpl implements PulsarClient { private final boolean createdExecutorProviders; private final boolean createdScheduledProviders; + private final boolean createdLookupProviders; private LookupService lookup; private Map urlLookupMap = new ConcurrentHashMap<>(); private final ConnectionPool cnxPool; @@ -117,6 +118,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private final ExecutorProvider lookupExecutorProvider; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -157,29 +159,40 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); + } + + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) + throws PulsarClientException { + this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, + scheduledExecutorProvider, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, - ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + ScheduledExecutorProvider scheduledExecutorProvider, + ExecutorProvider lookupExecutorProvider) throws PulsarClientException { + EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; try { @@ -191,6 +204,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG } this.createdExecutorProviders = externalExecutorProvider == null; this.createdScheduledProviders = scheduledExecutorProvider == null; + this.createdLookupProviders = lookupExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; if (conf == null || isBlank(conf.getServiceUrl())) { @@ -206,13 +220,16 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : + new ExecutorProvider(1, "pulsar-client-lookup"); this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), + this.lookupExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -965,6 +982,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) { + try { + lookupExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown lookupExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 0254cf8d44cea..984b201d1ce64 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -25,28 +25,44 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class BinaryProtoLookupServiceTest { private BinaryProtoLookupService lookup; private TopicName topicName; + private ExecutorService internalExecutor; + + @AfterMethod + public void cleanup() throws Exception { + internalExecutor.shutdown(); + lookup.close(); + } @BeforeMethod public void setup() throws Exception { @@ -72,9 +88,15 @@ public void setup() throws Exception { doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); + ClientConfigurationData data = new ClientConfigurationData(); + doReturn(data).when(client).getConfiguration(); + internalExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); + doReturn(internalExecutor).when(client).getInternalExecutorService(); + + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); topicName = TopicName.get("persistent://tenant1/ns1/t1"); } @@ -116,6 +138,37 @@ public void maxLookupRedirectsTest3() throws Exception { } } + @Test + public void testCommandUnChangedInDifferentThread() throws Exception { + BaseCommand successCommand = Commands.newSuccessCommand(10000); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + } + + @Test + public void testCommandChangedInSameThread() throws Exception { + AtomicReference successCommand = new AtomicReference<>(); + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.get().getType(), Type.LOOKUP); + + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA); + } + private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception { LookupDataResult lookupResult = new LookupDataResult(-1);