Skip to content

Commit

Permalink
reuse subscription ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Oct 16, 2024
1 parent 40f8a79 commit 66fe962
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct {

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
activeSubscriptionIds map[uint32]struct{}

// stream will batch and flush out messages every 10 ms.
ticker *time.Ticker
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewFullNodeStreamingManager(
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,
activeSubscriptionIds: make(map[uint32]struct{}),

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
Expand Down Expand Up @@ -180,8 +180,13 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
for i, subaccountId := range subaccountIds {
sIds[i] = *subaccountId
}
issuedSubscriptionId := uint32(0)
for _, exists := sm.activeSubscriptionIds[issuedSubscriptionId]; exists; {
issuedSubscriptionId += 1
}

subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
subscriptionId: issuedSubscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
Expand All @@ -196,7 +201,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
issuedSubscriptionId,
)
}
for _, subaccountId := range sIds {
Expand All @@ -207,7 +212,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append(
sm.subaccountIdToSubscriptionIdMapping[subaccountId],
sm.nextSubscriptionId,
issuedSubscriptionId,
)
}

Expand All @@ -220,7 +225,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
),
)
sm.orderbookSubscriptions[subscription.subscriptionId] = subscription
sm.nextSubscriptionId++
sm.activeSubscriptionIds[issuedSubscriptionId] = struct{}{}
sm.EmitMetrics()
sm.Unlock()

Expand Down Expand Up @@ -303,6 +308,8 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription(
}
}

delete(sm.activeSubscriptionIds, subscriptionIdToRemove)

sm.logger.Info(
fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove),
)
Expand Down

0 comments on commit 66fe962

Please sign in to comment.