Skip to content

Commit

Permalink
feat(storage): dynamic read request stall timeout (#10958)
Browse files Browse the repository at this point in the history
- Adding a new storage client option to  enable pro-actively retry for stalled request based on dynamic timeout.
- Added emulator test to validated the changes.
  • Loading branch information
raj-prince authored Oct 14, 2024
1 parent 4c98f7a commit a09f00e
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 40 deletions.
104 changes: 86 additions & 18 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package storage

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
Expand All @@ -27,6 +29,7 @@ import (
"time"

"cloud.google.com/go/iam/apiv1/iampb"
"cloud.google.com/go/storage/experimental"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"github.com/googleapis/gax-go/v2/apierror"
Expand Down Expand Up @@ -948,7 +951,6 @@ func initEmulatorClients() func() error {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]storageClient{
"http": httpClient,
"grpc": grpcClient,
Expand Down Expand Up @@ -1335,10 +1337,14 @@ func TestObjectConditionsEmulated(t *testing.T) {
// Test that RetryNever prevents any retries from happening in both transports.
func TestRetryNeverEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
_, err := client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))
_, err = client.GetBucket(ctx, bucket, nil, withRetryConfig(&retryConfig{policy: RetryNever}))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1354,12 +1360,16 @@ func TestRetryNeverEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until a timeout.
func TestRetryTimeoutEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand All @@ -1379,11 +1389,15 @@ func TestRetryTimeoutEmulated(t *testing.T) {
// Test that errors are wrapped correctly if retry happens until max attempts.
func TestRetryMaxAttemptsEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-503", "return-503", "return-503", "return-503", "return-503"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(3), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
_, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))
_, err = client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config))

var ae *apierror.APIError
if errors.As(err, &ae) {
Expand Down Expand Up @@ -1426,8 +1440,12 @@ func TestTimeoutErrorEmulated(t *testing.T) {
// Test that server-side DEADLINE_EXCEEDED errors are retried as expected with gRPC.
func TestRetryDeadlineExceedeEmulated(t *testing.T) {
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}
instructions := map[string][]string{"storage.buckets.get": {"return-504", "return-504"}}
testID := createRetryTest(t, project, bucket, client, instructions)
testID := createRetryTest(t, client, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)
config := &retryConfig{maxAttempts: expectedAttempts(4), backoff: &gax.Backoff{Initial: 10 * time.Millisecond}}
if _, err := client.GetBucket(ctx, bucket, nil, idempotent(true), withRetryConfig(config)); err != nil {
Expand All @@ -1436,17 +1454,61 @@ func TestRetryDeadlineExceedeEmulated(t *testing.T) {
})
}

// Test validates the retry for stalled read-request, when client is created with
// WithReadStallTimeout.
func TestRetryReadReqStallEmulated(t *testing.T) {
multiTransportTest(skipJSONReads(skipGRPC("not supported"), "not supported"), t, func(t *testing.T, ctx context.Context, project, _ string, client *Client) {
// Setup bucket and upload object.
bucket := fmt.Sprintf("http-bucket-%d", time.Now().Nanosecond())
if _, err := client.tc.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}, nil); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

name, _, _, err := createObjectWithContent(ctx, bucket, randomBytes3MiB)
if err != nil {
t.Fatalf("createObject: %v", err)
}

// Plant stall at start for 2s.
instructions := map[string][]string{"storage.objects.get": {"stall-for-2s-after-0K"}}
testID := createRetryTest(t, client.tc, instructions)
ctx = callctx.SetHeaders(ctx, "x-retry-test-id", testID)

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

r, err := client.tc.NewRangeReader(ctx, &newRangeReaderParams{
bucket: bucket,
object: name,
gen: defaultGen,
offset: 0,
length: -1,
}, idempotent(true))
if err != nil {
t.Fatalf("NewRangeReader: %v", err)
}
defer r.Close()

buf := &bytes.Buffer{}
if _, err := io.Copy(buf, r); err != nil {
t.Fatalf("io.Copy: %v", err)
}
if !bytes.Equal(buf.Bytes(), randomBytes3MiB) {
t.Errorf("content does not match, got len %v, want len %v", buf.Len(), len(randomBytes3MiB))
}

}, experimental.WithReadStallTimeout(
&experimental.ReadStallTimeoutConfig{
TargetPercentile: 0.99,
Min: time.Second,
}))
}

// createRetryTest creates a bucket in the emulator and sets up a test using the
// Retry Test API for the given instructions. This is intended for emulator tests
// of retry behavior that are not covered by conformance tests.
func createRetryTest(t *testing.T, project, bucket string, client storageClient, instructions map[string][]string) string {
func createRetryTest(t *testing.T, client storageClient, instructions map[string][]string) string {
t.Helper()
ctx := context.Background()

_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{}, nil)
if err != nil {
t.Fatalf("creating bucket: %v", err)
}

// Need the HTTP hostname to set up a retry test, as well as knowledge of
// underlying transport to specify instructions.
Expand All @@ -1470,14 +1532,20 @@ func createRetryTest(t *testing.T, project, bucket string, client storageClient,
return et.id
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
// createObject creates an object in the emulator with content randomBytesToWrite and
// returns its name, generation, and metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
return createObjectWithContent(ctx, bucket, randomBytesToWrite)
}

// createObject creates an object in the emulator with the provided []byte contents,
// and returns its name, generation, and metageneration.
func createObjectWithContent(ctx context.Context, bucket string, bytes []byte) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
if _, err := w.Write(bytes); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
Expand Down
64 changes: 64 additions & 0 deletions storage/experimental/experimental.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package experimental is a collection of experimental features that might
// have some rough edges to them. Housing experimental features in this package
// results in a user accessing these APIs as `experimental.Foo`, thereby making
// it explicit that the feature is experimental and using them in production
// code is at their own risk.
//
// All APIs in this package are experimental.
package experimental

import (
"time"

"cloud.google.com/go/storage/internal"
"google.golang.org/api/option"
)

// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
// It enables the client to retry stalled requests when starting a download from
// Cloud Storage. If the timeout elapses with no response from the server, the request
// is automatically retried.
// The timeout is initially set to ReadStallTimeoutConfig.Min. The client tracks
// latency across all read requests from the client, and can adjust the timeout higher
// to the target percentile when latency from the server is high.
// Currently, this is supported only for downloads ([storage.NewReader] and
// [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON)
// will be supported soon.
func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption {
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence having a separate dynamicTimeout instance at BucketHandle level will
// be better.
return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc)
}

// ReadStallTimeoutConfig defines the timeout which is adjusted dynamically based on
// past observed latencies.
type ReadStallTimeoutConfig struct {
// Min is the minimum duration of the timeout. The default value is 500ms. Requests
// taking shorter than this value to return response headers will never time out.
// In general, you should choose a Min value that is greater than the typical value
// for the target percentile.
Min time.Duration

// TargetPercentile is the percentile to target for the dynamic timeout. The default
// value is 0.99. At the default percentile, at most 1% of requests will be timed out
// and retried.
TargetPercentile float64
}
86 changes: 71 additions & 15 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
Expand All @@ -47,13 +48,14 @@ import (
// httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
// storageClient interface.
type httpStorageClient struct {
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
creds *google.Credentials
hc *http.Client
xmlHost string
raw *raw.Service
scheme string
settings *settings
config *storageConfig
dynamicReadReqStallTimeout *dynamicDelay
}

// newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
Expand Down Expand Up @@ -128,14 +130,29 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl
return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
}

var dd *dynamicDelay
if config.readStallTimeoutConfig != nil {
drrstConfig := config.readStallTimeoutConfig
dd, err = newDynamicDelay(
drrstConfig.TargetPercentile,
getDynamicReadReqIncreaseRateFromEnv(),
getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min),
drrstConfig.Min,
defaultDynamicReqdReqMaxTimeout)
if err != nil {
return nil, fmt.Errorf("creating dynamic-delay: %w", err)
}
}

return &httpStorageClient{
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
creds: creds,
hc: hc,
xmlHost: u.Host,
raw: rawService,
scheme: u.Scheme,
settings: s,
config: &config,
dynamicReadReqStallTimeout: dd,
}, nil
}

Expand Down Expand Up @@ -858,7 +875,46 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa
reopen := readerReopen(ctx, req.Header, params, s,
func(ctx context.Context) (*http.Response, error) {
setHeadersFromCtx(ctx, req.Header)
return c.hc.Do(req.WithContext(ctx))

if c.dynamicReadReqStallTimeout == nil {
return c.hc.Do(req.WithContext(ctx))
}

cancelCtx, cancel := context.WithCancel(ctx)
var (
res *http.Response
err error
)

done := make(chan bool)
go func() {
reqStartTime := time.Now()
res, err = c.hc.Do(req.WithContext(cancelCtx))
if err == nil {
reqLatency := time.Since(reqStartTime)
c.dynamicReadReqStallTimeout.update(reqLatency)
} else if errors.Is(err, context.Canceled) {
// context.Canceled means operation took more than current dynamicTimeout,
// hence should be increased.
c.dynamicReadReqStallTimeout.increase()
}
done <- true
}()

// Wait until timeout or request is successful.
timer := time.After(c.dynamicReadReqStallTimeout.getValue())
select {
case <-timer:
log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds())
cancel()
err = context.DeadlineExceeded
if res != nil && res.Body != nil {
res.Body.Close()
}
case <-done:
cancel = nil
}
return res, err
},
func() error { return setConditionsHeaders(req.Header, params.conds) },
func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
Expand Down
2 changes: 0 additions & 2 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
itesting "google.golang.org/api/iterator/testing"
"google.golang.org/api/option"
"google.golang.org/api/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -93,7 +92,6 @@ var (
)

func TestMain(m *testing.M) {
grpc.EnableTracing = true
cleanup := initIntegrationTest()
cleanupEmulatorClients := initEmulatorClients()
exit := m.Run()
Expand Down
Loading

0 comments on commit a09f00e

Please sign in to comment.