Skip to content

Commit

Permalink
feat(network): add SessionClosedByRequest event (#1221)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Oct 1, 2023
1 parent 71e89ee commit bdea914
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
10 changes: 10 additions & 0 deletions crates/papyrus_network/src/streamed_data_protocol/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,21 @@ impl<Query: QueryBound, Data: DataBound> ConnectionHandler for Handler<Query, Da
session_id: SessionId::InboundSessionId(inbound_session_id),
} => {
self.inbound_sessions_marked_to_end.insert(inbound_session_id);
self.pending_events.push_back(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviourEvent::SessionClosedByRequest {
session_id: SessionId::InboundSessionId(inbound_session_id),
},
));
}
RequestFromBehaviourEvent::CloseSession {
session_id: SessionId::OutboundSessionId(outbound_session_id),
} => {
self.id_to_outbound_session.remove(&outbound_session_id);
self.pending_events.push_back(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviourEvent::SessionClosedByRequest {
session_id: SessionId::OutboundSessionId(outbound_session_id),
},
));
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions crates/papyrus_network/src/streamed_data_protocol/handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ async fn validate_received_data_event<Query: QueryBound, Data: DataBound + Parti
);
}

async fn validate_session_closed_by_request_event<
Query: QueryBound,
Data: DataBound + PartialEq,
>(
handler: &mut Handler<Query, Data>,
session_id: SessionId,
) {
let event = handler.next().await.unwrap();
assert_matches!(
event,
ConnectionHandlerEvent::NotifyBehaviour(ToBehaviourEvent::SessionClosedByRequest {
session_id: event_session_id
}) if event_session_id == session_id
);
}

async fn validate_outbound_session_closed_by_peer_event<
Query: QueryBound,
Data: DataBound + PartialEq,
Expand Down Expand Up @@ -226,6 +242,11 @@ async fn closed_inbound_session_ignores_behaviour_request_to_send_data() {
&mut handler,
SessionId::InboundSessionId(inbound_session_id),
);
validate_session_closed_by_request_event(
&mut handler,
SessionId::InboundSessionId(inbound_session_id),
)
.await;

let hardcoded_data_vec = hardcoded_data();
for data in &hardcoded_data_vec {
Expand Down Expand Up @@ -322,6 +343,11 @@ async fn closed_outbound_session_doesnt_emit_events_when_data_is_sent() {
&mut handler,
SessionId::OutboundSessionId(outbound_session_id),
);
validate_session_closed_by_request_event(
&mut handler,
SessionId::OutboundSessionId(outbound_session_id),
)
.await;

for data in hardcoded_data() {
write_message(data, &mut inbound_stream).await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_network/src/streamed_data_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct InboundSessionId {
value: usize,
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
// TODO(shahak) remove allow(dead_code).
#[allow(dead_code)]
pub(crate) enum SessionId {
Expand All @@ -38,5 +38,6 @@ pub(crate) enum GenericEvent<Query: QueryBound, Data: DataBound, SessionError> {
NewInboundSession { query: Query, inbound_session_id: InboundSessionId },
ReceivedData { outbound_session_id: OutboundSessionId, data: Data },
SessionFailed { session_id: SessionId, error: SessionError },
SessionClosedByRequest { session_id: SessionId },
OutboundSessionClosedByPeer { outbound_session_id: OutboundSessionId },
}

0 comments on commit bdea914

Please sign in to comment.