-
Notifications
You must be signed in to change notification settings - Fork 11
/
sweeper.go
211 lines (175 loc) · 4.88 KB
/
sweeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package flow
import (
"math"
"sync"
"time"
"github.com/benbjohnson/clock"
)
// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13
// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper
var cl = clock.New()
// SetClock sets a clock to use in the sweeper.
// This will probably only ever be useful for testing purposes.
func SetClock(c clock.Clock) {
cl = c
}
// We tick every second.
var ewmaRate = time.Second
type sweeper struct {
sweepOnce sync.Once
snapshotMu sync.RWMutex
meters []*Meter
activeMeters int
lastUpdateTime time.Time
registerChannel chan *Meter
}
func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
}
func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
}
}
func (sw *sweeper) register(m *Meter) {
if m.registered {
// registered twice, move on.
return
}
m.registered = true
sw.meters = append(sw.meters, m)
}
func (sw *sweeper) runActive() {
ticker := cl.Ticker(ewmaRate)
defer ticker.Stop()
sw.lastUpdateTime = cl.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
newMeters := make([]*Meter, len(sw.meters))
copy(newMeters, sw.meters)
sw.meters = newMeters
}
select {
case <-ticker.C:
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
}
}
sw.meters = nil
// Till next time.
}
func (sw *sweeper) update() {
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()
now := cl.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff < 0 {
// we went back in time, skip this update.
// note: if we go _forward_ in time, we don't really care as
// we'll just log really low bandwidth for a second.
sw.lastUpdateTime = now
// update the totals but leave the rates alone.
for _, m := range sw.meters {
m.snapshot.Total = m.accumulator.Load()
}
return
} else if tdiff <= ewmaRate/10 {
// If the time-delta is too small, wait a bit. Otherwise, we can end up logging a
// very large spike.
//
// This won't fix the case where a user passes a large update (spanning multiple
// seconds) to `Meter.Mark`, but it will fix the case where the system fails to
// accurately schedule the sweeper goroutine.
return
}
sw.lastUpdateTime = now
timeMultiplier := float64(ewmaRate) / float64(tdiff)
// Calculate the bandwidth for all active meters.
for i, m := range sw.meters[:sw.activeMeters] {
total := m.accumulator.Load()
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)
if diff > 0 {
m.snapshot.LastUpdate = now
}
if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total
// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
continue
}
// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := m.accumulator.Swap(0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.
// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := m.accumulator.Add(swappedTotal)
// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
continue
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
m.accumulator.Add(^uint64(m.snapshot.Total - 1))
}
// Reset the rate, keep the total.
m.registered = false
m.snapshot.Rate = 0
sw.meters[i] = nil
}
// Re-add the total to all the newly active accumulators and set the snapshot to the total.
// 1. We don't do this on register to avoid having to take the snapshot lock.
// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
for _, m := range sw.meters[sw.activeMeters:] {
total := m.accumulator.Add(m.snapshot.Total)
if total > m.snapshot.Total {
m.snapshot.LastUpdate = now
}
m.snapshot.Total = total
}
// compress and trim the meter list
var newLen int
for _, m := range sw.meters {
if m != nil {
sw.meters[newLen] = m
newLen++
}
}
sw.meters = sw.meters[:newLen]
// Finally, mark all meters still in the list as "active".
sw.activeMeters = len(sw.meters)
}
func (sw *sweeper) Register(m *Meter) {
sw.sweepOnce.Do(sw.start)
sw.registerChannel <- m
}