Skip to content

Commit

Permalink
Backport incremental bulk execution (elastic#113215)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks authored Sep 20, 2024
1 parent 0507199 commit 69c4a4f
Show file tree
Hide file tree
Showing 46 changed files with 3,163 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ public void testIndexMissingBody() throws IOException {
}

public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
);
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertResponseException(responseException, "request body is required");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.util.ReferenceCounted;

import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -52,6 +53,8 @@ protected boolean addMockHttpTransport() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;

import java.util.function.Predicate;

/**
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;

private final Predicate<HttpPreRequest> decider;
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
} else {
handle(ctx, (HttpObject) msg);
}
}

private void handle(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest request) {
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
if (continueResponse != null) {
// there are 3 responses expected: 100, 413, 417
// on 100 we pass request further and reply to client to continue
// on 413/417 we ignore following content
ctx.writeAndFlush(continueResponse);
var resp = (FullHttpResponse) continueResponse;
if (resp.status() != HttpResponseStatus.CONTINUE) {
ignoreContentAfterContinueResponse = true;
return;
}
HttpUtil.set100ContinueExpected(request, false);
}
ignoreContentAfterContinueResponse = false;
ctx.fireChannelRead(msg);
} else {
var httpContent = (HttpContent) msg;
if (ignoreContentAfterContinueResponse) {
httpContent.release();
} else {
ctx.fireChannelRead(msg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
assert ctx.channel().config().isAutoRead() == false;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
Expand All @@ -77,14 +78,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
ReferenceCountUtil.release(httpObject);
break;
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
ctx.channel().config().setAutoRead(false);
break;
}

setAutoReadForState(ctx, state);
}

private void requestStart(ChannelHandlerContext ctx) {
Expand All @@ -105,6 +106,7 @@ private void requestStart(ChannelHandlerContext ctx) {
}

state = QUEUEING_DATA;
ctx.channel().config().setAutoRead(false);

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
Expand Down Expand Up @@ -150,6 +152,7 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

ctx.channel().config().setAutoRead(true);
boolean fullRequestForwarded = forwardData(ctx, pending);

assert fullRequestForwarded || pending.isEmpty();
Expand All @@ -161,7 +164,6 @@ private void forwardFullRequest(ChannelHandlerContext ctx) {
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
Expand All @@ -177,6 +179,8 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
messageToForward = toReplace.replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));

ctx.channel().config().setAutoRead(true);
ctx.fireChannelRead(messageToForward);

assert fullRequestDropped || pending.isEmpty();
Expand All @@ -188,7 +192,6 @@ private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContex
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

@Override
Expand Down Expand Up @@ -244,10 +247,6 @@ private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject>
}
}

private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}

enum State {
WAITING_TO_START,
QUEUEING_DATA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -71,6 +74,12 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu
@Nullable
private ChunkedWrite currentChunkedWrite;

/**
* HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated
*/
@Nullable
private Netty4HttpRequestBodyStream currentRequestStream;

/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
Expand Down Expand Up @@ -110,23 +119,38 @@ public Netty4HttpPipeliningHandler(
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
activityTracker.startActivity();
try {
assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
final Netty4HttpRequest netty4HttpRequest;
if (fullHttpRequest.decoderResult().isFailure()) {
final Throwable cause = fullHttpRequest.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
if (msg instanceof HttpRequest request) {
final Netty4HttpRequest netty4HttpRequest;
if (request.decoderResult().isFailure()) {
final Throwable cause = request.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
} else {
nonError = (Exception) cause;
assert currentRequestStream == null : "current stream must be null for new request";
if (request instanceof FullHttpRequest fullHttpRequest) {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
currentRequestStream = null;
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
}
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
assert currentRequestStream != null : "current stream must exists before handling http content";
currentRequestStream.handleNettyContent((HttpContent) msg);
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
}
}
handlePipelinedRequest(ctx, netty4HttpRequest);
} finally {
activityTracker.stopActivity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -21,6 +22,7 @@
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
Expand All @@ -40,22 +42,40 @@
public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final BytesReference content;
private final HttpBody content;
private final Map<String, List<String>> headers;
private final AtomicBoolean released;
private final Exception inboundException;
private final boolean pooled;
private final int sequence;

Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
this(
sequence,
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.EMPTY_BUFFER,
request.headers(),
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
true,
contentStream,
null
);
}

Netty4HttpRequest(int sequence, FullHttpRequest request) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()));
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
}

Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException);
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
}

private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) {
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
this(sequence, request, released, pooled, content, null);
}

Expand All @@ -64,7 +84,7 @@ private Netty4HttpRequest(
FullHttpRequest request,
AtomicBoolean released,
boolean pooled,
BytesReference content,
HttpBody content,
Exception inboundException
) {
this.sequence = sequence;
Expand All @@ -87,7 +107,7 @@ public String uri() {
}

@Override
public BytesReference content() {
public HttpBody body() {
assert released.get() == false;
return content;
}
Expand All @@ -96,6 +116,7 @@ public BytesReference content() {
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
content.close();
}
}

Expand All @@ -107,6 +128,12 @@ public HttpRequest releaseAndCopy() {
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
HttpBody newContent;
if (content.isStream()) {
newContent = content;
} else {
newContent = Netty4Utils.fullHttpBodyFrom(copiedContent);
}
return new Netty4HttpRequest(
sequence,
new DefaultFullHttpRequest(
Expand All @@ -119,7 +146,7 @@ public HttpRequest releaseAndCopy() {
),
new AtomicBoolean(false),
false,
Netty4Utils.toBytesReference(copiedContent)
newContent
);
} finally {
release();
Expand Down
Loading

0 comments on commit 69c4a4f

Please sign in to comment.