From 3fc1236659c18325c5f5fc21121be3eb9edf4ff7 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Fri, 3 Nov 2023 15:24:27 +0100 Subject: [PATCH] Revert "Prevent concurrent IWANT of the same message (#943)" (#977) --- libp2p/protocols/pubsub/gossipsub.nim | 4 - .../protocols/pubsub/gossipsub/behavior.nim | 16 +-- libp2p/protocols/pubsub/gossipsub/types.nim | 7 -- tests/pubsub/testgossipinternal.nim | 98 ------------------- 4 files changed, 1 insertion(+), 124 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1ff4c5969f..5a5dd621b4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams = disconnectBadPeers: false, enablePX: false, bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps - iwantTimeout: 3 * GossipSubHeartbeatInterval, overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]), disconnectPeerAboveRateLimit: false ) @@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub, let msgId = msgIdResult.get msgIdSalted = msgId & g.seenSalt - g.outstandingIWANTs.withValue(msgId, iwantRequest): - if iwantRequest.peer.peerId == peer.peerId: - g.outstandingIWANTs.del(msgId) # addSeen adds salt to msgId to avoid # remote attacking the hash function diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index 008e197448..983262fa0c 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub, if not g.hasSeen(msgId): if peer.iHaveBudget <= 0: break - elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs: - g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now()) + elif msgId notin res.messageIds: res.messageIds.add(msgId) dec peer.iHaveBudget trace "requested message via ihave", messageID=msgId @@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub, messages.add(msg) return messages -proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} = - let currentTime = Moment.now() - var idsToRemove = newSeq[MessageId]() - for msgId, request in g.outstandingIWANTs.pairs(): - if currentTime - request.timestamp > timeoutDuration: - trace "IWANT request timed out", messageID=msgId, peer=request.peer - request.peer.behaviourPenalty += 0.1 - idsToRemove.add(msgId) - for msgId in idsToRemove: - g.outstandingIWANTs.del(msgId) - proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) @@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} = for trigger in g.heartbeatEvents: trace "firing heartbeat event", instance = cast[int](g) trigger.fire() - - checkIWANTTimeouts(g, g.parameters.iwantTimeout) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 3387e914df..06fa55eb30 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -143,7 +143,6 @@ type enablePX*: bool bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely - iwantTimeout*: Duration overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]] disconnectPeerAboveRateLimit*: bool @@ -181,7 +180,6 @@ type routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange heartbeatEvents*: seq[AsyncEvent] - outstandingIWANTs*: Table[MessageId, IWANTRequest] MeshMetrics* = object # scratch buffers for metrics @@ -192,8 +190,3 @@ type lowPeersTopics*: int64 # npeers < dlow healthyPeersTopics*: int64 # npeers >= dlow underDoutTopics*: int64 - - IWANTRequest* = object - messageId*: MessageId - peer*: PubSubPeer - timestamp*: Moment diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 42809e7c2d..4f60400dba 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -718,104 +718,6 @@ suite "GossipSub internal": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() - asyncTest "two IHAVEs should generate only one IWANT": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - - var iwantCount = 0 - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = - check false - - proc handler2(topic: string, data: seq[byte]) {.async.} = discard - - let topic = "foobar" - var conns = newSeq[Connection]() - gossipSub.subscribe(topic, handler2) - - # Setup two connections and two peers - var ihaveMessageId: string - var firstPeer: PubSubPeer - let seqno = @[0'u8, 1, 2, 3] - for i in 0..<2: - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - if isNil(firstPeer): - firstPeer = peer - ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId - peer.handler = handler - - # Simulate that each peer sends an IHAVE message to our node - let msg = ControlIHave( - topicID: topic, - messageIDs: @[ihaveMessageId.toBytes()] - ) - let iwants = gossipSub.handleIHave(peer, @[msg]) - if iwants.messageIds.len > 0: - iwantCount += 1 - - # Verify that our node responds with only one IWANT message - check: iwantCount == 1 - check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) - - # Simulate that our node receives the RPCMsg in response to the IWANT - let actualMessageData = "Hello, World!".toBytes - let rpcMsg = RPCMsg( - messages: @[Message( - fromPeer: firstPeer.peerId, - seqno: seqno, - data: actualMessageData - )] - ) - await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false)) - - check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes()) - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - - asyncTest "handle unanswered IWANT messages": - let gossipSub = TestGossipSub.init(newStandardSwitch()) - gossipSub.parameters.heartbeatInterval = 50.milliseconds - gossipSub.parameters.iwantTimeout = 10.milliseconds - await gossipSub.start() - - proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard - proc handler2(topic: string, data: seq[byte]) {.async.} = discard - - let topic = "foobar" - var conns = newSeq[Connection]() - gossipSub.subscribe(topic, handler2) - - # Setup a connection and a peer - let conn = TestBufferStream.new(noop) - conns &= conn - let peerId = randomPeerId() - conn.peerId = peerId - let peer = gossipSub.getPubSubPeer(peerId) - peer.handler = handler - - # Simulate that the peer sends an IHAVE message to our node - let ihaveMessageId = @[0'u8, 1, 2, 3] - let ihaveMsg = ControlIHave( - topicID: topic, - messageIDs: @[ihaveMessageId] - ) - discard gossipSub.handleIHave(peer, @[ihaveMsg]) - - check: gossipSub.outstandingIWANTs.contains(ihaveMessageId) - check: peer.behaviourPenalty == 0.0 - - await sleepAsync(60.milliseconds) - - check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId) - check: peer.behaviourPenalty == 0.1 - - await allFuturesThrowing(conns.mapIt(it.close())) - await gossipSub.switch.stop() - proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} = let nodes = generateNodes(2, gossip = true, verifySignature = false)