diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 245731a6de..fa0c06d956 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -56,13 +56,13 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} - Ttlmessage* = object + QueuedMessage* = object msg*: seq[byte] - ttl*: Moment + addedAt*: Moment RpcMessageQueue* = ref object sendPriorityQueue: Deque[Future[void]] - nonPriorityQueue: AsyncQueue[Ttlmessage] + nonPriorityQueue: AsyncQueue[QueuedMessage] sendNonPriorityTask: Future[void] # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. maxDurationInNonPriorityQueue: Duration @@ -298,7 +298,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) + await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -385,7 +385,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId]) continue @@ -410,7 +410,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[Ttlmessage](), + nonPriorityQueue: newAsyncQueue[QueuedMessage](), maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, )