Skip to content

Commit

Permalink
feat: support unregister grpc interface
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 23, 2023
1 parent 5b9f417 commit acc91d0
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 103 deletions.
77 changes: 44 additions & 33 deletions rust/experimental/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::store::{
StoreProvider,
};
use crate::util::current_timestamp_sec;
use anyhow::Result;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
Expand Down Expand Up @@ -277,13 +277,30 @@ impl App {
}

pub async fn purge(&self, app_id: String, shuffle_id: Option<i32>) -> Result<()> {
if shuffle_id.is_some() {
error!("Partial purge is not supported.");
} else {
self.store.purge(app_id).await?
}
self.store
.purge(PurgeDataContext::new(app_id, shuffle_id))
.await
}
}

Ok(())
#[derive(Debug, Clone)]
pub struct PurgeDataContext {
pub(crate) app_id: String,
pub(crate) shuffle_id: Option<i32>,
}

impl PurgeDataContext {
pub fn new(app_id: String, shuffle_id: Option<i32>) -> PurgeDataContext {
PurgeDataContext { app_id, shuffle_id }
}
}

impl From<&str> for PurgeDataContext {
fn from(app_id_ref: &str) -> Self {
PurgeDataContext {
app_id: app_id_ref.to_string(),
shuffle_id: None,
}
}
}

Expand Down Expand Up @@ -343,7 +360,7 @@ pub enum PurgeEvent {
// app_id
HEART_BEAT_TIMEOUT(String),
// app_id + shuffle_id
APP_PARTIAL_SHUFFLES_PURGE(String, Vec<i32>),
APP_PARTIAL_SHUFFLES_PURGE(String, i32),
// app_id
APP_PURGE(String),
}
Expand Down Expand Up @@ -424,18 +441,18 @@ impl AppManager {
"The app:[{}]'s data will be purged due to heartbeat timeout",
&app_id
);
app_manager_cloned.purge_app_data(app_id).await
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PURGE(app_id) => {
info!(
"The app:[{}] has been finished, its data will be purged.",
&app_id
);
app_manager_cloned.purge_app_data(app_id).await
app_manager_cloned.purge_app_data(app_id, None).await
}
PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(_app_id, _shuffle_ids) => {
info!("Partial data purge is not supported currently");
Ok(())
PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id) => {
info!("The app:[{:?}] with shuffleId: [{:?}] will be purged due to unregister grpc interface", &app_id, shuffle_id);
app_manager_cloned.purge_app_data(app_id, Some(shuffle_id)).await
}
}
.map_err(|err| error!("Errors on purging data. error: {:?}", err));
Expand All @@ -457,19 +474,16 @@ impl AppManager {
self.store.memory_spill_event_num()
}

async fn purge_app_data(&self, app_id: String) -> Result<()> {
let app = self.get_app(&app_id);
if app.is_none() {
error!(
"App:{} don't exist when purging data, this should not happen",
&app_id
);
} else {
let app = app.unwrap();
app.purge(app_id.clone(), None).await?;
}
async fn purge_app_data(&self, app_id: String, shuffle_id_option: Option<i32>) -> Result<()> {
let app = self.get_app(&app_id).ok_or(anyhow!(format!(
"App:{} don't exist when purging data, this should not happen",
&app_id
)))?;
app.purge(app_id.clone(), shuffle_id_option).await?;

self.apps.remove(&app_id);
if shuffle_id_option.is_none() {
self.apps.remove(&app_id);
}

Ok(())
}
Expand Down Expand Up @@ -520,13 +534,10 @@ impl AppManager {
app_ref.register_shuffle(shuffle_id)
}

pub async fn unregister(&self, app_id: String, shuffle_ids: Option<Vec<i32>>) -> Result<()> {
let event = match shuffle_ids {
Some(ids) => PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, ids),
_ => PurgeEvent::APP_PURGE(app_id),
};

self.sender.send(event).await?;
pub async fn unregister(&self, app_id: String, shuffle_id: i32) -> Result<()> {
self.sender
.send(PurgeEvent::APP_PARTIAL_SHUFFLES_PURGE(app_id, shuffle_id))
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -649,7 +660,7 @@ mod test {

// case3: purge
app_manager_ref
.purge_app_data(app_id.to_string())
.purge_app_data(app_id.to_string(), None)
.await
.expect("");

Expand Down
20 changes: 16 additions & 4 deletions rust/experimental/server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,24 @@ impl ShuffleServer for DefaultShuffleServer {

async fn unregister_shuffle(
&self,
_request: Request<ShuffleUnregisterRequest>,
request: Request<ShuffleUnregisterRequest>,
) -> Result<Response<ShuffleUnregisterResponse>, Status> {
// todo: implement shuffle level deletion
info!("Accepted unregister shuffle info....");
let request = request.into_inner();
let shuffle_id = request.shuffle_id;
let app_id = request.app_id;

info!(
"Accepted unregister shuffle info for [app:{:?}, shuffle_id:{:?}]",
&app_id, shuffle_id
);
let status_code = self
.app_manager_ref
.unregister(app_id, shuffle_id)
.await
.map_or_else(|_e| StatusCode::INTERNAL_ERROR, |_| StatusCode::SUCCESS);

Ok(Response::new(ShuffleUnregisterResponse {
status: StatusCode::SUCCESS.into(),
status: status_code.into(),
ret_msg: "".to_string(),
}))
}
Expand Down
7 changes: 4 additions & 3 deletions rust/experimental/server/src/store/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

use crate::app::{
PartitionedUId, ReadingIndexViewContext, ReadingViewContext, RequireBufferContext,
WritingViewContext,
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext, WritingViewContext,
};
use crate::config::HdfsStoreConfig;
use crate::error::WorkerError;
Expand Down Expand Up @@ -246,7 +246,8 @@ impl Store for HdfsStore {
todo!()
}

async fn purge(&self, app_id: String) -> Result<()> {
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
let app_dir = self.get_app_dir(app_id.as_str());

let keys_to_delete: Vec<_> = self
Expand Down
25 changes: 9 additions & 16 deletions rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::app::{
PartitionedUId, ReadingIndexViewContext, ReadingOptions, ReadingViewContext,
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, ReadingViewContext,
RequireBufferContext, WritingViewContext,
};
use crate::await_tree::AWAIT_TREE_REGISTRY;
Expand Down Expand Up @@ -457,24 +457,17 @@ impl Store for HybridStore {
.await
}

async fn purge(&self, app_id: String) -> Result<()> {
self.hot_store.purge(app_id.clone()).await?;
info!("Removed data of app:[{}] in hot store", &app_id);
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = &ctx.app_id;
self.hot_store.purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in hot store", app_id);
if self.warm_store.is_some() {
self.warm_store
.as_ref()
.unwrap()
.purge(app_id.clone())
.await?;
info!("Removed data of app:[{}] in warm store", &app_id);
self.warm_store.as_ref().unwrap().purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in warm store", app_id);
}
if self.cold_store.is_some() {
self.cold_store
.as_ref()
.unwrap()
.purge(app_id.clone())
.await?;
info!("Removed data of app:[{}] in cold store", &app_id);
self.cold_store.as_ref().unwrap().purge(ctx.clone()).await?;
info!("Removed data of app:[{}] in cold store", app_id);
}
Ok(())
}
Expand Down
78 changes: 54 additions & 24 deletions rust/experimental/server/src/store/localfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
use crate::app::{
PartitionedUId, ReadingIndexViewContext, ReadingViewContext, RequireBufferContext,
WritingViewContext,
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext, WritingViewContext,
};
use crate::config::LocalfileStoreConfig;
use crate::error::WorkerError;
Expand Down Expand Up @@ -112,6 +112,10 @@ impl LocalFileStore {
format!("{}", app_id)
}

fn gen_relative_path_for_shuffle(app_id: &str, shuffle_id: i32) -> String {
format!("{}/{}", app_id, shuffle_id)
}

fn gen_relative_path_for_partition(uid: &PartitionedUId) -> (String, String) {
(
format!(
Expand Down Expand Up @@ -426,32 +430,40 @@ impl Store for LocalFileStore {
todo!()
}

async fn purge(&self, app_id: String) -> Result<()> {
let app_relative_dir_path = LocalFileStore::gen_relative_path_for_app(&app_id);
async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
let shuffle_id_option = ctx.shuffle_id;

let all_partition_ids = self.get_app_all_partitions(&app_id);
if all_partition_ids.is_empty() {
return Ok(());
}
let data_relative_dir_path = match shuffle_id_option {
Some(shuffle_id) => LocalFileStore::gen_relative_path_for_shuffle(&app_id, shuffle_id),
_ => LocalFileStore::gen_relative_path_for_app(&app_id),
};

for local_disk_ref in &self.local_disks {
let disk = local_disk_ref.clone();
disk.delete(app_relative_dir_path.to_string()).await?;
disk.delete(data_relative_dir_path.to_string()).await?;
}

for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
// delete lock
let uid = PartitionedUId {
app_id: app_id.clone(),
shuffle_id,
partition_id,
};
let (data_file_path, _) = LocalFileStore::gen_relative_path_for_partition(&uid);
self.partition_file_locks.remove(&data_file_path);
}
if shuffle_id_option.is_none() {
let all_partition_ids = self.get_app_all_partitions(&app_id);
if all_partition_ids.is_empty() {
return Ok(());
}

for (shuffle_id, partition_id) in all_partition_ids.into_iter() {
// delete lock
let uid = PartitionedUId {
app_id: app_id.clone(),
shuffle_id,
partition_id,
};
let (data_file_path, _) = LocalFileStore::gen_relative_path_for_partition(&uid);
self.partition_file_locks.remove(&data_file_path);
}

// delete disk mapping
self.delete_app(&app_id)?;
// delete disk mapping
self.delete_app(&app_id)?;
}

Ok(())
}
Expand Down Expand Up @@ -739,8 +751,8 @@ impl LocalDisk {
#[cfg(test)]
mod test {
use crate::app::{
PartitionedUId, ReadingIndexViewContext, ReadingOptions, ReadingViewContext,
WritingViewContext,
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions,
ReadingViewContext, WritingViewContext,
};
use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore};

Expand Down Expand Up @@ -805,11 +817,29 @@ mod test {
&temp_path, &app_id, "0", "0"
)))?
);
runtime.wait(local_store.purge(app_id.clone()))?;

// shuffle level purge
runtime
.wait(local_store.purge(PurgeDataContext::new(app_id.to_string(), Some(0))))
.expect("");
assert_eq!(
false,
runtime.wait(tokio::fs::try_exists(format!(
"{}/{}/{}",
&temp_path, &app_id, 0
)))?
);

// app level purge
runtime.wait(local_store.purge((&*app_id).into()))?;
assert_eq!(
false,
runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path, &app_id)))?
);
assert!(!local_store
.partition_file_locks
.contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0, 0)));
assert!(!local_store.partition_written_disk_map.contains_key(&app_id));

Ok(())
}
Expand Down
Loading

0 comments on commit acc91d0

Please sign in to comment.