diff --git a/fraos/README.md b/fraos/README.md index c040649..b7cc158 100644 --- a/fraos/README.md +++ b/fraos/README.md @@ -2,7 +2,8 @@ Guarantees: * thread-safe -* changes are dumped to the disk at the moment of insertion +* changes are dumped to the disk at the moment of insertion or the insertion is failed +* the records are stored / iterated in the order of insertion This work was impressed by the original works of data-pile by Eugene Babichenko. Currently maintained by Eugene Gostkin and dcSpark team. @@ -14,18 +15,112 @@ This work was impressed by the original works of data-pile by Eugene Babichenko. * The storage should have a minimal dependency footprint. * Thread-safety -## Usage guide - -### Example +## Example ```rust -use data_pile::Database; -let db = Database::file("./pile").unwrap(); +let storage = Database::file(path).unwrap()?; let value = b"some data"; db.put(&value).unwrap(); ``` -### Notes +## How it works + +### Field schema +```rust +pub struct Database { + flatfile: Arc, + seqno_index: Arc, + write_lock: Arc>, +} +``` + +Field roles: +* `flatfile` - the **raw data file**, where the bytes are stored sequentially +* `seqno_index` - sequentially stored pairs `(offset, length)` that point to records stored in **raw data file** + * can be accessed by the `sequential index` (the right offset is `2 * size_of::() * n`) +* `write_lock` - handles concurrency + +### Memory allocation + +Both `flatfile` and `seqno_index` use `Appender` concept inside: +```rust +pub(crate) struct Appender { + // This is used to trick the compiler so that we have parallel reads and + // writes. + mmap: UnsafeCell, + // Atomic is used to ensure that we can have lock-free and memory-safe + // reads. Since this value is updated only after the write has finished it + // is safe to use it as the upper boundary for reads. + actual_size: AtomicUsize, +} + +impl Appender { + /// Open a flatfile. + /// + /// # Arguments + /// + /// * `path` - the path to the file. It will be created if not exists. + /// * `writable` - flag that indicates whether the storage is read-only + pub fn new( + path: Option, + existing_length: Option, + writable: bool, + ) -> Result { ... } + + /// Append data to the file. The mutable pointer to the new data location is + /// given to `f` which should write the data. This function will block if + /// another write is in progress. + pub fn append(&self, size_inc: usize, f: F) -> Result<(), FraosError> + where + F: Fn(&mut [u8]) -> Result<(), FraosError>, + { ... } + + /// The whole data buffer is given to `f` which should return the data back + /// or return None if something went wrong. + pub fn get_data(&self, offset: usize, f: F) -> Result, FraosError> + where + F: Fn(&[u8]) -> Result, FraosError>, + { ... } + + pub fn memory_size(&self) -> usize { ... } + + pub fn shrink_to_size(&self) -> Result<(), FraosError> { ... } +} + +``` + +```rust +pub(crate) struct GrowableMmap { + storage: RwLock, + file: Option, +} + +struct Storage { + inactive_mmaps: InactiveMmaps, + active_map: Option, +} +``` + +`GrowableMmap` has an active mutable mmap tail (`active_map`) and inactive prefix (`inactive_mmaps`). +* If we have enough space we add records to the active mmap +* If we don't have enough space: + * we slice the active mmap to the actual end of writes + * put it to inactive mmaps + * create a new mmap either of size of the data or `MIN_MMAP_BYTES` +* If `inactive_mmaps` has more than `MAX_MMAPS_COUNT` mmaps we remap the existing data and create a single mmap for that data + * This is needed, since on UNIX-like systems there's a limit on how much mmaps a process can have at a time. If the limit is exceeded the storage will stop working + +When the data is being appended: +* We try check if `GrowableMmap` of `flatfile` has an active section. + * If free space in active section is enough, then the data is written into the free section and dumped to disk + * If free space is not enough the current active mmap is cut, added to list of inactive mmaps and new chunk is allocated. The data is written to the allocated section and dumped to disk. If the record is too big the active mmap size is equal to the record size +* Same applies to `GrowableMmap` of `seqno_index`: + * We append the pair of `offset` and `length` to the active section, so the index know where to search the data in `flatfile` + +Reload note: -Values are accessible only by their sequential numbers. You will need an -external index if you want any other kind of keys. +* If the storage is reloaded without proper `drop` it might be the case when the end of storage is filled with zeros. So the actual amount of stored data is less than amount of allocated memory. This way: + * for `flatfile` this is no problem: we never go there if we don't have a link from `seqno_index` + * for `seqno_index` this is a problem: we need to identify where is the actual end of the data: + * you can see that `(offset, length)` pairs have a special structure: `offset` is monotonically increasing sequence, `length` is always non-zero + * when we reload the `seqno_index` and see that it is not empty, but has zeros in the end we use binary search to find the actual storage end and reload the storage knowing the size already diff --git a/fraos/src/database.rs b/fraos/src/database.rs index b4dc576..1860907 100644 --- a/fraos/src/database.rs +++ b/fraos/src/database.rs @@ -95,8 +95,8 @@ impl Database { offset += record.len(); } - let seqno = self.seqno_index.append(&seqno_index_update)?; self.flatfile.append(records)?; + let seqno = self.seqno_index.append(&seqno_index_update)?; Ok(seqno) } diff --git a/fraos/src/growable_mmap.rs b/fraos/src/growable_mmap.rs index f1f4998..39bfcb3 100644 --- a/fraos/src/growable_mmap.rs +++ b/fraos/src/growable_mmap.rs @@ -29,11 +29,10 @@ struct Storage { active_map: Option, } -/// the struct has an active mutable mmap and inactive tail +/// the struct has an active mutable mmap (tail) and inactive prefix /// if we have enough space we add records to the active mmap /// if not we slice the active mmap to the actual end of writes and put it to inactive mmaps -/// then we create a new mmap with 2x size from previous -/// if 2x is not enough we create an mmap with size of the data +/// then we create a new mmap either of size of the data or MIN_MMAP_BYTES /// pub(crate) struct GrowableMmap { storage: RwLock, @@ -57,8 +56,9 @@ impl GrowableMmap { } } - if file_length > 0 { - let upper_cap = existing_length.unwrap_or(file_length); + let upper_cap = existing_length.unwrap_or(file_length); + + if upper_cap > 0 { let mmap = SharedMmap::new( unsafe { MmapOptions::new().offset(0).len(upper_cap).map(file) } .map_err(|err| FraosError::MmapError(MmapError::Mmap(err)))?, @@ -327,6 +327,7 @@ impl GrowableMmap { fn create_mmap(&self, new_mmap_size: usize, offset: usize) -> Result { if let Some(file) = &self.file { + // that fills the file with zeros file.set_len((offset + new_mmap_size) as u64) .map_err(|err| FraosError::FileError(FileError::Extend(err)))?; unsafe { diff --git a/fraos/src/seqno.rs b/fraos/src/seqno.rs index f5a6343..4898eae 100644 --- a/fraos/src/seqno.rs +++ b/fraos/src/seqno.rs @@ -17,7 +17,21 @@ impl SeqNoIndex { /// /// * `path` - the path to the file. It will be created if not exists. pub fn new(path: Option, writable: bool) -> Result { - Appender::new(path, None, writable).map(|inner| Self { inner }) + let mut appender = + Appender::new(path.clone(), None, writable).map(|inner| Self { inner })?; + let (_, last_len) = match appender.last()? { + None => return Ok(appender), + Some(some) => some, + }; + + if last_len == 0 { + // the storage wasn't shrink to fit and we need to find where the index ends + let actual_len = appender.find_actual_end()?; + appender = Appender::new(path, Some(2 * Self::SIZE_OF_USIZE * actual_len), writable) + .map(|inner| Self { inner })?; + } + + Ok(appender) } /// Add records to index. This function will block if another write is still @@ -54,6 +68,21 @@ impl SeqNoIndex { Ok(Some(current_seqno)) } + pub fn get_length_at(&self, at: usize) -> Result { + Ok(self + .get_offset_and_length(at)? + .ok_or(FraosError::IndexFileDamaged)? + .1) + } + + #[allow(unused)] + pub fn get_offset_at(&self, at: usize) -> Result { + Ok(self + .get_offset_and_length(at)? + .ok_or(FraosError::IndexFileDamaged)? + .0) + } + /// Get the location of a record with the given number. pub fn get_offset_and_length( &self, @@ -114,12 +143,53 @@ impl SeqNoIndex { pub(crate) fn mmaps_count(&self) -> Result { self.inner.mmaps_count() } + + // The seqno index contains pairs (offset, length), offsets grow monotonically, lengths are always non-zero + // If the storage is still open or the storage wasn't shrink to fit properly while dropped it might have tailing zeros + // This way to find out what is the actual storage size and what is indexed we need to find the actual end if seqno is not empty. + // We utilize binary search to find last non zero value par. This is the actual end + pub(crate) fn find_actual_end(&self) -> Result { + let mut start = 0; + let len = self.len(); + let mut end = self.len(); + + // empty index was created or index is empty + if self.get_length_at(start)? == 0 || end == 0 { + return Ok(0); + } + + // all elements are non-zero + if self.get_length_at(end.saturating_sub(1))? != 0 { + return Ok(end); + } + + // if index is empty we checked already + while start < len.saturating_sub(1) { + // we checked before that we have at least one zero and it is ok to access start + 1 + if self.get_length_at(start + 1)? == 0 { + return Ok(start + 1); + } + let mid = (start + end) / 2; + if self.get_length_at(mid)? == 0 { + end = mid; + } else { + start = mid; + } + } + + Err(FraosError::IndexFileDamaged) + } } #[cfg(test)] mod tests { use super::SeqNoIndex; - use crate::FraosError; + + use crate::{FileError, FraosError, MmapError}; + use memmap2::{MmapMut, MmapOptions}; + use std::fs::{File, OpenOptions}; + use std::mem::size_of; + use std::path::PathBuf; #[quickcheck] fn test_read_write(records: Vec<(usize, usize)>) { @@ -186,4 +256,113 @@ mod tests { FraosError::EmptyRecordAppended )); } + + fn get_file(path: PathBuf, writable: bool) -> Result { + let mut options = OpenOptions::new(); + options.read(true); + if writable { + options.write(true).create(true); + }; + + options + .open(&path) + .map_err(|err| FraosError::FileError(FileError::FileOpen(path.clone(), err))) + } + + fn allocate_mmap(file: &File, size: usize) -> Result { + // that fills the file with zeros + file.set_len(size as u64) + .map_err(|err| FraosError::FileError(FileError::Extend(err)))?; + unsafe { MmapOptions::new().len(size).offset(0u64).map_mut(file) } + .map_err(|err| FraosError::MmapError(MmapError::Mmap(err))) + } + + #[test] + fn check_index_recovery_zero_length() { + for i in 0..20 { + let tmp = tempfile::NamedTempFile::new().unwrap(); + + let file = get_file(tmp.path().to_path_buf(), true).unwrap(); + + if i != 0 { + let mmap = allocate_mmap(&file, size_of::() * i).unwrap(); + mmap.flush().unwrap(); + } + + let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true); + assert!(index.is_ok(), "can't create seqno index with {} usizes", i); + let index = index.unwrap(); + assert!(index.is_empty()); + + index.append(&[(5000, 5), (5005, 6)]).unwrap(); + drop(index); + + let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true); + assert!( + index.is_ok(), + "can't create seqno index with {} usizes after append", + i, + ); + let index = index.unwrap(); + assert_eq!( + index.len(), + 2, + "seqno index should have len 2 after append at {}", + i + ); + } + } + + #[test] + fn check_index_recovery_non_zero_length() { + for (non_zeros, zeros) in [(2, 0), (100, 0), (2, 1), (2, 5), (2, 10), (258, 400)] { + let tmp = tempfile::NamedTempFile::new().unwrap(); + + let file = get_file(tmp.path().to_path_buf(), true).unwrap(); + + let mut mmap = allocate_mmap(&file, size_of::() * (non_zeros + zeros)).unwrap(); + for i in 0..non_zeros { + mmap.as_mut()[i * size_of::()..(i + 1) * size_of::()] + .copy_from_slice(&i.to_le_bytes()[..]); + } + mmap.flush().unwrap(); + + let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true); + assert!( + index.is_ok(), + "can't create seqno index with {} non zeros and {} zeros", + non_zeros, + zeros + ); + let index = index.unwrap(); + assert_eq!( + index.len(), + non_zeros / 2, + "seqno index with {} non zeros and {} zeros should have len {}", + non_zeros, + zeros, + non_zeros / 2 + ); + + index.append(&[(5000, 5), (5005, 6)]).unwrap(); + drop(index); + + let index = SeqNoIndex::new(Some(tmp.path().to_path_buf()), true); + assert!( + index.is_ok(), + "can't create seqno index with {} non zeros and {} zeros after append", + non_zeros, + zeros + ); + let index = index.unwrap(); + assert_eq!( + index.len(), + non_zeros / 2 + 2, + "seqno index with {} non zeros and {} zeros should have len {} after append", + non_zeros, + zeros, + non_zeros / 2 + 2 + ); + } + } } diff --git a/indexed-log-map/README.md b/indexed-log-map/README.md index 71a85f4..f738f98 100644 --- a/indexed-log-map/README.md +++ b/indexed-log-map/README.md @@ -1,5 +1,97 @@ -# Indexed Log Map Storage +# Indexed Log Map Store -Indexed Log Map Storage - storage built on top of fraos library adding advanced indexing to that. +Indexed Log Map Storage - persistent insert-ordered append-only thread-safe KV-store. It's similar to what `BTreeMap` does, but disk-based. + +## Use cases + +The storage is handy in case any insert-ordered data need to be preserved between service restarts. E.g.: +* Blockchain data indexing & backup: if you want to fetch some specific data from the blocks and store related events for future (re)-usage. This way you can e.g. deploy new services with the events backup without the need to read all the blockchain to re-fetch these events, which can save a lot of sync time +* Blockchain backend events fetch & read: the storage works similarly to kafka topic. If you need to make sure no event is lost and the events are handled in order of appearance on chain - the storage is for you. + * You can remember at which position you finished the reads using e.g. `SaveProgressSource` from [blockchain-source-library](../blockchain-source) + +## Example + +```rust +// key / value / path are already assumed as initialized +let config = IndexedLogMapConfig { + storage_path: Some(path), + use_key_indexing: true, + readonly: false, +}; +let storage = IndexedLogMap::::new(config)?; +let storage = Arc::new(storage); + +storage.append(key, value).unwrap(); +assert_eq!(value, storage.get(&key)?.unwrap()); +``` +## How it works + +The storage is based on `sled::db` and `fraos`. +`fraos` - thread-safe fast reindexable append only storage. +See more about this storage and memory structure [here](../fraos/README.md). + +### Field schema +```rust +pub struct IndexedLogMap { + storage: fraos::Database, // raw storage + key_to_seqno: Option, // key mapping + phantom_data: PhantomData<(Key, Value)>, +} +``` + +Field roles: +* `storage` - the raw data + sequential index on top of that +* `key_to_seqno` - mapping from key to sequential index + +### Lookups technique +When you call `storage.get(&key)`: +* storage lookups the `key` -> `sequential index` mapping in `key_to_seqno` +* then the inner `storage` performs lookup in the sequential index file for `sequential index` to get `(offseet, length)` pair +* then the inner `storage` performs lookup in the raw data file for `offset` and tries to read `length` bytes +* in case of successful read the data is deserialized to the proper type and returned + +When you call `storage.iter_from(&key)`: +* storage lookups the `key` -> `sequential index` mapping in `key_to_seqno` +* this `sequential index` is needed to identify from which position to read data in the underlying storage +* then for each record: + * the inner storage sequentially iterates the sequential index file from `sequential index` to get `(offseet, length)` pairs + * then the inner `storage` performs lookup in the raw data file for `offset` and tries to read `length` bytes + * in case of successful read the data is deserialized to the proper type and returned + +### Memory structure: +![Memory structure](docs/assets/indexed_log_map.jpg) + +### Main functionality +Methods: +```rust +impl + IndexedLogMap +{ + pub fn new(config: IndexedLogMapConfig) -> Result { ... } + + pub fn append(&self, key: Key, value: Value) -> Result<()> { ... } + pub fn get(&self, key: &Key) -> Result> { ... } + + pub fn iter_from(&self, key: &Key) -> Result>>> { ... } + pub fn iter(&self) -> Option>> { ... } + + pub fn last(&self) -> Result> { ... } + + pub fn contains(&self, key: &Key) -> Result { ... } + pub fn is_empty(&self) -> Result { ... } + +} +``` + +* `new(config: IndexedLogMapConfig)` - creates the storage based on the config +* `append(&self, key: Key, value: Value)` - append the `key`-`value` pair into the storage (so you can `index_from` it after) + * if you append already existing `key` the old `key` value is still kept in the storage history. So if you do: + * `storage.get(&key)` - new value will be returned + * `storage.iter()` - both values will be seen in the order of insertion +* `get(&self, key: &Key)` - get the value by `key` or `None` +* `iter_from(&self, key: &Key)` - iterate the storage in order of insertion from position `key` +* `iter(&self)` - iterate the storage in order of insertion from the beginning +* `last(&self)` - return last element in the storage (if exists) or `None` +* `contains(&self, key: &Key)` - check if the storage contains `key` +* `is_empty(&self)` - check if the storage is empty -It offers continuous write at the top of the storage while allowing for concurrent reads. diff --git a/indexed-log-map/docs/assets/indexed_log_map.jpg b/indexed-log-map/docs/assets/indexed_log_map.jpg new file mode 100644 index 0000000..369387c Binary files /dev/null and b/indexed-log-map/docs/assets/indexed_log_map.jpg differ