Skip to content

Commit

Permalink
fix(network): read_message returns None if EOF instead of empty messa…
Browse files Browse the repository at this point in the history
…ge (#1212)
  • Loading branch information
ShahakShama authored Sep 28, 2023
1 parent 8cadb8c commit ee4da76
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 11 deletions.
40 changes: 40 additions & 0 deletions crates/papyrus_network/src/messages/messages_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::time::Duration;

use futures::AsyncWriteExt;
use pretty_assertions::assert_eq;

use super::block::GetBlocksResponse;
use super::{read_message, write_message};
use crate::test_utils::{get_connected_streams, hardcoded_data};

#[tokio::test]
async fn read_write_positive_flow() {
let (mut stream1, mut stream2, _) = get_connected_streams().await;
let messages = hardcoded_data();
for message in &messages {
write_message(message.clone(), &mut stream1).await.unwrap();
}
for expected_message in &messages {
assert_eq!(*expected_message, read_message(&mut stream2).await.unwrap().unwrap());
}
}

#[tokio::test]
async fn read_message_returns_none_when_other_stream_is_closed() {
let (mut stream1, mut stream2, _) = get_connected_streams().await;
stream1.close().await.unwrap();
assert!(read_message::<GetBlocksResponse, _>(&mut stream2).await.unwrap().is_none());
}

#[tokio::test]
async fn read_message_is_pending_when_other_stream_didnt_send() {
let (_stream1, mut stream2, _) = get_connected_streams().await;
assert!(
tokio::time::timeout(
Duration::from_millis(10),
read_message::<GetBlocksResponse, _>(&mut stream2)
)
.await
.is_err()
);
}
13 changes: 11 additions & 2 deletions crates/papyrus_network/src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod block;
pub mod common;
#[cfg(test)]
mod messages_test;

pub mod proto {
pub mod p2p {
Expand Down Expand Up @@ -27,7 +29,14 @@ pub async fn write_message<T: Message, Stream: AsyncWrite + Unpin>(

pub async fn read_message<T: Message + Default, Stream: AsyncRead + Unpin>(
mut io: Stream,
) -> Result<T, io::Error> {
) -> Result<Option<T>, io::Error> {
let buf = read_length_prefixed(&mut io, MAX_MESSAGE_SIZE).await?;
Ok(T::decode(buf.as_slice())?)
// read_length_prefixed returns an empty vec if it reaches EOF. We opened an issue in libp2p to
// try and change this: https://github.com/libp2p/rust-libp2p/issues/4565
// TODO(shahak): This currently disables reading empty messages. fix this by copying the
// code from libp2p and changing it.
if buf.is_empty() {
return Ok(None);
}
Ok(Some(T::decode(buf.as_slice())?))
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ async fn validate_next_event_send_query_to_handler<
async fn send_and_process_request() {
let mut behaviour = Behaviour::<GetBlocks, GetBlocksResponse>::new(SUBSTREAM_TIMEOUT);

let query = GetBlocks::default();
// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let peer_id = PeerId::random();

let outbound_session_id = behaviour.send_query(query.clone(), peer_id);
Expand Down
7 changes: 6 additions & 1 deletion crates/papyrus_network/src/streamed_data_protocol/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ impl<Query: QueryBound, Data: DataBound> ConnectionHandler for Handler<Query, Da
outbound_session_id,
stream! {
loop {
let result = read_message::<Data, _>(&mut stream).await;
let result_opt = read_message::<Data, _>(&mut stream).await;
let result = match result_opt {
Ok(Some(data)) => Ok(data),
Ok(None) => break,
Err(error) => Err(error),
};
let is_err = result.is_err();
yield result;
if is_err {
Expand Down
14 changes: 10 additions & 4 deletions crates/papyrus_network/src/streamed_data_protocol/handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn validate_request_to_swarm_new_outbound_session_to_swarm_event<
async fn read_messages(stream: &mut Stream, num_messages: usize) -> Vec<GetBlocksResponse> {
let mut result = Vec::new();
for _ in 0..num_messages {
result.push(read_message::<GetBlocksResponse, _>(&mut *stream).await.unwrap());
result.push(read_message::<GetBlocksResponse, _>(&mut *stream).await.unwrap().unwrap());
}
result
}
Expand All @@ -132,7 +132,9 @@ async fn process_inbound_session() {
);

let (inbound_stream, mut outbound_stream, _) = get_connected_streams().await;
let query = GetBlocks::default();
// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let inbound_session_id = InboundSessionId { value: 1 };

simulate_negotiated_inbound_session_from_swarm(
Expand Down Expand Up @@ -188,7 +190,9 @@ async fn process_outbound_session() {
);

let (mut inbound_stream, outbound_stream, _) = get_connected_streams().await;
let query = GetBlocks::default();
// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let outbound_session_id = OutboundSessionId { value: 1 };

simulate_request_to_send_query_from_swarm(&mut handler, query.clone(), outbound_session_id);
Expand Down Expand Up @@ -258,7 +262,9 @@ async fn process_outbound_session() {
// async fn process_session() {
// let mut handler = Handler::new(SUBSTREAM_TIMEOUT);

// let request = GetBlocks::default();
// // TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// // messages is fixed.
// let request = GetBlocks { limit: 10, ..Default::default() };
// let request_id = OutboundSessionId::default();
// let response = GetBlocksResponse {
// response: Some(Response::Header(BlockHeader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ where

fn upgrade_inbound(self, mut stream: Stream, _: Self::Info) -> Self::Future {
async move {
let request = read_message::<Query, _>(&mut stream).await?;
let request = read_message::<Query, _>(&mut stream)
.await?
.ok_or::<io::Error>(io::ErrorKind::UnexpectedEof.into())?;
Ok((request, stream))
}
.boxed()
Expand Down
27 changes: 25 additions & 2 deletions crates/papyrus_network/src/streamed_data_protocol/protocol_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::io::ErrorKind;

use assert_matches::assert_matches;
use futures::AsyncWriteExt;
use libp2p::core::upgrade::{write_varint, InboundUpgrade, OutboundUpgrade};
use libp2p::core::UpgradeInfo;
Expand All @@ -22,7 +25,9 @@ fn both_protocols_have_same_info() {
async fn positive_flow() {
let (inbound_stream, outbound_stream, _) = get_connected_streams().await;

let query = GetBlocks::default();
// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
// messages is fixed.
let query = GetBlocks { limit: 10, ..Default::default() };
let outbound_protocol = OutboundProtocol { query: query.clone() };
let inbound_protocol = InboundProtocol::<GetBlocks>::new();

Expand All @@ -39,7 +44,8 @@ async fn positive_flow() {
let mut stream =
outbound_protocol.upgrade_outbound(outbound_stream, PROTOCOL_NAME).await.unwrap();
for expected_response in hardcoded_data() {
let response = read_message::<GetBlocksResponse, _>(&mut stream).await.unwrap();
let response =
read_message::<GetBlocksResponse, _>(&mut stream).await.unwrap().unwrap();
assert_eq!(response, expected_response);
}
}
Expand All @@ -63,3 +69,20 @@ async fn outbound_sends_invalid_request() {
},
);
}

#[tokio::test]
async fn outbound_sends_no_request() {
let (inbound_stream, mut outbound_stream, _) = get_connected_streams().await;
let inbound_protocol = InboundProtocol::<GetBlocks>::new();

tokio::join!(
async move {
let error =
inbound_protocol.upgrade_inbound(inbound_stream, PROTOCOL_NAME).await.unwrap_err();
assert_matches!(error.kind(), ErrorKind::UnexpectedEof);
},
async move {
outbound_stream.close().await.unwrap();
},
);
}

0 comments on commit ee4da76

Please sign in to comment.