Skip to content

Commit

Permalink
few more metrics emissions, tag some metrics by subscription ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 14, 2024
1 parent 857eaa1 commit 5d64f9f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
2 changes: 2 additions & 0 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ const (
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"
GrpcStreamNumUpdatesBuffered = "grpc_stream_num_updates_buffered"
GrpcStreamNumUpdatesBufferedHistogram = "grpc_stream_num_updates_buffered_histogram"
GrpcFlushUpdatesLatency = "grpc_flush_updates_latency"
GrpcSubscriptionChannelLength = "grpc_subscription_channel_length"
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"
SubscriptionId = "subscription_id"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
28 changes: 21 additions & 7 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
}

func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.GrpcStreamNumUpdatesBuffered,
metrics.AddSample(
metrics.GrpcStreamNumUpdatesBufferedHistogram,
float32(len(sm.streamUpdateCache)),
)
metrics.SetGauge(
metrics.GrpcStreamSubscriberCount,
float32(len(sm.orderbookSubscriptions)),
)
for _, subscription := range sm.orderbookSubscriptions {
metrics.AddSample(
metrics.AddSampleWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
}
}
Expand Down Expand Up @@ -226,9 +227,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
err = subscription.messageSender.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Expand Down Expand Up @@ -364,9 +366,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
return
}

metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)),
)

select {
case subscription.updatesChannel <- streamUpdates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
sm.logger.Error(
fmt.Sprintf(
"Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
Expand Down Expand Up @@ -752,9 +762,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(

sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds)

sm.EmitMetrics()
// Remove all subscriptions and wipe the buffer if buffer overflows.
sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
}

// AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache.
Expand All @@ -773,8 +783,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(

sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds)

sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
sm.RemoveSubscriptionsAndClearBufferIfFull()
}

// RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows.
Expand All @@ -790,6 +800,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull(
}
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
sm.EmitMetrics()
}
}

Expand Down Expand Up @@ -825,13 +836,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)),
)
select {
case subscription.updatesChannel <- updates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
idsToRemove = append(idsToRemove, id)
}
}
Expand Down

0 comments on commit 5d64f9f

Please sign in to comment.