Skip to content

Commit

Permalink
fix(network): flatten output of db executor query (#2105)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored Jun 13, 2024
1 parent c2ed014 commit 14ee3bb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
10 changes: 6 additions & 4 deletions crates/papyrus_network/src/db_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub trait DBExecutorTrait {
&mut self,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
sender: Sender<Vec<Data>>,
sender: Sender<Data>,
);

/// Polls incoming queries.
Expand All @@ -154,7 +154,7 @@ impl DBExecutorTrait for DBExecutor {
&mut self,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
mut sender: Sender<Vec<Data>>,
mut sender: Sender<Data>,
) {
let storage_reader_clone = self.storage_reader.clone();
tokio::task::spawn(async move {
Expand All @@ -179,8 +179,10 @@ impl DBExecutorTrait for DBExecutor {
let data_vec = data_type.fetch_block_data_from_db(block_number, &txn)?;
// Using poll_fn because Sender::poll_ready is not a future
poll_fn(|cx| sender.poll_ready(cx)).await?;
// TODO: consider implement retry mechanism.
sender.start_send(data_vec)?;
for data in data_vec {
// TODO: consider implement retry mechanism.
sender.start_send(data)?;
}
}
Ok(())
};
Expand Down
36 changes: 17 additions & 19 deletions crates/papyrus_network/src/db_executor/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn header_db_executor_can_register_and_run_a_query() {
limit: NUM_OF_BLOCKS,
step: 1,
};
type ReceiversType = Vec<(Receiver<Vec<Data>>, DataType)>;
type ReceiversType = Vec<(Receiver<Data>, DataType)>;
let mut receivers: ReceiversType = enum_iterator::all::<DataType>()
.map(|data_type| {
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
Expand Down Expand Up @@ -60,20 +60,20 @@ async fn header_db_executor_can_register_and_run_a_query() {
_ = async {
while let Some(res) = receivers_stream.next().await {
let (data, requested_data_type) = res.await;
assert_eq!(data.len(), NUM_OF_BLOCKS as usize);
if matches!(requested_data_type, DataType::SignedBlockHeader) {
assert_eq!(data.len(), NUM_OF_BLOCKS as usize);
}
for (i, data) in data.into_iter().enumerate() {
for data in data.iter() {
match data {
Data::BlockHeaderAndSignature(signed_header) => {
assert_eq!(signed_header.block_header.block_number.0, i as u64);
assert_eq!(*requested_data_type, DataType::SignedBlockHeader);
}
Data::StateDiffChunk (_state_diff) => {
// TODO: check the state diff.
assert_eq!(*requested_data_type, DataType::StateDiff);
}
_ => panic!("Unexpected data type"),
match data {
Data::BlockHeaderAndSignature(signed_header) => {
assert_eq!(signed_header.block_header.block_number.0, i as u64);
assert_eq!(*requested_data_type, DataType::SignedBlockHeader);
}
Data::StateDiffChunk (_state_diff) => {
// TODO: check the state diff.
assert_eq!(*requested_data_type, DataType::StateDiff);
}
_ => panic!("Unexpected data type"),
}
}
}
Expand Down Expand Up @@ -117,12 +117,10 @@ async fn header_db_executor_start_block_given_by_hash() {
res = receiver.collect::<Vec<_>>() => {
assert_eq!(res.len(), NUM_OF_BLOCKS as usize);
for (i, data) in res.into_iter().enumerate() {
for data in data.iter() {
let Data::BlockHeaderAndSignature(signed_header) = data else {
panic!("Unexpected data type");
};
assert_eq!(signed_header.block_header.block_number.0, i as u64);
}
let Data::BlockHeaderAndSignature(signed_header) = data else {
panic!("Unexpected data type");
};
assert_eq!(signed_header.block_header.block_number.0, i as u64);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ impl<DBExecutorT: DBExecutorTrait, SwarmT: SwarmTrait> GenericNetworkManager<DBE
self.db_executor.register_query(internal_query, data_type, sender);
self.query_results_router.push(
receiver
.flat_map(futures::stream::iter)
.chain(stream::once(async move { Data::Fin(data_type) }))
.map(move |data| (data, inbound_session_id))
.boxed(),
Expand Down
9 changes: 5 additions & 4 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,18 @@ impl DBExecutorTrait for MockDBExecutor {
&mut self,
query: Query,
_data_type: impl FetchBlockDataFromDb + Send,
mut sender: Sender<Vec<Data>>,
mut sender: Sender<Data>,
) {
let headers = self.query_to_headers.get(&query).unwrap().clone();
self.query_execution_set.push(tokio::task::spawn(async move {
{
for header in headers.iter().cloned() {
// Using poll_fn because Sender::poll_ready is not a future
if let Ok(()) = poll_fn(|cx| sender.poll_ready(cx)).await {
sender.start_send(vec![Data::BlockHeaderAndSignature(
SignedBlockHeader { block_header: header, signatures: vec![] },
)])?;
sender.start_send(Data::BlockHeaderAndSignature(SignedBlockHeader {
block_header: header,
signatures: vec![],
}))?;
}
}
Ok(())
Expand Down

0 comments on commit 14ee3bb

Please sign in to comment.