diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/DeliveryResponse.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/DeliveryResponse.java new file mode 100644 index 00000000..99e56403 --- /dev/null +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/DeliveryResponse.java @@ -0,0 +1,6 @@ +package com.flipkart.varadhi.consumer.delivery; + +import com.flipkart.varadhi.entities.Endpoint; + +public record DeliveryResponse(int statusCode, Endpoint.Protocol protocol, byte[] body) { +} diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/MessageDelivery.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/MessageDelivery.java new file mode 100644 index 00000000..9842beb1 --- /dev/null +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/delivery/MessageDelivery.java @@ -0,0 +1,62 @@ +package com.flipkart.varadhi.consumer.delivery; + +import com.flipkart.varadhi.entities.Endpoint; +import com.flipkart.varadhi.entities.Message; +import com.google.common.collect.Multimap; +import org.apache.commons.lang3.ArrayUtils; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public interface MessageDelivery { + static MessageDelivery of(Endpoint endpoint, Supplier httpClientSupplier) { + return switch (endpoint.getProtocol()) { + case HTTP1_1, HTTP2 -> new HttpMessageDelivery((Endpoint.HttpEndpoint) endpoint, httpClientSupplier.get()); + default -> throw new IllegalArgumentException("Unsupported protocol: " + endpoint.getProtocol()); + }; + } + + CompletableFuture deliver(Message message) + throws Exception; + + class HttpMessageDelivery implements MessageDelivery { + private final Endpoint.HttpEndpoint endpoint; + private final HttpClient httpClient; + + public HttpMessageDelivery(Endpoint.HttpEndpoint endpoint, HttpClient client) { + this.endpoint = endpoint; + this.httpClient = client; + } + + @Override + public CompletableFuture deliver(Message message) throws Exception { + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() + .uri(endpoint.getUri()) + .timeout(Duration.ofMillis(endpoint.getRequestTimeoutMs())) + .header("Content-Type", endpoint.getContentType()) + .method( + endpoint.getMethod(), + ArrayUtils.isEmpty(message.getPayload()) ? HttpRequest.BodyPublishers.noBody() : + HttpRequest.BodyPublishers.ofByteArray(message.getPayload()) + ); + + // apply request headers from message + Multimap requestHeaders = message.getRequestHeaders(); + if (requestHeaders != null) { + requestHeaders.entries().forEach(entry -> requestBuilder.header(entry.getKey(), entry.getValue())); + } + + HttpRequest request = requestBuilder.build(); + + return httpClient.sendAsync( + request, HttpResponse.BodyHandlers.ofByteArray()) + .thenApply(response -> new DeliveryResponse(response.statusCode(), endpoint.getProtocol(), + response.body() + )); + } + } +} diff --git a/consumer/src/main/java/module-info.java b/consumer/src/main/java/module-info.java index b1bbba2f..cdf85d6b 100644 --- a/consumer/src/main/java/module-info.java +++ b/consumer/src/main/java/module-info.java @@ -12,4 +12,5 @@ requires com.flipkart.varadhi.core; requires jakarta.annotation; requires com.fasterxml.jackson.annotation; + requires java.net.http; } diff --git a/entities/src/main/java/com/flipkart/varadhi/entities/Endpoint.java b/entities/src/main/java/com/flipkart/varadhi/entities/Endpoint.java index 582bd22b..d5b26ae4 100644 --- a/entities/src/main/java/com/flipkart/varadhi/entities/Endpoint.java +++ b/entities/src/main/java/com/flipkart/varadhi/entities/Endpoint.java @@ -1,22 +1,25 @@ package com.flipkart.varadhi.entities; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; import lombok.EqualsAndHashCode; -import java.net.URL; +import java.net.URI; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, - property = "@endpointType" + property = "protocol" ) @JsonSubTypes({ - @JsonSubTypes.Type(Endpoint.HttpEndpoint.class), + @JsonSubTypes.Type(value = Endpoint.HttpEndpoint.class, name = "HTTP1_1"), + @JsonSubTypes.Type(value = Endpoint.HttpEndpoint.class, name = "HTTP2"), }) public abstract sealed class Endpoint { - abstract Protocol getProtocol(); + @JsonIgnore + public abstract Protocol getProtocol(); public enum Protocol { HTTP1_1, @@ -26,7 +29,7 @@ public enum Protocol { @EqualsAndHashCode(callSuper = true) @Data public static final class HttpEndpoint extends Endpoint { - private final URL url; + private final URI uri; private final String method; private final String contentType; private final long connectTimeoutMs; @@ -35,7 +38,7 @@ public static final class HttpEndpoint extends Endpoint { private final boolean http2Supported; @Override - Protocol getProtocol() { + public Protocol getProtocol() { return http2Supported ? Protocol.HTTP2 : Protocol.HTTP1_1; } } diff --git a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java index 88f753f5..6f0ff659 100644 --- a/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java +++ b/server/src/test/java/com/flipkart/varadhi/services/SubscriptionServiceTest.java @@ -39,7 +39,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -115,11 +115,7 @@ void testSubscriptionEntitiesSerDe() { 1, 1, 1, 1 ); ConsumptionPolicy consumptionPolicy = new ConsumptionPolicy(1, 1, false, 1, null); - try { - endpoint = new Endpoint.HttpEndpoint(new URL("http", "localhost", "hello"), "GET", "", 500, 500, false); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } + endpoint = new Endpoint.HttpEndpoint(URI.create("http://localhost:8080"), "GET", "", 500, 500, false); TopicCapacityPolicy capacity = Constants.DefaultTopicCapacity; String region = "default"; diff --git a/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java b/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java index 3880ca8e..f8b4e0a8 100644 --- a/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java +++ b/server/src/test/java/com/flipkart/varadhi/web/admin/SubscriptionHandlersTest.java @@ -1,10 +1,10 @@ package com.flipkart.varadhi.web.admin; -import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.entities.*; import com.flipkart.varadhi.exceptions.ResourceNotFoundException; import com.flipkart.varadhi.services.ProjectService; import com.flipkart.varadhi.services.SubscriptionService; +import com.flipkart.varadhi.services.VaradhiTopicService; import com.flipkart.varadhi.utils.VaradhiSubscriptionFactory; import com.flipkart.varadhi.web.ErrorResponse; import com.flipkart.varadhi.web.WebTestBase; @@ -18,8 +18,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -29,7 +28,8 @@ import static org.mockito.Mockito.*; public class SubscriptionHandlersTest extends WebTestBase { - private static final Endpoint endpoint; + private static final Endpoint endpoint = + new Endpoint.HttpEndpoint(URI.create("http://localhost:8080"), "GET", "", 500, 500, false); private static final RetryPolicy retryPolicy = new RetryPolicy( new CodeRange[]{new CodeRange(500, 502)}, RetryPolicy.BackoffType.LINEAR, @@ -39,14 +39,6 @@ public class SubscriptionHandlersTest extends WebTestBase { private static final TopicCapacityPolicy capacityPolicy = new TopicCapacityPolicy(1, 10, 1); private static final SubscriptionShards shards = new SubscriptionUnitShard(0, capacityPolicy, null, null, null); - static { - try { - endpoint = new Endpoint.HttpEndpoint(new URL("http", "localhost", "hello"), "GET", "", 500, 500, false); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } - private final Project project = new Project("project1", 0, "", "team1", "org1"); private final TopicResource topicResource = new TopicResource("topic1", 0, "project2", false, null); SubscriptionHandlers subscriptionHandlers; diff --git a/server/src/testE2E/java/com/flipkart/varadhi/SubscriptionTests.java b/server/src/testE2E/java/com/flipkart/varadhi/SubscriptionTests.java index 77d220f9..6c6dd1d2 100644 --- a/server/src/testE2E/java/com/flipkart/varadhi/SubscriptionTests.java +++ b/server/src/testE2E/java/com/flipkart/varadhi/SubscriptionTests.java @@ -5,8 +5,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; import java.util.List; import static com.flipkart.varadhi.entities.VersionedEntity.INITIAL_VERSION; @@ -14,21 +13,14 @@ public class SubscriptionTests extends E2EBase { - private static final Endpoint endpoint; + private static final Endpoint endpoint = + new Endpoint.HttpEndpoint(URI.create("http://localhost:8080"), "GET", "", 500, 500, false); private static Org o1; private static Team o1t1; private static Project o1t1p1; private static TopicResource p1t1; private static TopicResource p1t2; - static { - try { - endpoint = new Endpoint.HttpEndpoint(new URL("http", "localhost", "hello"), "GET", "", 500, 500, false); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - } - private final RetryPolicy retryPolicy = new RetryPolicy( new CodeRange[]{new CodeRange(500, 502)}, RetryPolicy.BackoffType.LINEAR,