Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent concurrent IWANT of the same message #943

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
behaviourPenaltyDecay: 0.999,
disconnectBadPeers: false,
enablePX: false,
bandwidthEstimatebps: 100_000_000 # 100 Mbps or 12.5 MBps
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -401,6 +402,9 @@ method rpcHandler*(g: GossipSub,
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)

# addSeen adds salt to msgId to avoid
# remote attacking the hash function
Expand Down
16 changes: 15 additions & 1 deletion libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@
if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIds:
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
res.messageIds.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
Expand Down Expand Up @@ -299,6 +300,17 @@
messages.add(msg)
return messages

proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)

proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
Expand Down Expand Up @@ -704,3 +716,5 @@
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()

Check warning on line 719 in libp2p/protocols/pubsub/gossipsub/behavior.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/gossipsub/behavior.nim#L719

Added line #L719 was not covered by tests
checkIWANTTimeouts(g, g.parameters.iwantTimeout)
7 changes: 7 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type
enablePX*: bool

bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]
Expand Down Expand Up @@ -177,6 +178,7 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange

heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]

MeshMetrics* = object
# scratch buffers for metrics
Expand All @@ -187,3 +189,8 @@ type
lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64

IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment
98 changes: 98 additions & 0 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,101 @@ suite "GossipSub internal":

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "two IHAVEs should generate only one IWANT":
let gossipSub = TestGossipSub.init(newStandardSwitch())

var iwantCount = 0

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
check false

proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup two connections and two peers
var ihaveMessageId: string
var firstPeer: PubSubPeer
let seqno = @[0'u8, 1, 2, 3]
for i in 0..<2:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
if isNil(firstPeer):
firstPeer = peer
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
peer.handler = handler

# Simulate that each peer sends an IHAVE message to our node
let msg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId.toBytes()]
)
let iwants = gossipSub.handleIHave(peer, @[msg])
if iwants.messageIds.len > 0:
iwantCount += 1

# Verify that our node responds with only one IWANT message
check: iwantCount == 1
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

# Simulate that our node receives the RPCMsg in response to the IWANT
let actualMessageData = "Hello, World!".toBytes
let rpcMsg = RPCMsg(
messages: @[Message(
fromPeer: firstPeer.peerId,
seqno: seqno,
data: actualMessageData
)]
)
await gossipSub.rpcHandler(firstPeer, rpcMsg)

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "handle unanswered IWANT messages":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.heartbeatInterval = 50.milliseconds
gossipSub.parameters.iwantTimeout = 10.milliseconds
await gossipSub.start()

proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = discard
proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup a connection and a peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler

# Simulate that the peer sends an IHAVE message to our node
let ihaveMessageId = @[0'u8, 1, 2, 3]
let ihaveMsg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId]
)
discard gossipSub.handleIHave(peer, @[ihaveMsg])

check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.0

await sleepAsync(60.milliseconds)

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.1

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
Loading