diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 0b9194f420..cc9ade6ca9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), - disconnectPeerAboveRateLimit: false + disconnectPeerAboveRateLimit: false, + maxDurationInNonPriorityQueue: Opt.none(Duration), ) proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = @@ -751,4 +752,5 @@ method getOrCreatePeer*( let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos) g.parameters.overheadRateLimit.withValue(overheadRateLimit): peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval)) + peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue return peer diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 06fa55eb30..81db8f3793 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -147,6 +147,10 @@ type overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] disconnectPeerAboveRateLimit*: bool + # The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded + # as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration. + maxDurationInNonPriorityQueue*: Opt[Duration] + BackoffTable* = Table[string, Table[PeerId, Moment]] ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]] diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 362b1984d2..bc5122dc14 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -67,7 +67,7 @@ type # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. - maxDurationInNonPriorityQueue: Duration + maxDurationInNonPriorityQueue*: Opt[Duration] PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -90,7 +90,7 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] - rpcmessagequeue: RpcMessageQueue + rpcmessagequeue*: RpcMessageQueue RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -395,10 +395,11 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: - when defined(libp2p_expensive_metrics): - libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) - continue + p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue): + if Moment.now() - ttlMsg.addedAt >= maxDurationInNonPriorityQueue: + when defined(libp2p_expensive_metrics): + libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) + continue await p.sendMsg(ttlMsg.msg) proc startSendNonPriorityTask(p: PubSubPeer) = @@ -417,7 +418,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T = return T( sendPriorityQueue: initDeque[Future[void]](), nonPriorityQueue: newAsyncQueue[QueuedMessage](), @@ -431,7 +432,8 @@ proc new*( onEvent: OnEvent, codec: string, maxMessageSize: int, - overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T = + overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket), + maxDurationInNonPriorityQueue = Opt.none(Duration)): T = result = T( getConn: getConn, @@ -441,7 +443,7 @@ proc new*( connectedFut: newFuture[void](), maxMessageSize: maxMessageSize, overheadRateLimitOpt: overheadRateLimitOpt, - rpcmessagequeue: RpcMessageQueue.new(), + rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue), ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId]))