Skip to content

Commit

Permalink
feat(storage): adding bucket-specific dynamicDelay (#10987)
Browse files Browse the repository at this point in the history
* adding bucket-specific delay

* adding more tests

* removing unnecessary print statement

* addressing review comments

* fixing vet error

---------

Co-authored-by: Chris Cotter <[email protected]>
  • Loading branch information
raj-prince and tritone authored Oct 17, 2024
1 parent 70d82fe commit a807a7e
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 50 deletions.
107 changes: 95 additions & 12 deletions storage/dynamic_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ type dynamicDelay struct {
mu *sync.RWMutex
}

// validateDynamicDelayParams ensures,
// targetPercentile is a valid fraction (between 0 and 1).
// increaseRate is a positive number.
// minDelay is less than maxDelay.
func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error {
if targetPercentile < 0 || targetPercentile > 1 {
return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
return nil
}

// NewDynamicDelay returns a dynamicDelay.
//
// targetPercentile is the desired percentile to be computed. For example, a
Expand All @@ -49,16 +66,7 @@ type dynamicDelay struct {
//
// decrease can never lower the delay past minDelay, increase can never raise
// the delay past maxDelay.
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) {
if targetPercentile < 0 || targetPercentile > 1 {
return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay {
if initialDelay < minDelay {
initialDelay = minDelay
}
Expand All @@ -84,7 +92,7 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela
maxDelay: maxDelay,
value: initialDelay,
mu: &sync.RWMutex{},
}, nil
}
}

func (d *dynamicDelay) unsafeIncrease() {
Expand Down Expand Up @@ -141,7 +149,7 @@ func (d *dynamicDelay) getValue() time.Duration {
return d.value
}

// PrintDelay prints the state of delay, helpful in debugging.
// printDelay prints the state of delay, helpful in debugging.
func (d *dynamicDelay) printDelay() {
d.mu.RLock()
defer d.mu.RUnlock()
Expand All @@ -152,3 +160,78 @@ func (d *dynamicDelay) printDelay() {
fmt.Println("MaxDelay: ", d.maxDelay)
fmt.Println("Value: ", d.value)
}

// bucketDelayManager wraps dynamicDelay to provide bucket-specific delays.
type bucketDelayManager struct {
targetPercentile float64
increaseRate float64
initialDelay time.Duration
minDelay time.Duration
maxDelay time.Duration

// delays maps bucket names to their dynamic delay instance.
delays map[string]*dynamicDelay

// mu guards delays.
mu *sync.RWMutex
}

// newBucketDelayManager returns a new bucketDelayManager instance.
func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) {
err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay)
if err != nil {
return nil, err
}

return &bucketDelayManager{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initialDelay: initialDelay,
minDelay: minDelay,
maxDelay: maxDelay,
delays: make(map[string]*dynamicDelay),
mu: &sync.RWMutex{},
}, nil
}

// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay
// exists for the bucket, a new one is created with the configured parameters.
func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay {
b.mu.RLock()
delay, ok := b.delays[bucketName]
b.mu.RUnlock()

if !ok {
b.mu.Lock()
defer b.mu.Unlock()

// Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock().
delay, ok = b.delays[bucketName]
if !ok {
// Create a new dynamicDelay for the bucket if it doesn't exist
delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay)
b.delays[bucketName] = delay
}
}
return delay
}

// increase notes that the operation took longer than the delay for the given bucket.
func (b *bucketDelayManager) increase(bucketName string) {
b.getDelay(bucketName).increase()
}

// decrease notes that the operation completed before the delay for the given bucket.
func (b *bucketDelayManager) decrease(bucketName string) {
b.getDelay(bucketName).decrease()
}

// update updates the delay value for the bucket depending on the specified latency.
func (b *bucketDelayManager) update(bucketName string, latency time.Duration) {
b.getDelay(bucketName).update(latency)
}

// getValue returns the desired delay to wait before retrying the operation for the given bucket.
func (b *bucketDelayManager) getValue(bucketName string) time.Duration {
return b.getDelay(bucketName).getValue()
}
Loading

0 comments on commit a807a7e

Please sign in to comment.