Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): adding bucket-specific dynamicDelay #10987

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions storage/bucket_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 Google LLC
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"fmt"
"sync"
"time"
)

// bucketDelay wraps dynamicDelay to provide bucket-specific delays.
type bucketDelay struct {
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
targetPercentile float64
increaseRate float64
initialDelay time.Duration
minDelay time.Duration
maxDelay time.Duration

delays map[string]*dynamicDelay
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
}

// newBucketDelay returns a new bucketDelay instance.
func newBucketDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelay, error) {
if targetPercentile < 0 || targetPercentile > 1 {
return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}

return &bucketDelay{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initialDelay: initialDelay,
minDelay: minDelay,
maxDelay: maxDelay,
delays: make(map[string]*dynamicDelay),
}, nil
}

// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay
// exists for the bucket, a new one is created with the configured parameters.
func (b *bucketDelay) getDelay(bucketName string) *dynamicDelay {
delay, ok := b.delays[bucketName]
if !ok {
// Create a new dynamicDelay for the bucket if it doesn't exist
delay = newDynamicDelayInternal(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay)
b.delays[bucketName] = delay
}
return delay
}

// increase notes that the operation took longer than the delay for the given bucket.
func (b *bucketDelay) increase(bucketName string) {
b.mu.Lock()
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
defer b.mu.Unlock()

b.getDelay(bucketName).unsafeIncrease()
}

// decrease notes that the operation completed before the delay for the given bucket.
func (b *bucketDelay) decrease(bucketName string) {
b.mu.Lock()
defer b.mu.Unlock()

b.getDelay(bucketName).unsafeDecrease()
}

// update updates the delay value for the bucket depending on the specified latency.
func (b *bucketDelay) update(bucketName string, latency time.Duration) {
b.mu.Lock()
defer b.mu.Unlock()

b.getDelay(bucketName).unsafeUpdate(latency)
}

// getValue returns the desired delay to wait before retrying the operation for the given bucket.
func (b *bucketDelay) getValue(bucketName string) time.Duration {
b.mu.Lock()
defer b.mu.Unlock()

return b.getDelay(bucketName).getValueUnsafe()
}
205 changes: 205 additions & 0 deletions storage/bucket_delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"
)

func applySamplesBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) int {
var samplesOverThreshold int
for i := 0; i < numSamples; i++ {
randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second))
if randomDelay > b.getValue(bucketName) {
samplesOverThreshold++
b.increase(bucketName)
} else {
b.decrease(bucketName)
}
}
return samplesOverThreshold
}

func applySamplesWithUpdateBucket(numSamples int, expectedValue float64, rnd *rand.Rand, b *bucketDelay, bucketName string) {
for i := 0; i < numSamples; i++ {
randomDelay := time.Duration(-math.Log(rnd.Float64()) * expectedValue * float64(time.Second))
b.update(bucketName, randomDelay)
}
}

func TestBucketDelay(t *testing.T) {
b, err := newBucketDelay(0.99, 1.5, 100*time.Millisecond, 100*time.Millisecond, 10*time.Second)
if err != nil {
t.Errorf("while creating bucketDelay: %v", err)
}

// Test increase and getValue
b.increase("bucket1")
delay1 := b.getValue("bucket1")
if delay1 <= 100*time.Millisecond {
t.Errorf("Expected delay for bucket1 to be > 100ms after increase, got %v", delay1)
}

// Test decrease and getValue
b.decrease("bucket1")
delay2 := b.getValue("bucket1")
if delay2 >= delay1 {
t.Errorf("Expected delay for bucket1 to be < %v after decrease, got %v", delay1, delay2)
}

// Test update with latency > current delay
b.update("bucket2", 200*time.Millisecond)
delay3 := b.getValue("bucket2")
if delay3 <= 100*time.Millisecond {
t.Errorf("Expected delay for bucket2 to be > 100ms after update with higher latency, got %v", delay3)
}

// Test update with latency < current delay
b.update("bucket2", 50*time.Millisecond)
delay4 := b.getValue("bucket2")
if delay4 >= delay3 {
t.Errorf("Expected delay for bucket2 to be < %v after update with lower latency, got %v", delay3, delay4)
}
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
}

func TestBucketDelayConcurrentAccess(t *testing.T) {
b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Errorf("while creating bucketDelay: %v", err)
}

// Test concurrent access
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
bucketName := fmt.Sprintf("bucket%d", i%3) // 3 buckets
b.increase(bucketName)
b.decrease(bucketName)
b.update(bucketName, time.Duration(i)*time.Millisecond)
}(i)
}
wg.Wait()

// Check if the map size is as expected
b.mu.Lock() // Lock to access the map safely
defer b.mu.Unlock()
if len(b.delays) != 3 {
t.Errorf("Expected %d buckets in the map, but got %d", 3, len(b.delays))
}
}

func TestBucketDelayInvalidArgument(t *testing.T) {
// Test with invalid targetPercentile
_, err := newDynamicDelay(1.1, 15, 1*time.Millisecond, 1*time.Hour, 2*time.Hour)
if err == nil {
t.Fatal("unexpected, should throw error as targetPercentile is greater than 1")
}

// Test with invalid increaseRate
_, err = newDynamicDelay(0.9, -1, 1*time.Millisecond, 1*time.Hour, 2*time.Hour)
if err == nil {
t.Fatal("unexpected, should throw error as increaseRate can't be negative")
}

// Test with invalid minDelay and maxDelay combination
_, err = newDynamicDelay(0.9, 15, 1*time.Millisecond, 2*time.Hour, 1*time.Hour)
if err == nil {
t.Fatal("unexpected, should throw error as minDelay is greater than maxDelay")
}
}

func TestBucketDelayOverflow(t *testing.T) {
b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Errorf("while creating bucketDelay: %v", err)
}

bucketName := "testBucket"

n := 10000
for i := 0; i < n; i++ {
b.increase(bucketName)
}
for i := 0; i < 100*n; i++ {
b.decrease(bucketName)
}
if got, want := b.getValue(bucketName), 1*time.Millisecond; got != want {
t.Fatalf("unexpected delay value: got %v, want %v", got, want)
}
}

func TestBucketDelayConvergence90(t *testing.T) {
b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Errorf("while creating bucketDelay: %v", err)
}
bucket1 := "bucket1"
bucket2 := "bucket2"

rnd := rand.New(rand.NewSource(1))

// Warm up both buckets
applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket1)
applySamplesWithUpdateBucket(1000, 0.005, rnd, b, bucket2)

// Check convergence for bucket1
{
samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket1)
if samplesOverThreshold < (1000 * 0.05) {
t.Errorf("bucket1: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold)
}
if samplesOverThreshold > (1000 * 0.2) {
t.Errorf("bucket1: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold)
}
}

// Check convergence for bucket2
{
samplesOverThreshold := applySamplesBucket(1000, 0.005, rnd, b, bucket2)
if samplesOverThreshold < (1000 * 0.05) {
t.Errorf("bucket2: samplesOverThreshold = %d < 1000*0.05", samplesOverThreshold)
}
if samplesOverThreshold > (1000 * 0.2) {
t.Errorf("bucket2: samplesOverThreshold = %d > 1000*0.2", samplesOverThreshold)
}
}
}

func TestBucketDelayMapSize(t *testing.T) {
b, err := newBucketDelay(1-0.1, 15, 1*time.Millisecond, 1*time.Millisecond, 1*time.Hour)
if err != nil {
t.Errorf("while creating bucketDelay: %v", err)
}
// Add delays for multiple buckets
numBuckets := 10
for i := 0; i < numBuckets; i++ {
bucketName := fmt.Sprintf("bucket%d", i)
b.increase(bucketName)
}

// Check if the map size is as expected
b.mu.Lock() // Lock to access the map safely
defer b.mu.Unlock()
if len(b.delays) != numBuckets {
t.Errorf("Expected %d buckets in the map, but got %d", numBuckets, len(b.delays))
}
}
26 changes: 24 additions & 2 deletions storage/dynamic_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela
if minDelay >= maxDelay {
return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}

return newDynamicDelayInternal(targetPercentile, increaseRate, initialDelay, minDelay, maxDelay), nil
}

// newDynamicDelayInternal constructs the dynamicDelay object without validating the
// arguments. This is created specifically to use by bucketDelay which validates
// the arguments in the start. newDynamicDelay is the recommended way to create dynamicDelay.
func newDynamicDelayInternal(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay {
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
if initialDelay < minDelay {
initialDelay = minDelay
}
Expand All @@ -84,7 +92,7 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela
maxDelay: maxDelay,
value: initialDelay,
mu: &sync.RWMutex{},
}, nil
}
}

func (d *dynamicDelay) unsafeIncrease() {
Expand Down Expand Up @@ -133,6 +141,15 @@ func (d *dynamicDelay) update(latency time.Duration) {
}
}

// update updates the delay value depending on the specified latency.
func (d *dynamicDelay) unsafeUpdate(latency time.Duration) {
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
if latency > d.value {
d.unsafeIncrease()
} else {
d.unsafeDecrease()
}
}

// getValue returns the desired delay to wait before retry the operation.
func (d *dynamicDelay) getValue() time.Duration {
d.mu.RLock()
Expand All @@ -141,7 +158,12 @@ func (d *dynamicDelay) getValue() time.Duration {
return d.value
}

// PrintDelay prints the state of delay, helpful in debugging.
// getValueUnsafe returns the desired delay to wait before retry the operation.
func (d *dynamicDelay) getValueUnsafe() time.Duration {
return d.value
}

// printDelay prints the state of delay, helpful in debugging.
func (d *dynamicDelay) printDelay() {
d.mu.RLock()
defer d.mu.RUnlock()
Expand Down
6 changes: 0 additions & 6 deletions storage/experimental/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ import (
// [storage.NewRangeReader] calls) and only for the XML API. Other read APIs (gRPC & JSON)
raj-prince marked this conversation as resolved.
Show resolved Hide resolved
// will be supported soon.
func WithReadStallTimeout(rstc *ReadStallTimeoutConfig) option.ClientOption {
// TODO (raj-prince): To keep separate dynamicDelay instance for different BucketHandle.
// Currently, dynamicTimeout is kept at the client and hence shared across all the
// BucketHandle, which is not the ideal state. As latency depends on location of VM
// and Bucket, and read latency of different buckets may lie in different range.
// Hence having a separate dynamicTimeout instance at BucketHandle level will
// be better.
return internal.WithReadStallTimeout.(func(config *ReadStallTimeoutConfig) option.ClientOption)(rstc)
}

Expand Down
Loading
Loading