Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: message prioritization with immediate peer-published dispatch and queuing for other msgs #1015

Merged
merged 14 commits into from
Feb 16, 2024
Merged
47 changes: 29 additions & 18 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0

pubSubPeer.stopSendNonPriorityTask()

procCall FloodSub(g).unsubscribePeer(peer)

proc handleSubscribe*(g: GossipSub,
Expand Down Expand Up @@ -279,31 +281,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages
let
isPruneNotEmpty = respControl.prune.len > 0
isIWantNotEmpty = respControl.iwant.len > 0

if isPruneNotEmpty or isIWantNotEmpty:

if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

if isPruneNotEmpty:
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl)), true)

if messages.len > 0:
for smsg in messages:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
# iwant replies have lower priority
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
RPCMsg(messages: messages), false)

proc validateAndRelay(g: GossipSub,
msg: Message,
Expand Down Expand Up @@ -370,7 +381,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -441,7 +452,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
Expand Down Expand Up @@ -655,7 +666,7 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]), true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
Expand Down
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to remote peer
##

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} =
msg: RPCMsg,
isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
Expand Down Expand Up @@ -195,12 +196,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
p.send(peer, msg, isHighPriority)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)
asyncSpawn peer.sendEncoded(encoded, isHighPriority)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
Expand Down
115 changes: 90 additions & 25 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

type
PeerRateLimitError* = object of CatchableError

Expand All @@ -49,6 +52,14 @@
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[seq[byte]]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
Expand All @@ -70,6 +81,8 @@
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue: RpcMessageQueue

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}

Expand All @@ -82,6 +95,16 @@
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:

Check warning on line 105 in libp2p/protocols/pubsub/pubsubpeer.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsubpeer.nim#L105

Added line #L105 was not covered by tests
"unknown"

func hash*(p: PubSubPeer): Hash =
p.peerId.hash

Expand Down Expand Up @@ -227,17 +250,7 @@
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
Expand All @@ -262,6 +275,27 @@

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

if isHighPriority:
p.rpcmessagequeue.sendPriorityQueue.addLast(p.sendMsg(msg))
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
Expand Down Expand Up @@ -297,7 +331,7 @@
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
Expand All @@ -317,11 +351,11 @@

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
await p.sendEncoded(encoded, isHighPriority)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -330,6 +364,45 @@
return true
return false

proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[0]
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
p.clearSendPriorityQueue()

Check warning on line 377 in libp2p/protocols/pubsub/pubsubpeer.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsubpeer.nim#L377

Added line #L377 was not covered by tests
# we send non-priority messages only if there are no pending priority messages
let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

Check warning on line 388 in libp2p/protocols/pubsub/pubsubpeer.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsubpeer.nim#L388

Added line #L388 was not covered by tests
proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue]): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
)

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
Expand All @@ -346,17 +419,9 @@
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
result.startSendNonPriorityTask()
Loading