Skip to content

Commit

Permalink
longer waits for quota error only
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Jul 11, 2024
1 parent 49f0ba3 commit cbbe3f0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import java.io.IOException;
import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -52,6 +51,9 @@
class RetryManager<ResultT, ContextT extends Context<ResultT>> {
private Queue<Operation<ResultT, ContextT>> operations;
private final BackOff backoff;

// Longer backoff for quota errors because AppendRows throughput takes a long time to cool off
private final BackOff quotaBackoff;
private static final ExecutorService executor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("BeamBQRetryManager-%d").build());
Expand All @@ -61,7 +63,9 @@ enum RetryType {
// The in-flight operations will not be retried.
DONT_RETRY,
// All operations will be retried.
RETRY_ALL_OPERATIONS
RETRY_ALL_OPERATIONS,
// Retry operations due to a quota error. Tells RetryManager to wait longer between retries
RETRY_QUOTA
};

static class WrappedFailure extends Throwable {
Expand All @@ -85,6 +89,13 @@ Object getResult() {
.withMaxBackoff(maxBackoff)
.withMaxRetries(maxRetries)
.backoff();

quotaBackoff =
FluentBackoff.DEFAULT
.withInitialBackoff(initialBackoff.multipliedBy(5))
.withMaxBackoff(maxBackoff.multipliedBy(3))
.withMaxRetries(maxRetries)
.backoff();
}

RetryManager(
Expand All @@ -97,6 +108,14 @@ Object getResult() {
.withMaxRetries(maxRetries)
.withThrottledTimeCounter(throttledTimeCounter)
.backoff();

quotaBackoff =
FluentBackoff.DEFAULT
.withInitialBackoff(initialBackoff.multipliedBy(5))
.withMaxBackoff(maxBackoff.multipliedBy(3))
.withMaxRetries(maxRetries)
.withThrottledTimeCounter(throttledTimeCounter)
.backoff();
}

static class Operation<ResultT, ContextT extends Context<ResultT>> {
Expand Down Expand Up @@ -313,10 +332,9 @@ void await() throws Exception {
if (retryType == RetryType.DONT_RETRY) {
operations.clear();
} else {
checkState(RetryType.RETRY_ALL_OPERATIONS == retryType);
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new RuntimeException(failure);
}
sleepOrFail(
retryType == RetryType.RETRY_ALL_OPERATIONS ? backoff : quotaBackoff, failure);

for (Operation<ResultT, ?> awaitOperation : operations) {
awaitOperation.await();
}
Expand All @@ -330,4 +348,11 @@ void await() throws Exception {
}
}
}

private void sleepOrFail(BackOff backoff, @Nullable Throwable failure)
throws IOException, InterruptedException {
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
throw new RuntimeException(failure);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ long flush(
.inc(numRowsRetried);

appendFailures.inc();
if (quotaError) {
return RetryType.RETRY_QUOTA;
}
return RetryType.RETRY_ALL_OPERATIONS;
},
c -> {
Expand Down Expand Up @@ -967,8 +970,8 @@ void flushAll(

RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(
Duration.standardSeconds(5),
Duration.standardSeconds(60),
Duration.standardSeconds(1),
Duration.standardSeconds(20),
maxRetries,
BigQuerySinkMetrics.throttledTimeCounter(
BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/calc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@


initial = 1
backoff_multiplier = 1.5

total = 0

for i in range(5):
total += initial
initial *= backoff_multiplier
if initial > 20:
initial = 20

print(total)

0 comments on commit cbbe3f0

Please sign in to comment.