diff --git a/changelog/@unreleased/pr-2367.v2.yml b/changelog/@unreleased/pr-2367.v2.yml new file mode 100644 index 000000000..f39e8bd89 --- /dev/null +++ b/changelog/@unreleased/pr-2367.v2.yml @@ -0,0 +1,5 @@ +type: feature +feature: + description: Dialogue produces timer metrics for all endpoints. + links: + - https://github.com/palantir/dialogue/pull/2367 diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/ChannelToEndpointChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/ChannelToEndpointChannel.java index 29fdc828b..b6f2ba3c1 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/ChannelToEndpointChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/ChannelToEndpointChannel.java @@ -16,54 +16,25 @@ package com.palantir.dialogue.core; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.util.concurrent.ListenableFuture; import com.palantir.dialogue.Channel; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; final class ChannelToEndpointChannel implements Channel { - private final Function adapter; - private final Map cache; + private final LoadingCache cache; - ChannelToEndpointChannel(Function adapter) { - this.adapter = adapter; - this.cache = new ConcurrentHashMap<>(); + ChannelToEndpointChannel(Function loader) { + this.cache = Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(loader::apply); } @Override public ListenableFuture execute(Endpoint endpoint, Request request) { - return channelFor(endpoint).execute(endpoint, request); - } - - private Channel channelFor(Endpoint endpoint) { - return cache.computeIfAbsent(key(endpoint), _key -> adapter.apply(endpoint)); - } - - /** - * Constant {@link Endpoint endpoints} may be safely used as cache keys, as opposed to dynamically created - * {@link Endpoint} objects which would result in a memory leak. - */ - static boolean isConstant(Endpoint endpoint) { - // The conjure generator creates endpoints as enum values, which can safely be cached because they aren't - // dynamically created. - return endpoint instanceof Enum; - } - - /** - * Creates a cache key for the given endpoint. Some consumers (CJR feign shim) may not use endpoint enums, so we - * cannot safely hold references to potentially short-lived objects. In such cases we use a string value based on - * the service-name endpoint-name tuple. - */ - private static Object key(Endpoint endpoint) { - return isConstant(endpoint) ? endpoint : stringKey(endpoint); - } - - private static String stringKey(Endpoint endpoint) { - return endpoint.serviceName() + '.' + endpoint.endpointName(); + return cache.get(endpoint).execute(endpoint, request); } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index e4424d0e8..955e725b3 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -219,21 +219,25 @@ private static ImmutableList createHostChannels(Config cf, List< .build()); channel = RetryOtherValidatingChannel.create(cf, channel); channel = HostMetricsChannel.create(cf, channel, targetUri.uri()); - Channel tracingChannel = + channel = new TraceEnrichingChannel(channel, DialogueTracing.tracingTags(cf, uriIndexForInstrumentation)); - channel = cf.isConcurrencyLimitingEnabled() - ? new ChannelToEndpointChannel(endpoint -> { - if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) { - return tracingChannel; - } - LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint( - tracingChannel, cf.channelName(), uriIndexForInstrumentation, endpoint); - return QueuedChannel.create(cf, endpoint, limited); - }) - : tracingChannel; - LimitedChannel limitedChannel = cf.isConcurrencyLimitingEnabled() - ? ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation) - : new ChannelToLimitedChannelAdapter(channel); + + LimitedChannel limitedChannel; + if (cf.isConcurrencyLimitingEnabled()) { + Channel unlimited = channel; + channel = new ChannelToEndpointChannel(endpoint -> { + if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) { + return unlimited; + } + LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint( + unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint); + return QueuedChannel.create(cf, endpoint, limited); + }); + limitedChannel = ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation); + } else { + limitedChannel = new ChannelToLimitedChannelAdapter(channel); + } + perUriChannels.add(limitedChannel); } return perUriChannels.build(); @@ -253,11 +257,7 @@ private static EndpointChannelFactory createEndpointChannelFactory(Channel multi channel = new RangeAcceptsIdentityEncodingChannel(channel); channel = ContentEncodingChannel.of(channel, endpoint); channel = TracedChannel.create(cf, channel, endpoint); - if (ChannelToEndpointChannel.isConstant(endpoint)) { - // Avoid producing metrics for non-constant endpoints which may produce - // high cardinality. - channel = TimingEndpointChannel.create(cf, channel, endpoint); - } + channel = TimingEndpointChannel.create(cf, channel, endpoint); channel = new RequestBodyValidationChannel(channel); channel = new InterruptionChannel(channel); return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop