Skip to content

Commit

Permalink
feat: refactor ticket manager
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 26, 2023
1 parent 57d87be commit 08d6172
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 270 deletions.
7 changes: 7 additions & 0 deletions rust/experimental/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/experimental/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ prost-build = "0.11.9"

[dev-dependencies]
env_logger = "0.10.0"
awaitility = "0.3.1"

[profile.dev]
# re-enable debug assertions when pprof-rs fixed the reports for misaligned pointer dereferences
Expand Down
26 changes: 17 additions & 9 deletions rust/experimental/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,6 @@ impl App {
}
}

pub fn is_buffer_ticket_exist(&self, ticket_id: i64) -> bool {
self.store
.is_buffer_ticket_exist(self.app_id.as_str(), ticket_id)
}

pub fn discard_tickets(&self, ticket_id: i64) -> i64 {
self.store.discard_tickets(self.app_id.as_str(), ticket_id)
}

pub async fn free_allocated_memory_size(&self, size: i64) -> Result<bool> {
self.store.free_hot_store_allocated_memory_size(size).await
}
Expand All @@ -258,6 +249,12 @@ impl App {
self.store.require_buffer(ctx).await
}

pub async fn release_buffer(&self, ticket_id: i64) -> Result<i64, WorkerError> {
self.store
.release_buffer(ReleaseBufferContext::from(ticket_id))
.await
}

fn get_underlying_partition_bitmap(&self, uid: PartitionedUId) -> PartitionedMeta {
let shuffle_id = uid.shuffle_id;
let partition_id = uid.partition_id;
Expand Down Expand Up @@ -344,6 +341,17 @@ pub struct RequireBufferContext {
pub size: i64,
}

#[derive(Debug, Clone)]
pub struct ReleaseBufferContext {
pub(crate) ticket_id: i64,
}

impl From<i64> for ReleaseBufferContext {
fn from(value: i64) -> Self {
Self { ticket_id: value }
}
}

impl RequireBufferContext {
pub fn new(uid: PartitionedUId, size: i64) -> Self {
Self { uid, size }
Expand Down
3 changes: 3 additions & 0 deletions rust/experimental/server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub enum WorkerError {

#[error("Http request failed. {0}")]
HTTP_SERVICE_ERROR(String),

#[error("Ticket id: {0} not exist")]
TICKET_ID_NOT_EXIST(i64),
}

impl From<AcquireError> for WorkerError {
Expand Down
5 changes: 3 additions & 2 deletions rust/experimental/server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,15 @@ impl ShuffleServer for DefaultShuffleServer {

let app = app_option.unwrap();

if !app.is_buffer_ticket_exist(ticket_id) {
let release_result = app.release_buffer(ticket_id).await;
if release_result.is_err() {
return Ok(Response::new(SendShuffleDataResponse {
status: StatusCode::NO_BUFFER.into(),
ret_msg: "No such buffer ticket id, it may be discarded due to timeout".to_string(),
}));
}

let ticket_required_size = app.discard_tickets(ticket_id);
let ticket_required_size = release_result.unwrap();

let mut blocks_map = HashMap::new();
for shuffle_data in req.shuffle_data {
Expand Down
6 changes: 5 additions & 1 deletion rust/experimental/server/src/store/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext, WritingViewContext,
ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::HdfsStoreConfig;
use crate::error::WorkerError;
Expand Down Expand Up @@ -246,6 +246,10 @@ impl Store for HdfsStore {
todo!()
}

async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, WorkerError> {
todo!()
}

async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
let app_dir = self.get_app_dir(app_id.as_str());
Expand Down
15 changes: 5 additions & 10 deletions rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, ReadingViewContext,
RequireBufferContext, WritingViewContext,
ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::await_tree::AWAIT_TREE_REGISTRY;

Expand Down Expand Up @@ -261,15 +261,6 @@ impl HybridStore {
Ok(message)
}

// For app to check the ticket allocated existence and discard if it has been used
pub fn is_buffer_ticket_exist(&self, app_id: &str, ticket_id: i64) -> bool {
self.hot_store.is_ticket_exist(app_id, ticket_id)
}

pub fn discard_tickets(&self, app_id: &str, ticket_id: i64) -> i64 {
self.hot_store.discard_tickets(app_id, Some(ticket_id))
}

pub async fn free_hot_store_allocated_memory_size(&self, size: i64) -> Result<bool> {
self.hot_store.free_allocated(size).await
}
Expand Down Expand Up @@ -461,6 +452,10 @@ impl Store for HybridStore {
.await
}

async fn release_buffer(&self, ctx: ReleaseBufferContext) -> Result<i64, WorkerError> {
self.hot_store.release_buffer(ctx).await
}

async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = &ctx.app_id;
self.hot_store.purge(ctx.clone()).await?;
Expand Down
6 changes: 5 additions & 1 deletion rust/experimental/server/src/store/localfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::app::ReadingOptions::FILE_OFFSET_AND_LEN;
use crate::app::{
PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingViewContext,
RequireBufferContext, WritingViewContext,
ReleaseBufferContext, RequireBufferContext, WritingViewContext,
};
use crate::config::LocalfileStoreConfig;
use crate::error::WorkerError;
Expand Down Expand Up @@ -430,6 +430,10 @@ impl Store for LocalFileStore {
todo!()
}

async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result<i64, WorkerError> {
todo!()
}

async fn purge(&self, ctx: PurgeDataContext) -> Result<()> {
let app_id = ctx.app_id;
let shuffle_id_option = ctx.shuffle_id;
Expand Down
30 changes: 2 additions & 28 deletions rust/experimental/server/src/store/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,6 @@
// specific language governing permissions and limitations
// under the License.

pub use await_tree::InstrumentAwait;

pub struct MemoryBufferTicket {
id: i64,
created_time: u64,
size: i64,
}

impl MemoryBufferTicket {
pub fn new(id: i64, created_time: u64, size: i64) -> Self {
Self {
id,
created_time,
size,
}
}
pub mod ticket;

pub fn get_size(&self) -> i64 {
self.size
}

pub fn is_timeout(&self, timeout_sec: i64) -> bool {
crate::util::current_timestamp_sec() - self.created_time > timeout_sec as u64
}

pub fn get_id(&self) -> i64 {
self.id
}
}
pub use await_tree::InstrumentAwait;
Loading

0 comments on commit 08d6172

Please sign in to comment.