Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(gossipsub): Topic Membership Tests #1201

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
871efab
added test wrt subscribe and unsubscribe
shashankshampi Sep 26, 2024
dc7f8d4
added tests/pubsub/testgossipinternal2 file
shashankshampi Sep 26, 2024
5790b6f
linters
shashankshampi Sep 26, 2024
eced002
Merge branch 'master' into block6Test
shashankshampi Sep 26, 2024
1c2e221
refactor and suite name refactor
shashankshampi Sep 27, 2024
2923a2d
test(gossipsub): added test for membership for join and leave topic
shashankshampi Sep 30, 2024
fda0d2b
test(gossipsub): import optimization
shashankshampi Oct 1, 2024
eb2f6bf
test(gossipsub): Test cases covering subscribe and unsubscribe Events
shashankshampi Sep 26, 2024
9c0966e
Merge branch 'master' into block6Test
shashankshampi Oct 2, 2024
27c2850
Merge branch 'master' into block6Test2
shashankshampi Oct 2, 2024
25df50d
updtaed as per review comment
shashankshampi Oct 3, 2024
f0c8c5b
review comments to remove unwanted comments
shashankshampi Oct 3, 2024
46b7125
removed internal subscribe and unsubscribe test
shashankshampi Oct 3, 2024
19d3ead
removed unwanted check
shashankshampi Oct 3, 2024
f42a763
added assertion for handle SUBSCRIBE to the topic
shashankshampi Oct 3, 2024
e10e4d0
rebase with block6Test
shashankshampi Oct 5, 2024
89473da
TC fix
shashankshampi Oct 5, 2024
806592d
PR update
shashankshampi Oct 7, 2024
2d38e8a
fix in test logic for multiple peers join and leave topic simultaneously
shashankshampi Oct 8, 2024
d594c04
addressed wornderful review comments
shashankshampi Oct 10, 2024
a12b56c
Merge branch 'block6Test' into block6Test2
shashankshampi Oct 10, 2024
cb7ccae
updated as per review comment
shashankshampi Oct 10, 2024
ff6b274
review comment fix
shashankshampi Oct 10, 2024
567a456
Merge branch 'block6Test' into block6Test2
shashankshampi Oct 10, 2024
cc8e976
removed PubSub in sunscribe
shashankshampi Oct 10, 2024
e68685c
added part 2 PR in same
shashankshampi Oct 16, 2024
44dd7d1
added updated logic to use common handler and create peers nodes as r…
shashankshampi Oct 21, 2024
d80abe0
review comment fix
shashankshampi Oct 22, 2024
a7e80de
rebase
shashankshampi Oct 24, 2024
be818b9
Remove tests/pubsub/testgossipmembership from the PR
shashankshampi Oct 24, 2024
a3c29ae
rebase
shashankshampi Oct 24, 2024
aa34c7f
fix in last test
shashankshampi Oct 24, 2024
df674d5
another set of review comment fix
shashankshampi Oct 24, 2024
1bfdded
added documentation as per request
shashankshampi Oct 25, 2024
49ce782
added alex changes
shashankshampi Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 98 additions & 38 deletions tests/pubsub/testgossipmembership.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ import ../../libp2p/muxers/muxer
import ../../libp2p/protocols/pubsub/rpc/protobuf
import utils
import chronos

import chronicles
import ../helpers

proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard

proc voidTopicHandler(topic: string, data: seq[byte]) {.async.} =
discard

const MsgIdSuccess = "msg id gen success"
let DURATION_TIMEOUT = 500.milliseconds

suite "GossipSub Topic Membership Tests":
teardown:
Expand Down Expand Up @@ -78,6 +82,26 @@ suite "GossipSub Topic Membership Tests":
for topic in topics:
gossipSub.unsubscribeAll(topic)

proc commonSubscribe(
nodes: seq[TestGossipSub],
topic: string,
handler: proc(topic: string, data: seq[byte]) {.async.},
) =
# Subscribe all nodes to the topic
for node in nodes:
node.subscribe(topic, handler)
echo "Subscribed all nodes to the topic: ", topic

proc commonUnsubscribe(
nodes: seq[TestGossipSub],
topic: string,
handler: proc(topic: string, data: seq[byte]) {.async.},
) =
# Unsubscribe all nodes from the topic
for node in nodes:
node.unsubscribe(topic, handler)
echo "Unsubscribed all nodes from the topic: ", topic

# Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures
asyncTest "handle SUBSCRIBE to the topic":
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
let topic = "test-topic"
Expand Down Expand Up @@ -115,7 +139,6 @@ suite "GossipSub Topic Membership Tests":
check topic in gossipSub.gossipsub

# The topic should remain in gossipsub

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

Expand Down Expand Up @@ -182,8 +205,8 @@ suite "GossipSub Topic Membership Tests":
# Initialize the GossipSub system and simulate peer connections
let (gossipSub, conns) = setupGossipSub(topic, 5)

# Simulate peer joining the topic
subscribeToTopics(gossipSub, @[topic])
# Simulate peer joining the topic using commonSubscribe
commonSubscribe(@[gossipSub], topic, voidTopicHandler)

# Check that peers are added to the mesh and the topic is tracked
check gossipSub.mesh[topic].len == 5
Expand All @@ -199,11 +222,11 @@ suite "GossipSub Topic Membership Tests":
# Initialize the GossipSub system and simulate peer connections
let (gossipSub, conns) = setupGossipSub(topic, 5)

# Simulate peer joining the topic first
subscribeToTopics(gossipSub, @[topic])
# Simulate peer joining the topic using commonSubscribe
commonSubscribe(@[gossipSub], topic, voidTopicHandler)

# Now simulate peer leaving the topic
unsubscribeFromTopics(gossipSub, @[topic])
# Now simulate peer leaving the topic using commonUnsubscribe
commonUnsubscribe(@[gossipSub], topic, voidTopicHandler)

# Check that peers are removed from the mesh but the topic remains in gossipsub
check topic notin gossipSub.mesh
Expand All @@ -214,39 +237,76 @@ suite "GossipSub Topic Membership Tests":

# Test the behavior when multiple peers join and leave a topic simultaneously.
asyncTest "multiple peers join and leave topic simultaneously":
let topic = "test-multi-join-leave"

# Initialize the GossipSub system and simulate peer connections for 6 peers
let (gossipSub, conns) = setupGossipSub(@[topic], 6)

# Ensure the topic is correctly initialized in mesh and gossipsub
doAssert gossipSub.mesh.contains(topic), "Topic not found in mesh"
doAssert gossipSub.gossipsub.contains(topic), "Topic not found in gossipsub"

# Simulate 6 peers joining the topic
subscribeToTopics(gossipSub, @[topic])

# Assert that 6 peers have joined the mesh
doAssert gossipSub.mesh[topic].len == 6, "Expected 6 peers to join the mesh"

# Define a simple handler for unsubscribing the peers
proc dummyHandler(topic: string, data: seq[byte]): Future[void] {.async.} =
discard

# Simulate 3 peers leaving the topic by unsubscribing them
var peersToUnsubscribe = gossipSub.mesh[topic].toSeq()[0 .. 2]
let
numberOfNodes = 6
topic = "foobar"
nodes = generateNodes(numberOfNodes, gossip = true)
nodesFut = await allFinished(nodes.mapIt(it.switch.start()))

# Subscribe all nodes to the topic
await subscribeNodes(nodes)
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
for node in nodes:
node.subscribe(topic, voidTopicHandler)

# Allow time for subscription propagation
await sleepAsync(2 * DURATION_TIMEOUT)

# Ensure each node is subscribed by checking the gossipsub field
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip'
doAssert currentGossip.gossipsub.hasKey(topic),
"Node is not subscribed to the topic"

# Print mesh status before connecting nodes
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
echo "Initial mesh size:"
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip'
echo "Node ", i, " mesh size: ", currentGossip.mesh.getOrDefault(topic).len

# Connect all nodes to each other and ensure subscription propagation
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
for x in 0 ..< numberOfNodes:
for y in 0 ..< numberOfNodes:
if x != y:
await waitSub(nodes[x], nodes[y], topic)

# Allow time for mesh stabilization
await sleepAsync(2 * DURATION_TIMEOUT)

# Print mesh status after stabilization
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
echo "Mesh size after connecting:"
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip'
echo "Node ", i, " mesh size: ", currentGossip.mesh.getOrDefault(topic).len
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved

# Expected number of peers in the mesh
let expectedNumberOfPeers = numberOfNodes - 1
for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i]) # Rename to 'currentGossip'
check:
currentGossip.gossipsub[topic].len == expectedNumberOfPeers
currentGossip.mesh[topic].len == expectedNumberOfPeers
currentGossip.fanout.len == 0

# Simulate unsubscription of 3 peers
let firstNodeGossip = GossipSub(nodes[0]) # Initialize for the first node
let peersToUnsubscribe = firstNodeGossip.mesh[topic].toSeq()[0 .. 2]
for peer in peersToUnsubscribe:
firstNodeGossip.unsubscribe(topic, voidTopicHandler)
AlejandroCabeza marked this conversation as resolved.
Show resolved Hide resolved
echo "Unsubscribing peer: ", peer.peerId
gossipSub.unsubscribe(topic, dummyHandler)

# Now assert that 6 peers still remain in the mesh because the mesh retains peers
doAssert gossipSub.mesh[topic].len == 6,
"Expected 6 peers to still be in mesh after unsubscription"
# Allow time for heartbeat to adjust the mesh
await sleepAsync(3 * DURATION_TIMEOUT) # Increased delay for mesh stabilization

# Check mesh status again, expecting remaining peers
echo "Mesh size after unsubscription: ",
firstNodeGossip.mesh.getOrDefault(topic).len
doAssert firstNodeGossip.mesh.getOrDefault(topic).len == 3,
"Expected 3 peers to remain in the mesh"

# Assert that unsubscribed peers should remain in the mesh but should no longer receive messages
# Assert that unsubscribed peers are no longer in the mesh
for peer in peersToUnsubscribe:
doAssert gossipSub.mesh[topic].contains(peer),
"Peer should still be in mesh even after unsubscription"
doAssert not firstNodeGossip.mesh[topic].contains(peer),
"Unsubscribed peer should not be in the mesh"

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
# Cleanup: stop all nodes
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
Loading