Skip to content

Commit

Permalink
Use cache with weak keys in ChannelToEndpointChannel (#2367)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoenig10 authored Oct 1, 2024
1 parent 9f09405 commit f5498d1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 54 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2367.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: feature
feature:
description: Dialogue produces timer metrics for all endpoints.
links:
- https://github.com/palantir/dialogue/pull/2367
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,25 @@

package com.palantir.dialogue.core;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

final class ChannelToEndpointChannel implements Channel {

private final Function<Endpoint, Channel> adapter;
private final Map<Object, Channel> cache;
private final LoadingCache<Endpoint, Channel> cache;

ChannelToEndpointChannel(Function<Endpoint, Channel> adapter) {
this.adapter = adapter;
this.cache = new ConcurrentHashMap<>();
ChannelToEndpointChannel(Function<Endpoint, Channel> loader) {
this.cache = Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(loader::apply);
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return channelFor(endpoint).execute(endpoint, request);
}

private Channel channelFor(Endpoint endpoint) {
return cache.computeIfAbsent(key(endpoint), _key -> adapter.apply(endpoint));
}

/**
* Constant {@link Endpoint endpoints} may be safely used as cache keys, as opposed to dynamically created
* {@link Endpoint} objects which would result in a memory leak.
*/
static boolean isConstant(Endpoint endpoint) {
// The conjure generator creates endpoints as enum values, which can safely be cached because they aren't
// dynamically created.
return endpoint instanceof Enum;
}

/**
* Creates a cache key for the given endpoint. Some consumers (CJR feign shim) may not use endpoint enums, so we
* cannot safely hold references to potentially short-lived objects. In such cases we use a string value based on
* the service-name endpoint-name tuple.
*/
private static Object key(Endpoint endpoint) {
return isConstant(endpoint) ? endpoint : stringKey(endpoint);
}

private static String stringKey(Endpoint endpoint) {
return endpoint.serviceName() + '.' + endpoint.endpointName();
return cache.get(endpoint).execute(endpoint, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,25 @@ private static ImmutableList<LimitedChannel> createHostChannels(Config cf, List<
.build());
channel = RetryOtherValidatingChannel.create(cf, channel);
channel = HostMetricsChannel.create(cf, channel, targetUri.uri());
Channel tracingChannel =
channel =
new TraceEnrichingChannel(channel, DialogueTracing.tracingTags(cf, uriIndexForInstrumentation));
channel = cf.isConcurrencyLimitingEnabled()
? new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return tracingChannel;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
tracingChannel, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
})
: tracingChannel;
LimitedChannel limitedChannel = cf.isConcurrencyLimitingEnabled()
? ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation)
: new ChannelToLimitedChannelAdapter(channel);

LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation);
} else {
limitedChannel = new ChannelToLimitedChannelAdapter(channel);
}

perUriChannels.add(limitedChannel);
}
return perUriChannels.build();
Expand All @@ -253,11 +257,7 @@ private static EndpointChannelFactory createEndpointChannelFactory(Channel multi
channel = new RangeAcceptsIdentityEncodingChannel(channel);
channel = ContentEncodingChannel.of(channel, endpoint);
channel = TracedChannel.create(cf, channel, endpoint);
if (ChannelToEndpointChannel.isConstant(endpoint)) {
// Avoid producing metrics for non-constant endpoints which may produce
// high cardinality.
channel = TimingEndpointChannel.create(cf, channel, endpoint);
}
channel = TimingEndpointChannel.create(cf, channel, endpoint);
channel = new RequestBodyValidationChannel(channel);
channel = new InterruptionChannel(channel);
return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop
Expand Down

0 comments on commit f5498d1

Please sign in to comment.