Skip to content

Commit

Permalink
fix(rust): forget to call report blockIds method await
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 22, 2023
1 parent 07c66c1 commit 5b9f417
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
11 changes: 10 additions & 1 deletion rust/experimental/server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,16 @@ impl ShuffleServer for DefaultShuffleServer {
},
blocks: partition_to_block_id.block_ids,
};
let _ = app.report_block_ids(ctx);

match app.report_block_ids(ctx).await {
Err(e) => {
return Ok(Response::new(ReportShuffleResultResponse {
status: StatusCode::INTERNAL_ERROR.into(),
ret_msg: e.to_string(),
}))
}
_ => (),
}
}

Ok(Response::new(ReportShuffleResultResponse {
Expand Down
35 changes: 33 additions & 2 deletions rust/experimental/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ use crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{
GetLocalShuffleDataRequest, GetLocalShuffleIndexRequest, GetMemoryShuffleDataRequest,
RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
ShuffleRegisterRequest,
GetShuffleResultRequest, PartitionToBlockIds, ReportShuffleResultRequest, RequireBufferRequest,
SendShuffleDataRequest, ShuffleBlock, ShuffleData, ShuffleRegisterRequest,
};
use crate::runtime::manager::RuntimeManager;
use crate::util::gen_worker_uid;
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
use log::info;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
Expand Down Expand Up @@ -164,6 +166,20 @@ pub async fn write_read_for_one_time(mut client: ShuffleServerClient<Channel>) -

let response = response.into_inner();
assert_eq!(0, response.status);

// report the finished block ids
client
.report_shuffle_result(ReportShuffleResultRequest {
app_id: app_id.clone(),
shuffle_id: 0,
task_attempt_id: 0,
bitmap_num: 0,
partition_to_block_ids: vec![PartitionToBlockIds {
partition_id: idx,
block_ids: vec![idx as i64],
}],
})
.await?;
}

tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -173,6 +189,21 @@ pub async fn write_read_for_one_time(mut client: ShuffleServerClient<Channel>) -

// firstly. read from the memory
for idx in 0..batch_size {
let block_id_result = client
.get_shuffle_result(GetShuffleResultRequest {
app_id: app_id.clone(),
shuffle_id: 0,
partition_id: idx,
})
.await?
.into_inner();

assert_eq!(0, block_id_result.status);

let block_id_bitmap = Treemap::deserialize(&*block_id_result.serialized_bitmap)?;
assert_eq!(1, block_id_bitmap.iter().count());
assert!(block_id_bitmap.contains(idx as u64));

let response_data = client
.get_memory_shuffle_data(GetMemoryShuffleDataRequest {
app_id: app_id.clone(),
Expand Down

0 comments on commit 5b9f417

Please sign in to comment.