From 7e35a9040b76249e66855932183c3b21d9e18da4 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:48:53 -0400 Subject: [PATCH 01/15] fix: debug incoming_queue and peer_queue throughput --- collect/collect.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 17c6412654..216bcd5150 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -158,6 +158,8 @@ var inMemCollectorMetrics = []metrics.Metadata{ {Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"}, {Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"}, {Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"}, + {Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept decision message received"}, + {Name: "drop_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of drop decision message received"}, } func (i *InMemCollector) Start() error { @@ -210,7 +212,7 @@ func (i *InMemCollector) Start() error { i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions) i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions) - i.dropDecisionBatch = make(chan string, 1000) + i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*3) } // spin up one collector because this is a single threaded collector @@ -401,13 +403,6 @@ func (i *InMemCollector) collect() { return case <-i.redistributeTimer.Notify(): i.redistributeTraces(ctx) - case msg, ok := <-i.keptDecisionMessages: - if !ok { - // channel's been closed; we should shut down. - return - } - - i.processKeptDecision(msg) case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. @@ -1463,6 +1458,7 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st func (i *InMemCollector) processDropDecisions(msg string) { ids := newDroppedTraceDecision(msg) + i.Metrics.Increment("drop_decisions_received") if len(ids) == 0 { return @@ -1492,6 +1488,8 @@ func (i *InMemCollector) processKeptDecision(msg string) { i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err) return } + i.Metrics.Increment("kept_decisions_received") + toDelete := generics.NewSet[string]() trace := i.cache.Get(td.TraceID) // if we don't have the trace in the cache, we don't need to do anything From 8e8cc197d55585a9e63aafe24e44f696f6a89fe1 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:18:45 -0400 Subject: [PATCH 02/15] add duration metrics --- collect/collect.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/collect/collect.go b/collect/collect.go index 216bcd5150..be7cb7b349 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -564,6 +564,9 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. defer span.End() startTime := time.Now() + defer func() { + i.Metrics.Histogram("collector_send_expired_traces_in_cache_dur_ms", time.Since(startTime).Milliseconds()) + }() expiredTraces := make([]*types.Trace, 0) traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout() var orphanTraceCount int @@ -1457,6 +1460,11 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st } func (i *InMemCollector) processDropDecisions(msg string) { + start := time.Now() + defer func() { + i.Metrics.Histogram("collector_process_drop_decisions_dur_ms", time.Since(start).Milliseconds()) + }() + ids := newDroppedTraceDecision(msg) i.Metrics.Increment("drop_decisions_received") @@ -1483,6 +1491,11 @@ func (i *InMemCollector) processDropDecisions(msg string) { } func (i *InMemCollector) processKeptDecision(msg string) { + start := time.Now() + defer func() { + i.Metrics.Histogram("collector_process_kept_decisions_dur_ms", time.Since(start).Milliseconds()) + }() + td, err := newKeptTraceDecision(msg) if err != nil { i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err) @@ -1596,6 +1609,11 @@ func (i *InMemCollector) IsMyTrace(traceID string) bool { } func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) { + start := time.Now() + defer func() { + i.Metrics.Histogram("collector_publish_trace_decision_dur_ms", time.Since(start).Milliseconds()) + }() + var ( decisionMsg string err error From 0532b2c5dfd68bf5bdc547a37e95c28121f7aaaf Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:13:47 -0500 Subject: [PATCH 03/15] change orphan traces timeout defintiion --- collect/collect.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index be7cb7b349..0972f35de6 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -578,14 +578,14 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. timeoutDuration := now.Sub(t.SendBy) // if a trace has expired more than 4 times the trace timeout, we should just make a decision for it // instead of waiting for the decider node - if timeoutDuration > traceTimeout*4 { + if timeoutDuration > traceTimeout*8 { orphanTraceCount++ return true } // if a trace has expired more than 2 times the trace timeout, we should forward it to its decider // and wait for the decider to publish the trace decision again - if timeoutDuration > traceTimeout*2 { + if timeoutDuration > traceTimeout*4 { expiredTraces = append(expiredTraces, t) } From 8227b48f0633c4fa494a3660b95b8e585e03139e Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 16:00:57 -0500 Subject: [PATCH 04/15] only allow stress relief activation for the entire cluster --- collect/collect.go | 9 +++++---- collect/stressRelief.go | 30 +++++++++++++++++++----------- types/event.go | 3 ++- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 0972f35de6..b6a52e8c1e 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -167,7 +167,7 @@ func (i *InMemCollector) Start() error { defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }() imcConfig := i.Config.GetCollectionConfig() i.cache = cache.NewInMemCache(imcConfig.CacheCapacity, i.Metrics, i.Logger) - i.StressRelief.UpdateFromConfig(i.Config.GetStressReliefConfig()) + i.StressRelief.UpdateFromConfig() // listen for config reloads i.Config.RegisterReloadCallback(i.sendReloadSignal) @@ -578,15 +578,17 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. timeoutDuration := now.Sub(t.SendBy) // if a trace has expired more than 4 times the trace timeout, we should just make a decision for it // instead of waiting for the decider node - if timeoutDuration > traceTimeout*8 { + if timeoutDuration > traceTimeout*4 { orphanTraceCount++ return true } // if a trace has expired more than 2 times the trace timeout, we should forward it to its decider // and wait for the decider to publish the trace decision again - if timeoutDuration > traceTimeout*4 { + // only retry it once + if timeoutDuration > traceTimeout*2 && !t.Retried { expiredTraces = append(expiredTraces, t) + t.Retried = true } // by returning false we will not remove the trace from the cache @@ -1521,7 +1523,6 @@ func (i *InMemCollector) processKeptDecision(msg string) { i.cache.RemoveTraces(toDelete) } func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) { - if trace.Sent { return nil, errors.New("trace already sent") } diff --git a/collect/stressRelief.go b/collect/stressRelief.go index 83da176283..76bdea9df3 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -23,7 +23,7 @@ import ( const stressReliefTopic = "refinery-stress-relief" type StressReliever interface { - UpdateFromConfig(cfg config.StressReliefConfig) + UpdateFromConfig() Recalc() uint Stressed() bool GetSampleRate(traceID string) (rate uint, keep bool, reason string) @@ -41,10 +41,10 @@ type MockStressReliever struct { SampleRate uint } -func (m *MockStressReliever) Start() error { return nil } -func (m *MockStressReliever) UpdateFromConfig(cfg config.StressReliefConfig) {} -func (m *MockStressReliever) Recalc() uint { return 0 } -func (m *MockStressReliever) Stressed() bool { return m.IsStressed } +func (m *MockStressReliever) Start() error { return nil } +func (m *MockStressReliever) UpdateFromConfig() {} +func (m *MockStressReliever) Recalc() uint { return 0 } +func (m *MockStressReliever) Stressed() bool { return m.IsStressed } func (m *MockStressReliever) GetSampleRate(traceID string) (rate uint, keep bool, reason string) { return m.SampleRate, m.ShouldKeep, "mock" } @@ -76,6 +76,7 @@ var _ StressReliever = &StressRelief{} type StressRelief struct { RefineryMetrics metrics.Metrics `inject:"metrics"` + Config config.Config `inject:""` Logger logger.Logger `inject:""` Health health.Recorder `inject:""` PubSub pubsub.PubSub `inject:""` @@ -141,8 +142,8 @@ func (s *StressRelief) Start() error { // All of the numerator metrics are gauges. The denominator metrics are constants. s.calcs = []StressReliefCalculation{ - {Numerator: "collector_peer_queue_length", Denominator: "PEER_CAP", Algorithm: "sqrt", Reason: "CacheCapacity (peer)"}, - {Numerator: "collector_incoming_queue_length", Denominator: "INCOMING_CAP", Algorithm: "sqrt", Reason: "CacheCapacity (incoming)"}, + {Numerator: "collector_peer_queue_length", Denominator: "PEER_CAP", Algorithm: "sqrt", Reason: "PeerQueueSize"}, + {Numerator: "collector_incoming_queue_length", Denominator: "INCOMING_CAP", Algorithm: "sqrt", Reason: "IncomingQueueSize"}, {Numerator: "libhoney_peer_queue_length", Denominator: "PEER_BUFFER_SIZE", Algorithm: "sqrt", Reason: "PeerBufferSize"}, {Numerator: "libhoney_upstream_queue_length", Denominator: "UPSTREAM_BUFFER_SIZE", Algorithm: "sqrt", Reason: "UpstreamBufferSize"}, {Numerator: "memory_heap_allocation", Denominator: "MEMORY_MAX_ALLOC", Algorithm: "sigmoid", Reason: "MaxAlloc"}, @@ -248,10 +249,12 @@ func (s *StressRelief) onStressLevelUpdate(ctx context.Context, msg string) { } } -func (s *StressRelief) UpdateFromConfig(cfg config.StressReliefConfig) { +func (s *StressRelief) UpdateFromConfig() { s.lock.Lock() defer s.lock.Unlock() + cfg := s.Config.GetStressReliefConfig() + switch cfg.Mode { case "never", "": s.mode = Never @@ -416,9 +419,14 @@ func (s *StressRelief) Recalc() uint { s.lock.Lock() defer s.lock.Unlock() - // The overall stress level is the max of the individual and cluster stress levels - // If a single node is under significant stress, it can activate stress relief mode - s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel))) + overallStressLevel := clusterStressLevel + + if s.Config.GetCollectionConfig().EnableTraceLocality { + // The overall stress level is the max of the individual and cluster stress levels + // If a single node is under significant stress, it can activate stress relief mode + overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel))) + } + s.overallStressLevel = overallStressLevel s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel) s.reason = reason diff --git a/types/event.go b/types/event.go index dd891e4b14..903c04fead 100644 --- a/types/event.go +++ b/types/event.go @@ -50,7 +50,8 @@ type Trace struct { Sent bool keptReason uint - SendBy time.Time + SendBy time.Time + Retried bool // ArrivalTime is the server time when the first span arrived for this trace. // Used to calculate how long traces spend sitting in Refinery From c279d841fda474c340bb8be770e6089d290e5b5f Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 16:05:23 -0500 Subject: [PATCH 05/15] fix build --- collect/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collect/collect.go b/collect/collect.go index b6a52e8c1e..ee512d548b 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -240,7 +240,7 @@ func (i *InMemCollector) reloadConfigs() { i.sampleTraceCache.Resize(i.Config.GetSampleCacheConfig()) - i.StressRelief.UpdateFromConfig(i.Config.GetStressReliefConfig()) + i.StressRelief.UpdateFromConfig() // clear out any samplers that we have previously created // so that the new configuration will be propagated From db1fe8d9b5d6c0a0ee454a9f8ca62a7671c26a7e Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 16:17:20 -0500 Subject: [PATCH 06/15] fix tests --- collect/stress_relief_test.go | 56 ++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/collect/stress_relief_test.go b/collect/stress_relief_test.go index 6db1ee848b..8a250138e4 100644 --- a/collect/stress_relief_test.go +++ b/collect/stress_relief_test.go @@ -33,17 +33,18 @@ func TestStressRelief_Monitor(t *testing.T) { }) sr.RefineryMetrics.Store("INCOMING_CAP", 1200) - - cfg := config.StressReliefConfig{ - Mode: "monitor", - ActivationLevel: 80, - DeactivationLevel: 50, - SamplingRate: 2, - MinimumActivationDuration: config.Duration(5 * time.Second), + sr.Config = &config.MockConfig{ + StressRelief: config.StressReliefConfig{ + Mode: "monitor", + ActivationLevel: 80, + DeactivationLevel: 50, + SamplingRate: 2, + MinimumActivationDuration: config.Duration(5 * time.Second), + }, } // On startup, the stress relief should not be active - sr.UpdateFromConfig(cfg) + sr.UpdateFromConfig() require.False(t, sr.Stressed()) // Test 1 @@ -90,17 +91,18 @@ func TestStressRelief_Peer(t *testing.T) { }) sr.RefineryMetrics.Store("INCOMING_CAP", 1200) - - cfg := config.StressReliefConfig{ - Mode: "monitor", - ActivationLevel: 80, - DeactivationLevel: 65, - SamplingRate: 2, - MinimumActivationDuration: config.Duration(5 * time.Second), + sr.Config = &config.MockConfig{ + StressRelief: config.StressReliefConfig{ + Mode: "monitor", + ActivationLevel: 80, + DeactivationLevel: 65, + SamplingRate: 2, + MinimumActivationDuration: config.Duration(5 * time.Second), + }, } // On startup, the stress relief should not be active - sr.UpdateFromConfig(cfg) + sr.UpdateFromConfig() require.False(t, sr.Stressed()) // activate stress relief in one refinery @@ -136,7 +138,9 @@ func TestStressRelief_Peer(t *testing.T) { }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") } -func TestStressRelief_OverallStressLevel(t *testing.T) { +//TODO: Add a test for OverallStressLevel calculation with EnableTraceLocality to false + +func TestStressRelief_OverallStressLevel_EnableTraceLocality(t *testing.T) { clock := clockwork.NewFakeClock() sr, stop := newStressRelief(t, clock, nil) defer stop() @@ -151,16 +155,20 @@ func TestStressRelief_OverallStressLevel(t *testing.T) { }) sr.RefineryMetrics.Store("INCOMING_CAP", 1200) - - cfg := config.StressReliefConfig{ - Mode: "monitor", - ActivationLevel: 80, - DeactivationLevel: 65, - MinimumActivationDuration: config.Duration(5 * time.Second), + sr.Config = &config.MockConfig{ + StressRelief: config.StressReliefConfig{ + Mode: "monitor", + ActivationLevel: 80, + DeactivationLevel: 65, + MinimumActivationDuration: config.Duration(5 * time.Second), + }, + GetCollectionConfigVal: config.CollectionConfig{ + EnableTraceLocality: true, + }, } // On startup, the stress relief should not be active - sr.UpdateFromConfig(cfg) + sr.UpdateFromConfig() require.False(t, sr.Stressed()) // Test 1 From 3690252e282aaeae10413f2b0330892c288dae83 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:14:11 -0500 Subject: [PATCH 07/15] separate publishing kept decisions into its own goroutine --- collect/collect.go | 62 ++++++++++++++++++++++++++--------------- collect/collect_test.go | 19 +++++++++++++ 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index ee512d548b..239e105306 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -115,8 +115,9 @@ type InMemCollector struct { dropDecisionMessages chan string keptDecisionMessages chan string - dropDecisionBatch chan string - hostname string + dropDecisionBatch chan string + keptDecisionBuffer chan string + hostname string } var inMemCollectorMetrics = []metrics.Metadata{ @@ -212,7 +213,8 @@ func (i *InMemCollector) Start() error { i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions) i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions) - i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*3) + i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5) + i.keptDecisionBuffer = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize) } // spin up one collector because this is a single threaded collector @@ -220,6 +222,7 @@ func (i *InMemCollector) Start() error { go i.sendTraces() // spin up a drop decision batch sender go i.sendDropDecisions() + go i.sendKeptDecisions() return nil } @@ -294,6 +297,9 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) { totalDataSizeSent := 0 tracesSent := generics.NewSet[string]() // Send the traces we can't keep. + // should we also send off orphan traces here? + // not expired, expired, orphaned + // if it's more than 2 times of the trace timeout and it's not my trace, then it should be eligible to eject for _, trace := range allTraces { if !i.IsMyTrace(trace.ID()) { i.Logger.Debug().WithFields(map[string]interface{}{ @@ -656,6 +662,9 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { span.End() }() + // TODO: + // if we check trace ownership here before we add it into the cache + // then we don't need to run redistribution multiple times on one signal tcfg := i.Config.GetTracesConfig() trace := i.cache.Get(sp.TraceID) @@ -1070,6 +1079,7 @@ func (i *InMemCollector) Stop() error { if !i.Config.GetCollectionConfig().EnableTraceLocality { close(i.dropDecisionBatch) + close(i.keptDecisionBuffer) } return nil @@ -1622,33 +1632,41 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis if td.Kept { decisionMsg, err = newKeptDecisionMessage(td) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": td.TraceID, + "kept": td.Kept, + "reason": td.KeptReason, + "sampler": td.SamplerKey, + "selector": td.SamplerSelector, + "error": err.Error(), + }).Logf("Failed to create trace decision message") + return + } + + i.keptDecisionBuffer <- decisionMsg + return } else { // if we're dropping the trace, we should add it to the batch so we can send it later i.dropDecisionBatch <- td.TraceID return } +} - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "trace_id": td.TraceID, - "kept": td.Kept, - "reason": td.KeptReason, - "sampler": td.SamplerKey, - "selector": td.SamplerSelector, - "error": err.Error(), - }).Logf("Failed to create trace decision message") +func (i *InMemCollector) sendKeptDecisions() { + if i.Config.GetCollectionConfig().EnableTraceLocality { + return } - err = i.PubSub.Publish(ctx, keptTraceDecisionTopic, decisionMsg) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "trace_id": td.TraceID, - "kept": td.Kept, - "reason": td.KeptReason, - "sampler": td.SamplerKey, - "selector": td.SamplerSelector, - "error": err.Error(), - }).Logf("Failed to publish trace decision") + ctx := context.Background() + for msg := range i.keptDecisionBuffer { + err := i.PubSub.Publish(ctx, keptTraceDecisionTopic, msg) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "error": err.Error(), + }).Logf("Failed to publish trace decision") + } + } } diff --git a/collect/collect_test.go b/collect/collect_test.go index 972db401ba..0b89afcf00 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -133,6 +133,7 @@ func TestAddRootSpan(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -265,6 +266,8 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.dropDecisionBatch = make(chan string, 5) + coll.keptDecisionBuffer = make(chan string, 5) + coll.outgoingTraces = make(chan sendableTrace, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() @@ -366,6 +369,8 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) + coll.keptDecisionBuffer = make(chan string, 5) + coll.outgoingTraces = make(chan sendableTrace, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() @@ -424,6 +429,7 @@ func TestAddSpan(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -530,6 +536,7 @@ func TestDryRunMode(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -756,6 +763,7 @@ func TestStableMaxAlloc(t *testing.T) { coll.incoming = make(chan *types.Span, 1000) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 500) + coll.keptDecisionBuffer = make(chan string, 500) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -948,6 +956,7 @@ func TestAddCountsToRoot(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1038,6 +1047,7 @@ func TestLateRootGetsCounts(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1132,6 +1142,7 @@ func TestAddSpanCount(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1223,6 +1234,7 @@ func TestLateRootGetsSpanCount(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1301,6 +1313,7 @@ func TestLateSpanNotDecorated(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1373,6 +1386,7 @@ func TestAddAdditionalAttributes(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1534,6 +1548,7 @@ func TestStressReliefDecorateHostname(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1641,6 +1656,7 @@ func TestSpanWithRuleReasons(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -1850,6 +1866,7 @@ func TestDrainTracesOnShutdown(t *testing.T) { coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) sentTraceChan := make(chan sentRecord, 1) @@ -1980,6 +1997,7 @@ func TestBigTracesGoEarly(t *testing.T) { coll.incoming = make(chan *types.Span, 500) coll.fromPeer = make(chan *types.Span, 500) coll.outgoingTraces = make(chan sendableTrace, 500) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() go coll.sendTraces() @@ -2213,6 +2231,7 @@ func TestExpiredTracesCleanup(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) for _, traceID := range peerTraceIDs { From 90aa76f37b39519ab7673b7ddfb4733f7f9d8cdb Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:19:17 -0500 Subject: [PATCH 08/15] prioritize incoming traffic --- collect/collect.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/collect/collect.go b/collect/collect.go index 239e105306..5eee08485b 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -409,6 +409,13 @@ func (i *InMemCollector) collect() { return case <-i.redistributeTimer.Notify(): i.redistributeTraces(ctx) + case sp, ok := <-i.incoming: + if !ok { + // channel's been closed; we should shut down. + span.End() + return + } + i.processSpan(ctx, sp) case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. From 0a32461066576650669033096f02383518f54e38 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:17:06 -0500 Subject: [PATCH 09/15] add metrics for trace decisions queue overflow --- collect/collect.go | 19 ++++++++++++++++--- collect/collect_test.go | 3 +++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 5eee08485b..1a8f2848d0 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -161,6 +161,8 @@ var inMemCollectorMetrics = []metrics.Metadata{ {Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"}, {Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept decision message received"}, {Name: "drop_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of drop decision message received"}, + {Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"}, + {Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"}, } func (i *InMemCollector) Start() error { @@ -214,7 +216,7 @@ func (i *InMemCollector) Start() error { i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions) i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5) - i.keptDecisionBuffer = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize) + i.keptDecisionBuffer = make(chan string, 100_000) } // spin up one collector because this is a single threaded collector @@ -1651,11 +1653,22 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis return } - i.keptDecisionBuffer <- decisionMsg + select { + case i.keptDecisionBuffer <- decisionMsg: + default: + i.Metrics.Increment("collector_kept_decisions_queue_full") + i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message") + } return } else { // if we're dropping the trace, we should add it to the batch so we can send it later - i.dropDecisionBatch <- td.TraceID + + select { + case i.dropDecisionBatch <- td.TraceID: + default: + i.Metrics.Increment("collector_drop_decisions_queue_full") + i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message") + } return } } diff --git a/collect/collect_test.go b/collect/collect_test.go index 0b89afcf00..6ca2ac48b8 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -985,6 +985,8 @@ func TestAddCountsToRoot(t *testing.T) { coll.AddSpanFromPeer(span) } + time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2) + // ok now let's add the root span and verify that both got sent rootSpan := &types.Span{ TraceID: traceID, @@ -996,6 +998,7 @@ func TestAddCountsToRoot(t *testing.T) { IsRoot: true, } coll.AddSpan(rootSpan) + events := transmission.GetBlock(3) assert.Equal(t, 3, len(events), "adding a root span should send all spans in the trace") assert.Equal(t, nil, events[0].Data["meta.span_count"], "child span metadata should NOT be populated with span count") From 470b188a8111a0a63efc911daf5ff21513e489d7 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:33:46 -0500 Subject: [PATCH 10/15] add missing return in processSpan after dealing with sent trace --- collect/collect.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collect/collect.go b/collect/collect.go index 1a8f2848d0..ccdd16d58f 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -724,6 +724,7 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { // we will just use the default late span reason as the sent reason which is // set inside the dealWithSentTrace function i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp) + return } // great! trace is live. add the span. From bb4a6474689a1e9ff1237fd990c66483a43d169e Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 5 Nov 2024 10:43:25 -0500 Subject: [PATCH 11/15] revert previous commit --- collect/collect.go | 1 - 1 file changed, 1 deletion(-) diff --git a/collect/collect.go b/collect/collect.go index ccdd16d58f..1a8f2848d0 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -724,7 +724,6 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { // we will just use the default late span reason as the sent reason which is // set inside the dealWithSentTrace function i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp) - return } // great! trace is live. add the span. From 0a7358640458996e1ac3138943a0cd1a89be388f Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:09:14 -0500 Subject: [PATCH 12/15] do not add signal spans into the trace cache --- collect/collect.go | 49 +++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 1a8f2848d0..98c3902819 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -310,7 +310,7 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) { continue } - td, err := i.makeDecision(trace, TraceSendEjectedMemsize) + td, err := i.makeDecision(ctx, trace, TraceSendEjectedMemsize) if err != nil { continue } @@ -411,20 +411,13 @@ func (i *InMemCollector) collect() { return case <-i.redistributeTimer.Notify(): i.redistributeTraces(ctx) - case sp, ok := <-i.incoming: - if !ok { - // channel's been closed; we should shut down. - span.End() - return - } - i.processSpan(ctx, sp) case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. span.End() return } - i.processSpan(ctx, sp) + i.processSpan(ctx, sp, "peer") default: select { case <-i.done: @@ -468,14 +461,14 @@ func (i *InMemCollector) collect() { span.End() return } - i.processSpan(ctx, sp) + i.processSpan(ctx, sp, "incoming") case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. span.End() return } - i.processSpan(ctx, sp) + i.processSpan(ctx, sp, "peer") case <-i.reload: i.reloadConfigs() } @@ -622,23 +615,24 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. var totalSpansSent int64 for _, t := range traces { + ctx, span := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTrace") totalSpansSent += int64(t.DescendantCount()) if t.RootSpan != nil { - td, err := i.makeDecision(t, TraceSendGotRoot) + td, err := i.makeDecision(ctx, t, TraceSendGotRoot) if err != nil { continue } i.send(ctx, t, td) } else { if spanLimit > 0 && t.DescendantCount() > spanLimit { - td, err := i.makeDecision(t, TraceSendSpanLimit) + td, err := i.makeDecision(ctx, t, TraceSendSpanLimit) if err != nil { continue } i.send(ctx, t, td) } else { - td, err := i.makeDecision(t, TraceSendExpired) + td, err := i.makeDecision(ctx, t, TraceSendExpired) if err != nil { continue } @@ -646,24 +640,27 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. } } + span.End() } for _, trace := range expiredTraces { // if a trace has expired and it doesn't belong to this peer, we should ask its decider to // publish the trace decision again - i.PeerTransmission.EnqueueEvent(i.createDecisionSpan(&types.Span{ + dc := i.createDecisionSpan(&types.Span{ TraceID: trace.ID(), Event: types.Event{ Context: trace.GetSpans()[0].Context, }, - }, trace, i.Sharder.WhichShard(trace.ID()))) + }, trace, i.Sharder.WhichShard(trace.ID())) + dc.Data["meta.refinery.expired_trace"] = true + i.PeerTransmission.EnqueueEvent(dc) } span.SetAttributes(attribute.Int64("total_spans_sent", totalSpansSent)) } // processSpan does all the stuff necessary to take an incoming span and add it // to (or create a new placeholder for) a trace. -func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { +func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source string) { ctx, span := otelutil.StartSpan(ctx, i.Tracer, "processSpan") defer func() { i.Metrics.Increment("span_processed") @@ -726,6 +723,12 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp) } + // if the span is sent for signaling expired traces, + // we should not add it to the cache + if sp.Data["meta.refinery.expired_trace"] != nil { + return + } + // great! trace is live. add the span. trace.AddSpan(sp) span.SetAttributes(attribute.String("disposition", "live_trace")) @@ -735,8 +738,9 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) { if !i.Config.GetCollectionConfig().EnableTraceLocality { // if this trace doesn't belong to us, we should forward a decision span to its decider targetShard := i.Sharder.WhichShard(trace.ID()) + if !targetShard.Equals(i.Sharder.MyShard()) && !sp.IsDecisionSpan() { - i.Metrics.Increment("incoming_router_peer") + i.Metrics.Increment(source + "_router_peer") i.Logger.Debug(). WithString("peer", targetShard.GetAddress()). Logf("Sending span to peer") @@ -996,6 +1000,8 @@ func (i *InMemCollector) send(ctx context.Context, trace *types.Trace, td *Trace return } trace.Sent = true + _, span := otelutil.StartSpan(ctx, i.Tracer, "send") + defer span.End() traceDur := i.Clock.Since(trace.ArrivalTime) i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) @@ -1541,12 +1547,12 @@ func (i *InMemCollector) processKeptDecision(msg string) { i.cache.RemoveTraces(toDelete) } -func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) { +func (i *InMemCollector) makeDecision(ctx context.Context, trace *types.Trace, sendReason string) (*TraceDecision, error) { if trace.Sent { return nil, errors.New("trace already sent") } - ctx, span := otelutil.StartSpan(context.Background(), i.Tracer, "makeDecision") + ctx, span := otelutil.StartSpan(ctx, i.Tracer, "makeDecision") defer span.End() i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount())) @@ -1634,6 +1640,9 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis i.Metrics.Histogram("collector_publish_trace_decision_dur_ms", time.Since(start).Milliseconds()) }() + _, span := otelutil.StartSpanWith(ctx, i.Tracer, "publishTraceDecision", "decision", td.Kept) + defer span.End() + var ( decisionMsg string err error From cdb839fdb657406d233953df0bf1f239a1270f2b Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:31:03 -0500 Subject: [PATCH 13/15] publish trace decision for late spans through a goroutine --- collect/collect.go | 71 ++++++++++++---------------------------------- 1 file changed, 18 insertions(+), 53 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 98c3902819..232fb1b146 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -685,6 +685,13 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source i.dealWithSentTrace(ctx, sr, keptReason, sp) return } + + // if the span is sent for signaling expired traces, + // we should not add it to the cache + if sp.Data["meta.refinery.expired_trace"] != nil { + return + } + // trace hasn't already been sent (or this span is really old); let's // create a new trace to hold it span.SetAttributes(attribute.Bool("create_new_trace", true)) @@ -723,12 +730,6 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp) } - // if the span is sent for signaling expired traces, - // we should not add it to the cache - if sp.Data["meta.refinery.expired_trace"] != nil { - return - } - // great! trace is live. add the span. trace.AddSpan(sp) span.SetAttributes(attribute.String("disposition", "live_trace")) @@ -856,54 +857,18 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe // if we receive a proxy span after a trace decision has been made, // we should just broadcast the decision again if sp.IsDecisionSpan() { - var ( - msg string - err error - ) - topic := keptTraceDecisionTopic - if tr.Kept() { - // late span in this case won't get HasRoot - // this means the late span won't be decorated with some metadata - // like span count, event count, link count - msg, err = newKeptDecisionMessage(TraceDecision{ - TraceID: sp.TraceID, - Kept: tr.Kept(), - KeptReason: keptReason, - SendReason: TraceSendLateSpan, - SampleRate: tr.Rate(), - Count: uint32(tr.SpanCount()), - EventCount: uint32(tr.SpanEventCount()), - LinkCount: uint32(tr.SpanLinkCount()), - }) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "trace_id": sp.TraceID, - "kept": tr.Kept(), - "late_span": true, - }).Logf("Failed to create new kept decision message") - return - } - } else { - topic = droppedTraceDecisionTopic - msg, err = newDroppedDecisionMessage(sp.TraceID) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "trace_id": sp.TraceID, - "kept": tr.Kept(), - "late_span": true, - }).Logf("Failed to create new dropped decision message") - return - } - } - - err = i.PubSub.Publish(ctx, topic, msg) - if err != nil { - i.Logger.Error().WithFields(map[string]interface{}{ - "trace_id": sp.TraceID, - "kept": tr.Kept(), - "late_span": true, - }).Logf("Failed to publish trace decision") + // late span in this case won't get HasRoot + td := TraceDecision{ + TraceID: sp.TraceID, + Kept: tr.Kept(), + KeptReason: keptReason, + SendReason: TraceSendLateSpan, + SampleRate: tr.Rate(), + Count: uint32(tr.SpanCount()), + EventCount: uint32(tr.SpanEventCount()), + LinkCount: uint32(tr.SpanLinkCount()), } + i.publishTraceDecision(ctx, td) return } From bf41091922c694350a2d5cf13f4a3f2b3804cc4f Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Wed, 6 Nov 2024 14:30:28 +0000 Subject: [PATCH 14/15] clean up some span.End() calls --- collect/collect.go | 60 +++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 35 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 232fb1b146..5d863900a0 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -420,41 +420,31 @@ func (i *InMemCollector) collect() { i.processSpan(ctx, sp, "peer") default: select { - case <-i.done: - span.End() - return case msg, ok := <-i.dropDecisionMessages: if !ok { // channel's been closed; we should shut down. + span.End() return } - i.processDropDecisions(msg) case msg, ok := <-i.keptDecisionMessages: if !ok { // channel's been closed; we should shut down. + span.End() return } - i.processKeptDecision(msg) case <-ticker.C: - select { - case <-i.done: - span.End() - return - default: - i.sendExpiredTracesInCache(ctx, i.Clock.Now()) - i.checkAlloc(ctx) - - // Briefly unlock the cache, to allow test access. - _, span3 := otelutil.StartSpan(ctx, i.Tracer, "Gosched") - i.mutex.Unlock() - runtime.Gosched() - i.mutex.Lock() - span3.End() - } - case <-i.redistributeTimer.Notify(): - i.redistributeTraces(ctx) + i.sendExpiredTracesInCache(ctx, i.Clock.Now()) + i.checkAlloc(ctx) + + // maybe only do this if in test mode? + // Briefly unlock the cache, to allow test access. + _, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched") + i.mutex.Unlock() + runtime.Gosched() + i.mutex.Lock() + goSchedSpan.End() case sp, ok := <-i.incoming: if !ok { // channel's been closed; we should shut down. @@ -480,7 +470,7 @@ func (i *InMemCollector) collect() { } func (i *InMemCollector) redistributeTraces(ctx context.Context) { - _, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces") + ctx, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces") redistrubutionStartTime := i.Clock.Now() defer func() { @@ -508,24 +498,22 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) { if trace == nil { continue } - _, span2 := otelutil.StartSpanWith(ctx, i.Tracer, "distributeTrace", "num_spans", trace.DescendantCount()) + _, redistributeTraceSpan := otelutil.StartSpanWith(ctx, i.Tracer, "distributeTrace", "num_spans", trace.DescendantCount()) newTarget := i.Sharder.WhichShard(trace.TraceID) - span2.SetAttributes(attribute.String("shard", newTarget.GetAddress())) + redistributeTraceSpan.SetAttributes(attribute.String("shard", newTarget.GetAddress())) if newTarget.Equals(i.Sharder.MyShard()) { if !i.Config.GetCollectionConfig().EnableTraceLocality { // Drop all proxy spans since peers will resend them trace.RemoveDecisionSpans() } - span2.SetAttributes(attribute.Bool("self", true)) - span2.End() + redistributeTraceSpan.SetAttributes(attribute.Bool("self", true)) + redistributeTraceSpan.End() continue } - span2.SetAttributes(attribute.String("shard", newTarget.GetAddress())) - for _, sp := range trace.GetSpans() { if sp.IsDecisionSpan() { continue @@ -552,7 +540,7 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) { } forwardedTraces.Add(trace.TraceID) - span2.End() + redistributeTraceSpan.End() } otelutil.AddSpanFields(span, map[string]interface{}{ @@ -569,12 +557,12 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) { func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.Time) { ctx, span := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTracesInCache") - defer span.End() - startTime := time.Now() defer func() { i.Metrics.Histogram("collector_send_expired_traces_in_cache_dur_ms", time.Since(startTime).Milliseconds()) + span.End() }() + expiredTraces := make([]*types.Trace, 0) traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout() var orphanTraceCount int @@ -615,12 +603,13 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. var totalSpansSent int64 for _, t := range traces { - ctx, span := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTrace") + ctx, sendExpiredTraceSpan := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTrace") totalSpansSent += int64(t.DescendantCount()) if t.RootSpan != nil { td, err := i.makeDecision(ctx, t, TraceSendGotRoot) if err != nil { + sendExpiredTraceSpan.End() continue } i.send(ctx, t, td) @@ -628,19 +617,20 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time. if spanLimit > 0 && t.DescendantCount() > spanLimit { td, err := i.makeDecision(ctx, t, TraceSendSpanLimit) if err != nil { + sendExpiredTraceSpan.End() continue } i.send(ctx, t, td) } else { td, err := i.makeDecision(ctx, t, TraceSendExpired) if err != nil { + sendExpiredTraceSpan.End() continue } i.send(ctx, t, td) } } - - span.End() + sendExpiredTraceSpan.End() } for _, trace := range expiredTraces { From 68a7201b8bbcded4dcb03e959b44f029fb4107d2 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Wed, 6 Nov 2024 14:07:10 -0500 Subject: [PATCH 15/15] fix test --- collect/collect_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collect/collect_test.go b/collect/collect_test.go index 66f7eb7bb4..15335af6a5 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -1772,6 +1772,7 @@ func TestRedistributeTraces(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) coll.outgoingTraces = make(chan sendableTrace, 5) + coll.keptDecisionBuffer = make(chan string, 5) coll.datasetSamplers = make(map[string]sample.Sampler) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})