From 6647f4f9d907364522267c3ebcd1852049eac1f4 Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Mon, 14 Oct 2024 13:05:59 +0000 Subject: [PATCH 1/5] adding bucket-specific delay --- storage/bucket_delay.go | 99 ++++++++++++++++++++++++++++++++++++ storage/bucket_delay_test.go | 71 ++++++++++++++++++++++++++ storage/dynamic_delay.go | 26 +++++++++- storage/http_client.go | 18 ++++--- 4 files changed, 204 insertions(+), 10 deletions(-) create mode 100644 storage/bucket_delay.go create mode 100644 storage/bucket_delay_test.go diff --git a/storage/bucket_delay.go b/storage/bucket_delay.go new file mode 100644 index 000000000000..5f1ad8dd6a75 --- /dev/null +++ b/storage/bucket_delay.go @@ -0,0 +1,99 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "sync" + "time" +) + +// bucketDelay wraps dynamicDelay to provide bucket-specific delays. +type bucketDelay struct { + targetPercentile float64 + increaseRate float64 + initialDelay time.Duration + minDelay time.Duration + maxDelay time.Duration + + delays map[string]*dynamicDelay + mu sync.Mutex +} + +// newBucketDelay returns a new bucketDelay instance. +func newBucketDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelay, error) { + if targetPercentile < 0 || targetPercentile > 1 { + return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) + } + if increaseRate <= 0 { + return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) + } + if minDelay >= maxDelay { + return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) + } + + return &bucketDelay{ + targetPercentile: targetPercentile, + increaseRate: increaseRate, + initialDelay: initialDelay, + minDelay: minDelay, + maxDelay: maxDelay, + delays: make(map[string]*dynamicDelay), + }, nil +} + +// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay +// exists for the bucket, a new one is created with the configured parameters. +func (b *bucketDelay) getDelay(bucketName string) *dynamicDelay { + delay, ok := b.delays[bucketName] + if !ok { + // Create a new dynamicDelay for the bucket if it doesn't exist + delay = newDynamicDelayInternal(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay) + b.delays[bucketName] = delay + } + return delay +} + +// increase notes that the operation took longer than the delay for the given bucket. +func (b *bucketDelay) increase(bucketName string) { + b.mu.Lock() + defer b.mu.Unlock() + + b.getDelay(bucketName).unsafeIncrease() +} + +// decrease notes that the operation completed before the delay for the given bucket. +func (b *bucketDelay) decrease(bucketName string) { + b.mu.Lock() + defer b.mu.Unlock() + + b.getDelay(bucketName).unsafeDecrease() +} + +// update updates the delay value for the bucket depending on the specified latency. +func (b *bucketDelay) update(bucketName string, latency time.Duration) { + b.mu.Lock() + defer b.mu.Unlock() + + b.getDelay(bucketName).unsafeUpdate(latency) +} + +// getValue returns the desired delay to wait before retrying the operation for the given bucket. +func (b *bucketDelay) getValue(bucketName string) time.Duration { + b.mu.Lock() + defer b.mu.Unlock() + + return b.getDelay(bucketName).getValueUnsafe() +} diff --git a/storage/bucket_delay_test.go b/storage/bucket_delay_test.go new file mode 100644 index 000000000000..4195c4458a70 --- /dev/null +++ b/storage/bucket_delay_test.go @@ -0,0 +1,71 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + "sync" + "testing" + "time" +) + +func TestBucketDelay(t *testing.T) { + b, err := newBucketDelay(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second) + if err != nil { + t.Errorf("while creating bucketDelay: %v", err) + } + + // Test increase and getValue + b.increase("bucket1") + delay1 := b.getValue("bucket1") + if delay1 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket1 to be > 100ms after increase, got %v", delay1) + } + + // Test decrease and getValue + b.decrease("bucket1") + delay2 := b.getValue("bucket1") + if delay2 >= delay1 { + t.Errorf("Expected delay for bucket1 to be < %v after decrease, got %v", delay1, delay2) + } + + // Test update with latency > current delay + b.update("bucket2", 200*time.Millisecond) + delay3 := b.getValue("bucket2") + if delay3 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket2 to be > 100ms after update with higher latency, got %v", delay3) + } + + // Test update with latency < current delay + b.update("bucket2", 50*time.Millisecond) + delay4 := b.getValue("bucket2") + if delay4 >= delay3 { + t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4) + } + + // Test concurrent access + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bucketName := fmt.Sprintf("bucket%d", i%3) // Use only 3 buckets to increase concurrency + b.increase(bucketName) + b.decrease(bucketName) + b.update(bucketName, time.Duration(i)*time.Millisecond) + }(i) + } + wg.Wait() +} diff --git a/storage/dynamic_delay.go b/storage/dynamic_delay.go index 5d4c42fb82bf..4ff6d94f49fa 100644 --- a/storage/dynamic_delay.go +++ b/storage/dynamic_delay.go @@ -59,6 +59,14 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela if minDelay >= maxDelay { return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) } + + return newDynamicDelayInternal(targetPercentile, increaseRate, initialDelay, minDelay, maxDelay), nil +} + +// newDynamicDelayInternal constructs the dynamicDelay object without validating the +// arguments. This is created specifically to use by bucketDelay which validates +// the arguments in the start. newDynamicDelay is the recommended way to create dynamicDelay. +func newDynamicDelayInternal(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay { if initialDelay < minDelay { initialDelay = minDelay } @@ -84,7 +92,7 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela maxDelay: maxDelay, value: initialDelay, mu: &sync.RWMutex{}, - }, nil + } } func (d *dynamicDelay) unsafeIncrease() { @@ -133,6 +141,15 @@ func (d *dynamicDelay) update(latency time.Duration) { } } +// update updates the delay value depending on the specified latency. +func (d *dynamicDelay) unsafeUpdate(latency time.Duration) { + if latency > d.value { + d.unsafeIncrease() + } else { + d.unsafeDecrease() + } +} + // getValue returns the desired delay to wait before retry the operation. func (d *dynamicDelay) getValue() time.Duration { d.mu.RLock() @@ -141,7 +158,12 @@ func (d *dynamicDelay) getValue() time.Duration { return d.value } -// PrintDelay prints the state of delay, helpful in debugging. +// getValueUnsafe returns the desired delay to wait before retry the operation. +func (d *dynamicDelay) getValueUnsafe() time.Duration { + return d.value +} + +// printDelay prints the state of delay, helpful in debugging. func (d *dynamicDelay) printDelay() { d.mu.RLock() defer d.mu.RUnlock() diff --git a/storage/http_client.go b/storage/http_client.go index bf4af85c5fd4..28867fe4e8cb 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -55,7 +55,7 @@ type httpStorageClient struct { scheme string settings *settings config *storageConfig - dynamicReadReqStallTimeout *dynamicDelay + dynamicReadReqStallTimeout *bucketDelay } // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON @@ -130,10 +130,10 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err) } - var dd *dynamicDelay + var bd *bucketDelay if config.readStallTimeoutConfig != nil { drrstConfig := config.readStallTimeoutConfig - dd, err = newDynamicDelay( + bd, err = newBucketDelay( drrstConfig.TargetPercentile, getDynamicReadReqIncreaseRateFromEnv(), getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min), @@ -152,7 +152,7 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl scheme: u.Scheme, settings: s, config: &config, - dynamicReadReqStallTimeout: dd, + dynamicReadReqStallTimeout: bd, }, nil } @@ -892,20 +892,22 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa res, err = c.hc.Do(req.WithContext(cancelCtx)) if err == nil { reqLatency := time.Since(reqStartTime) - c.dynamicReadReqStallTimeout.update(reqLatency) + fmt.Println(reqLatency) + fmt.Println(params.bucket) + c.dynamicReadReqStallTimeout.update(params.bucket, reqLatency) } else if errors.Is(err, context.Canceled) { // context.Canceled means operation took more than current dynamicTimeout, // hence should be increased. - c.dynamicReadReqStallTimeout.increase() + c.dynamicReadReqStallTimeout.increase(params.bucket) } done <- true }() // Wait until timeout or request is successful. - timer := time.After(c.dynamicReadReqStallTimeout.getValue()) + timer := time.After(c.dynamicReadReqStallTimeout.getValue(params.bucket)) select { case <-timer: - log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue().Seconds()) + log.Printf("stalled read-req cancelled after %fs", c.dynamicReadReqStallTimeout.getValue(params.bucket).Seconds()) cancel() err = context.DeadlineExceeded if res != nil && res.Body != nil { From 4837d634aff52f77fcc398eb0e807b1741470b3f Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Mon, 14 Oct 2024 14:14:16 +0000 Subject: [PATCH 2/5] adding more tests --- storage/bucket_delay_test.go | 136 ++++++++++++++++++++++++++- storage/experimental/experimental.go | 6 -- 2 files changed, 135 insertions(+), 7 deletions(-) diff --git a/storage/bucket_delay_test.go b/storage/bucket_delay_test.go index 4195c4458a70..6983c253999a 100644 --- a/storage/bucket_delay_test.go +++ b/storage/bucket_delay_test.go @@ -16,11 +16,34 @@ package storage import ( "fmt" + "math" + "math/rand" "sync" "testing" "time" ) +func applySamplesBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) int { + var samplesOverThreshold int + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + if randomDelay > b.getValue(bucketName) { + samplesOverThreshold++ + b.increase(bucketName) + } else { + b.decrease(bucketName) + } + } + return samplesOverThreshold +} + +func applySamplesWithUpdateBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) { + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + b.update(bucketName, randomDelay) + } +} + func TestBucketDelay(t *testing.T) { b, err := newBucketDelay(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second) if err != nil { @@ -54,6 +77,13 @@ func TestBucketDelay(t *testing.T) { if delay4 >= delay3 { t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4) } +} + +func TestBucketDelayConcurrentAccess(t *testing.T) { + b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelay: %v", err) + } // Test concurrent access var wg sync.WaitGroup @@ -61,11 +91,115 @@ func TestBucketDelay(t *testing.T) { wg.Add(1) go func(i int) { defer wg.Done() - bucketName := fmt.Sprintf("bucket%d", i%3) // Use only 3 buckets to increase concurrency + bucketName := fmt.Sprintf("bucket%d", i%3) // 3 buckets b.increase(bucketName) b.decrease(bucketName) b.update(bucketName, time.Duration(i)*time.Millisecond) }(i) } wg.Wait() + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != 3 { + t.Errorf("Expected %d buckets in the map, but got %d", 3, len(b.delays)) + } +} + +func TestBucketDelayInvalidArgument(t *testing.T) { + // Test with invalid targetPercentile + _, err := newDynamicDelay(1.1, 15, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as targetPercentile is greater than 1") + } + + // Test with invalid increaseRate + _, err = newDynamicDelay(0.9, -1, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as increaseRate can't be negative") + } + + // Test with invalid minDelay and maxDelay combination + _, err = newDynamicDelay(0.9, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + } +} + +func TestBucketDelayOverflow(t *testing.T) { + b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelay: %v", err) + } + + bucketName := "testBucket" + + n := 10000 + for i := 0; i < n; i++ { + b.increase(bucketName) + } + for i := 0; i < 100*n; i++ { + b.decrease(bucketName) + } + if got, want := b.getValue(bucketName), 1*time.Millisecond; got != want { + t.Fatalf("unexpected delay value: got %v, want %v", got, want) + } +} + +func TestBucketDelayConvergence90(t *testing.T) { + b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelay: %v", err) + } + bucket1 := "bucket1" + bucket2 := "bucket2" + + rnd := rand.New(rand.NewSource(1)) + + // Warm up both buckets + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket1) + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket2) + + // Check convergence for bucket1 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket1) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket1: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket1: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } + + // Check convergence for bucket2 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket2) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket2: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket2: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } +} + +func TestBucketDelayMapSize(t *testing.T) { + b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelay: %v", err) + } + // Add delays for multiple buckets + numBuckets := 10 + for i := 0; i < numBuckets; i++ { + bucketName := fmt.Sprintf("bucket%d", i) + b.increase(bucketName) + } + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != numBuckets { + t.Errorf("Expected %d buckets in the map, but got %d", numBuckets, len(b.delays)) + } } diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index 4e908712ba13..4942be411f56 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -39,12 +39,6 @@ import ( // [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON) // will be supported soon. func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption { - // TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle. - // Currently, dynamicTimeout is kept at the client and hence shared across all the - // BucketHandle, which is not the ideal state. As latency depends on location of VM - // and Bucket, and read latency of different buckets may lie in different range. - // Hence having a separate dynamicTimeout instance at BucketHandle level will - // be better. return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc) } From 85ef80d20f98adfe66e1da4f988a5e038752e4ca Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Mon, 14 Oct 2024 14:17:16 +0000 Subject: [PATCH 3/5] removing unnecessary print statement --- storage/http_client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/storage/http_client.go b/storage/http_client.go index 28867fe4e8cb..01384dc0ae8a 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -892,8 +892,6 @@ func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRa res, err = c.hc.Do(req.WithContext(cancelCtx)) if err == nil { reqLatency := time.Since(reqStartTime) - fmt.Println(reqLatency) - fmt.Println(params.bucket) c.dynamicReadReqStallTimeout.update(params.bucket, reqLatency) } else if errors.Is(err, context.Canceled) { // context.Canceled means operation took more than current dynamicTimeout, From d44467afd7dfa6ce44cc8f88308f2aca5d9f030f Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Thu, 17 Oct 2024 16:22:48 +0000 Subject: [PATCH 4/5] addressing review comments --- storage/bucket_delay.go | 99 ----------- storage/bucket_delay_test.go | 205 ---------------------- storage/dynamic_delay.go | 125 +++++++++---- storage/dynamic_delay_test.go | 253 ++++++++++++++++++++++++--- storage/experimental/experimental.go | 5 +- storage/http_client.go | 14 +- 6 files changed, 334 insertions(+), 367 deletions(-) delete mode 100644 storage/bucket_delay.go delete mode 100644 storage/bucket_delay_test.go diff --git a/storage/bucket_delay.go b/storage/bucket_delay.go deleted file mode 100644 index 5f1ad8dd6a75..000000000000 --- a/storage/bucket_delay.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "fmt" - "sync" - "time" -) - -// bucketDelay wraps dynamicDelay to provide bucket-specific delays. -type bucketDelay struct { - targetPercentile float64 - increaseRate float64 - initialDelay time.Duration - minDelay time.Duration - maxDelay time.Duration - - delays map[string]*dynamicDelay - mu sync.Mutex -} - -// newBucketDelay returns a new bucketDelay instance. -func newBucketDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelay, error) { - if targetPercentile < 0 || targetPercentile > 1 { - return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) - } - if increaseRate <= 0 { - return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) - } - if minDelay >= maxDelay { - return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) - } - - return &bucketDelay{ - targetPercentile: targetPercentile, - increaseRate: increaseRate, - initialDelay: initialDelay, - minDelay: minDelay, - maxDelay: maxDelay, - delays: make(map[string]*dynamicDelay), - }, nil -} - -// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay -// exists for the bucket, a new one is created with the configured parameters. -func (b *bucketDelay) getDelay(bucketName string) *dynamicDelay { - delay, ok := b.delays[bucketName] - if !ok { - // Create a new dynamicDelay for the bucket if it doesn't exist - delay = newDynamicDelayInternal(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay) - b.delays[bucketName] = delay - } - return delay -} - -// increase notes that the operation took longer than the delay for the given bucket. -func (b *bucketDelay) increase(bucketName string) { - b.mu.Lock() - defer b.mu.Unlock() - - b.getDelay(bucketName).unsafeIncrease() -} - -// decrease notes that the operation completed before the delay for the given bucket. -func (b *bucketDelay) decrease(bucketName string) { - b.mu.Lock() - defer b.mu.Unlock() - - b.getDelay(bucketName).unsafeDecrease() -} - -// update updates the delay value for the bucket depending on the specified latency. -func (b *bucketDelay) update(bucketName string, latency time.Duration) { - b.mu.Lock() - defer b.mu.Unlock() - - b.getDelay(bucketName).unsafeUpdate(latency) -} - -// getValue returns the desired delay to wait before retrying the operation for the given bucket. -func (b *bucketDelay) getValue(bucketName string) time.Duration { - b.mu.Lock() - defer b.mu.Unlock() - - return b.getDelay(bucketName).getValueUnsafe() -} diff --git a/storage/bucket_delay_test.go b/storage/bucket_delay_test.go deleted file mode 100644 index 6983c253999a..000000000000 --- a/storage/bucket_delay_test.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "fmt" - "math" - "math/rand" - "sync" - "testing" - "time" -) - -func applySamplesBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) int { - var samplesOverThreshold int - for i := 0; i < numSamples; i++ { - randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) - if randomDelay > b.getValue(bucketName) { - samplesOverThreshold++ - b.increase(bucketName) - } else { - b.decrease(bucketName) - } - } - return samplesOverThreshold -} - -func applySamplesWithUpdateBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) { - for i := 0; i < numSamples; i++ { - randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) - b.update(bucketName, randomDelay) - } -} - -func TestBucketDelay(t *testing.T) { - b, err := newBucketDelay(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second) - if err != nil { - t.Errorf("while creating bucketDelay: %v", err) - } - - // Test increase and getValue - b.increase("bucket1") - delay1 := b.getValue("bucket1") - if delay1 <= 100*time.Millisecond { - t.Errorf("Expected delay for bucket1 to be > 100ms after increase, got %v", delay1) - } - - // Test decrease and getValue - b.decrease("bucket1") - delay2 := b.getValue("bucket1") - if delay2 >= delay1 { - t.Errorf("Expected delay for bucket1 to be < %v after decrease, got %v", delay1, delay2) - } - - // Test update with latency > current delay - b.update("bucket2", 200*time.Millisecond) - delay3 := b.getValue("bucket2") - if delay3 <= 100*time.Millisecond { - t.Errorf("Expected delay for bucket2 to be > 100ms after update with higher latency, got %v", delay3) - } - - // Test update with latency < current delay - b.update("bucket2", 50*time.Millisecond) - delay4 := b.getValue("bucket2") - if delay4 >= delay3 { - t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4) - } -} - -func TestBucketDelayConcurrentAccess(t *testing.T) { - b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Errorf("while creating bucketDelay: %v", err) - } - - // Test concurrent access - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - bucketName := fmt.Sprintf("bucket%d", i%3) // 3 buckets - b.increase(bucketName) - b.decrease(bucketName) - b.update(bucketName, time.Duration(i)*time.Millisecond) - }(i) - } - wg.Wait() - - // Check if the map size is as expected - b.mu.Lock() // Lock to access the map safely - defer b.mu.Unlock() - if len(b.delays) != 3 { - t.Errorf("Expected %d buckets in the map, but got %d", 3, len(b.delays)) - } -} - -func TestBucketDelayInvalidArgument(t *testing.T) { - // Test with invalid targetPercentile - _, err := newDynamicDelay(1.1, 15, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) - if err == nil { - t.Fatal("unexpected, should throw error as targetPercentile is greater than 1") - } - - // Test with invalid increaseRate - _, err = newDynamicDelay(0.9, -1, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) - if err == nil { - t.Fatal("unexpected, should throw error as increaseRate can't be negative") - } - - // Test with invalid minDelay and maxDelay combination - _, err = newDynamicDelay(0.9, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) - if err == nil { - t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") - } -} - -func TestBucketDelayOverflow(t *testing.T) { - b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Errorf("while creating bucketDelay: %v", err) - } - - bucketName := "testBucket" - - n := 10000 - for i := 0; i < n; i++ { - b.increase(bucketName) - } - for i := 0; i < 100*n; i++ { - b.decrease(bucketName) - } - if got, want := b.getValue(bucketName), 1*time.Millisecond; got != want { - t.Fatalf("unexpected delay value: got %v, want %v", got, want) - } -} - -func TestBucketDelayConvergence90(t *testing.T) { - b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Errorf("while creating bucketDelay: %v", err) - } - bucket1 := "bucket1" - bucket2 := "bucket2" - - rnd := rand.New(rand.NewSource(1)) - - // Warm up both buckets - applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket1) - applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket2) - - // Check convergence for bucket1 - { - samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket1) - if samplesOverThreshold < (1000 * 0.05) { - t.Errorf("bucket1: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) - } - if samplesOverThreshold > (1000 * 0.2) { - t.Errorf("bucket1: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) - } - } - - // Check convergence for bucket2 - { - samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket2) - if samplesOverThreshold < (1000 * 0.05) { - t.Errorf("bucket2: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) - } - if samplesOverThreshold > (1000 * 0.2) { - t.Errorf("bucket2: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) - } - } -} - -func TestBucketDelayMapSize(t *testing.T) { - b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Errorf("while creating bucketDelay: %v", err) - } - // Add delays for multiple buckets - numBuckets := 10 - for i := 0; i < numBuckets; i++ { - bucketName := fmt.Sprintf("bucket%d", i) - b.increase(bucketName) - } - - // Check if the map size is as expected - b.mu.Lock() // Lock to access the map safely - defer b.mu.Unlock() - if len(b.delays) != numBuckets { - t.Errorf("Expected %d buckets in the map, but got %d", numBuckets, len(b.delays)) - } -} diff --git a/storage/dynamic_delay.go b/storage/dynamic_delay.go index 4ff6d94f49fa..9295480d030b 100644 --- a/storage/dynamic_delay.go +++ b/storage/dynamic_delay.go @@ -36,6 +36,23 @@ type dynamicDelay struct { mu *sync.RWMutex } +// validateDynamicDelayParams ensures +// targetPercentile is a valid fraction (between 0 and 1). +// increaseRate is a positive number. +// minDelay is less than maxDelay. +func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error { + if targetPercentile < 0 || targetPercentile > 1 { + return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) + } + if increaseRate <= 0 { + return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) + } + if minDelay >= maxDelay { + return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) + } + return nil +} + // NewDynamicDelay returns a dynamicDelay. // // targetPercentile is the desired percentile to be computed. For example, a @@ -49,24 +66,7 @@ type dynamicDelay struct { // // decrease can never lower the delay past minDelay, increase can never raise // the delay past maxDelay. -func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) { - if targetPercentile < 0 || targetPercentile > 1 { - return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile) - } - if increaseRate <= 0 { - return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate) - } - if minDelay >= maxDelay { - return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay) - } - - return newDynamicDelayInternal(targetPercentile, increaseRate, initialDelay, minDelay, maxDelay), nil -} - -// newDynamicDelayInternal constructs the dynamicDelay object without validating the -// arguments. This is created specifically to use by bucketDelay which validates -// the arguments in the start. newDynamicDelay is the recommended way to create dynamicDelay. -func newDynamicDelayInternal(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay { +func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay { if initialDelay < minDelay { initialDelay = minDelay } @@ -141,15 +141,6 @@ func (d *dynamicDelay) update(latency time.Duration) { } } -// update updates the delay value depending on the specified latency. -func (d *dynamicDelay) unsafeUpdate(latency time.Duration) { - if latency > d.value { - d.unsafeIncrease() - } else { - d.unsafeDecrease() - } -} - // getValue returns the desired delay to wait before retry the operation. func (d *dynamicDelay) getValue() time.Duration { d.mu.RLock() @@ -158,11 +149,6 @@ func (d *dynamicDelay) getValue() time.Duration { return d.value } -// getValueUnsafe returns the desired delay to wait before retry the operation. -func (d *dynamicDelay) getValueUnsafe() time.Duration { - return d.value -} - // printDelay prints the state of delay, helpful in debugging. func (d *dynamicDelay) printDelay() { d.mu.RLock() @@ -174,3 +160,78 @@ func (d *dynamicDelay) printDelay() { fmt.Println("MaxDelay: ", d.maxDelay) fmt.Println("Value: ", d.value) } + +// bucketDelayManager wraps dynamicDelay to provide bucket-specific delays. +type bucketDelayManager struct { + targetPercentile float64 + increaseRate float64 + initialDelay time.Duration + minDelay time.Duration + maxDelay time.Duration + + // delays maps bucket names to their dynamic delay instance. + delays map[string]*dynamicDelay + + // mu guards delays. + mu *sync.RWMutex +} + +// newBucketDelayManager returns a new bucketDelayManager instance. +func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) { + err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay) + if err != nil { + return nil, err + } + + return &bucketDelayManager{ + targetPercentile: targetPercentile, + increaseRate: increaseRate, + initialDelay: initialDelay, + minDelay: minDelay, + maxDelay: maxDelay, + delays: make(map[string]*dynamicDelay), + mu: &sync.RWMutex{}, + }, nil +} + +// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay +// exists for the bucket, a new one is created with the configured parameters. +func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay { + b.mu.RLock() + delay, ok := b.delays[bucketName] + b.mu.RUnlock() + + if !ok { + b.mu.Lock() + defer b.mu.Unlock() + + // Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock(). + delay, ok = b.delays[bucketName] + if !ok { + // Create a new dynamicDelay for the bucket if it doesn't exist + delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay) + b.delays[bucketName] = delay + } + } + return delay +} + +// increase notes that the operation took longer than the delay for the given bucket. +func (b *bucketDelayManager) increase(bucketName string) { + b.getDelay(bucketName).increase() +} + +// decrease notes that the operation completed before the delay for the given bucket. +func (b *bucketDelayManager) decrease(bucketName string) { + b.getDelay(bucketName).decrease() +} + +// update updates the delay value for the bucket depending on the specified latency. +func (b *bucketDelayManager) update(bucketName string, latency time.Duration) { + b.getDelay(bucketName).update(latency) +} + +// getValue returns the desired delay to wait before retrying the operation for the given bucket. +func (b *bucketDelayManager) getValue(bucketName string) time.Duration { + return b.getDelay(bucketName).getValue() +} diff --git a/storage/dynamic_delay_test.go b/storage/dynamic_delay_test.go index 8247d7527496..57318580c454 100644 --- a/storage/dynamic_delay_test.go +++ b/storage/dynamic_delay_test.go @@ -13,8 +13,10 @@ package storage import ( + "fmt" "math" "math/rand" + "sync" "testing" "time" @@ -44,10 +46,7 @@ func applySamplesWithUpdate(numSamples int, expectedValue float64, rnd *rand.Ran } func TestNewDelay(t *testing.T) { - d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) want := &dynamicDelay{ increaseFactor: 1.047294, @@ -84,10 +83,7 @@ func TestNewDelay(t *testing.T) { func TestConvergence99(t *testing.T) { // d should converge to the 99-percentile value. - d, err := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.01, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) rnd := rand.New(rand.NewSource(1)) @@ -122,10 +118,7 @@ func TestConvergence99(t *testing.T) { func TestConvergence90(t *testing.T) { // d should converge to the 90-percentile value. - d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) rnd := rand.New(rand.NewSource(1)) @@ -145,16 +138,18 @@ func TestConvergence90(t *testing.T) { } func TestOverflow(t *testing.T) { - d, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) - if err != nil { - t.Fatal(err) - } + d := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) n := 10000 + // Should converge to maxDelay. for i := 0; i < n; i++ { d.increase() } - t.Log(d.getValue()) + if got, want := d.getValue(), 1*time.Hour; got != want { + t.Fatalf("unexpected d.Value: got %v, want %v", got, want) + } + + // Should converge to minDelay. for i := 0; i < 100*n; i++ { d.decrease() } @@ -163,14 +158,228 @@ func TestOverflow(t *testing.T) { } } -func TestInvalidArgument(t *testing.T) { - _, err := newDynamicDelay(1-0.1, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) +func TestValidateDynamicDelayParams(t *testing.T) { + testCases := []struct { + name string + targetPercentile float64 + increaseRate float64 + minDelay time.Duration + maxDelay time.Duration + expectErr bool + }{ + // Valid parameters + {"valid", 0.5, 0.1, 1 * time.Second, 10 * time.Second, false}, + + // Invalid targetPercentile + {"invalid targetPercentile (< 0)", -0.1, 0.1, 1 * time.Second, 10 * time.Second, true}, + {"invalid targetPercentile (> 1)", 1.1, 0.1, 1 * time.Second, 10 * time.Second, true}, + + // Invalid increaseRate + {"invalid increaseRate (<= 0)", 0.5, 0, 1 * time.Second, 10 * time.Second, true}, + + // Invalid delay combination + {"invalid delay combination (minDelay >= maxDelay)", 0.5, 0.1, 10 * time.Second, 1 * time.Second, true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateDynamicDelayParams(tc.targetPercentile, tc.increaseRate, tc.minDelay, tc.maxDelay) + if tc.expectErr && err == nil { + t.Errorf("Expected an error, but got none") + } + if !tc.expectErr && err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) + } +} + +func applySamplesBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelayManager, bucketName string) int { + var samplesOverThreshold int + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + if randomDelay > b.getValue(bucketName) { + samplesOverThreshold++ + b.increase(bucketName) + } else { + b.decrease(bucketName) + } + } + return samplesOverThreshold +} + +func applySamplesWithUpdateBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelayManager, bucketName string) { + for i := 0; i < numSamples; i++ { + randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second)) + b.update(bucketName, randomDelay) + } +} + +func TestBucketDelayManager(t *testing.T) { + b, err := newBucketDelayManager(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + t.Logf("testing") + + // Test increase and getValue + b.increase("bucket1") + delay1 := b.getValue("bucket1") + if delay1 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket1 to be > 100ms after increase, got %v", delay1) + } + + // Test decrease and getValue + b.decrease("bucket1") + delay2 := b.getValue("bucket1") + if delay2 >= delay1 { + t.Errorf("Expected delay for bucket1 to be < %v after decrease, got %v", delay1, delay2) + } + + // Test update with latency > current delay + b.update("bucket2", 200*time.Millisecond) + delay3 := b.getValue("bucket2") + if delay3 <= 100*time.Millisecond { + t.Errorf("Expected delay for bucket2 to be > 100ms after update with higher latency, got %v", delay3) + } + + // Test update with latency < current delay + b.update("bucket2", 50*time.Millisecond) + delay4 := b.getValue("bucket2") + if delay4 >= delay3 { + t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4) + } +} + +func TestBucketDelayManagerConcurrentAccess(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + // Test concurrent access + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bucketName := fmt.Sprintf("bucket%d", i%3) // 3 buckets + b.increase(bucketName) + b.decrease(bucketName) + b.update(bucketName, time.Duration(i)*time.Millisecond) + }(i) + } + wg.Wait() + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != 3 { + t.Errorf("Expected %d buckets in the map, but got %d", 3, len(b.delays)) + } +} + +func TestBucketDelayManagerInvalidArgument(t *testing.T) { + // Test with invalid targetPercentile + _, err := newBucketDelayManager(1.1, 15, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) if err == nil { - t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + t.Fatal("unexpected, should throw error as targetPercentile is greater than 1") + } + + // Test with invalid increaseRate + _, err = newBucketDelayManager(0.9, -1, 1*time.Millisecond, 1*time.Hour, 2*time.Hour) + if err == nil { + t.Fatal("unexpected, should throw error as increaseRate can't be negative") } - _, err = newDynamicDelay(1-0.1, 0, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) + // Test with invalid minDelay and maxDelay combination + _, err = newBucketDelayManager(0.9, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour) if err == nil { - t.Fatal("unexpected, should throw error as increaseRate can't be zero") + t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay") + } +} + +func TestBucketDelayManagerOverflow(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + + bucketName := "testBucket" + n := 10000 + + // Should converge to maxDelay. + for i := 0; i < n; i++ { + b.increase(bucketName) + } + + if got, want := b.getValue(bucketName), 1*time.Hour; got != want { + t.Fatalf("unexpected delay value: got %v, want %v", got, want) + } + + // Should converge to minDelay. + for i := 0; i < 100*n; i++ { + b.decrease(bucketName) + } + if got, want := b.getValue(bucketName), 1*time.Millisecond; got != want { + t.Fatalf("unexpected delay value: got %v, want %v", got, want) + } +} + +func TestBucketDelayManagerConvergence90(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + bucket1 := "bucket1" + bucket2 := "bucket2" + + rnd := rand.New(rand.NewSource(1)) + + // Warm up both buckets + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket1) + applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket2) + + // Check convergence for bucket1 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket1) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket1: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket1: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } + + // Check convergence for bucket2 + { + samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket2) + if samplesOverThreshold < (1000 * 0.05) { + t.Errorf("bucket2: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold) + } + if samplesOverThreshold > (1000 * 0.2) { + t.Errorf("bucket2: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold) + } + } +} + +func TestBucketDelayManagerMapSize(t *testing.T) { + b, err := newBucketDelayManager(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour) + if err != nil { + t.Errorf("while creating bucketDelayManager: %v", err) + } + // Add delays for multiple buckets + numBuckets := 10 + for i := 0; i < numBuckets; i++ { + bucketName := fmt.Sprintf("bucket%d", i) + b.increase(bucketName) + } + + // Check if the map size is as expected + b.mu.Lock() // Lock to access the map safely + defer b.mu.Unlock() + if len(b.delays) != numBuckets { + t.Errorf("Expected %d buckets in the map, but got %d", numBuckets, len(b.delays)) } } diff --git a/storage/experimental/experimental.go b/storage/experimental/experimental.go index 4942be411f56..c83d1fa0935a 100644 --- a/storage/experimental/experimental.go +++ b/storage/experimental/experimental.go @@ -33,8 +33,9 @@ import ( // Cloud Storage. If the timeout elapses with no response from the server, the request // is automatically retried. // The timeout is initially set to ReadStallTimeoutConfig.Min. The client tracks -// latency across all read requests from the client, and can adjust the timeout higher -// to the target percentile when latency from the server is high. +// latency across all read requests from the client for each bucket accessed, and can +// adjust the timeout higher to the target percentile when latency for request to that +// bucket is high. // Currently, this is supported only for downloads ([storage.NewReader] and // [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON) // will be supported soon. diff --git a/storage/http_client.go b/storage/http_client.go index 01384dc0ae8a..97bbf04887b3 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -55,7 +55,7 @@ type httpStorageClient struct { scheme string settings *settings config *storageConfig - dynamicReadReqStallTimeout *bucketDelay + dynamicReadReqStallTimeout *bucketDelayManager } // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON @@ -130,10 +130,10 @@ func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageCl return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err) } - var bd *bucketDelay + var bd *bucketDelayManager if config.readStallTimeoutConfig != nil { drrstConfig := config.readStallTimeoutConfig - bd, err = newBucketDelay( + bd, err = newBucketDelayManager( drrstConfig.TargetPercentile, getDynamicReadReqIncreaseRateFromEnv(), getDynamicReadReqInitialTimeoutSecFromEnv(drrstConfig.Min), @@ -1352,7 +1352,7 @@ func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error { // readerReopen initiates a Read with offset and length, assuming we // have already read seen bytes. func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings, - doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { + doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { return func(seen int64) (*http.Response, error) { // If the context has already expired, return immediately without making a // call. @@ -1407,9 +1407,9 @@ func readerReopen(ctx context.Context, header http.Header, params *newRangeReade } partialContentNotSatisfied := - !decompressiveTranscoding(res) && - start > 0 && params.length != 0 && - res.StatusCode != http.StatusPartialContent + !decompressiveTranscoding(res) && + start > 0 && params.length != 0 && + res.StatusCode != http.StatusPartialContent if partialContentNotSatisfied { res.Body.Close() From 95fe8ec39fa61b0750df59cda0aa907fe6a062b0 Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Thu, 17 Oct 2024 16:27:09 +0000 Subject: [PATCH 5/5] fixing vet error --- storage/dynamic_delay.go | 2 +- storage/http_client.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/storage/dynamic_delay.go b/storage/dynamic_delay.go index 9295480d030b..5944f515d39c 100644 --- a/storage/dynamic_delay.go +++ b/storage/dynamic_delay.go @@ -36,7 +36,7 @@ type dynamicDelay struct { mu *sync.RWMutex } -// validateDynamicDelayParams ensures +// validateDynamicDelayParams ensures, // targetPercentile is a valid fraction (between 0 and 1). // increaseRate is a positive number. // minDelay is less than maxDelay. diff --git a/storage/http_client.go b/storage/http_client.go index 97bbf04887b3..6baf90547356 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -1352,7 +1352,7 @@ func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error { // readerReopen initiates a Read with offset and length, assuming we // have already read seen bytes. func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings, - doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { + doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) { return func(seen int64) (*http.Response, error) { // If the context has already expired, return immediately without making a // call. @@ -1407,9 +1407,9 @@ func readerReopen(ctx context.Context, header http.Header, params *newRangeReade } partialContentNotSatisfied := - !decompressiveTranscoding(res) && - start > 0 && params.length != 0 && - res.StatusCode != http.StatusPartialContent + !decompressiveTranscoding(res) && + start > 0 && params.length != 0 && + res.StatusCode != http.StatusPartialContent if partialContentNotSatisfied { res.Body.Close()