Skip to content

Commit

Permalink
Handle new QoS Metadata (#2375)
Browse files Browse the repository at this point in the history
Handle new QoS Metadata
  • Loading branch information
carterkozak authored Oct 21, 2024
1 parent 6c49518 commit 882755a
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 58 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2375.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Handle new QoS Metadata
links:
- https://github.com/palantir/dialogue/pull/2375
1 change: 1 addition & 0 deletions dialogue-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation 'com.palantir.tritium:tritium-metrics'
implementation 'com.google.code.findbugs:jsr305'
implementation 'com.google.errorprone:error_prone_annotations'
implementation 'com.palantir.conjure.java.api:errors'
implementation 'com.palantir.conjure.java.api:service-config'
implementation 'com.palantir.tracing:tracing-api'
implementation 'com.palantir.refreshable:refreshable'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public void undoStartRequest() {
public void onSuccess(Response response) {
inflight.decrementAndGet();

if (Responses.isQosDueToCustom(response)) {
// The server has marked this QoS exception as something that the balanced score
// tracker cannot understand.
return;
}
if (isGlobalQosStatus(response) || Responses.isServerErrorRange(response)) {
recentFailuresReservoir.update(FAILURE_WEIGHT);
observability.debugLogStatusFailure(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ enum Behavior {
HOST_LEVEL() {
@Override
void onSuccess(Response result, PermitControl control) {
if (Responses.isTooManyRequests(result) || Responses.isInternalServerError(result)) {
// 429 or 500
if (Responses.isTooManyRequests(result)
|| Responses.isInternalServerError(result)
|| Responses.isQosDueToCustom(result)) {
// 429, 500, or QoS due to a custom reason
control.ignore();
} else if ((Responses.isQosStatus(result) && !Responses.isTooManyRequests(result))
|| Responses.isServerErrorRange(result)) {
Expand All @@ -123,8 +125,9 @@ void onFailure(Throwable throwable, PermitControl control) {
ENDPOINT_LEVEL() {
@Override
void onSuccess(Response result, PermitControl control) {
if (Responses.isTooManyRequests(result) || Responses.isInternalServerError(result)) {
// 429 or 500
if ((Responses.isTooManyRequests(result) && !Responses.isQosDueToCustom(result))
|| Responses.isInternalServerError(result)) {
// non-custom 429 or 500
control.dropped();
} else if (Responses.isServerErrorRange(result)) {
// 501-599
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReasons;
import com.palantir.conjure.java.api.errors.QosReasons.QosResponseDecodingAdapter;
import com.palantir.dialogue.Response;
import java.util.Optional;

enum DialogueQosReasonDecoder implements QosResponseDecodingAdapter<Response> {
INSTANCE;

@Override
public Optional<String> getFirstHeader(Response response, String headerName) {
return response.getFirstHeader(headerName);
}

static QosReason parse(Response response) {
return QosReasons.parseFromResponse(response, DialogueQosReasonDecoder.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ public void onSuccess(Response response) {
// workflows where it is important for a large number of requests to all land on the same node,
// even if a couple of them get rate limited in the middle.
if (Responses.isServerErrorRange(response)
|| (Responses.isQosStatus(response) && !Responses.isTooManyRequests(response))) {
|| (Responses.isQosStatus(response)
&& !Responses.isQosDueToCustom(response)
&& !Responses.isTooManyRequests(response))) {
OptionalInt next = incrementHostIfNecessary(pin);
instrumentation.receivedErrorStatus(pin, channel, response, next);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.palantir.dialogue.core;

import com.google.common.net.HttpHeaders;
import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReason.DueTo;
import com.palantir.conjure.java.api.errors.QosReason.RetryHint;
import com.palantir.dialogue.Response;

/** Utility functionality for {@link Response} handling. */
Expand All @@ -41,6 +44,23 @@ static boolean isQosStatus(Response response) {
return isRetryOther(response) || isTooManyRequests(response) || isUnavailable(response);
}

static boolean isQosDueToCustom(Response result) {
if (!isQosStatus(result)) {
return false;
}
QosReason reason = DialogueQosReasonDecoder.parse(result);
return reason.dueTo().isPresent() && DueTo.CUSTOM.equals(reason.dueTo().get());
}

static boolean isRetryableQos(Response result) {
if (!isQosStatus(result)) {
return false;
}
QosReason reason = DialogueQosReasonDecoder.parse(result);
return reason.retryHint().isEmpty()
|| !RetryHint.DO_NOT_RETRY.equals(reason.retryHint().get());
}

static boolean isServerErrorRange(Response response) {
return response.code() / 100 == 5;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,12 @@ private long getBackoffNanoseconds() {
private boolean isRetryableQosStatus(Response response) {
switch (serverQoS) {
case AUTOMATIC_RETRY:
return Responses.isQosStatus(response);
return Responses.isRetryableQos(response);
case PROPAGATE_429_and_503_TO_CALLER:
return Responses.isQosStatus(response)
&& !Responses.isTooManyRequests(response)
&& !Responses.isUnavailable(response);
&& !Responses.isUnavailable(response)
&& Responses.isRetryableQos(response);
}
throw new SafeIllegalStateException(
"Encountered unknown propagate QoS configuration", SafeArg.of("serverQoS", serverQoS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
package com.palantir.dialogue.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReason.DueTo;
import com.palantir.conjure.java.api.errors.QosReason.RetryHint;
import com.palantir.conjure.java.api.errors.QosReasons;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestResponse;
import com.palantir.dialogue.TestResponseQosEncoder;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Behavior;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit;
import com.palantir.dialogue.core.LimitedChannel.LimitEnforcement;
Expand Down Expand Up @@ -177,8 +180,7 @@ public void success_increasesLimitOnlyIfSufficientNumberOfRequestsAreInflight(Be
@EnumSource(Behavior.class)
public void onSuccess_releasesSuccessfully(Behavior behavior) {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior);
Response response = mock(Response.class);
when(response.code()).thenReturn(200);
Response response = new TestResponse().code(200);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -188,9 +190,7 @@ public void onSuccess_releasesSuccessfully(Behavior behavior) {
@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_host_308() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.getFirstHeader(eq("Location"))).thenReturn(Optional.of("https://localhost"));
when(response.code()).thenReturn(308);
Response response = new TestResponse().code(308).withHeader("Location", "https://localhost");

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -202,8 +202,7 @@ public void onSuccess_successIfResponseIndicatesNonQos308() {
// This represents google chunked-upload APIs which respond '308 Resume Incomplete' to indicate
// a successful chunk upload request.
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.code()).thenReturn(308);
Response response = new TestResponse().code(308);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -213,32 +212,89 @@ public void onSuccess_successIfResponseIndicatesNonQos308() {
@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_host_503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.code()).thenReturn(503);
Response response = new TestResponse().code(503);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For status %d", 503).isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_host_custom503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
TestResponse response = new TestResponse().code(503);
QosReason reason =
QosReason.builder().reason("reason").dueTo(DueTo.CUSTOM).build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For DueTo.CUSTOM status %d", 503).isEqualTo(max);
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_host_nonRetryable503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
TestResponse response = new TestResponse().code(503);
QosReason reason = QosReason.builder()
.reason("reason")
.retryHint(RetryHint.DO_NOT_RETRY)
.build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit())
.as("For status %d not impacted by RetryHint.DO_NOT_RETRY", 503)
.isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_endpoint() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
int code = 429;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For status %d", code).isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_endpoint_custom429() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
TestResponse response = new TestResponse().code(429);
QosReason reason =
QosReason.builder().reason("reason").dueTo(DueTo.CUSTOM).build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For DueTo.CUSTOM status %d", 429).isEqualTo(max);
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_endpoint_nonRetryable429() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
TestResponse response = new TestResponse().code(429);
QosReason reason = QosReason.builder()
.reason("reason")
.retryHint(RetryHint.DO_NOT_RETRY)
.build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit())
.as("For status %d not impacted by RetryHint.DO_NOT_RETRY", 429)
.isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_releasesSuccessfullyIfResponseIndicatesQosOrError_sticky() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.STICKY);
int code = 429;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -249,8 +305,7 @@ public void onSuccess_releasesSuccessfullyIfResponseIndicatesQosOrError_sticky()
public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_endpoint() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -261,8 +316,7 @@ public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_endpoint() {
public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_sticky() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.STICKY);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -273,8 +327,7 @@ public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_sticky() {
public void onSuccess_dropsIfResponseIndicatesUnknownServerError_host() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand Down
Loading

0 comments on commit 882755a

Please sign in to comment.