Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): adding bucket-specific dynamicDelay #10987

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 95 additions & 12 deletions storage/dynamic_delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ type dynamicDelay struct {
mu *sync.RWMutex
}

// validateDynamicDelayParams ensures,
// targetPercentile is a valid fraction (between 0 and 1).
// increaseRate is a positive number.
// minDelay is less than maxDelay.
func validateDynamicDelayParams(targetPercentile, increaseRate float64, minDelay, maxDelay time.Duration) error {
if targetPercentile < 0 || targetPercentile > 1 {
return fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
return nil
}

// NewDynamicDelay returns a dynamicDelay.
//
// targetPercentile is the desired percentile to be computed. For example, a
Expand All @@ -49,16 +66,7 @@ type dynamicDelay struct {
//
// decrease can never lower the delay past minDelay, increase can never raise
// the delay past maxDelay.
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*dynamicDelay, error) {
if targetPercentile < 0 || targetPercentile > 1 {
return nil, fmt.Errorf("invalid targetPercentile (%v): must be within [0, 1]", targetPercentile)
}
if increaseRate <= 0 {
return nil, fmt.Errorf("invalid increaseRate (%v): must be > 0", increaseRate)
}
if minDelay >= maxDelay {
return nil, fmt.Errorf("invalid minDelay (%v) and maxDelay (%v) combination: minDelay must be smaller than maxDelay", minDelay, maxDelay)
}
func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) *dynamicDelay {
if initialDelay < minDelay {
initialDelay = minDelay
}
Expand All @@ -84,7 +92,7 @@ func newDynamicDelay(targetPercentile float64, increaseRate float64, initialDela
maxDelay: maxDelay,
value: initialDelay,
mu: &sync.RWMutex{},
}, nil
}
}

func (d *dynamicDelay) unsafeIncrease() {
Expand Down Expand Up @@ -141,7 +149,7 @@ func (d *dynamicDelay) getValue() time.Duration {
return d.value
}

// PrintDelay prints the state of delay, helpful in debugging.
// printDelay prints the state of delay, helpful in debugging.
func (d *dynamicDelay) printDelay() {
d.mu.RLock()
defer d.mu.RUnlock()
Expand All @@ -152,3 +160,78 @@ func (d *dynamicDelay) printDelay() {
fmt.Println("MaxDelay: ", d.maxDelay)
fmt.Println("Value: ", d.value)
}

// bucketDelayManager wraps dynamicDelay to provide bucket-specific delays.
type bucketDelayManager struct {
targetPercentile float64
increaseRate float64
initialDelay time.Duration
minDelay time.Duration
maxDelay time.Duration

// delays maps bucket names to their dynamic delay instance.
delays map[string]*dynamicDelay

// mu guards delays.
mu *sync.RWMutex
}

// newBucketDelayManager returns a new bucketDelayManager instance.
func newBucketDelayManager(targetPercentile float64, increaseRate float64, initialDelay, minDelay, maxDelay time.Duration) (*bucketDelayManager, error) {
err := validateDynamicDelayParams(targetPercentile, increaseRate, minDelay, maxDelay)
if err != nil {
return nil, err
}

return &bucketDelayManager{
targetPercentile: targetPercentile,
increaseRate: increaseRate,
initialDelay: initialDelay,
minDelay: minDelay,
maxDelay: maxDelay,
delays: make(map[string]*dynamicDelay),
mu: &sync.RWMutex{},
}, nil
}

// getDelay retrieves the dynamicDelay instance for the given bucket name. If no delay
// exists for the bucket, a new one is created with the configured parameters.
func (b *bucketDelayManager) getDelay(bucketName string) *dynamicDelay {
b.mu.RLock()
delay, ok := b.delays[bucketName]
b.mu.RUnlock()

if !ok {
b.mu.Lock()
defer b.mu.Unlock()

// Check again, as someone might create b/w the execution of mu.RUnlock() and mu.Lock().
delay, ok = b.delays[bucketName]
if !ok {
// Create a new dynamicDelay for the bucket if it doesn't exist
delay = newDynamicDelay(b.targetPercentile, b.increaseRate, b.initialDelay, b.minDelay, b.maxDelay)
b.delays[bucketName] = delay
}
}
return delay
}

// increase notes that the operation took longer than the delay for the given bucket.
func (b *bucketDelayManager) increase(bucketName string) {
b.getDelay(bucketName).increase()
}

// decrease notes that the operation completed before the delay for the given bucket.
func (b *bucketDelayManager) decrease(bucketName string) {
b.getDelay(bucketName).decrease()
}

// update updates the delay value for the bucket depending on the specified latency.
func (b *bucketDelayManager) update(bucketName string, latency time.Duration) {
b.getDelay(bucketName).update(latency)
}

// getValue returns the desired delay to wait before retrying the operation for the given bucket.
func (b *bucketDelayManager) getValue(bucketName string) time.Duration {
return b.getDelay(bucketName).getValue()
}
Loading
Loading