Skip to content

Commit

Permalink
Use cache with weak keys in ChannelToEndpointChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoenig10 committed Sep 30, 2024
1 parent 5be2795 commit 5a23204
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,25 @@

package com.palantir.dialogue.core;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.BiFunction;

final class ChannelToEndpointChannel implements Channel {

private final Function<Endpoint, Channel> adapter;
private final Map<Object, Channel> cache;
private final LoadingCache<Endpoint, Channel> cache;

ChannelToEndpointChannel(Function<Endpoint, Channel> adapter) {
this.adapter = adapter;
this.cache = new ConcurrentHashMap<>();
ChannelToEndpointChannel(Channel channel, BiFunction<Channel, Endpoint, Channel> loader) {
this.cache = Caffeine.newBuilder().weakKeys().build(endpoint -> loader.apply(channel, endpoint));
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return channelFor(endpoint).execute(endpoint, request);
}

private Channel channelFor(Endpoint endpoint) {
return cache.computeIfAbsent(key(endpoint), _key -> adapter.apply(endpoint));
}

/**
* Constant {@link Endpoint endpoints} may be safely used as cache keys, as opposed to dynamically created
* {@link Endpoint} objects which would result in a memory leak.
*/
static boolean isConstant(Endpoint endpoint) {
// The conjure generator creates endpoints as enum values, which can safely be cached because they aren't
// dynamically created.
return endpoint instanceof Enum;
}

/**
* Creates a cache key for the given endpoint. Some consumers (CJR feign shim) may not use endpoint enums, so we
* cannot safely hold references to potentially short-lived objects. In such cases we use a string value based on
* the service-name endpoint-name tuple.
*/
private static Object key(Endpoint endpoint) {
return isConstant(endpoint) ? endpoint : stringKey(endpoint);
}

private static String stringKey(Endpoint endpoint) {
return endpoint.serviceName() + '.' + endpoint.endpointName();
return cache.get(endpoint).execute(endpoint, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,24 @@ private static ImmutableList<LimitedChannel> createHostChannels(Config cf, List<
.build());
channel = RetryOtherValidatingChannel.create(cf, channel);
channel = HostMetricsChannel.create(cf, channel, targetUri.uri());
Channel tracingChannel =
channel =
new TraceEnrichingChannel(channel, DialogueTracing.tracingTags(cf, uriIndexForInstrumentation));
channel = cf.isConcurrencyLimitingEnabled()
? new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return tracingChannel;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
tracingChannel, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
})
: tracingChannel;
LimitedChannel limitedChannel = cf.isConcurrencyLimitingEnabled()
? ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation)
: new ChannelToLimitedChannelAdapter(channel);

LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
channel = new ChannelToEndpointChannel(channel, (unlimited, endpoint) -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation);
} else {
limitedChannel = new ChannelToLimitedChannelAdapter(channel);
}

perUriChannels.add(limitedChannel);
}
return perUriChannels.build();
Expand All @@ -253,11 +256,7 @@ private static EndpointChannelFactory createEndpointChannelFactory(Channel multi
channel = new RangeAcceptsIdentityEncodingChannel(channel);
channel = ContentEncodingChannel.of(channel, endpoint);
channel = TracedChannel.create(cf, channel, endpoint);
if (ChannelToEndpointChannel.isConstant(endpoint)) {
// Avoid producing metrics for non-constant endpoints which may produce
// high cardinality.
channel = TimingEndpointChannel.create(cf, channel, endpoint);
}
channel = TimingEndpointChannel.create(cf, channel, endpoint);
channel = new RequestBodyValidationChannel(channel);
channel = new InterruptionChannel(channel);
return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop
Expand Down

0 comments on commit 5a23204

Please sign in to comment.