Skip to content

Commit

Permalink
feat(network): impl behaviour's on_connection_handler_event
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Oct 1, 2023
1 parent 638623d commit b1441fb
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ impl<Query: QueryBound, Data: DataBound> NetworkBehaviour for Behaviour<Query, D
&mut self,
_peer_id: PeerId,
_connection_id: ConnectionId,
_event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
unimplemented!();
self.pending_events.push_back(ToSwarm::GenerateEvent(event.into()));
}

fn poll(
Expand Down
100 changes: 97 additions & 3 deletions crates/papyrus_network/src/streamed_data_protocol/behaviour_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use libp2p::{Multiaddr, PeerId};

use super::super::handler::RequestFromBehaviourEvent;
use super::super::handler::{RequestFromBehaviourEvent, ToBehaviourEvent};
use super::super::protocol::PROTOCOL_NAME;
use super::super::{DataBound, OutboundSessionId, QueryBound};
use super::{Behaviour, Event};
use crate::messages::block::{GetBlocks, GetBlocksResponse};
use crate::test_utils::hardcoded_data;

pub struct GetBlocksPollParameters {}

Expand Down Expand Up @@ -59,6 +60,31 @@ fn simulate_dial_finished_from_swarm<Query: QueryBound, Data: DataBound>(
}));
}

fn simulate_received_data_from_swarm<Query: QueryBound, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: PeerId,
data: Data,
outbound_session_id: OutboundSessionId,
) {
behaviour.on_connection_handler_event(
peer_id,
ConnectionId::new_unchecked(0),
ToBehaviourEvent::ReceivedData { data, outbound_session_id },
);
}

fn simulate_outbound_session_closed_by_peer<Query: QueryBound, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: PeerId,
outbound_session_id: OutboundSessionId,
) {
behaviour.on_connection_handler_event(
peer_id,
ConnectionId::new_unchecked(0),
ToBehaviourEvent::OutboundSessionClosedByPeer { outbound_session_id },
);
}

async fn validate_dial_event<Query: QueryBound, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
Expand Down Expand Up @@ -89,6 +115,36 @@ async fn validate_create_outbound_session_event<Query: QueryBound + PartialEq, D
);
}

async fn validate_received_data_event<Query: QueryBound, Data: DataBound + PartialEq>(
behaviour: &mut Behaviour<Query, Data>,
data: &Data,
outbound_session_id: OutboundSessionId,
) {
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(Event::ReceivedData {
data: event_data, outbound_session_id: event_outbound_session_id
}) if event_data == *data && event_outbound_session_id == outbound_session_id
);
}

async fn validate_outbound_session_closed_by_peer_event<
Query: QueryBound,
Data: DataBound + PartialEq,
>(
behaviour: &mut Behaviour<Query, Data>,
outbound_session_id: OutboundSessionId,
) {
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(Event::OutboundSessionClosedByPeer {
outbound_session_id: event_outbound_session_id
}) if event_outbound_session_id == outbound_session_id
);
}

// TODO(shahak): Fix code duplication with handler test.
fn validate_no_events<Query: QueryBound, Data: DataBound>(behaviour: &mut Behaviour<Query, Data>) {
assert!(behaviour.next().now_or_never().is_none());
Expand All @@ -102,8 +158,8 @@ async fn create_and_process_outbound_session() {
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let peer_id = PeerId::random();

let outbound_session_id = behaviour.send_query(query.clone(), peer_id);

validate_dial_event(&mut behaviour, &peer_id).await;
validate_no_events(&mut behaviour);

Expand All @@ -113,5 +169,43 @@ async fn create_and_process_outbound_session() {
.await;
validate_no_events(&mut behaviour);

// TODO(shahak): Send responses from the handler.
let hardcoded_data_vec = hardcoded_data();
for data in &hardcoded_data_vec {
simulate_received_data_from_swarm(
&mut behaviour,
peer_id.clone(),
data.clone(),
outbound_session_id,
);
}

for data in &hardcoded_data_vec {
validate_received_data_event(&mut behaviour, &data, outbound_session_id).await;
}
validate_no_events(&mut behaviour);

// TODO(shahak): Close session and validate SessionClosedByRequest event.
}

#[tokio::test]
async fn outbound_session_closed_by_peer() {
let mut behaviour = Behaviour::<GetBlocks, GetBlocksResponse>::new(SUBSTREAM_TIMEOUT);

// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let peer_id = PeerId::random();
let outbound_session_id = behaviour.send_query(query.clone(), peer_id);

// Consume the dial event.
behaviour.next().await.unwrap();
simulate_dial_finished_from_swarm(&mut behaviour, &peer_id);

// Consume the event to create an outbound session.
behaviour.next().await.unwrap();

simulate_outbound_session_closed_by_peer(&mut behaviour, peer_id, outbound_session_id);

validate_outbound_session_closed_by_peer_event(&mut behaviour, outbound_session_id).await;
validate_no_events(&mut behaviour);
}

0 comments on commit b1441fb

Please sign in to comment.