Skip to content

Commit

Permalink
feat(network): add support to finishing outbound session (#1215)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored Sep 28, 2023
1 parent ee4da76 commit 38a3c52
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_network/src/streamed_data_protocol/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ impl<Query: QueryBound, Data: DataBound> ConnectionHandler for Handler<Query, Da
self.inbound_sessions_marked_to_end.insert(inbound_session_id);
}
RequestFromBehaviourEvent::FinishSession {
session_id: SessionId::OutboundSessionId(_outbound_session_id),
session_id: SessionId::OutboundSessionId(outbound_session_id),
} => {
unimplemented!();
self.id_to_outbound_session.remove(&outbound_session_id);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegot
use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, Stream};

use super::super::{DataBound, InboundSessionId, OutboundSessionId, QueryBound};
use super::{Handler, HandlerEvent, RequestFromBehaviourEvent, ToBehaviourEvent};
use super::{Handler, HandlerEvent, RequestFromBehaviourEvent, SessionId, ToBehaviourEvent};
use crate::messages::block::{GetBlocks, GetBlocksResponse};
use crate::messages::{read_message, write_message};
use crate::test_utils::{get_connected_streams, hardcoded_data};
Expand Down Expand Up @@ -50,6 +50,13 @@ fn simulate_request_to_send_query_from_swarm<Query: QueryBound, Data: DataBound>
});
}

fn simulate_request_to_finish_session<Query: QueryBound, Data: DataBound>(
handler: &mut Handler<Query, Data>,
session_id: SessionId,
) {
handler.on_behaviour_event(RequestFromBehaviourEvent::FinishSession { session_id });
}

fn simulate_negotiated_inbound_session_from_swarm<Query: QueryBound, Data: DataBound>(
handler: &mut Handler<Query, Data>,
query: Query,
Expand Down Expand Up @@ -100,6 +107,10 @@ async fn validate_received_data_event<Query: QueryBound, Data: DataBound + Parti
);
}

fn validate_no_events<Query: QueryBound, Data: DataBound>(handler: &mut Handler<Query, Data>) {
assert!(handler.next().now_or_never().is_none());
}

async fn validate_request_to_swarm_new_outbound_session_to_swarm_event<
Query: QueryBound + PartialEq,
Data: DataBound,
Expand Down Expand Up @@ -218,6 +229,34 @@ async fn process_outbound_session() {
validate_received_data_event(&mut handler, data, outbound_session_id).await;
}
}

#[tokio::test]
async fn finished_outbound_session_doesnt_emit_events_when_data_is_sent() {
let mut handler = Handler::<GetBlocks, GetBlocksResponse>::new(
SUBSTREAM_TIMEOUT,
Arc::new(Default::default()),
);

let (mut inbound_stream, outbound_stream, _) = get_connected_streams().await;
let outbound_session_id = OutboundSessionId { value: 1 };

simulate_negotiated_outbound_session_from_swarm(
&mut handler,
outbound_stream,
outbound_session_id,
);

simulate_request_to_finish_session(
&mut handler,
SessionId::OutboundSessionId(outbound_session_id),
);

for data in hardcoded_data() {
write_message(data, &mut inbound_stream).await.unwrap();
}

validate_no_events(&mut handler);
}
// async fn start_request_and_validate_event<
// Query: Message + PartialEq + Clone,
// Data: Message + Default,
Expand Down

0 comments on commit 38a3c52

Please sign in to comment.