diff --git a/scope_registry.go b/scope_registry.go index 57904c0..1692829 100644 --- a/scope_registry.go +++ b/scope_registry.go @@ -25,8 +25,6 @@ import ( "runtime" "sync" "unsafe" - - "go.uber.org/atomic" ) var ( @@ -304,41 +302,50 @@ func (r *scopeRegistry) reportInternalMetrics() { return } - counters, gauges, histograms, scopes := atomic.Int64{}, atomic.Int64{}, atomic.Int64{}, atomic.Int64{} - rootCounters, rootGauges, rootHistograms := atomic.Int64{}, atomic.Int64{}, atomic.Int64{} - scopes.Inc() // Account for root scope. + var counters, gauges, histograms int64 + var rootCounters, rootGauges, rootHistograms int64 + scopes := 1 // Account for root scope. r.ForEachScope( func(ss *scope) { ss.cm.RLock() - defer ss.cm.RUnlock() - counterSliceLen, gaugeSliceLen, histogramSliceLen := int64(len(ss.countersSlice)), int64(len(ss.gaugesSlice)), int64(len(ss.histogramsSlice)) + counterSliceLen := int64(len(ss.countersSlice)) + ss.cm.RUnlock() + + ss.gm.RLock() + gaugeSliceLen := int64(len(ss.gaugesSlice)) + ss.gm.RUnlock() + + ss.hm.RLock() + histogramSliceLen := int64(len(ss.histogramsSlice)) + ss.hm.RUnlock() + if ss.root { // Root scope is referenced across all buckets. - rootCounters.Store(counterSliceLen) - rootGauges.Store(gaugeSliceLen) - rootHistograms.Store(histogramSliceLen) + rootCounters = counterSliceLen + rootGauges = gaugeSliceLen + rootHistograms = histogramSliceLen return } - counters.Add(counterSliceLen) - gauges.Add(gaugeSliceLen) - histograms.Add(histogramSliceLen) - scopes.Inc() + counters += counterSliceLen + gauges += gaugeSliceLen + histograms += histogramSliceLen + scopes++ }, ) - counters.Add(rootCounters.Load()) - gauges.Add(rootGauges.Load()) - histograms.Add(rootHistograms.Load()) + counters += rootCounters + gauges += rootGauges + histograms += rootHistograms if r.root.reporter != nil { - r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters.Load())) - r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges.Load())) - r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms.Load())) - r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes.Load())) + r.root.reporter.ReportGauge(r.sanitizedCounterCardinalityName, r.cardinalityMetricsTags, float64(counters)) + r.root.reporter.ReportGauge(r.sanitizedGaugeCardinalityName, r.cardinalityMetricsTags, float64(gauges)) + r.root.reporter.ReportGauge(r.sanitizedHistogramCardinalityName, r.cardinalityMetricsTags, float64(histograms)) + r.root.reporter.ReportGauge(r.sanitizedScopeCardinalityName, r.cardinalityMetricsTags, float64(scopes)) } if r.root.cachedReporter != nil { - r.cachedCounterCardinalityGauge.ReportGauge(float64(counters.Load())) - r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges.Load())) - r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms.Load())) - r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes.Load())) + r.cachedCounterCardinalityGauge.ReportGauge(float64(counters)) + r.cachedGaugeCardinalityGauge.ReportGauge(float64(gauges)) + r.cachedHistogramCardinalityGauge.ReportGauge(float64(histograms)) + r.cachedScopeCardinalityGauge.ReportGauge(float64(scopes)) } } diff --git a/scope_registry_test.go b/scope_registry_test.go index 66dc189..6647fa3 100644 --- a/scope_registry_test.go +++ b/scope_registry_test.go @@ -22,7 +22,10 @@ package tally import ( "fmt" + "strconv" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -257,3 +260,66 @@ func TestCachedReporterInternalMetricsAlloc(t *testing.T) { ) } } + +func TestCachedReporterInternalMetricsConcurrent(t *testing.T) { + tr := newTestStatsReporter() + root, closer := NewRootScope(ScopeOptions{ + CachedReporter: tr, + OmitCardinalityMetrics: false, + }, 0) + s := root.(*scope) + + var wg sync.WaitGroup + + done := make(chan struct{}) + time.AfterFunc(time.Second, func() { + close(done) + }) + + wg.Add(1) + go func() { + defer wg.Done() + var i int + for { + select { + case <-done: + return + default: + } + suffix := strconv.Itoa(i) + tr.gg.Add(1) + tr.tg.Add(1) + tr.cg.Add(1) + s.Gauge("gauge-foo" + suffix).Update(42) + s.Timer("timer-foo" + suffix).Record(42) + s.Counter("counter-foo" + suffix).Inc(42) + i++ + time.Sleep(time.Microsecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ticker.C: + // kick off report loop manually, so we can keep track of how many internal metrics + // we emitted. + tr.gg.Add(numInternalMetrics) + s.reportLoopRun() + } + } + }() + wg.Wait() + + // Close should also trigger internal metric report. + tr.gg.Add(numInternalMetrics) + closer.Close() +} diff --git a/scope_test.go b/scope_test.go index eb1a879..85440da 100644 --- a/scope_test.go +++ b/scope_test.go @@ -101,6 +101,8 @@ func newTestHistogramValue() *testHistogramValue { } type testStatsReporter struct { + mtx sync.Mutex + cg sync.WaitGroup gg sync.WaitGroup tg sync.WaitGroup @@ -125,6 +127,9 @@ func newTestStatsReporter() *testStatsReporter { } func (r *testStatsReporter) getCounters() map[string]*testIntValue { + r.mtx.Lock() + defer r.mtx.Unlock() + dst := make(map[string]*testIntValue, len(r.counters)) for k, v := range r.counters { var ( @@ -142,6 +147,9 @@ func (r *testStatsReporter) getCounters() map[string]*testIntValue { } func (r *testStatsReporter) getGauges() map[string]*testFloatValue { + r.mtx.Lock() + defer r.mtx.Unlock() + dst := make(map[string]*testFloatValue, len(r.gauges)) for k, v := range r.gauges { var ( @@ -159,6 +167,9 @@ func (r *testStatsReporter) getGauges() map[string]*testFloatValue { } func (r *testStatsReporter) getTimers() map[string]*testIntValue { + r.mtx.Lock() + defer r.mtx.Unlock() + dst := make(map[string]*testIntValue, len(r.timers)) for k, v := range r.timers { var ( @@ -176,6 +187,9 @@ func (r *testStatsReporter) getTimers() map[string]*testIntValue { } func (r *testStatsReporter) getHistograms() map[string]*testHistogramValue { + r.mtx.Lock() + defer r.mtx.Unlock() + dst := make(map[string]*testHistogramValue, len(r.histograms)) for k, v := range r.histograms { var ( @@ -202,6 +216,9 @@ func (r *testStatsReporter) WaitAll() { func (r *testStatsReporter) AllocateCounter( name string, tags map[string]string, ) CachedCount { + r.mtx.Lock() + defer r.mtx.Unlock() + counter := &testIntValue{ val: 0, tags: tags, @@ -212,6 +229,9 @@ func (r *testStatsReporter) AllocateCounter( } func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, value int64) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.counters[name] = &testIntValue{ val: value, tags: tags, @@ -222,6 +242,9 @@ func (r *testStatsReporter) ReportCounter(name string, tags map[string]string, v func (r *testStatsReporter) AllocateGauge( name string, tags map[string]string, ) CachedGauge { + r.mtx.Lock() + defer r.mtx.Unlock() + gauge := &testFloatValue{ val: 0, tags: tags, @@ -232,6 +255,9 @@ func (r *testStatsReporter) AllocateGauge( } func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, value float64) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.gauges[name] = &testFloatValue{ val: value, tags: tags, @@ -242,6 +268,9 @@ func (r *testStatsReporter) ReportGauge(name string, tags map[string]string, val func (r *testStatsReporter) AllocateTimer( name string, tags map[string]string, ) CachedTimer { + r.mtx.Lock() + defer r.mtx.Unlock() + timer := &testIntValue{ val: 0, tags: tags, @@ -252,6 +281,9 @@ func (r *testStatsReporter) AllocateTimer( } func (r *testStatsReporter) ReportTimer(name string, tags map[string]string, interval time.Duration) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.timers[name] = &testIntValue{ val: int64(interval), tags: tags, @@ -320,6 +352,9 @@ func (r *testStatsReporter) ReportHistogramValueSamples( bucketUpperBound float64, samples int64, ) { + r.mtx.Lock() + defer r.mtx.Unlock() + key := KeyForPrefixedStringMap(name, tags) value, ok := r.histograms[key] if !ok { @@ -339,6 +374,9 @@ func (r *testStatsReporter) ReportHistogramDurationSamples( bucketUpperBound time.Duration, samples int64, ) { + r.mtx.Lock() + defer r.mtx.Unlock() + key := KeyForPrefixedStringMap(name, tags) value, ok := r.histograms[key] if !ok {