From 5d64f9f2b848fcf7b5404922613ebfe692fda2ac Mon Sep 17 00:00:00 2001 From: Jonathan Fung Date: Mon, 14 Oct 2024 13:46:57 -0400 Subject: [PATCH] few more metrics emissions, tag some metrics by subscription ids --- protocol/lib/metrics/metric_keys.go | 2 ++ .../streaming/full_node_streaming_manager.go | 28 ++++++++++++++----- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index ac8e8a17d1..bcbcfdfe84 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -77,11 +77,13 @@ const ( GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered" + GrpcStreamNumUpdatesBufferedHistogram = "grpc_stream_num_updates_buffered_histogram" GrpcFlushUpdatesLatency = "grpc_flush_updates_latency" GrpcSubscriptionChannelLength = "grpc_subscription_channel_length" GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count" GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count" GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count" + SubscriptionId = "subscription_id" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 85c265f12e..8d9a990385 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -145,8 +145,8 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { } func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { - metrics.SetGauge( - metrics.GrpcStreamNumUpdatesBuffered, + metrics.AddSample( + metrics.GrpcStreamNumUpdatesBufferedHistogram, float32(len(sm.streamUpdateCache)), ) metrics.SetGauge( @@ -154,9 +154,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { float32(len(sm.orderbookSubscriptions)), ) for _, subscription := range sm.orderbookSubscriptions { - metrics.AddSample( + metrics.AddSampleWithLabels( metrics.GrpcSubscriptionChannelLength, float32(len(subscription.updatesChannel)), + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) } } @@ -226,9 +227,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcSendResponseToSubscriberCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) err = subscription.messageSender.Send( &clobtypes.StreamOrderbookUpdatesResponse{ @@ -364,9 +366,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( return } + metrics.IncrCounterWithLabels( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)), + ) + select { case subscription.updatesChannel <- streamUpdates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() sm.logger.Error( fmt.Sprintf( "Streaming subscription id %+v channel full capacity. Dropping subscription connection.", @@ -752,9 +762,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) + sm.EmitMetrics() // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() - sm.EmitMetrics() } // AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. @@ -773,8 +783,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) - sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() + sm.RemoveSubscriptionsAndClearBufferIfFull() } // RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. @@ -790,6 +800,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull( } sm.streamUpdateCache = nil sm.streamUpdateSubscriptionCache = nil + sm.EmitMetrics() } } @@ -825,13 +836,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { // If the buffer is full, drop the subscription. for id, updates := range subscriptionUpdates { if subscription, ok := sm.orderbookSubscriptions[id]; ok { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcAddToSubscriptionChannelCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)), ) select { case subscription.updatesChannel <- updates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() idsToRemove = append(idsToRemove, id) } }