diff --git a/Cargo.toml b/Cargo.toml index e3b97b3..bab01a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kip_db" -version = "0.1.2-alpha.14" +version = "0.1.2-alpha.15" edition = "2021" authors = ["Kould "] description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库" diff --git a/src/kernel/lsm/mvcc.rs b/src/kernel/lsm/mvcc.rs index 9dbe584..dd8a40e 100644 --- a/src/kernel/lsm/mvcc.rs +++ b/src/kernel/lsm/mvcc.rs @@ -19,8 +19,10 @@ use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::Sender; -type MapIter<'a> = - Map, fn((&Bytes, &KeyValue)) -> KeyValue>; +type MapIter<'a> = Map< + skiplist::skipmap::Iter<'a, Bytes, Option>, + fn((&Bytes, &Option)) -> KeyValue, +>; unsafe impl Send for BufPtr {} unsafe impl Sync for BufPtr {} @@ -32,7 +34,7 @@ pub struct Transaction { pub(crate) compactor_tx: Sender, pub(crate) version: Arc, - pub(crate) writer_buf: SkipMap, + pub(crate) writer_buf: SkipMap>, pub(crate) seq_id: i64, } @@ -42,7 +44,7 @@ impl Transaction { /// 此处不需要等待压缩,因为在Transaction存活时不会触发Compaction #[inline] pub fn get(&self, key: &[u8]) -> Result> { - if let Some((_, value)) = self.writer_buf.get(key) { + if let Some(value) = self.writer_buf.get(key) { return Ok(value.clone()); } @@ -58,18 +60,15 @@ impl Transaction { } #[inline] - pub fn set(&mut self, key: &[u8], value: Bytes) { - let bytes = Bytes::copy_from_slice(key); - - let _ignore = self.writer_buf.insert(bytes.clone(), (bytes, Some(value))); + pub fn set(&mut self, key: Bytes, value: Bytes) { + let _ignore = self.writer_buf.insert(key, Some(value)); } #[inline] pub fn remove(&mut self, key: &[u8]) -> Result<()> { let _ = self.get(key)?.ok_or(KernelError::KeyNotFound)?; - let bytes = Bytes::copy_from_slice(key); - let _ignore = self.writer_buf.insert(bytes.clone(), (bytes, None)); + let _ignore = self.writer_buf.insert(bytes, None); Ok(()) } @@ -80,7 +79,7 @@ impl Transaction { let batch_data = self .writer_buf .iter() - .map(|(_, item)| item.clone()) + .map(|(key, value)| (key.clone(), value.clone())) .collect_vec(); if mem_table.insert_batch_data(batch_data, Sequence::create())? { @@ -111,7 +110,7 @@ impl Transaction { min.map(Bytes::copy_from_slice).as_ref(), max.map(Bytes::copy_from_slice).as_ref(), ) - .map(|(_, value)| (value.clone())) + .map(|(key, value)| (key.clone(), value.clone())) } fn mem_table(&self) -> &MemTable {