From cbbe3f004a4d22a36745cedae83a4bd3cb1b28fb Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 10 Jul 2024 21:37:48 -0400 Subject: [PATCH] longer waits for quota error only --- .../sdk/io/gcp/bigquery/RetryManager.java | 39 +++++++++++++++---- .../StorageApiWriteUnshardedRecords.java | 7 +++- sdks/python/calc.py | 14 +++++++ 3 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 sdks/python/calc.py diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index 1a7202de0a56d..f5d1e56ad83e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -17,11 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import java.io.IOException; import java.time.Instant; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -52,6 +51,9 @@ class RetryManager> { private Queue> operations; private final BackOff backoff; + + // Longer backoff for quota errors because AppendRows throughput takes a long time to cool off + private final BackOff quotaBackoff; private static final ExecutorService executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("BeamBQRetryManager-%d").build()); @@ -61,7 +63,9 @@ enum RetryType { // The in-flight operations will not be retried. DONT_RETRY, // All operations will be retried. - RETRY_ALL_OPERATIONS + RETRY_ALL_OPERATIONS, + // Retry operations due to a quota error. Tells RetryManager to wait longer between retries + RETRY_QUOTA }; static class WrappedFailure extends Throwable { @@ -85,6 +89,13 @@ Object getResult() { .withMaxBackoff(maxBackoff) .withMaxRetries(maxRetries) .backoff(); + + quotaBackoff = + FluentBackoff.DEFAULT + .withInitialBackoff(initialBackoff.multipliedBy(5)) + .withMaxBackoff(maxBackoff.multipliedBy(3)) + .withMaxRetries(maxRetries) + .backoff(); } RetryManager( @@ -97,6 +108,14 @@ Object getResult() { .withMaxRetries(maxRetries) .withThrottledTimeCounter(throttledTimeCounter) .backoff(); + + quotaBackoff = + FluentBackoff.DEFAULT + .withInitialBackoff(initialBackoff.multipliedBy(5)) + .withMaxBackoff(maxBackoff.multipliedBy(3)) + .withMaxRetries(maxRetries) + .withThrottledTimeCounter(throttledTimeCounter) + .backoff(); } static class Operation> { @@ -313,10 +332,9 @@ void await() throws Exception { if (retryType == RetryType.DONT_RETRY) { operations.clear(); } else { - checkState(RetryType.RETRY_ALL_OPERATIONS == retryType); - if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - throw new RuntimeException(failure); - } + sleepOrFail( + retryType == RetryType.RETRY_ALL_OPERATIONS ? backoff : quotaBackoff, failure); + for (Operation awaitOperation : operations) { awaitOperation.await(); } @@ -330,4 +348,11 @@ void await() throws Exception { } } } + + private void sleepOrFail(BackOff backoff, @Nullable Throwable failure) + throws IOException, InterruptedException { + if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + throw new RuntimeException(failure); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8ace778d16086..11e531e5c9c7a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -811,6 +811,9 @@ long flush( .inc(numRowsRetried); appendFailures.inc(); + if (quotaError) { + return RetryType.RETRY_QUOTA; + } return RetryType.RETRY_ALL_OPERATIONS; }, c -> { @@ -967,8 +970,8 @@ void flushAll( RetryManager retryManager = new RetryManager<>( - Duration.standardSeconds(5), - Duration.standardSeconds(60), + Duration.standardSeconds(1), + Duration.standardSeconds(20), maxRetries, BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); diff --git a/sdks/python/calc.py b/sdks/python/calc.py new file mode 100644 index 0000000000000..afd1fd6ec5264 --- /dev/null +++ b/sdks/python/calc.py @@ -0,0 +1,14 @@ + + +initial = 1 +backoff_multiplier = 1.5 + +total = 0 + +for i in range(5): + total += initial + initial *= backoff_multiplier + if initial > 20: + initial = 20 + +print(total) \ No newline at end of file