Skip to content

Commit

Permalink
Fix race when reporting internal cardinality metrics (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Apr 12, 2024
1 parent c83f06b commit 22fe011
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 25 deletions.
57 changes: 32 additions & 25 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"runtime"
"sync"
"unsafe"

"go.uber.org/atomic"
)

var (
Expand Down Expand Up @@ -304,41 +302,50 @@ func (r *scopeRegistry) reportInternalMetrics() {
return
}

counters, gauges, histograms, scopes := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
rootCounters, rootGauges, rootHistograms := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}
scopes.Inc() // Account for root scope.
var counters, gauges, histograms int64
var rootCounters, rootGauges, rootHistograms int64
scopes := 1 // Account for root scope.
r.ForEachScope(
func(ss *scope) {
ss.cm.RLock()
defer ss.cm.RUnlock()
counterSliceLen, gaugeSliceLen, histogramSliceLen := int64(len(ss.countersSlice)), int64(len(ss.gaugesSlice)), int64(len(ss.histogramsSlice))
counterSliceLen := int64(len(ss.countersSlice))
ss.cm.RUnlock()

ss.gm.RLock()
gaugeSliceLen := int64(len(ss.gaugesSlice))
ss.gm.RUnlock()

ss.hm.RLock()
histogramSliceLen := int64(len(ss.histogramsSlice))
ss.hm.RUnlock()

if ss.root { // Root scope is referenced across all buckets.
rootCounters.Store(counterSliceLen)
rootGauges.Store(gaugeSliceLen)
rootHistograms.Store(histogramSliceLen)
rootCounters = counterSliceLen
rootGauges = gaugeSliceLen
rootHistograms = histogramSliceLen
return
}
counters.Add(counterSliceLen)
gauges.Add(gaugeSliceLen)
histograms.Add(histogramSliceLen)
scopes.Inc()
counters += counterSliceLen
gauges += gaugeSliceLen
histograms += histogramSliceLen
scopes++
},
)

counters.Add(rootCounters.Load())
gauges.Add(rootGauges.Load())
histograms.Add(rootHistograms.Load())
counters += rootCounters
gauges += rootGauges
histograms += rootHistograms
if r.root.reporter != nil {
r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters.Load()))
r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges.Load()))
r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms.Load()))
r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes.Load()))
r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters))
r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges))
r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms))
r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes))
}

if r.root.cachedReporter != nil {
r.cachedCounterCardinalityGauge.ReportGauge(float64(counters.Load()))
r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges.Load()))
r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms.Load()))
r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes.Load()))
r.cachedCounterCardinalityGauge.ReportGauge(float64(counters))
r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges))
r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms))
r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes))
}
}
66 changes: 66 additions & 0 deletions scope_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ package tally

import (
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -257,3 +260,66 @@ func TestCachedReporterInternalMetricsAlloc(t *testing.T) {
)
}
}

func TestCachedReporterInternalMetricsConcurrent(t *testing.T) {
tr := newTestStatsReporter()
root, closer := NewRootScope(ScopeOptions{
CachedReporter: tr,
OmitCardinalityMetrics: false,
}, 0)
s := root.(*scope)

var wg sync.WaitGroup

done := make(chan struct{})
time.AfterFunc(time.Second, func() {
close(done)
})

wg.Add(1)
go func() {
defer wg.Done()
var i int
for {
select {
case <-done:
return
default:
}
suffix := strconv.Itoa(i)
tr.gg.Add(1)
tr.tg.Add(1)
tr.cg.Add(1)
s.Gauge("gauge-foo" + suffix).Update(42)
s.Timer("timer-foo" + suffix).Record(42)
s.Counter("counter-foo" + suffix).Inc(42)
i++
time.Sleep(time.Microsecond)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
// kick off report loop manually, so we can keep track of how many internal metrics
// we emitted.
tr.gg.Add(numInternalMetrics)
s.reportLoopRun()
}
}
}()
wg.Wait()

// Close should also trigger internal metric report.
tr.gg.Add(numInternalMetrics)
closer.Close()
}
38 changes: 38 additions & 0 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func newTestHistogramValue() *testHistogramValue {
}

type testStatsReporter struct {
mtx sync.Mutex

cg sync.WaitGroup
gg sync.WaitGroup
tg sync.WaitGroup
Expand All @@ -125,6 +127,9 @@ func newTestStatsReporter() *testStatsReporter {
}

func (r *testStatsReporter) getCounters() map[string]*testIntValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testIntValue, len(r.counters))
for k, v := range r.counters {
var (
Expand All @@ -142,6 +147,9 @@ func (r *testStatsReporter) getCounters() map[string]*testIntValue {
}

func (r *testStatsReporter) getGauges() map[string]*testFloatValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testFloatValue, len(r.gauges))
for k, v := range r.gauges {
var (
Expand All @@ -159,6 +167,9 @@ func (r *testStatsReporter) getGauges() map[string]*testFloatValue {
}

func (r *testStatsReporter) getTimers() map[string]*testIntValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testIntValue, len(r.timers))
for k, v := range r.timers {
var (
Expand All @@ -176,6 +187,9 @@ func (r *testStatsReporter) getTimers() map[string]*testIntValue {
}

func (r *testStatsReporter) getHistograms() map[string]*testHistogramValue {
r.mtx.Lock()
defer r.mtx.Unlock()

dst := make(map[string]*testHistogramValue, len(r.histograms))
for k, v := range r.histograms {
var (
Expand All @@ -202,6 +216,9 @@ func (r *testStatsReporter) WaitAll() {
func (r *testStatsReporter) AllocateCounter(
name string, tags map[string]string,
) CachedCount {
r.mtx.Lock()
defer r.mtx.Unlock()

counter := &testIntValue{
val: 0,
tags: tags,
Expand All @@ -212,6 +229,9 @@ func (r *testStatsReporter) AllocateCounter(
}

func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, value int64) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.counters[name] = &testIntValue{
val: value,
tags: tags,
Expand All @@ -222,6 +242,9 @@ func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, v
func (r *testStatsReporter) AllocateGauge(
name string, tags map[string]string,
) CachedGauge {
r.mtx.Lock()
defer r.mtx.Unlock()

gauge := &testFloatValue{
val: 0,
tags: tags,
Expand All @@ -232,6 +255,9 @@ func (r *testStatsReporter) AllocateGauge(
}

func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, value float64) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.gauges[name] = &testFloatValue{
val: value,
tags: tags,
Expand All @@ -242,6 +268,9 @@ func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, val
func (r *testStatsReporter) AllocateTimer(
name string, tags map[string]string,
) CachedTimer {
r.mtx.Lock()
defer r.mtx.Unlock()

timer := &testIntValue{
val: 0,
tags: tags,
Expand All @@ -252,6 +281,9 @@ func (r *testStatsReporter) AllocateTimer(
}

func (r *testStatsReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) {
r.mtx.Lock()
defer r.mtx.Unlock()

r.timers[name] = &testIntValue{
val: int64(interval),
tags: tags,
Expand Down Expand Up @@ -320,6 +352,9 @@ func (r *testStatsReporter) ReportHistogramValueSamples(
bucketUpperBound float64,
samples int64,
) {
r.mtx.Lock()
defer r.mtx.Unlock()

key := KeyForPrefixedStringMap(name, tags)
value, ok := r.histograms[key]
if !ok {
Expand All @@ -339,6 +374,9 @@ func (r *testStatsReporter) ReportHistogramDurationSamples(
bucketUpperBound time.Duration,
samples int64,
) {
r.mtx.Lock()
defer r.mtx.Unlock()

key := KeyForPrefixedStringMap(name, tags)
value, ok := r.histograms[key]
if !ok {
Expand Down

0 comments on commit 22fe011

Please sign in to comment.