diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d1f531353b..1fd9f18777 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -402,9 +402,8 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt - g.outstandingIWANTs.withValue(msgId, iwantRequest): - if iwantRequest.peer == peer: + if iwantRequest.peer.peerId == peer.peerId: g.outstandingIWANTs.del(msgId) # addSeen adds salt to msgId to avoid diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index edc83ef156..25a9e636a8 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -193,4 +193,4 @@ type IWANTRequest* = object messageId*: MessageId peer*: PubSubPeer - timestamp*: Moment \ No newline at end of file + timestamp*: Moment diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index e574ce611c..302d5bf7b0 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -2,7 +2,7 @@ include ../../libp2p/protocols/pubsub/gossipsub {.used.} -import std/[options, deques] +import hashes, std/[options, deques] import stew/byteutils import ../../libp2p/builders import ../../libp2p/errors @@ -727,3 +727,61 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + asyncTest "two IHAVEs should generate only one IWANT": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + var iwantCount = 0 + + proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} = + check false + + proc handler2(topic: string, data: seq[byte]) {.async.} = discard + + let topic = "foobar" + var conns = newSeq[Connection]() + gossipSub.subscribe(topic, handler2) + + # Setup two connections and two peers + var ihaveMessageId: string + var firstPeer: PubSubPeer + let seqno = @[0'u8, 1, 2, 3] + for i in 0..<2: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + if isNil(firstPeer): + firstPeer = peer + ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId + peer.handler = handler + + # Action 1: Simulate that each peer sends an IHAVE message to our node + let msg = ControlIHave( + topicID: topic, + messageIDs: @[ihaveMessageId.toBytes()] + ) + let iwants = gossipSub.handleIHave(peer, @[msg]) + if iwants.messageIds.len > 0: + iwantCount += 1 + + # Check: Verify that our node responds with only one IWANT message + check: iwantCount == 1 + check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) + + # Action 2: Simulate that our node receives the RPCMsg in response to the IWANT + let actualMessageData = "Hello, World!".toBytes + let rpcMsg = RPCMsg( + messages: @[Message( + fromPeer: firstPeer.peerId, + seqno: seqno, + data: actualMessageData + )] + ) + await gossipSub.rpcHandler(firstPeer, rpcMsg) + + check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop()