Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
feat: implement BloomFilter to replace external dependencies (#59)
Browse files Browse the repository at this point in the history
* feat: implement `BloomFilter` to replace external dependencies

* code fmt

* code fmt
  • Loading branch information
KKould authored Dec 9, 2023
1 parent c481591 commit 3f5e551
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 11 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ tracing-subscriber = "0.3"
lz4 = "1.23.1"
integer-encoding = "3.0.4"
clap = { version = "4.4.6", features = ["derive"] }
growable-bloom-filter = "2.0.1"
itertools = "0.10.3"
chrono = "0.4.19"
parking_lot = "0.12.1"
Expand Down
38 changes: 33 additions & 5 deletions src/kernel/lsm/table/ss_table/block.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::kernel::lsm::storage::Config;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::{Buf, BufMut, Bytes};
use futures::future;
use growable_bloom_filter::GrowableBloom;
use integer_encoding::{FixedInt, VarIntReader, VarIntWriter};
use itertools::Itertools;
use lz4::Decoder;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::collections::Bound;
use std::io::{Cursor, Read, Write};
Expand Down Expand Up @@ -195,14 +194,43 @@ pub(crate) enum CompressType {
LZ4,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Debug)]
pub(crate) struct MetaBlock {
pub(crate) filter: GrowableBloom,
pub(crate) filter: BloomFilter<[u8]>,
pub(crate) len: usize,
pub(crate) index_restart_interval: usize,
pub(crate) data_restart_interval: usize,
}

impl MetaBlock {
pub(crate) fn to_raw(&self) -> Vec<u8> {
let mut bytes = u32::encode_fixed_vec(self.len as u32);

bytes.append(&mut u32::encode_fixed_vec(
self.index_restart_interval as u32,
));
bytes.append(&mut u32::encode_fixed_vec(
self.data_restart_interval as u32,
));
bytes.append(&mut self.filter.to_raw());
bytes
}

pub(crate) fn from_raw(bytes: &[u8]) -> Self {
let len = u32::decode_fixed(&bytes[0..4]) as usize;
let index_restart_interval = u32::decode_fixed(&bytes[4..8]) as usize;
let data_restart_interval = u32::decode_fixed(&bytes[8..12]) as usize;
let filter = BloomFilter::from_raw(&bytes[12..]);

Self {
filter,
len,
index_restart_interval,
data_restart_interval,
}
}
}

/// Block SSTable最小的存储单位
///
/// 分为DataBlock和IndexBlock
Expand Down Expand Up @@ -614,7 +642,7 @@ where
}

/// 批量以restart_interval进行shared_len的获取
fn sharding_shared_len<T>(vec_kv: &Vec<KeyValue<T>>, restart_interval: usize) -> Vec<usize>
fn sharding_shared_len<T>(vec_kv: &[KeyValue<T>], restart_interval: usize) -> Vec<usize>
where
T: BlockItem,
{
Expand Down
11 changes: 6 additions & 5 deletions src/kernel/lsm/table/ss_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::kernel::lsm::table::ss_table::block::{
use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE};
use crate::kernel::lsm::table::ss_table::iter::SSTableIter;
use crate::kernel::lsm::table::Table;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use growable_bloom_filter::GrowableBloom;
use core::slice::SlicePattern;
use itertools::Itertools;
use parking_lot::Mutex;
use std::io::SeekFrom;
Expand Down Expand Up @@ -53,7 +54,7 @@ impl SSTable {
let len = vec_data.len();
let data_restart_interval = config.data_restart_interval;
let index_restart_interval = config.index_restart_interval;
let mut filter = GrowableBloom::new(config.desired_error_prob, len);
let mut filter = BloomFilter::new(len, config.desired_error_prob);

let mut builder = BlockBuilder::new(
BlockOptions::from(config)
Expand All @@ -63,7 +64,7 @@ impl SSTable {
);
for data in vec_data {
let (key, value) = data;
let _ = filter.insert(&key);
filter.insert(key.as_slice());
builder.add((key, Value::from(value)));
}
let meta = MetaBlock {
Expand All @@ -73,7 +74,7 @@ impl SSTable {
data_restart_interval,
};
let (data_bytes, index_bytes) = builder.build().await?;
let meta_bytes = bincode::serialize(&meta)?;
let meta_bytes = meta.to_raw();
let footer = Footer {
level: level as u8,
index_offset: data_bytes.len() as u32,
Expand Down Expand Up @@ -133,7 +134,7 @@ impl SSTable {
let _ = reader.seek(SeekFrom::Start(*meta_offset as u64))?;
let _ = reader.read(&mut buf)?;

let meta = bincode::deserialize(&buf)?;
let meta = MetaBlock::from_raw(&buf);
let reader = Mutex::new(reader);
Ok(SSTable {
footer,
Expand Down
Loading

0 comments on commit 3f5e551

Please sign in to comment.