Skip to content

Commit

Permalink
fix potential deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Jan 5, 2024
1 parent ae03258 commit 92e1fef
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 31 deletions.
3 changes: 2 additions & 1 deletion rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,9 @@ mod tests {
let reading_view_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
serialized_expected_task_ids_bitmap: Default::default(),
serialized_expected_task_ids_bitmap: None,
};
println!("reading. offset: {:?}. len: {:?}", offset, length);
let read_data = store.get(reading_view_ctx).await.unwrap();
match read_data {
ResponseData::Local(local_data) => {
Expand Down
64 changes: 50 additions & 14 deletions rust/experimental/server/src/store/local/disk.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


use crate::runtime::manager::RuntimeManager;
use anyhow::Result;
use await_tree::InstrumentAwait;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use log::{error, info, warn};
use opendal::services::Fs;
use opendal::Operator;
use std::io::SeekFrom;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::sync::Semaphore;

pub struct LocalDiskConfig {
Expand Down Expand Up @@ -167,16 +187,19 @@ impl LocalDisk {
}

pub async fn read(&self, path: &str, offset: i64, length: Option<i64>) -> Result<Bytes> {
let read_len = match length {
Some(len) => len,
_ => self.get_file_len(path).await?,
} as u64;
let data = self
.operator
.read_with(path)
.range(offset as u64..read_len)
.await?;
Ok(Bytes::from(data))
if length.is_none() {
return Ok(Bytes::from(self.operator.read(path).await?));
}

let mut reader = self.operator.reader(path).await?;
reader.seek(SeekFrom::Start(offset as u64)).await?;

let mut buffer = vec![0; length.unwrap() as usize];
reader.read_exact(buffer.as_mut()).await?;

let mut bytes_buffer = BytesMut::new();
bytes_buffer.extend_from_slice(&*buffer);
Ok(bytes_buffer.freeze())
}

pub async fn delete(&self, path: String) -> Result<()> {
Expand Down Expand Up @@ -315,13 +338,26 @@ mod tests {
runtime.wait(local_disk.write(Bytes::copy_from_slice(data), relative_path));
assert!(write_result.is_ok());

let read_result =
runtime.wait(local_disk.read(relative_path, 0, Some(data.len() as i64 * 2)));
// read the first world
let read_result = runtime.wait(local_disk.read(relative_path, 0, Some(data.len() as i64)));
assert!(read_result.is_ok());
let read_data = read_result.unwrap();
let expected = b"Hello, World!Hello, World!";
let expected = b"Hello, World!";
assert_eq!(read_data.as_ref(), expected);

// read the middle word
let read_result = runtime.wait(local_disk.read(
relative_path,
data.len() as i64,
Some(data.len() as i64),
));
assert_eq!(read_result.unwrap().as_ref(), expected);

// read all words
let read_result = runtime.wait(local_disk.read(relative_path, 0, None));
let expected = b"Hello, World!Hello, World!";
assert_eq!(read_result.unwrap().as_ref(), expected);

temp_dir.close().unwrap();
}
}
18 changes: 18 additions & 0 deletions rust/experimental/server/src/store/local/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


pub mod disk;
52 changes: 36 additions & 16 deletions rust/experimental/server/src/store/localfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::store::{
LocalDataIndex, PartitionedLocalData, Persistent, RequireBufferResponse, ResponseData,
ResponseDataIndex, Store,
};
use std::ops::Deref;
use std::path::Path;

use anyhow::Result;
Expand All @@ -39,20 +40,30 @@ use dashmap::DashMap;
use log::{debug, error, warn};

use crate::runtime::manager::RuntimeManager;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use crate::store::local::disk::{LocalDisk, LocalDiskConfig};

struct LockedObj {
disk: Arc<LocalDisk>,
pointer: i64,
pointer: AtomicI64,
}

impl From<Arc<LocalDisk>> for LockedObj {
fn from(value: Arc<LocalDisk>) -> Self {
Self {
disk: value.clone(),
pointer: Default::default(),
}
}
}

pub struct LocalFileStore {
local_disks: Vec<Arc<LocalDisk>>,
healthy_check_min_disks: i32,
runtime_manager: RuntimeManager,
partition_locks: DashMap<String, LockedObj>,
partition_locks: DashMap<String, Arc<LockedObj>>,
}

impl Persistent for LocalFileStore {}
Expand Down Expand Up @@ -176,19 +187,17 @@ impl Store for LocalFileStore {
LocalFileStore::gen_relative_path_for_partition(&uid);

let mut parent_dir_is_created = false;
let mut locked_obj = self
let locked_obj = self
.partition_locks
.entry(data_file_path.clone())
.or_insert_with(|| {
parent_dir_is_created = true;
LockedObj {
disk: self.select_disk(&uid).unwrap(),
pointer: 0,
}
});
Arc::new(LockedObj::from(self.select_disk(&uid).unwrap()))
})
.clone();

let local_disk = &locked_obj.disk;
let mut next_offset = locked_obj.pointer;
let mut next_offset = locked_obj.pointer.load(Ordering::SeqCst);

if local_disk.is_corrupted()? {
return Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string()));
Expand Down Expand Up @@ -240,7 +249,10 @@ impl Store for LocalFileStore {

TOTAL_LOCALFILE_USED.inc_by(total_size as u64);

locked_obj.pointer += total_size as i64;
locked_obj
.deref()
.pointer
.store(next_offset, Ordering::SeqCst);

Ok(())
}
Expand All @@ -261,8 +273,7 @@ impl Store for LocalFileStore {

let (data_file_path, _) = LocalFileStore::gen_relative_path_for_partition(&uid);

let locked_object = self.partition_locks.get_mut(&data_file_path);
if locked_object.is_none() {
if !self.partition_locks.contains_key(&data_file_path) {
warn!(
"There is no cached data in localfile store for [{:?}]",
&uid
Expand All @@ -272,7 +283,12 @@ impl Store for LocalFileStore {
}));
}

let locked_object = locked_object.unwrap();
let locked_object = self
.partition_locks
.entry(data_file_path.clone())
.or_insert_with(|| Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
.clone();

let local_disk = &locked_object.disk;

if local_disk.is_corrupted()? {
Expand All @@ -299,8 +315,7 @@ impl Store for LocalFileStore {
let (data_file_path, index_file_path) =
LocalFileStore::gen_relative_path_for_partition(&uid);

let locked_object = self.partition_locks.get(&data_file_path);
if locked_object.is_none() {
if !self.partition_locks.contains_key(&data_file_path) {
warn!(
"There is no cached data in localfile store for [{:?}]",
&uid
Expand All @@ -311,7 +326,12 @@ impl Store for LocalFileStore {
}));
}

let locked_object = locked_object.unwrap();
let locked_object = self
.partition_locks
.entry(data_file_path.clone())
.or_insert_with(|| Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())))
.clone();

let local_disk = &locked_object.disk;
if local_disk.is_corrupted()? {
return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED(
Expand Down

0 comments on commit 92e1fef

Please sign in to comment.