Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle new QoS Metadata #2375

Merged
merged 9 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2375.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Handle new QoS Metadata
links:
- https://github.com/palantir/dialogue/pull/2375
1 change: 1 addition & 0 deletions dialogue-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation 'com.palantir.tritium:tritium-metrics'
implementation 'com.google.code.findbugs:jsr305'
implementation 'com.google.errorprone:error_prone_annotations'
implementation 'com.palantir.conjure.java.api:errors'
implementation 'com.palantir.conjure.java.api:service-config'
implementation 'com.palantir.tracing:tracing-api'
implementation 'com.palantir.refreshable:refreshable'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public void undoStartRequest() {
public void onSuccess(Response response) {
inflight.decrementAndGet();

if (Responses.isQosDueToCustom(response)) {
// The server has marked this QoS exception as something that the balanced score
// tracker cannot understand.
return;
}
if (isGlobalQosStatus(response) || Responses.isServerErrorRange(response)) {
recentFailuresReservoir.update(FAILURE_WEIGHT);
observability.debugLogStatusFailure(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ enum Behavior {
HOST_LEVEL() {
@Override
void onSuccess(Response result, PermitControl control) {
if (Responses.isTooManyRequests(result) || Responses.isInternalServerError(result)) {
// 429 or 500
if (Responses.isTooManyRequests(result)
|| Responses.isInternalServerError(result)
|| Responses.isQosDueToCustom(result)) {
// 429, 500, or QoS due to a custom reason
control.ignore();
} else if ((Responses.isQosStatus(result) && !Responses.isTooManyRequests(result))
|| Responses.isServerErrorRange(result)) {
Expand All @@ -123,8 +125,9 @@ void onFailure(Throwable throwable, PermitControl control) {
ENDPOINT_LEVEL() {
@Override
void onSuccess(Response result, PermitControl control) {
if (Responses.isTooManyRequests(result) || Responses.isInternalServerError(result)) {
// 429 or 500
if ((Responses.isTooManyRequests(result) && !Responses.isQosDueToCustom(result))
|| Responses.isInternalServerError(result)) {
// non-custom 429 or 500
control.dropped();
} else if (Responses.isServerErrorRange(result)) {
// 501-599
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReasons;
import com.palantir.conjure.java.api.errors.QosReasons.QosResponseDecodingAdapter;
import com.palantir.dialogue.Response;
import java.util.Optional;

enum DialogueQosReasonDecoder implements QosResponseDecodingAdapter<Response> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this could not live along side Response in dialogue-target and be shared across projects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s an implementation detail, not part of the codegen target api. I don’t mind duplication for such a simple utility, primary goal is to avoid expanding the public api unnecessarily

INSTANCE;

@Override
public Optional<String> getFirstHeader(Response response, String headerName) {
return response.getFirstHeader(headerName);
}

static QosReason parse(Response response) {
return QosReasons.parseFromResponse(response, DialogueQosReasonDecoder.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ public void onSuccess(Response response) {
// workflows where it is important for a large number of requests to all land on the same node,
// even if a couple of them get rate limited in the middle.
if (Responses.isServerErrorRange(response)
|| (Responses.isQosStatus(response) && !Responses.isTooManyRequests(response))) {
|| (Responses.isQosStatus(response)
&& !Responses.isQosDueToCustom(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i understand this correctly, a Due-To=CUSTOM will not count as an error for node pinning, and we will stay on the same node? What's the rationale behind that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due-to custom means the client shouldn’t attempt to interpret meaning from the failure, so we don’t route to another node in that case. Changing pinned nodes can have dramatic impacts on perf, some clients use it for cache locality, which we don’t want to give up unless we really need to (instability/etc)

&& !Responses.isTooManyRequests(response))) {
OptionalInt next = incrementHostIfNecessary(pin);
instrumentation.receivedErrorStatus(pin, channel, response, next);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.palantir.dialogue.core;

import com.google.common.net.HttpHeaders;
import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReason.DueTo;
import com.palantir.conjure.java.api.errors.QosReason.RetryHint;
import com.palantir.dialogue.Response;

/** Utility functionality for {@link Response} handling. */
Expand All @@ -41,6 +44,23 @@ static boolean isQosStatus(Response response) {
return isRetryOther(response) || isTooManyRequests(response) || isUnavailable(response);
}

static boolean isQosDueToCustom(Response result) {
if (!isQosStatus(result)) {
return false;
}
QosReason reason = DialogueQosReasonDecoder.parse(result);
return reason.dueTo().isPresent() && DueTo.CUSTOM.equals(reason.dueTo().get());
}

static boolean isRetryableQos(Response result) {
if (!isQosStatus(result)) {
return false;
}
QosReason reason = DialogueQosReasonDecoder.parse(result);
return reason.retryHint().isEmpty()
|| !RetryHint.DO_NOT_RETRY.equals(reason.retryHint().get());
}

static boolean isServerErrorRange(Response response) {
return response.code() / 100 == 5;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,12 @@ private long getBackoffNanoseconds() {
private boolean isRetryableQosStatus(Response response) {
switch (serverQoS) {
case AUTOMATIC_RETRY:
return Responses.isQosStatus(response);
return Responses.isRetryableQos(response);
case PROPAGATE_429_and_503_TO_CALLER:
return Responses.isQosStatus(response)
&& !Responses.isTooManyRequests(response)
&& !Responses.isUnavailable(response);
&& !Responses.isUnavailable(response)
&& Responses.isRetryableQos(response);
}
throw new SafeIllegalStateException(
"Encountered unknown propagate QoS configuration", SafeArg.of("serverQoS", serverQoS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
package com.palantir.dialogue.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.conjure.java.api.errors.QosReason;
import com.palantir.conjure.java.api.errors.QosReason.DueTo;
import com.palantir.conjure.java.api.errors.QosReason.RetryHint;
import com.palantir.conjure.java.api.errors.QosReasons;
import com.palantir.dialogue.Response;
import com.palantir.dialogue.TestResponse;
import com.palantir.dialogue.TestResponseQosEncoder;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Behavior;
import com.palantir.dialogue.core.CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.Permit;
import com.palantir.dialogue.core.LimitedChannel.LimitEnforcement;
Expand Down Expand Up @@ -177,8 +180,7 @@ public void success_increasesLimitOnlyIfSufficientNumberOfRequestsAreInflight(Be
@EnumSource(Behavior.class)
public void onSuccess_releasesSuccessfully(Behavior behavior) {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(behavior);
Response response = mock(Response.class);
when(response.code()).thenReturn(200);
Response response = new TestResponse().code(200);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -188,9 +190,7 @@ public void onSuccess_releasesSuccessfully(Behavior behavior) {
@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_host_308() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.getFirstHeader(eq("Location"))).thenReturn(Optional.of("https://localhost"));
when(response.code()).thenReturn(308);
Response response = new TestResponse().code(308).withHeader("Location", "https://localhost");

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -202,8 +202,7 @@ public void onSuccess_successIfResponseIndicatesNonQos308() {
// This represents google chunked-upload APIs which respond '308 Resume Incomplete' to indicate
// a successful chunk upload request.
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.code()).thenReturn(308);
Response response = new TestResponse().code(308);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -213,32 +212,89 @@ public void onSuccess_successIfResponseIndicatesNonQos308() {
@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_host_503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
Response response = mock(Response.class);
when(response.code()).thenReturn(503);
Response response = new TestResponse().code(503);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For status %d", 503).isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_host_custom503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
TestResponse response = new TestResponse().code(503);
QosReason reason =
QosReason.builder().reason("reason").dueTo(DueTo.CUSTOM).build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For DueTo.CUSTOM status %d", 503).isEqualTo(max);
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_host_nonRetryable503() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
TestResponse response = new TestResponse().code(503);
QosReason reason = QosReason.builder()
.reason("reason")
.retryHint(RetryHint.DO_NOT_RETRY)
.build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit())
.as("For status %d not impacted by RetryHint.DO_NOT_RETRY", 503)
.isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_dropsIfResponseIndicatesQosOrError_endpoint() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
int code = 429;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For status %d", code).isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_endpoint_custom429() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
TestResponse response = new TestResponse().code(429);
QosReason reason =
QosReason.builder().reason("reason").dueTo(DueTo.CUSTOM).build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit()).as("For DueTo.CUSTOM status %d", 429).isEqualTo(max);
}

@Test
public void onSuccess_ignoresIfResponseIndicatesQos_endpoint_nonRetryable429() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
TestResponse response = new TestResponse().code(429);
QosReason reason = QosReason.builder()
.reason("reason")
.retryHint(RetryHint.DO_NOT_RETRY)
.build();
QosReasons.encodeToResponse(reason, response, TestResponseQosEncoder.INSTANCE);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
assertThat(limiter.getLimit())
.as("For status %d not impacted by RetryHint.DO_NOT_RETRY", 429)
.isCloseTo(max * 0.9, Percentage.withPercentage(5));
}

@Test
public void onSuccess_releasesSuccessfullyIfResponseIndicatesQosOrError_sticky() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.STICKY);
int code = 429;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -249,8 +305,7 @@ public void onSuccess_releasesSuccessfullyIfResponseIndicatesQosOrError_sticky()
public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_endpoint() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.ENDPOINT_LEVEL);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -261,8 +316,7 @@ public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_endpoint() {
public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_sticky() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.STICKY);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand All @@ -273,8 +327,7 @@ public void onSuccess_ignoresIfResponseIndicatesUnknownServerError_sticky() {
public void onSuccess_dropsIfResponseIndicatesUnknownServerError_host() {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = limiter(Behavior.HOST_LEVEL);
int code = 599;
Response response = mock(Response.class);
when(response.code()).thenReturn(code);
Response response = new TestResponse().code(code);

double max = limiter.getLimit();
limiter.acquire(LimitEnforcement.DEFAULT_ENABLED).get().onSuccess(response);
Expand Down
Loading
Loading