Skip to content

Commit

Permalink
add initial message delivery implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
AayuStark007 committed Jun 7, 2024
1 parent 16e60ef commit f3e5dd0
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flipkart.varadhi.consumer.delivery;

import com.flipkart.varadhi.entities.Endpoint;

public record DeliveryResponse(int statusCode, Endpoint.Protocol protocol, byte[] body) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.flipkart.varadhi.consumer.delivery;

import com.flipkart.varadhi.consumer.MessageTracker;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.google.common.collect.Multimap;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public interface MessageDelivery {
static MessageDelivery of(Endpoint endpoint) {
return switch (endpoint.getProtocol()) {
case HTTP1_1 -> new HttpMessageDelivery(endpoint);
case HTTP2 -> throw new NotImplementedException("HTTP2 is not supported yet");
default -> throw new IllegalArgumentException("Unsupported protocol: " + endpoint.getProtocol());
};
}

CompletableFuture<DeliveryResponse> deliver(MessageTracker messageTracker)
throws Exception; // or should be PolledMessage?

class HttpMessageDelivery implements MessageDelivery {
private final Endpoint.HttpEndpoint endpoint;
private final HttpClient httpClient;

public HttpMessageDelivery(Endpoint endpoint) {
this.endpoint = (Endpoint.HttpEndpoint) endpoint;
this.httpClient = HttpClient.newBuilder()
.version(this.endpoint.isHttp2Supported() ? HttpClient.Version.HTTP_2 : HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofMillis(this.endpoint.getConnectTimeoutMs()))
.followRedirects(HttpClient.Redirect.NORMAL)
.build();
// TODO(aayush): SSL/TLS support? Auth support?
}

@Override
public CompletableFuture<DeliveryResponse> deliver(MessageTracker messageTracker) throws Exception {
// TODO(aayush): PolledMessage getPayload, getHeaders support required
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(endpoint.getUrl().toURI())
.timeout(Duration.ofMillis(endpoint.getRequestTimeoutMs()))
.header("Content-Type", endpoint.getContentType())
.method(
endpoint.getMethod(),
HttpRequest.BodyPublishers.ofByteArray(messageTracker.getMessage().getPayload())
);

// apply request headers from message
Multimap<String, String> requestHeaders = messageTracker.getMessage().getRequestHeaders();
if (requestHeaders != null) {
requestHeaders.entries().forEach(entry -> requestBuilder.header(entry.getKey(), entry.getValue()));
}

HttpRequest request = requestBuilder.build();

return httpClient.sendAsync(
request, HttpResponse.BodyHandlers.ofByteArray()) // TODO(aayush): response as string or byte array?
.thenApply(response -> new DeliveryResponse(response.statusCode(), endpoint.getProtocol(),
response.body()
))
.exceptionally(e -> {
// log error
return new DeliveryResponse(500, endpoint.getProtocol(), e.getMessage().getBytes());
});
}
}
}
1 change: 1 addition & 0 deletions consumer/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
requires com.flipkart.varadhi.core;
requires jakarta.annotation;
requires com.fasterxml.jackson.annotation;
requires java.net.http;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.entities;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
Expand All @@ -16,7 +17,8 @@
})
public abstract sealed class Endpoint {

abstract Protocol getProtocol();
@JsonIgnore
public abstract Protocol getProtocol();

public enum Protocol {
HTTP1_1,
Expand All @@ -35,7 +37,7 @@ public static final class HttpEndpoint extends Endpoint {
private final boolean http2Supported;

@Override
Protocol getProtocol() {
public Protocol getProtocol() {
return http2Supported ? Protocol.HTTP2 : Protocol.HTTP1_1;
}
}
Expand Down
16 changes: 16 additions & 0 deletions server/src/testE2E/java/com/flipkart/varadhi/IamPolicyTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.flipkart.varadhi;

public class IamPolicyTests extends E2EBase {

/**
* Tests for iam policy, some cases to test are:
* - initial superuser should have all permissions
* - thanos creates flipkart org
* - create admin user via set policy
* - admin user should be able to create more user roles
* - non admin users should not be able to create roles
* - admin user can create team and project
* - similarly for team and project scoped roles, do the same
* - all the way down to topic level and produce level
*/
}

0 comments on commit f3e5dd0

Please sign in to comment.