Skip to content

Commit

Permalink
Revert "Prevent concurrent IWANT of the same message (#943)" (#977)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos authored and dryajov committed Dec 4, 2023
1 parent 974a7cf commit 2f35824
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 124 deletions.
4 changes: 0 additions & 4 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
disconnectBadPeers: false,
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval,
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
)
Expand Down Expand Up @@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub,
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)

# addSeen adds salt to msgId to avoid
# remote attacking the hash function
Expand Down
16 changes: 1 addition & 15 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub,
if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
elif msgId notin res.messageIds:
res.messageIds.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
Expand Down Expand Up @@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages

proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)

proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
Expand Down Expand Up @@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} =
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()

checkIWANTTimeouts(g, g.parameters.iwantTimeout)
7 changes: 0 additions & 7 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type
enablePX*: bool

bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration

overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool
Expand Down Expand Up @@ -181,7 +180,6 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange

heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]

MeshMetrics* = object
# scratch buffers for metrics
Expand All @@ -192,8 +190,3 @@ type
lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64

IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment
98 changes: 0 additions & 98 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -718,104 +718,6 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "two IHAVEs should generate only one IWANT":
let gossipSub = TestGossipSub.init(newStandardSwitch())

var iwantCount = 0

proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false

proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup two connections and two peers
var ihaveMessageId: string
var firstPeer: PubSubPeer
let seqno = @[0'u8, 1, 2, 3]
for i in 0..<2:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
if isNil(firstPeer):
firstPeer = peer
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
peer.handler = handler

# Simulate that each peer sends an IHAVE message to our node
let msg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId.toBytes()]
)
let iwants = gossipSub.handleIHave(peer, @[msg])
if iwants.messageIds.len > 0:
iwantCount += 1

# Verify that our node responds with only one IWANT message
check: iwantCount == 1
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

# Simulate that our node receives the RPCMsg in response to the IWANT
let actualMessageData = "Hello, World!".toBytes
let rpcMsg = RPCMsg(
messages: @[Message(
fromPeer: firstPeer.peerId,
seqno: seqno,
data: actualMessageData
)]
)
await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false))

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "handle unanswered IWANT messages":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.heartbeatInterval = 50.milliseconds
gossipSub.parameters.iwantTimeout = 10.milliseconds
await gossipSub.start()

proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard
proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup a connection and a peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler

# Simulate that the peer sends an IHAVE message to our node
let ihaveMessageId = @[0'u8, 1, 2, 3]
let ihaveMsg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId]
)
discard gossipSub.handleIHave(peer, @[ihaveMsg])

check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.0

await sleepAsync(60.milliseconds)

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.1

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let
nodes = generateNodes(2, gossip = true, verifySignature = false)
Expand Down

0 comments on commit 2f35824

Please sign in to comment.