Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Virtual cluster wide topic id cache #11

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions kroxylicious-filter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
<artifactId>topic-encryption</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<properties>
<caffeine.version>3.1.6</caffeine.version>
<assertj.version>3.24.2</assertj.version>
<mockito.version>5.4.0</mockito.version>
</properties>
<modelVersion>4.0.0</modelVersion>
<artifactId>kroxylicious-filter</artifactId>
<name>Kroxylicious Filter</name>
Expand Down Expand Up @@ -46,6 +51,11 @@
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>io.kroxylicious.testing</groupId>
<artifactId>testing-junit5-extension</artifactId>
Expand Down Expand Up @@ -76,6 +86,24 @@
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import org.apache.kafka.common.Uuid;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

public class ContextCacheLoader implements AsyncCacheLoader<Uuid, String> {

@Override
public CompletableFuture<? extends String> asyncLoad(Uuid key, Executor executor) throws Exception {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import io.kroxylicious.proxy.filter.FetchRequestFilter;
import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.KrpcFilterContext;
import io.kroxylicious.proxy.filter.MetadataResponseFilter;
import io.strimzi.kafka.topicenc.EncryptionModule;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseDataJsonConverter;
import org.apache.kafka.common.message.RequestHeaderData;
Expand All @@ -17,11 +15,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import io.strimzi.kafka.topicenc.EncryptionModule;

import io.kroxylicious.proxy.filter.FetchRequestFilter;
import io.kroxylicious.proxy.filter.FetchResponseFilter;
import io.kroxylicious.proxy.filter.KrpcFilterContext;
import io.kroxylicious.proxy.filter.MetadataResponseFilter;

import static io.strimzi.kafka.topicenc.common.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.toSet;
Expand All @@ -30,12 +29,13 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilt

private static final Logger log = LoggerFactory.getLogger(FetchDecryptFilter.class);
public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12;
private final Map<Uuid, String> topicUuidToName = new HashMap<>();

private final EncryptionModule module;
private final TopicIdCache topicUuidToNameCache;

public FetchDecryptFilter(TopicEncryptionConfig config) {
module = new EncryptionModule(config.getPolicyRepository());
topicUuidToNameCache = config.getTopicUuidToNameCache();
}

@Override
Expand All @@ -54,8 +54,10 @@ public void onFetchResponse(short apiVersion, ResponseHeaderData header, FetchRe
var unresolvedTopicIds = getUnresolvedTopicIds(response);
if (unresolvedTopicIds.isEmpty()) {
decryptFetchResponse(header, response, context);
} else {
log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response", unresolvedTopicIds.size());
}
else {
log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response",
unresolvedTopicIds.size());
log.debug("We did not know all topic names for topic ids {} within a fetch response, requesting metadata and returning error response", unresolvedTopicIds);
// we return an error rather than delaying the response to prevent out-of-order responses to the Consumer client.
// The Filter API only supports synchronous work currently.
Expand Down Expand Up @@ -83,29 +85,21 @@ private void resolveTopicsAndReturnError(ResponseHeaderData header, KrpcFilterCo
}

private void resolveAndCache(KrpcFilterContext context, Set<Uuid> topicIdsToResolve) {
MetadataRequestData request = new MetadataRequestData();
topicIdsToResolve.forEach(uuid -> {
MetadataRequestData.MetadataRequestTopic e = new MetadataRequestData.MetadataRequestTopic();
e.setTopicId(uuid);
request.topics().add(e);
});
// if the client is sending topic ids we will assume the broker can support at least the lowest metadata apiVersion
// supporting topicIds
CompletionStage<MetadataResponseData> stage = context.sendRequest(METADATA_VERSION_SUPPORTING_TOPIC_IDS, request);
stage.thenAccept(response -> cacheTopicIdToName(response, METADATA_VERSION_SUPPORTING_TOPIC_IDS));
topicUuidToNameCache.resolveTopicNames(context, topicIdsToResolve);
}

private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData response, KrpcFilterContext context) {
for (FetchResponseData.FetchableTopicResponse fetchResponse : response.responses()) {
Uuid originalUuid = fetchResponse.topicId();
String originalName = fetchResponse.topic();
if (isNullOrEmpty(originalName)) {
fetchResponse.setTopic(topicUuidToName.get(originalUuid));
fetchResponse.setTopic(getTopicNameForUuid(originalUuid));
fetchResponse.setTopicId(null);
}
try {
module.decrypt(fetchResponse);
} catch (Exception e) {
}
catch (Exception e) {
log.error("Failed to decrypt a fetchResponse for topic: " + fetchResponse.topic(), e);
throw new RuntimeException(e);
}
Expand All @@ -115,35 +109,35 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r
context.forwardResponse(header, response);
}

private String getTopicNameForUuid(Uuid originalUuid) {
//TODO revisit error handling
final CompletableFuture<String> topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid);
return topicNameFuture != null ? topicNameFuture.getNow(null) : null;
}

@Override
public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) {
cacheTopicIdToName(response, apiVersion);
context.forwardResponse(header, response);
}

private boolean isResolvable(FetchResponseData.FetchableTopicResponse fetchableTopicResponse) {
return !isNullOrEmpty(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId());
return hasTopicName(fetchableTopicResponse.topicId(), fetchableTopicResponse.topic());
}

private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) {
return !isNullOrEmpty(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId());
return hasTopicName(fetchTopic.topicId(), fetchTopic.topic());
}

@Override
public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) {
cacheTopicIdToName(response, apiVersion);
context.forwardResponse(header, response);
private boolean hasTopicName(Uuid topicId, String topicName) {
return !isNullOrEmpty(topicName) || topicUuidToNameCache.hasResolvedTopic(topicId);
}

private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) {
if (log.isTraceEnabled()) {
log.trace("received metadata response: {}", MetadataResponseDataJsonConverter.write(response, apiVersion));
}
response.topics().forEach(topic -> {
if (topic.errorCode() == 0) {
if (topic.topicId() != null && !isNullOrEmpty(topic.name())) {
topicUuidToName.put(topic.topicId(), topic.name());
} else {
log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name());
}
} else {
log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kroxylicious.proxy.config.BaseConfig;

import io.strimzi.kafka.topicenc.policy.PolicyRepository;

import java.util.Objects;
import io.kroxylicious.proxy.config.BaseConfig;

public class TopicEncryptionConfig extends BaseConfig {

public static final String IN_MEMORY_POLICY_REPOSITORY_PROP_NAME = "inMemoryPolicyRepository";
private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository;

@JsonIgnore
private static final ConcurrentHashMap<String, TopicIdCache> virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>();

@JsonCreator
public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) {
this.inMemoryPolicyRepository = inMemoryPolicyRepository;
Expand All @@ -22,4 +29,9 @@ public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_P
public PolicyRepository getPolicyRepository() {
return inMemoryPolicyRepository.getPolicyRepository();
}

public TopicIdCache getTopicUuidToNameCache() {
return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> new TopicIdCache());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intent here that the config will be given access to the VirtualCluster name, or its UID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha you spotted my deliberate fudge. I'm currently working on https://github.com/sambarker/kroxylicious/tree/name_that_cluster I my current suspicion is it will need to be name based as we are leaning towards relaxed restrictions on clusterID's.

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.strimzi.kafka.topicenc.kroxylicious;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.requests.MetadataRequest;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;

import io.kroxylicious.proxy.filter.KrpcFilterContext;

public class TopicIdCache {
private final AsyncCache<Uuid, String> topicNamesById;

public TopicIdCache() {
this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for expiring? Is this to keep only relevant/used mappings cached?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Can't say I gave it too much thought. I wanted to avoid the data going stale/leaking in case the proxy missed a metadata update which deleted a topic.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose if a kafka use-case used short lived topics, then this would be a concern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should probably be a bounded cache as well, given we are going to have 1 per VirtualCluster.

}

TopicIdCache(AsyncCache<Uuid, String> topicNamesById) {
this.topicNamesById = topicNamesById;
}

/**
* Exposes a future to avoid multiple clients triggering metadata requests for the same topicId.
* @param topicId to convert to a name
* @return the Future which will be completed when the topic name is resolved or <code>null</code> if the topic is not known (and is not currently being resolved)
*/
public CompletableFuture<String> getTopicName(Uuid topicId) {
return topicNamesById.getIfPresent(topicId);
}

public boolean hasResolvedTopic(Uuid topicId) {
final CompletableFuture<String> topicNameFuture = topicNamesById.getIfPresent(topicId);
//Caffeine converts failed or cancelled futures to null internally, so we don't have to handle them explicitly
return topicNameFuture != null && topicNameFuture.isDone();
}

public void resolveTopicNames(KrpcFilterContext context, Set<Uuid> topicIdsToResolve) {
final MetadataRequest.Builder builder = new MetadataRequest.Builder(List.copyOf(topicIdsToResolve));
final MetadataRequest metadataRequest = builder.build(builder.latestAllowedVersion());
topicIdsToResolve.forEach(uuid -> topicNamesById.put(uuid, new CompletableFuture<>()));
context.<MetadataResponseData> sendRequest(metadataRequest.version(), metadataRequest.data())
.whenComplete((metadataResponseData, throwable) -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What path is followed when topicId is not found?

Copy link
Member Author

@SamBarker SamBarker Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None yet 😁 As I haven't spun up a real cluster to work out what that would look like (this is the sort of reason its still a draft PR).

I suspect it will need to fail the future or even just complete it with null and let it get re-queried.

if (throwable != null) {
//TODO something sensible
}
else {
metadataResponseData.topics()
.forEach(metadataResponseTopic -> Objects.requireNonNull(topicNamesById.getIfPresent(metadataResponseTopic.topicId()))
.complete(metadataResponseTopic.name()));
//If we were to get null from getIfPresent it would imply we got a result for a topic we didn't expect
}
});
}

}
Loading