Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add initial message delivery implementation #151

Merged
merged 8 commits into from
Jun 24, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flipkart.varadhi.consumer.delivery;

import com.flipkart.varadhi.entities.Endpoint;

public record DeliveryResponse(int statusCode, Endpoint.Protocol protocol, byte[] body) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the Endpoint object itself here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it would be helpful since the caller will already have the endpoint object.
might as well remove the protocol field itself

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. sure if that is the case

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.flipkart.varadhi.consumer.delivery;

import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.ArrayUtils;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public interface MessageDelivery {
static MessageDelivery of(Endpoint endpoint) {
return switch (endpoint.getProtocol()) {
case HTTP1_1 -> new HttpMessageDelivery(endpoint);
case HTTP2 -> throw new NotImplementedException("HTTP2 is not supported yet");
default -> throw new IllegalArgumentException("Unsupported protocol: " + endpoint.getProtocol());
};
}

CompletableFuture<DeliveryResponse> deliver(Message message)
throws Exception;

class HttpMessageDelivery implements MessageDelivery {
private final Endpoint.HttpEndpoint endpoint;
private final HttpClient httpClient;

public HttpMessageDelivery(Endpoint endpoint) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor should take HttpEndpoint as param.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.endpoint = (Endpoint.HttpEndpoint) endpoint;
this.httpClient = HttpClient.newBuilder()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will likely not create a client per endpoint. There will be a single client used for all endpoints in an app.

Assume a ConsumerContext {
Threadpools..
eventloops....
HttpClient client,
... something related to grpc as well.
}

And expect the context to be passed to

MessageDelivery of(Endpoint endpoint) {

Copy link
Collaborator Author

@aayustark007-fk aayustark007-fk Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, can I model it as a Supplier<HttpClient> that needs to be passed, since the ConsumerContext does not exist yet

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supplier works

.version(this.endpoint.isHttp2Supported() ? HttpClient.Version.HTTP_2 : HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofMillis(this.endpoint.getConnectTimeoutMs()))
.followRedirects(HttpClient.Redirect.NORMAL)
.build();
}

@Override
public CompletableFuture<DeliveryResponse> deliver(Message message) throws Exception {
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(endpoint.getUrl().toURI())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we are forced to do this endpoint.getUrl().toURI()) , raises a question. Should we also just keep the URI object in the Endpoint object instead of URL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I can make this change

.timeout(Duration.ofMillis(endpoint.getRequestTimeoutMs()))
.header("Content-Type", endpoint.getContentType())
.method(
endpoint.getMethod(),
ArrayUtils.isEmpty(message.getPayload()) ? HttpRequest.BodyPublishers.noBody() :
HttpRequest.BodyPublishers.ofByteArray(message.getPayload())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thing to note here:
HttpRequest.BodyPublishers.ofByteArray does not make a copy of the array we are passing, which is good. But turns out that httpClient from jdk, does not support ByteBuffer object directly (and does not support off-heap objects as well.)

If that is what we ned in future, then we will want to look into using netty directly or an http client that does support offheap buffers. I wonder if vertx's http client supports it or not!!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2024-06-18 at 8 45 39 AM

Interesting. Something to keep in mind. For now jdk's http client is fine.

);

// apply request headers from message
Multimap<String, String> requestHeaders = message.getRequestHeaders();
if (requestHeaders != null) {
requestHeaders.entries().forEach(entry -> requestBuilder.header(entry.getKey(), entry.getValue()));
}

HttpRequest request = requestBuilder.build();

return httpClient.sendAsync(
request, HttpResponse.BodyHandlers.ofByteArray())
.thenApply(response -> new DeliveryResponse(response.statusCode(), endpoint.getProtocol(),
response.body()
))
.exceptionally(e -> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exceptionally block even required? It does not look like it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it, had added for testing

// log error
return new DeliveryResponse(500, endpoint.getProtocol(), e.getMessage().getBytes());
});
}
}
}
1 change: 1 addition & 0 deletions consumer/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
requires com.flipkart.varadhi.core;
requires jakarta.annotation;
requires com.fasterxml.jackson.annotation;
requires java.net.http;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.entities;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
Expand All @@ -16,7 +17,8 @@
})
public abstract sealed class Endpoint {

abstract Protocol getProtocol();
@JsonIgnore
public abstract Protocol getProtocol();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of of using raw type name, we can actually use protocol string only in the JsonSubTypes annotation. It will make for nice json.

Eg:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "protocol")
@JsonSubTypes({
    @JsonSubTypes.Type(value = HttpEndpoint.class, name = "HTTP1_1"),
    @JsonSubTypes.Type(value = HttpEndpoint.class, name = "HTTP2"),

See if this works and would it result in a better serialized form?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it works and this is the serialized json:

"endpoint" : {
    "protocol" : "HTTP1_1",
    "url" : "http://localhosthello",
    "method" : "GET",
    "contentType" : "",
    "connectTimeoutMs" : 500,
    "requestTimeoutMs" : 500,
    "http2Supported" : false
}


public enum Protocol {
HTTP1_1,
Expand All @@ -35,7 +37,7 @@ public static final class HttpEndpoint extends Endpoint {
private final boolean http2Supported;

@Override
Protocol getProtocol() {
public Protocol getProtocol() {
return http2Supported ? Protocol.HTTP2 : Protocol.HTTP1_1;
}
}
Expand Down
Loading