Skip to content

Commit

Permalink
chore: Drop the heap
Browse files Browse the repository at this point in the history
Use a slice of waiters, sort it each `Add` call.
Simpler, less efficient implementation.
  • Loading branch information
abhinav committed Sep 8, 2023
1 parent dee5232 commit 74f27a8
Showing 1 changed file with 24 additions and 70 deletions.
94 changes: 24 additions & 70 deletions internal/ztest/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package ztest

import (
"container/heap"
"sort"
"sync"
"time"
)
Expand All @@ -39,7 +39,7 @@ type MockClock struct {
// Each waiter knows the time at which it should be resolved.
// When the clock advances, all waiters that are in range are resolved
// in chronological order.
waiters waiters
waiters []waiter
}

// NewMockClock builds a new mock clock
Expand Down Expand Up @@ -88,6 +88,19 @@ func (c *MockClock) NewTicker(d time.Duration) *time.Ticker {
return &time.Ticker{C: ch}
}

// runAt schedules the given function to be run at the given time.
// The function runs without a lock held, so it may schedule more work.
func (c *MockClock) runAt(t time.Time, fn func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.waiters = append(c.waiters, waiter{until: t, fn: fn})
}

type waiter struct {
until time.Time
fn func()
}

// Add progresses time by the given duration.
// Other operations waiting for the time to advance
// will be resolved if they are within range.
Expand All @@ -105,16 +118,22 @@ func (c *MockClock) Add(d time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

sort.Slice(c.waiters, func(i, j int) bool {
return c.waiters[i].until.Before(c.waiters[j].until)
})

newTime := c.now.Add(d)
// newTime won't be recorded until the end of this method.
// This ensures that any waiters that are resolved
// are resolved at the time they were expecting.

for {
w, ok := c.waiters.PopLTE(newTime)
if !ok {
for len(c.waiters) > 0 {
w := c.waiters[0]
if w.until.After(newTime) {
break
}
c.waiters[0] = waiter{} // avoid memory leak
c.waiters = c.waiters[1:]

// The waiter is within range.
// Travel to the time of the waiter and resolve it.
Expand All @@ -132,68 +151,3 @@ func (c *MockClock) Add(d time.Duration) {

c.now = newTime
}

// runAt schedules the given function to be run at the given time.
// The function runs without a lock held, so it may schedule more work.
func (c *MockClock) runAt(t time.Time, fn func()) {
c.mu.Lock()
defer c.mu.Unlock()
c.waiters.Push(waiter{until: t, fn: fn})
}

type waiter struct {
until time.Time
fn func()
}

// waiters is a thread-safe collection of waiters
// with the next waiter to be resolved at the front.
//
// Use the methods on this type to manipulate the collection.
// Do not modify the slice directly.
type waiters struct{ heap waiterHeap }

// Push adds a new waiter to the collection.
func (w *waiters) Push(v waiter) {
heap.Push(&w.heap, v)
}

// PopLTE removes and returns the next waiter to be resolved
// if it is scheduled to be resolved at or before the given time.
//
// Returns false if there are no waiters in range.
func (w *waiters) PopLTE(t time.Time) (_ waiter, ok bool) {
if len(w.heap) == 0 || w.heap[0].until.After(t) {
return waiter{}, false
}

return heap.Pop(&w.heap).(waiter), true
}

// waiterHeap implements a min-heap of waiters based on their 'until' time.
//
// This is separate from the waiters type so that we can implement heap.Interface
// while still exposing a type-safe API on waiters.
type waiterHeap []waiter

var _ heap.Interface = (*waiterHeap)(nil)

func (h waiterHeap) Len() int { return len(h) }
func (h waiterHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h waiterHeap) Less(i, j int) bool {
return h[i].until.Before(h[j].until)
}

func (h *waiterHeap) Push(x interface{}) {
*h = append(*h, x.(waiter))
}

func (h *waiterHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1] = waiter{} // avoid memory leak
*h = old[:n-1]
return x
}

0 comments on commit 74f27a8

Please sign in to comment.