diff --git a/kroxylicious-filter/pom.xml b/kroxylicious-filter/pom.xml index f599cef..9b9517b 100644 --- a/kroxylicious-filter/pom.xml +++ b/kroxylicious-filter/pom.xml @@ -6,6 +6,11 @@ topic-encryption 0.0.1-SNAPSHOT + + 3.1.6 + 3.24.2 + 5.4.0 + 4.0.0 kroxylicious-filter Kroxylicious Filter @@ -46,6 +51,11 @@ kafka-clients 3.4.0 + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + io.kroxylicious.testing testing-junit5-extension @@ -76,6 +86,24 @@ 1.18.3 test + + org.assertj + assertj-core + ${assertj.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java new file mode 100644 index 0000000..2304dad --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java @@ -0,0 +1,16 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.apache.kafka.common.Uuid; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; + +public class ContextCacheLoader implements AsyncCacheLoader { + + @Override + public CompletableFuture asyncLoad(Uuid key, Executor executor) throws Exception { + return null; + } +} diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index 7d391e2..a2026ed 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -1,14 +1,12 @@ package io.strimzi.kafka.topicenc.kroxylicious; -import io.kroxylicious.proxy.filter.FetchRequestFilter; -import io.kroxylicious.proxy.filter.FetchResponseFilter; -import io.kroxylicious.proxy.filter.KrpcFilterContext; -import io.kroxylicious.proxy.filter.MetadataResponseFilter; -import io.strimzi.kafka.topicenc.EncryptionModule; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; + import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; -import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.message.MetadataResponseDataJsonConverter; import org.apache.kafka.common.message.RequestHeaderData; @@ -17,11 +15,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletionStage; -import java.util.function.Predicate; +import io.strimzi.kafka.topicenc.EncryptionModule; + +import io.kroxylicious.proxy.filter.FetchRequestFilter; +import io.kroxylicious.proxy.filter.FetchResponseFilter; +import io.kroxylicious.proxy.filter.KrpcFilterContext; +import io.kroxylicious.proxy.filter.MetadataResponseFilter; import static io.strimzi.kafka.topicenc.common.Strings.isNullOrEmpty; import static java.util.stream.Collectors.toSet; @@ -30,12 +29,13 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilt private static final Logger log = LoggerFactory.getLogger(FetchDecryptFilter.class); public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12; - private final Map topicUuidToName = new HashMap<>(); private final EncryptionModule module; + private final TopicIdCache topicUuidToNameCache; public FetchDecryptFilter(TopicEncryptionConfig config) { module = new EncryptionModule(config.getPolicyRepository()); + topicUuidToNameCache = config.getTopicUuidToNameCache(); } @Override @@ -54,8 +54,10 @@ public void onFetchResponse(short apiVersion, ResponseHeaderData header, FetchRe var unresolvedTopicIds = getUnresolvedTopicIds(response); if (unresolvedTopicIds.isEmpty()) { decryptFetchResponse(header, response, context); - } else { - log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response", unresolvedTopicIds.size()); + } + else { + log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response", + unresolvedTopicIds.size()); log.debug("We did not know all topic names for topic ids {} within a fetch response, requesting metadata and returning error response", unresolvedTopicIds); // we return an error rather than delaying the response to prevent out-of-order responses to the Consumer client. // The Filter API only supports synchronous work currently. @@ -83,16 +85,7 @@ private void resolveTopicsAndReturnError(ResponseHeaderData header, KrpcFilterCo } private void resolveAndCache(KrpcFilterContext context, Set topicIdsToResolve) { - MetadataRequestData request = new MetadataRequestData(); - topicIdsToResolve.forEach(uuid -> { - MetadataRequestData.MetadataRequestTopic e = new MetadataRequestData.MetadataRequestTopic(); - e.setTopicId(uuid); - request.topics().add(e); - }); - // if the client is sending topic ids we will assume the broker can support at least the lowest metadata apiVersion - // supporting topicIds - CompletionStage stage = context.sendRequest(METADATA_VERSION_SUPPORTING_TOPIC_IDS, request); - stage.thenAccept(response -> cacheTopicIdToName(response, METADATA_VERSION_SUPPORTING_TOPIC_IDS)); + topicUuidToNameCache.resolveTopicNames(context, topicIdsToResolve); } private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData response, KrpcFilterContext context) { @@ -100,12 +93,13 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r Uuid originalUuid = fetchResponse.topicId(); String originalName = fetchResponse.topic(); if (isNullOrEmpty(originalName)) { - fetchResponse.setTopic(topicUuidToName.get(originalUuid)); + fetchResponse.setTopic(getTopicNameForUuid(originalUuid)); fetchResponse.setTopicId(null); } try { module.decrypt(fetchResponse); - } catch (Exception e) { + } + catch (Exception e) { log.error("Failed to decrypt a fetchResponse for topic: " + fetchResponse.topic(), e); throw new RuntimeException(e); } @@ -115,19 +109,28 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r context.forwardResponse(header, response); } + private String getTopicNameForUuid(Uuid originalUuid) { + //TODO revisit error handling + final CompletableFuture topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid); + return topicNameFuture != null ? topicNameFuture.getNow(null) : null; + } + + @Override + public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) { + cacheTopicIdToName(response, apiVersion); + context.forwardResponse(header, response); + } private boolean isResolvable(FetchResponseData.FetchableTopicResponse fetchableTopicResponse) { - return !isNullOrEmpty(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId()); + return hasTopicName(fetchableTopicResponse.topicId(), fetchableTopicResponse.topic()); } private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) { - return !isNullOrEmpty(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId()); + return hasTopicName(fetchTopic.topicId(), fetchTopic.topic()); } - @Override - public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) { - cacheTopicIdToName(response, apiVersion); - context.forwardResponse(header, response); + private boolean hasTopicName(Uuid topicId, String topicName) { + return !isNullOrEmpty(topicName) || topicUuidToNameCache.hasResolvedTopic(topicId); } private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) { @@ -135,15 +138,6 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) log.trace("received metadata response: {}", MetadataResponseDataJsonConverter.write(response, apiVersion)); } response.topics().forEach(topic -> { - if (topic.errorCode() == 0) { - if (topic.topicId() != null && !isNullOrEmpty(topic.name())) { - topicUuidToName.put(topic.topicId(), topic.name()); - } else { - log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); - } - } else { - log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name()); - } }); } } diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java index 43e1cbc..85a8be7 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java @@ -1,17 +1,24 @@ package io.strimzi.kafka.topicenc.kroxylicious; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import io.kroxylicious.proxy.config.BaseConfig; + import io.strimzi.kafka.topicenc.policy.PolicyRepository; -import java.util.Objects; +import io.kroxylicious.proxy.config.BaseConfig; public class TopicEncryptionConfig extends BaseConfig { public static final String IN_MEMORY_POLICY_REPOSITORY_PROP_NAME = "inMemoryPolicyRepository"; private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository; + @JsonIgnore + private static final ConcurrentHashMap virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>(); + @JsonCreator public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) { this.inMemoryPolicyRepository = inMemoryPolicyRepository; @@ -22,4 +29,9 @@ public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_P public PolicyRepository getPolicyRepository() { return inMemoryPolicyRepository.getPolicyRepository(); } + + public TopicIdCache getTopicUuidToNameCache() { + return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> new TopicIdCache()); + } + } diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java new file mode 100644 index 0000000..3e09f97 --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java @@ -0,0 +1,62 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.requests.MetadataRequest; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import io.kroxylicious.proxy.filter.KrpcFilterContext; + +public class TopicIdCache { + private final AsyncCache topicNamesById; + + public TopicIdCache() { + this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync()); + } + + TopicIdCache(AsyncCache topicNamesById) { + this.topicNamesById = topicNamesById; + } + + /** + * Exposes a future to avoid multiple clients triggering metadata requests for the same topicId. + * @param topicId to convert to a name + * @return the Future which will be completed when the topic name is resolved or null if the topic is not known (and is not currently being resolved) + */ + public CompletableFuture getTopicName(Uuid topicId) { + return topicNamesById.getIfPresent(topicId); + } + + public boolean hasResolvedTopic(Uuid topicId) { + final CompletableFuture topicNameFuture = topicNamesById.getIfPresent(topicId); + //Caffeine converts failed or cancelled futures to null internally, so we don't have to handle them explicitly + return topicNameFuture != null && topicNameFuture.isDone(); + } + + public void resolveTopicNames(KrpcFilterContext context, Set topicIdsToResolve) { + final MetadataRequest.Builder builder = new MetadataRequest.Builder(List.copyOf(topicIdsToResolve)); + final MetadataRequest metadataRequest = builder.build(builder.latestAllowedVersion()); + topicIdsToResolve.forEach(uuid -> topicNamesById.put(uuid, new CompletableFuture<>())); + context. sendRequest(metadataRequest.version(), metadataRequest.data()) + .whenComplete((metadataResponseData, throwable) -> { + if (throwable != null) { + //TODO something sensible + } + else { + metadataResponseData.topics() + .forEach(metadataResponseTopic -> Objects.requireNonNull(topicNamesById.getIfPresent(metadataResponseTopic.topicId())) + .complete(metadataResponseTopic.name())); + //If we were to get null from getIfPresent it would imply we got a result for a topic we didn't expect + } + }); + } + +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java new file mode 100644 index 0000000..7ca7e72 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java @@ -0,0 +1,232 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.protocol.ApiMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import io.kroxylicious.proxy.filter.KrpcFilterContext; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class TopicIdCacheTest { + + private static final Uuid UNKNOWN_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid KNOWN_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid PENDING_TOPIC_ID = Uuid.randomUuid(); + private static final String KNOWN_TOPIC_NAME = "TOPIC_WIBBLE"; + private static final String RESOLVED_TOPIC_NAME = "TOPIC_RESOLVED"; + private TopicIdCache topicIdCache; + private AsyncLoadingCache underlyingCache; + private KrpcFilterContext filterContext; + private CompletableFuture pendingFuture; + + @BeforeEach + void setUp() { + underlyingCache = Caffeine.newBuilder().buildAsync((key, executor) -> null); + underlyingCache.put(KNOWN_TOPIC_ID, CompletableFuture.completedFuture(KNOWN_TOPIC_NAME)); + underlyingCache.put(PENDING_TOPIC_ID, new CompletableFuture<>()); + topicIdCache = new TopicIdCache(underlyingCache); + filterContext = mock(KrpcFilterContext.class); + pendingFuture = new CompletableFuture<>(); + + lenient().when(filterContext.sendRequest(anyShort(), any())).thenReturn(pendingFuture); + } + + @Test + void shouldReturnFalseForUnknownTopicId() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(UNKNOWN_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdWhichIsStillInProgress() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdWhichHasFailed() { + //Given + underlyingCache.put(PENDING_TOPIC_ID, CompletableFuture.failedFuture(new IllegalStateException("boom boom boom"))); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdOfWhichResolutionWasCancelled() { + //Given + final CompletableFuture cancelledFuture = new CompletableFuture<>(); + cancelledFuture.cancel(true); + underlyingCache.put(PENDING_TOPIC_ID, cancelledFuture); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdOfWhichResolutionWasCancelledAfterCaching() { + //Given + final CompletableFuture cancelledFuture = new CompletableFuture<>(); + underlyingCache.put(PENDING_TOPIC_ID, cancelledFuture); + cancelledFuture.cancel(true); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnTrueForKnownTopicId() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(KNOWN_TOPIC_ID); + + //Then + assertThat(isResolved).isTrue(); + } + + @Test + void shouldReturnNameFromGetForKnownTopic() { + //Given + + //When + final CompletableFuture topicName = topicIdCache.getTopicName(KNOWN_TOPIC_ID); + + //Then + Assertions.assertThat(topicName).isCompletedWithValue(KNOWN_TOPIC_NAME); + } + + @Test + void shouldReturnNullFromGetForUnresolvedTopic() { + //Given + + //When + final CompletableFuture topicName = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + + //Then + Assertions.assertThat(topicName).isNull(); + } + + @Test + void shouldSendMetadataRequestToResolveTopicNames() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), any(MetadataRequestData.class)); + } + + @Test + void shouldIncludeTopicId() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> { + assertThat(apiMessage).isInstanceOf(MetadataRequestData.class); + assertThat(((MetadataRequestData) apiMessage).topics()).anyMatch(metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId())); + return true; + })); + } + + @Test + void shouldIncludeMultipleTopicId() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> { + assertThat(apiMessage).isInstanceOf(MetadataRequestData.class); + assertThat(((MetadataRequestData) apiMessage).topics()) + .allMatch( + metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId()) || PENDING_TOPIC_ID.equals(metadataRequestTopic.topicId())); + return true; + })); + } + + @Test + void shouldReturnPendingFutures() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID)); + + //Then + assertThat(topicIdCache.getTopicName(UNKNOWN_TOPIC_ID)).isNotNull().isNotDone(); + assertThat(topicIdCache.getTopicName(PENDING_TOPIC_ID)).isNotNull().isNotDone(); + } + + @Test + void shouldCacheFutureForTopicId() { + //Given + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //When + final CompletableFuture actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + + //Then + assertThat(actualFuture).isNotNull().isNotDone(); + } + + @Test + void shouldCompleteFutureWhenMetadataResponseDelivered() { + //Given + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + final MetadataResponseData.MetadataResponseTopic responseTopic = new MetadataResponseData.MetadataResponseTopic(); + responseTopic.setTopicId(UNKNOWN_TOPIC_ID).setName(RESOLVED_TOPIC_NAME); + final MetadataResponseData.MetadataResponseTopicCollection metadataResponseTopics = new MetadataResponseData.MetadataResponseTopicCollection(); + metadataResponseTopics.add(responseTopic); + + //When + pendingFuture.complete(new MetadataResponseData().setTopics(metadataResponseTopics)); + + //Then + final CompletableFuture actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + assertThat(actualFuture).isCompletedWithValue(RESOLVED_TOPIC_NAME); + } +} \ No newline at end of file