Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT LAND] Process only changed metrics #236

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 75 additions & 20 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
done chan struct{}
wg sync.WaitGroup
root bool

counterChangeNotifyCh chan *counter
gaugeChangeNotifyCh chan *gauge
}

// ScopeOptions is a set of options to construct a scope.
Expand Down Expand Up @@ -166,23 +169,25 @@
}

s := &scope{
baseReporter: baseReporter,
bucketCache: newBucketCache(),
cachedReporter: opts.CachedReporter,
counters: make(map[string]*counter),
countersSlice: make([]*counter, 0, _defaultInitialSliceSize),
defaultBuckets: opts.DefaultBuckets,
done: make(chan struct{}),
gauges: make(map[string]*gauge),
gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize),
histograms: make(map[string]*histogram),
histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize),
prefix: sanitizer.Name(opts.Prefix),
reporter: opts.Reporter,
sanitizer: sanitizer,
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
baseReporter: baseReporter,
bucketCache: newBucketCache(),
cachedReporter: opts.CachedReporter,
counters: make(map[string]*counter),
countersSlice: make([]*counter, 0, _defaultInitialSliceSize),
defaultBuckets: opts.DefaultBuckets,
done: make(chan struct{}),
gauges: make(map[string]*gauge),
gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize),
histograms: make(map[string]*histogram),
histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize),
prefix: sanitizer.Name(opts.Prefix),
reporter: opts.Reporter,
sanitizer: sanitizer,
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
counterChangeNotifyCh: make(chan *counter, 1024),
Copy link
Collaborator

@prateek prateek Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm few thoughts about this channel

gaugeChangeNotifyCh: make(chan *gauge, 1024),
}

// NB(r): Take a copy of the tags on creation
Expand All @@ -196,7 +201,7 @@
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.reportLoop(interval)
s.processLoop(interval)
}()
}

Expand Down Expand Up @@ -249,7 +254,7 @@
}

// reportLoop is used by the root scope for periodic reporting
func (s *scope) reportLoop(interval time.Duration) {

Check failure on line 257 in scope.go

View workflow job for this annotation

GitHub Actions / test (1.18.x)

func (*scope).reportLoop is unused (U1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()

Expand Down Expand Up @@ -281,6 +286,48 @@
}
}

func (s *scope) processLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
counters := make([]*counter, 0, _defaultInitialSliceSize)
gauges := make([]*gauge, 0, _defaultInitialSliceSize)

defer ticker.Stop()
for {
select {
case c := <-s.counterChangeNotifyCh:
counters = append(counters, c)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if c is part of a subscope that's closed before you process it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, this will have the effect of flushing out any pending accumulated metrics which is desirable.

case g := <-s.gaugeChangeNotifyCh:
gauges = append(gauges, g)
case <-ticker.C:
s.reportChanges(counters, gauges)
s.cachedReporter.Flush()
// Reset the changed counters and gauges
var zeroCounter *counter
for i := range counters {
counters[i] = zeroCounter
}
counters = counters[:0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to set the elements in the slice to the zero value

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we talked about -- https://go.dev/play/p/WIRovDFJhE5


var zeroGauge *gauge
for i := range gauges {
gauges[i] = zeroGauge
}
gauges = gauges[:0]
default:
return
}
}
}

func (s *scope) reportChanges(counters []*counter, gauges []*gauge) {
for _, c := range counters {
c.cachedReport()
}
for _, g := range gauges {
g.cachedReport()
}
}

func (s *scope) Counter(name string) Counter {
name = s.sanitizer.Name(name)
if c, ok := s.counter(name); ok {
Expand All @@ -295,14 +342,18 @@
}

var cachedCounter CachedCount
var changeNotifyFn func(c *counter)
if s.cachedReporter != nil {
cachedCounter = s.cachedReporter.AllocateCounter(
s.fullyQualifiedName(name),
s.tags,
)
changeNotifyFn = func(c *counter) {
s.counterChangeNotifyCh <- c
}
}

c := newCounter(cachedCounter)
c := newCounter(cachedCounter, changeNotifyFn)
s.counters[name] = c
s.countersSlice = append(s.countersSlice, c)

Expand Down Expand Up @@ -331,13 +382,17 @@
}

var cachedGauge CachedGauge
var changeNotifyFn func(g *gauge)
if s.cachedReporter != nil {
cachedGauge = s.cachedReporter.AllocateGauge(
s.fullyQualifiedName(name), s.tags,
)
changeNotifyFn = func(g *gauge) {
s.gaugeChangeNotifyCh <- g
}
}

g := newGauge(cachedGauge)
g := newGauge(cachedGauge, changeNotifyFn)
s.gauges[name] = g
s.gaugesSlice = append(s.gaugesSlice, g)

Expand Down
3 changes: 3 additions & 0 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
timers: make(map[string]*timer),
bucketCache: parent.bucketCache,
done: make(chan struct{}),

counterChangeNotifyCh: parent.counterChangeNotifyCh,
gaugeChangeNotifyCh: parent.gaugeChangeNotifyCh,
}
subscopeBucket.s[key] = subscope
if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
Expand Down
60 changes: 43 additions & 17 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,30 @@ func (c *capabilities) Tagging() bool {
}

type counter struct {
prev int64
curr int64
cachedCount CachedCount
}

func newCounter(cachedCount CachedCount) *counter {
return &counter{cachedCount: cachedCount}
prev int64
curr int64
updated uint64
cachedCount CachedCount
changeNotifyFn func(c *counter)
}

func newCounter(
cachedCount CachedCount,
changeNotifyFn func(c *counter),
) *counter {
return &counter{
cachedCount: cachedCount,
changeNotifyFn: changeNotifyFn,
}
}

func (c *counter) Inc(v int64) {
atomic.AddInt64(&c.curr, v)
if c.changeNotifyFn != nil {
if atomic.SwapUint64(&c.updated, 1) == 0 {
c.changeNotifyFn(c)
}
}
}

func (c *counter) value() int64 {
Expand Down Expand Up @@ -99,26 +112,39 @@ func (c *counter) cachedReport() {
return
}

c.cachedCount.ReportCount(delta)
if atomic.SwapUint64(&c.updated, 0) == 1 {
c.cachedCount.ReportCount(delta)
}
}

func (c *counter) snapshot() int64 {
return atomic.LoadInt64(&c.curr) - atomic.LoadInt64(&c.prev)
}

type gauge struct {
updated uint64
curr uint64
cachedGauge CachedGauge
}

func newGauge(cachedGauge CachedGauge) *gauge {
return &gauge{cachedGauge: cachedGauge}
updated uint64
curr uint64
cachedGauge CachedGauge
changeNotifyFn func(g *gauge)
}

func newGauge(
cachedGauge CachedGauge,
changeNotifyFn func(g *gauge),
) *gauge {
return &gauge{
cachedGauge: cachedGauge,
changeNotifyFn: changeNotifyFn,
}
}

func (g *gauge) Update(v float64) {
atomic.StoreUint64(&g.curr, math.Float64bits(v))
atomic.StoreUint64(&g.updated, 1)
if atomic.SwapUint64(&g.updated, 1) == 0 {
if g.changeNotifyFn != nil {
g.changeNotifyFn(g)
}
}
}

func (g *gauge) value() float64 {
Expand Down Expand Up @@ -297,7 +323,7 @@ func newHistogram(
}

for i := range h.samples {
h.samples[i].counter = newCounter(nil)
h.samples[i].counter = newCounter(nil, nil)

if cachedHistogram != nil {
switch htype {
Expand Down
4 changes: 2 additions & 2 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *statsTestReporter) Capabilities() Capabilities {
func (r *statsTestReporter) Flush() {}

func TestCounter(t *testing.T) {
counter := newCounter(nil)
counter := newCounter(nil, nil)
r := newStatsTestReporter()

counter.Inc(1)
Expand All @@ -101,7 +101,7 @@ func TestCounter(t *testing.T) {
}

func TestGauge(t *testing.T) {
gauge := newGauge(nil)
gauge := newGauge(nil, nil)
r := newStatsTestReporter()

gauge.Update(42)
Expand Down
Loading