diff --git a/rust/experimental/server/Cargo.lock b/rust/experimental/server/Cargo.lock index 796f2ffc7e..de1cc8edba 100644 --- a/rust/experimental/server/Cargo.lock +++ b/rust/experimental/server/Cargo.lock @@ -127,6 +127,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "awaitility" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46ee60785cbb3a23bde2c462098564a6337432b9fa032a62bb7ddb5e734c135d" + [[package]] name = "axum" version = "0.6.20" @@ -3180,6 +3186,7 @@ dependencies = [ "async-channel", "async-trait", "await-tree", + "awaitility", "bytes 1.5.0", "cap", "clap", diff --git a/rust/experimental/server/Cargo.toml b/rust/experimental/server/Cargo.toml index e5b635347d..2db63dd5d7 100644 --- a/rust/experimental/server/Cargo.toml +++ b/rust/experimental/server/Cargo.toml @@ -123,6 +123,7 @@ prost-build = "0.11.9" [dev-dependencies] env_logger = "0.10.0" +awaitility = "0.3.1" [profile.dev] # re-enable debug assertions when pprof-rs fixed the reports for misaligned pointer dereferences diff --git a/rust/experimental/server/src/app.rs b/rust/experimental/server/src/app.rs index c6f789be91..9298fb6e3f 100644 --- a/rust/experimental/server/src/app.rs +++ b/rust/experimental/server/src/app.rs @@ -233,15 +233,6 @@ impl App { } } - pub fn is_buffer_ticket_exist(&self, ticket_id: i64) -> bool { - self.store - .is_buffer_ticket_exist(self.app_id.as_str(), ticket_id) - } - - pub fn discard_tickets(&self, ticket_id: i64) -> i64 { - self.store.discard_tickets(self.app_id.as_str(), ticket_id) - } - pub async fn free_allocated_memory_size(&self, size: i64) -> Result { self.store.free_hot_store_allocated_memory_size(size).await } @@ -258,6 +249,12 @@ impl App { self.store.require_buffer(ctx).await } + pub async fn release_buffer(&self, ticket_id: i64) -> Result { + self.store + .release_buffer(ReleaseBufferContext::from(ticket_id)) + .await + } + fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) -> PartitionedMeta { let shuffle_id = uid.shuffle_id; let partition_id = uid.partition_id; @@ -344,6 +341,17 @@ pub struct RequireBufferContext { pub size: i64, } +#[derive(Debug, Clone)] +pub struct ReleaseBufferContext { + pub(crate) ticket_id: i64, +} + +impl From for ReleaseBufferContext { + fn from(value: i64) -> Self { + Self { ticket_id: value } + } +} + impl RequireBufferContext { pub fn new(uid: PartitionedUId, size: i64) -> Self { Self { uid, size } diff --git a/rust/experimental/server/src/error.rs b/rust/experimental/server/src/error.rs index 0f411af0ce..cc07536a50 100644 --- a/rust/experimental/server/src/error.rs +++ b/rust/experimental/server/src/error.rs @@ -48,6 +48,9 @@ pub enum WorkerError { #[error("Http request failed. {0}")] HTTP_SERVICE_ERROR(String), + + #[error("Ticket id: {0} not exist")] + TICKET_ID_NOT_EXIST(i64), } impl From for WorkerError { diff --git a/rust/experimental/server/src/grpc.rs b/rust/experimental/server/src/grpc.rs index 6ad8f470e7..7c784c2456 100644 --- a/rust/experimental/server/src/grpc.rs +++ b/rust/experimental/server/src/grpc.rs @@ -149,14 +149,15 @@ impl ShuffleServer for DefaultShuffleServer { let app = app_option.unwrap(); - if !app.is_buffer_ticket_exist(ticket_id) { + let release_result = app.release_buffer(ticket_id).await; + if release_result.is_err() { return Ok(Response::new(SendShuffleDataResponse { status: StatusCode::NO_BUFFER.into(), ret_msg: "No such buffer ticket id, it may be discarded due to timeout".to_string(), })); } - let ticket_required_size = app.discard_tickets(ticket_id); + let ticket_required_size = release_result.unwrap(); let mut blocks_map = HashMap::new(); for shuffle_data in req.shuffle_data { diff --git a/rust/experimental/server/src/store/hdfs.rs b/rust/experimental/server/src/store/hdfs.rs index d99f667993..c36af4732b 100644 --- a/rust/experimental/server/src/store/hdfs.rs +++ b/rust/experimental/server/src/store/hdfs.rs @@ -17,7 +17,7 @@ use crate::app::{ PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, - RequireBufferContext, WritingViewContext, + ReleaseBufferContext, RequireBufferContext, WritingViewContext, }; use crate::config::HdfsStoreConfig; use crate::error::WorkerError; @@ -246,6 +246,10 @@ impl Store for HdfsStore { todo!() } + async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result { + todo!() + } + async fn purge(&self, ctx: PurgeDataContext) -> Result<()> { let app_id = ctx.app_id; let app_dir = self.get_app_dir(app_id.as_str()); diff --git a/rust/experimental/server/src/store/hybrid.rs b/rust/experimental/server/src/store/hybrid.rs index 8b1eb17123..6824cbccb9 100644 --- a/rust/experimental/server/src/store/hybrid.rs +++ b/rust/experimental/server/src/store/hybrid.rs @@ -17,7 +17,7 @@ use crate::app::{ PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, ReadingViewContext, - RequireBufferContext, WritingViewContext, + ReleaseBufferContext, RequireBufferContext, WritingViewContext, }; use crate::await_tree::AWAIT_TREE_REGISTRY; @@ -261,15 +261,6 @@ impl HybridStore { Ok(message) } - // For app to check the ticket allocated existence and discard if it has been used - pub fn is_buffer_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool { - self.hot_store.is_ticket_exist(app_id, ticket_id) - } - - pub fn discard_tickets(&self, app_id: &str, ticket_id: i64) -> i64 { - self.hot_store.discard_tickets(app_id, Some(ticket_id)) - } - pub async fn free_hot_store_allocated_memory_size(&self, size: i64) -> Result { self.hot_store.free_allocated(size).await } @@ -461,6 +452,10 @@ impl Store for HybridStore { .await } + async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result { + self.hot_store.release_buffer(ctx).await + } + async fn purge(&self, ctx: PurgeDataContext) -> Result<()> { let app_id = &ctx.app_id; self.hot_store.purge(ctx.clone()).await?; diff --git a/rust/experimental/server/src/store/localfile.rs b/rust/experimental/server/src/store/localfile.rs index a569333666..e527d8cdb1 100644 --- a/rust/experimental/server/src/store/localfile.rs +++ b/rust/experimental/server/src/store/localfile.rs @@ -18,7 +18,7 @@ use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN; use crate::app::{ PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, - RequireBufferContext, WritingViewContext, + ReleaseBufferContext, RequireBufferContext, WritingViewContext, }; use crate::config::LocalfileStoreConfig; use crate::error::WorkerError; @@ -430,6 +430,10 @@ impl Store for LocalFileStore { todo!() } + async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result { + todo!() + } + async fn purge(&self, ctx: PurgeDataContext) -> Result<()> { let app_id = ctx.app_id; let shuffle_id_option = ctx.shuffle_id; diff --git a/rust/experimental/server/src/store/mem/mod.rs b/rust/experimental/server/src/store/mem/mod.rs index cae795981b..84061067d9 100644 --- a/rust/experimental/server/src/store/mem/mod.rs +++ b/rust/experimental/server/src/store/mem/mod.rs @@ -15,32 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub use await_tree::InstrumentAwait; - -pub struct MemoryBufferTicket { - id: i64, - created_time: u64, - size: i64, -} - -impl MemoryBufferTicket { - pub fn new(id: i64, created_time: u64, size: i64) -> Self { - Self { - id, - created_time, - size, - } - } +pub mod ticket; - pub fn get_size(&self) -> i64 { - self.size - } - - pub fn is_timeout(&self, timeout_sec: i64) -> bool { - crate::util::current_timestamp_sec() - self.created_time > timeout_sec as u64 - } - - pub fn get_id(&self) -> i64 { - self.id - } -} +pub use await_tree::InstrumentAwait; diff --git a/rust/experimental/server/src/store/mem/ticket.rs b/rust/experimental/server/src/store/mem/ticket.rs new file mode 100644 index 0000000000..eaad3796e0 --- /dev/null +++ b/rust/experimental/server/src/store/mem/ticket.rs @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::WorkerError; +use crate::runtime::manager::RuntimeManager; +use anyhow::Result; +use dashmap::DashMap; +use log::debug; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +#[derive(Clone)] +pub struct Ticket { + id: i64, + created_time: u64, + size: i64, + owned_by_app_id: String, +} + +impl Ticket { + pub fn new(ticket_id: i64, created_time: u64, size: i64, app_id: &str) -> Self { + Self { + id: ticket_id, + created_time, + size, + owned_by_app_id: app_id.into(), + } + } + + pub fn get_size(&self) -> i64 { + self.size + } + + pub fn is_timeout(&self, timeout_sec: i64) -> bool { + crate::util::current_timestamp_sec() - self.created_time > timeout_sec as u64 + } + + pub fn get_id(&self) -> i64 { + self.id + } +} + +#[derive(Clone)] +pub struct TicketManager { + // key: ticket_id + ticket_store: Arc>, + + ticket_timeout_sec: i64, + ticket_timeout_check_interval_sec: i64, +} + +impl TicketManager { + pub fn new bool + Send + 'static>( + ticket_timeout_sec: i64, + ticket_timeout_check_interval_sec: i64, + free_allocated_size_func: F, + runtime_manager: RuntimeManager, + ) -> Self { + let manager = Self { + ticket_store: Default::default(), + ticket_timeout_sec, + ticket_timeout_check_interval_sec, + }; + Self::schedule_ticket_check(manager.clone(), free_allocated_size_func, runtime_manager); + manager + } + + /// check the ticket existence + pub fn exist(&self, ticket_id: i64) -> bool { + self.ticket_store.contains_key(&ticket_id) + } + + /// Delete one ticket by its id, and it will return the allocated size for this ticket + pub fn delete(&self, ticket_id: i64) -> Result { + if let Some(entry) = self.ticket_store.remove(&ticket_id) { + Ok(entry.1.size) + } else { + Err(WorkerError::TICKET_ID_NOT_EXIST(ticket_id)) + } + } + + /// Delete all the ticket owned by the app id. And + /// it will return all the allocated size of ticket ids that owned by this app_id + pub fn delete_by_app_id(&self, app_id: &str) -> i64 { + let read_view = self.ticket_store.clone(); + let mut deleted_ids = vec![]; + for ticket in read_view.iter() { + if ticket.owned_by_app_id == *app_id { + deleted_ids.push(ticket.id); + } + } + + let mut size = 0i64; + for deleted_id in deleted_ids { + size += self + .ticket_store + .remove(&deleted_id) + .map_or(0, |val| val.1.size); + } + size + } + + /// insert one ticket managed by this ticket manager + pub fn insert(&self, ticket_id: i64, size: i64, created_timestamp: u64, app_id: &str) -> bool { + let ticket = Ticket { + id: ticket_id, + created_time: created_timestamp, + size, + owned_by_app_id: app_id.into(), + }; + + self.ticket_store + .insert(ticket_id, ticket) + .map_or(false, |_| true) + } + + fn schedule_ticket_check bool + Send + 'static>( + ticket_manager: TicketManager, + mut free_allocated_fn: F, + _runtime_manager: RuntimeManager, + ) { + thread::spawn(move || { + let ticket_store = ticket_manager.ticket_store; + loop { + let read_view = ticket_store.clone(); + let mut timeout_tickets = vec![]; + for ticket in read_view.iter() { + if ticket.is_timeout(ticket_manager.ticket_timeout_sec) { + timeout_tickets.push(ticket.id); + } + } + + let mut total_removed_size = 0i64; + for timeout_ticket_id in timeout_tickets.iter() { + total_removed_size += ticket_store + .remove(timeout_ticket_id) + .map_or(0, |val| val.1.size); + } + if total_removed_size != 0 { + free_allocated_fn(total_removed_size); + debug!("remove {:#?} memory allocated tickets, release pre-allocated memory size: {:?}", timeout_tickets, total_removed_size); + } + thread::sleep(Duration::from_secs( + ticket_manager.ticket_timeout_check_interval_sec as u64, + )); + } + }); + } +} + +#[cfg(test)] +mod test { + use crate::runtime::manager::RuntimeManager; + use crate::store::mem::ticket::TicketManager; + use dashmap::DashMap; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::thread::JoinHandle; + use std::time::Duration; + + #[test] + fn test_closure() { + let state = Arc::new(DashMap::new()); + state.insert(1, 1); + + fn schedule(mut callback: impl FnMut(i64) -> i64 + Send + 'static) -> JoinHandle { + thread::spawn(move || callback(2)) + } + + let state_cloned = state.clone(); + let callback = move |a: i64| { + state_cloned.insert(a, a); + a + 1 + }; + schedule(callback).join().expect(""); + + assert!(state.contains_key(&2)); + } + + #[test] + fn test_ticket_manager() { + let released_size = Arc::new(Mutex::new(0)); + + let release_size_cloned = released_size.clone(); + let free_allocated_size_func = move |size: i64| { + *(release_size_cloned.lock().unwrap()) += size; + true + }; + let ticket_manager = + TicketManager::new(1, 1, free_allocated_size_func, RuntimeManager::default()); + let app_id = "test_ticket_manager_app_id"; + + assert!(ticket_manager.delete(1000).is_err()); + + // case1 + ticket_manager.insert(1, 10, crate::util::current_timestamp_sec() + 1, app_id); + ticket_manager.insert(2, 10, crate::util::current_timestamp_sec() + 1, app_id); + assert!(ticket_manager.exist(1)); + assert!(ticket_manager.exist(2)); + + // case2 + ticket_manager.delete(1).expect(""); + assert!(!ticket_manager.exist(1)); + assert!(ticket_manager.exist(2)); + + // case3 + ticket_manager.delete_by_app_id(app_id); + assert!(!ticket_manager.exist(2)); + + // case4 + ticket_manager.insert(3, 10, crate::util::current_timestamp_sec() + 1, app_id); + assert!(ticket_manager.exist(3)); + awaitility::at_most(Duration::from_secs(5)).until(|| !ticket_manager.exist(3)); + assert_eq!(10, *released_size.lock().unwrap()); + } +} diff --git a/rust/experimental/server/src/store/memory.rs b/rust/experimental/server/src/store/memory.rs index 93a7184dc9..c2756f1d6c 100644 --- a/rust/experimental/server/src/store/memory.rs +++ b/rust/experimental/server/src/store/memory.rs @@ -18,7 +18,7 @@ use crate::app::ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE; use crate::app::{ PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, - RequireBufferContext, WritingViewContext, + ReleaseBufferContext, RequireBufferContext, WritingViewContext, }; use crate::config::MemoryStoreConfig; use crate::error::WorkerError; @@ -39,27 +39,22 @@ use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; +use crate::store::mem::ticket::TicketManager; use crate::store::mem::InstrumentAwait; -use crate::store::mem::MemoryBufferTicket; use croaring::Treemap; -use log::error; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Duration; use tokio::sync::Mutex; -use tokio::time::sleep as delay_for; pub struct MemoryStore { // todo: change to RW lock state: DashMap>>, budget: MemoryBudget, // key: app_id, value: allocated memory size - memory_allocated_of_app: DashMap>, memory_capacity: i64, - buffer_ticket_timeout_sec: i64, - buffer_ticket_check_interval_sec: i64, in_flush_buffer_size: AtomicU64, runtime_manager: RuntimeManager, + ticket_manager: TicketManager, } unsafe impl Send for MemoryStore {} @@ -68,45 +63,59 @@ unsafe impl Sync for MemoryStore {} impl MemoryStore { // only for test cases pub fn new(max_memory_size: i64) -> Self { + let budget = MemoryBudget::new(max_memory_size); + let runtime_manager: RuntimeManager = Default::default(); + + let budget_clone = budget.clone(); + let free_allocated_size_func = + move |size: i64| budget_clone.free_allocated(size).map_or(false, |v| v); + let ticket_manager = TicketManager::new( + 5 * 60, + 10, + free_allocated_size_func, + runtime_manager.clone(), + ); MemoryStore { + budget, state: DashMap::new(), - budget: MemoryBudget::new(max_memory_size), - memory_allocated_of_app: DashMap::new(), memory_capacity: max_memory_size, - buffer_ticket_timeout_sec: 5 * 60, - buffer_ticket_check_interval_sec: 10, + ticket_manager, in_flush_buffer_size: Default::default(), - runtime_manager: Default::default(), + runtime_manager, } } pub fn from(conf: MemoryStoreConfig, runtime_manager: RuntimeManager) -> Self { let capacity = ReadableSize::from_str(&conf.capacity).unwrap(); + let budget = MemoryBudget::new(capacity.as_bytes() as i64); + + let budget_clone = budget.clone(); + let free_allocated_size_func = + move |size: i64| budget_clone.free_allocated(size).map_or(false, |v| v); + let ticket_manager = TicketManager::new( + 5 * 60, + 10, + free_allocated_size_func, + runtime_manager.clone(), + ); MemoryStore { state: DashMap::new(), budget: MemoryBudget::new(capacity.as_bytes() as i64), - memory_allocated_of_app: DashMap::new(), memory_capacity: capacity.as_bytes() as i64, - buffer_ticket_timeout_sec: conf.buffer_ticket_timeout_sec.unwrap_or(5 * 60), - buffer_ticket_check_interval_sec: 10, + ticket_manager, in_flush_buffer_size: Default::default(), runtime_manager, } } - // only for tests - fn refresh_buffer_ticket_check_interval_sec(&mut self, interval: i64) { - self.buffer_ticket_check_interval_sec = interval - } - // todo: make this used size as a var pub async fn memory_usage_ratio(&self) -> f32 { - let snapshot = self.budget.snapshot().await; + let snapshot = self.budget.snapshot(); snapshot.get_used_percent() } pub async fn memory_snapshot(&self) -> Result { - Ok(self.budget.snapshot().await) + Ok(self.budget.snapshot()) } pub fn get_capacity(&self) -> Result { @@ -114,7 +123,7 @@ impl MemoryStore { } pub async fn memory_used_ratio(&self) -> f32 { - let snapshot = self.budget.snapshot().await; + let snapshot = self.budget.snapshot(); (snapshot.used + snapshot.allocated - self.in_flush_buffer_size.load(Ordering::SeqCst) as i64) as f32 / snapshot.capacity as f32 @@ -129,11 +138,11 @@ impl MemoryStore { } pub async fn free_used(&self, size: i64) -> Result { - self.budget.free_used(size).await + self.budget.free_used(size) } pub async fn free_allocated(&self, size: i64) -> Result { - self.budget.free_allocated(size).await + self.budget.free_allocated(size) } pub async fn get_required_spill_buffer( @@ -143,7 +152,7 @@ impl MemoryStore { // sort // get the spill buffers - let snapshot = self.budget.snapshot().await; + let snapshot = self.budget.snapshot(); let removed_size = snapshot.used - target_len; if removed_size <= 0 { return HashMap::new(); @@ -238,90 +247,12 @@ impl MemoryStore { (fetched, fetched_size) } - - pub(crate) fn is_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool { - self.memory_allocated_of_app - .get(app_id) - .map_or(false, |app_entry| app_entry.contains_key(&ticket_id)) - } - - /// return the discarded memory allocated size - pub(crate) fn discard_tickets(&self, app_id: &str, ticket_id_option: Option) -> i64 { - match ticket_id_option { - None => self - .memory_allocated_of_app - .remove(app_id) - .map_or(0, |app| app.1.iter().map(|x| x.get_size()).sum()), - Some(ticket_id) => self.memory_allocated_of_app.get(app_id).map_or(0, |app| { - app.remove(&ticket_id) - .map_or(0, |ticket| ticket.1.get_size()) - }), - } - } - - fn cache_buffer_required_ticket( - &self, - app_id: &str, - require_buffer: &RequireBufferResponse, - size: i64, - ) { - let app_entry = self - .memory_allocated_of_app - .entry(app_id.to_string()) - .or_insert_with(|| DashMap::default()); - app_entry.insert( - require_buffer.ticket_id, - MemoryBufferTicket::new( - require_buffer.ticket_id, - require_buffer.allocated_timestamp, - size, - ), - ); - } - - async fn check_allocated_tickets(&self) { - // if the ticket is timeout, discard this. - let mut timeout_ids = vec![]; - let iter = self.memory_allocated_of_app.iter(); - for app in iter { - let app_id = &app.key().to_string(); - let app_iter = app.iter(); - for ticket in app_iter { - if ticket.is_timeout(self.buffer_ticket_timeout_sec) { - timeout_ids.push((app_id.to_string(), ticket.key().clone())) - } - } - } - for (app_id, ticket_id) in timeout_ids { - info!( - "Releasing timeout ticket of id:{}, app_id:{}", - ticket_id, app_id - ); - let released = self.discard_tickets(&app_id, Some(ticket_id)); - if let Err(e) = self.budget.free_allocated(released).await { - error!( - "Errors on removing the timeout ticket of id:{}, app_id:{}. error: {:?}", - ticket_id, app_id, e - ); - } - } - } } #[async_trait] impl Store for MemoryStore { fn start(self: Arc) { - // schedule check to find out the timeout allocated buffer ticket - let mem_store = self.clone(); - self.runtime_manager.default_runtime.spawn(async move { - loop { - mem_store.check_allocated_tickets().await; - delay_for(Duration::from_secs( - mem_store.buffer_ticket_check_interval_sec as u64, - )) - .await; - } - }); + // ignore } async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> { @@ -336,10 +267,7 @@ impl Store for MemoryStore { let inserted_size = buffer_guarded.add(blocks)?; drop(buffer_guarded); - self.budget - .allocated_to_used(inserted_size) - .instrument_await("make budget allocated -> used") - .await?; + self.budget.allocated_to_used(inserted_size)?; TOTAL_MEMORY_USED.inc_by(inserted_size as u64); @@ -466,40 +394,10 @@ impl Store for MemoryStore { panic!("It should not be invoked.") } - async fn require_buffer( - &self, - ctx: RequireBufferContext, - ) -> Result { - let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size).await?; - match succeed { - true => { - let require_buffer_resp = RequireBufferResponse::new(ticket_id); - self.cache_buffer_required_ticket( - ctx.uid.app_id.as_str(), - &require_buffer_resp, - ctx.size, - ); - Ok(require_buffer_resp) - } - _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED), - } - } - async fn purge(&self, ctx: PurgeDataContext) -> Result<()> { let app_id = ctx.app_id; let shuffle_id_option = ctx.shuffle_id; - if shuffle_id_option.is_none() { - // free allocated for the whole app - let released_size = self.discard_tickets(app_id.as_str(), None); - self.budget.free_allocated(released_size).await?; - info!( - "free allocated buffer size:[{}] for app:[{}]", - released_size, - app_id.as_str() - ); - } - // remove the corresponding app's data let read_only_state_view = self.state.clone().into_read_only(); let mut _removed_list = vec![]; @@ -526,7 +424,7 @@ impl Store for MemoryStore { } // free used - self.budget.free_used(used).await?; + self.budget.free_used(used)?; info!( "removed used buffer size:[{}] for [{:?}], [{:?}]", @@ -539,6 +437,31 @@ impl Store for MemoryStore { async fn is_healthy(&self) -> Result { Ok(true) } + + async fn require_buffer( + &self, + ctx: RequireBufferContext, + ) -> Result { + let (succeed, ticket_id) = self.budget.pre_allocate(ctx.size)?; + match succeed { + true => { + let require_buffer_resp = RequireBufferResponse::new(ticket_id); + self.ticket_manager.insert( + ticket_id, + ctx.size, + require_buffer_resp.allocated_timestamp, + &ctx.uid.app_id, + ); + Ok(require_buffer_resp) + } + _ => Err(WorkerError::NO_ENOUGH_MEMORY_TO_BE_ALLOCATED), + } + } + + async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result { + let ticket_id = ctx.ticket_id; + self.ticket_manager.delete(ticket_id) + } } /// thread safe, this will be guarded by the lock @@ -678,12 +601,12 @@ impl MemoryBudget { } } - pub async fn snapshot(&self) -> MemorySnapshot { + pub fn snapshot(&self) -> MemorySnapshot { let inner = self.inner.lock().unwrap(); (inner.capacity, inner.allocated, inner.used).into() } - async fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> { + fn pre_allocate(&self, size: i64) -> Result<(bool, i64)> { let mut inner = self.inner.lock().unwrap(); let free_space = inner.capacity - inner.allocated - inner.used; if free_space < size { @@ -697,7 +620,7 @@ impl MemoryBudget { } } - async fn allocated_to_used(&self, size: i64) -> Result { + fn allocated_to_used(&self, size: i64) -> Result { let mut inner = self.inner.lock().unwrap(); if inner.allocated < size { inner.allocated = 0; @@ -710,7 +633,7 @@ impl MemoryBudget { Ok(true) } - async fn free_used(&self, size: i64) -> Result { + fn free_used(&self, size: i64) -> Result { let mut inner = self.inner.lock().unwrap(); if inner.used < size { inner.used = 0; @@ -722,7 +645,7 @@ impl MemoryBudget { Ok(true) } - async fn free_allocated(&self, size: i64) -> Result { + fn free_allocated(&self, size: i64) -> Result { let mut inner = self.inner.lock().unwrap(); if inner.allocated < size { inner.allocated = 0; @@ -749,75 +672,10 @@ mod test { use bytes::BytesMut; use core::panic; use std::sync::Arc; - use std::thread; - use std::time::Duration; - use crate::config::MemoryStoreConfig; - use crate::runtime::manager::RuntimeManager; use anyhow::Result; use croaring::Treemap; - #[test] - fn test_ticket_timeout() -> Result<()> { - let cfg = MemoryStoreConfig::from("2M".to_string(), 1); - let runtime_manager: RuntimeManager = Default::default(); - let mut store = MemoryStore::from(cfg, runtime_manager.clone()); - - store.refresh_buffer_ticket_check_interval_sec(1); - - let store = Arc::new(store); - store.clone().start(); - - let app_id = "mocked-app-id"; - let ctx = RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000); - let resp = runtime_manager.wait(store.require_buffer(ctx.clone()))?; - assert!(store.is_ticket_exist(app_id, resp.ticket_id)); - - let snapshot = runtime_manager.wait(store.budget.snapshot()); - assert_eq!(snapshot.allocated, 1000); - assert_eq!(snapshot.used, 0); - - thread::sleep(Duration::from_secs(5)); - - assert!(!store.is_ticket_exist(app_id, resp.ticket_id)); - - let snapshot = runtime_manager.wait(store.budget.snapshot()); - assert_eq!(snapshot.allocated, 0); - assert_eq!(snapshot.used, 0); - - Ok(()) - } - - #[test] - fn test_memory_buffer_ticket() -> Result<()> { - let store = MemoryStore::new(1024 * 1000); - let runtime = store.runtime_manager.clone(); - - let app_id = "mocked-app-id"; - let ctx = RequireBufferContext::new(PartitionedUId::from(app_id.to_string(), 1, 1), 1000); - let resp = runtime.wait(store.require_buffer(ctx.clone()))?; - let ticket_id_1 = resp.ticket_id; - - let resp = runtime.wait(store.require_buffer(ctx.clone()))?; - let ticket_id_2 = resp.ticket_id; - - assert!(store.is_ticket_exist(app_id, ticket_id_1)); - assert!(store.is_ticket_exist(app_id, ticket_id_2)); - assert!(!store.is_ticket_exist(app_id, 100239)); - - let snapshot = runtime.wait(store.budget.snapshot()); - assert_eq!(snapshot.allocated, 1000 * 2); - assert_eq!(snapshot.used, 0); - - runtime.wait(store.purge(app_id.into()))?; - - let snapshot = runtime.wait(store.budget.snapshot()); - assert_eq!(snapshot.allocated, 0); - assert_eq!(snapshot.used, 0); - - Ok(()) - } - #[test] fn test_read_buffer_in_flight() { let store = MemoryStore::new(1024); @@ -1055,7 +913,6 @@ mod test { } let budget = store.budget.inner.lock().unwrap(); - assert_eq!(0, budget.allocated); assert_eq!(0, budget.used); assert_eq!(1024 * 1024 * 1024, budget.capacity); } @@ -1124,10 +981,8 @@ mod test { weak_ref_before.clone().unwrap().upgrade().is_none(), "Arc should not exist after purge" ); - let snapshot = runtime.wait(store.budget.snapshot()); + let snapshot = store.budget.snapshot(); assert_eq!(snapshot.used, 0); - // the remaining allocated will be removed. - assert_eq!(snapshot.allocated, 0); assert_eq!(snapshot.capacity, 1024); let data = runtime.wait(store.get(reading_ctx.clone())).expect(""); assert_eq!(0, data.from_memory().shuffle_data_block_segments.len()); diff --git a/rust/experimental/server/src/store/mod.rs b/rust/experimental/server/src/store/mod.rs index 4f601303ff..f29c7bf564 100644 --- a/rust/experimental/server/src/store/mod.rs +++ b/rust/experimental/server/src/store/mod.rs @@ -23,8 +23,8 @@ pub mod mem; pub mod memory; use crate::app::{ - PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, RequireBufferContext, - WritingViewContext, + PurgeDataContext, ReadingIndexViewContext, ReadingViewContext, ReleaseBufferContext, + RequireBufferContext, WritingViewContext, }; use crate::config::Config; use crate::error::WorkerError; @@ -167,12 +167,14 @@ pub trait Store { &self, ctx: ReadingIndexViewContext, ) -> Result; + async fn purge(&self, ctx: PurgeDataContext) -> Result<()>; + async fn is_healthy(&self) -> Result; + async fn require_buffer( &self, ctx: RequireBufferContext, ) -> Result; - async fn purge(&self, ctx: PurgeDataContext) -> Result<()>; - async fn is_healthy(&self) -> Result; + async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result; } pub trait Persistent {}