Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
- Remove `async-io` crate.
- Refactor some internal methods.
  • Loading branch information
tatsuya6502 committed Aug 5, 2023
1 parent 656ea57 commit 58176fd
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 330 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ default = ["sync", "atomic64", "quanta"]
sync = ["_core"]

# Enable this feature to use `moka::future::Cache`.
future = ["_core", "async-io", "async-lock", "async-trait", "futures-util"]
future = ["_core", "async-lock", "async-trait", "futures-util"]

# Enable this feature to activate optional logging from caches.
# Currently cache will emit log only when it encounters a panic in user provided
Expand Down
121 changes: 30 additions & 91 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use std::{
borrow::Borrow,
collections::hash_map::RandomState,
hash::{BuildHasher, Hash, Hasher},
ptr::NonNull,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
Expand Down Expand Up @@ -220,57 +219,6 @@ where
.unwrap_or_default() // `false` is the default for `bool` type.
}

// pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
// where
// K: Borrow<Q>,
// Q: Hash + Eq + ?Sized,
// {
// // Define a closure to record a read op.
// let record = |op, now| {
// self.record_read_op(op, now)
// .expect("Failed to record a get op");
// };
// let ignore_if = None as Option<&mut fn(&V) -> bool>;
// self.do_get_with_hash(key, hash, record, ignore_if, need_key)
// }

// pub(crate) fn get_with_hash_and_ignore_if<Q, I>(
// &self,
// key: &Q,
// hash: u64,
// ignore_if: Option<&mut I>,
// need_key: bool,
// ) -> Option<Entry<K, V>>
// where
// K: Borrow<Q>,
// Q: Hash + Eq + ?Sized,
// I: FnMut(&V) -> bool,
// {
// // Define a closure to record a read op.
// let record = |op, now| {
// self.record_read_op(op, now)
// .expect("Failed to record a get op");
// };
// self.do_get_with_hash(key, hash, record, ignore_if, need_key)
// }

// pub(crate) async fn get_with_hash_without_recording<Q, I>(
// &self,
// key: &Q,
// hash: u64,
// ignore_if: Option<&mut I>,
// ) -> Option<V>
// where
// K: Borrow<Q>,
// Q: Hash + Eq + ?Sized,
// I: FnMut(&V) -> bool,
// {
// // Define a closure that skips to record a read op.
// let record = |_op, _now| {};
// self.do_get_with_hash(key, hash, record, ignore_if, false)
// .map(Entry::into_value)
// }

pub(crate) async fn get_with_hash<Q, I>(
&self,
key: &Q,
Expand Down Expand Up @@ -875,12 +823,9 @@ impl EntrySizeAndFrequency {
}
}

// Access-Order Queue Node
type AoqNode<K> = NonNull<DeqNode<KeyHashDate<K>>>;

enum AdmissionResult<K> {
Admitted {
victim_khs: SmallVec<[KeyHash<K>; 8]>,
victim_keys: SmallVec<[KeyHash<K>; 8]>,
},
Rejected,
}
Expand Down Expand Up @@ -1618,21 +1563,22 @@ where
// will not be `Send`.
let admission_result = Self::admit(&candidate, &self.cache, deqs, freq);
match admission_result {
AdmissionResult::Admitted {
victim_khs: victims,
} => {
AdmissionResult::Admitted { victim_keys } => {
// Try to remove the victims from the cache (hash map).
for vic_kh in victims {
for victim in victim_keys {
let vic_key = victim.key;
let vic_hash = victim.hash;

// Lock the key for removal if blocking removal notification is enabled.
let kl = self.maybe_key_lock(&vic_kh.key);
let kl = self.maybe_key_lock(&vic_key);
let _klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};

if let Some((vic_key, vic_entry)) =
self.cache.remove_entry(vic_kh.hash, |k| k == &vic_kh.key)
self.cache.remove_entry(vic_hash, |k| k == &vic_key)
{
if eviction_state.is_notifier_enabled() {
eviction_state
Expand All @@ -1646,6 +1592,14 @@ where
vic_entry,
&mut eviction_state.counters,
);
} else {
// Could not remove the victim from the cache. Skip it as its
// ValueEntry might have been invalidated.
if let Some(node) = deqs.probation.peek_front() {
if node.element.key() == &vic_key && node.element.hash() == vic_hash {
deqs.probation.move_front_to_back();
}
}
}
}
// Add the candidate to the deques.
Expand Down Expand Up @@ -1706,11 +1660,12 @@ where
let mut retries = 0;

let mut victims = EntrySizeAndFrequency::default();
let mut victim_nodes: SmallVec<[AoqNode<K>; 8]> = SmallVec::default();
let mut skipped_nodes: SmallVec<[AoqNode<K>; 8]> = SmallVec::default();
let mut victim_keys = SmallVec::default();

let deq = &mut deqs.probation;

// Get first potential victim at the LRU position.
let mut next_victim = deqs.probation.peek_front_ptr();
let mut next_victim = deq.peek_front_ptr();

// Aggregate potential victims.
while victims.policy_weight < candidate.policy_weight {
Expand All @@ -1719,32 +1674,30 @@ where
}
if let Some(victim) = next_victim.take() {
next_victim = DeqNode::next_node_ptr(victim);

let vic_elem = &unsafe { victim.as_ref() }.element;
let key = vic_elem.key();
let hash = vic_elem.hash();

if let Some(vic_entry) = cache.get(vic_elem.hash(), |k| k == vic_elem.key()) {
victims.add_policy_weight(vic_entry.policy_weight());
victims.add_frequency(freq, vic_elem.hash());
victim_nodes.push(victim);
victims.add_frequency(freq, hash);
victim_keys.push(KeyHash::new(Arc::clone(key), hash));
retries = 0;
} else {
// Could not get the victim from the cache (hash map). Skip this node
// as its ValueEntry might have been invalidated.
skipped_nodes.push(victim);

unsafe { deq.move_to_back(victim) };
retries += 1;
if retries > MAX_CONSECUTIVE_RETRIES {
break;
}
}
} else {
// No more potential victims.
break;
}
}

for skipped in skipped_nodes {
// Move the skipped node to the back of the deque.
unsafe { deqs.probation.move_to_back(skipped) };
if retries > MAX_CONSECUTIVE_RETRIES {
break;
}
}

// Admit or reject the candidate.
Expand All @@ -1753,21 +1706,7 @@ where
// See Caffeine's implementation.

if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
let mut victim_khs: SmallVec<[KeyHash<K>; 8]> = SmallVec::default();
for victim in victim_nodes {
let vic_elem = &unsafe { victim.as_ref() }.element;
let vic_kh = KeyHash::new(Arc::clone(vic_elem.key()), vic_elem.hash());
victim_khs.push(vic_kh);

// Move the victim to the back of the deque. It will be removed from the
// cache (hash map) later and then removed from the deque. But we are
// still moving it to the back now because the victim's ValueEntry can be
// invalidated by another thread before it is removed by us. In that
// case, we will have to keep the victim in the deque.
unsafe { deqs.probation.move_to_back(victim) }
}

AdmissionResult::Admitted { victim_khs }
AdmissionResult::Admitted { victim_keys }
} else {
AdmissionResult::Rejected
}
Expand Down
Loading

0 comments on commit 58176fd

Please sign in to comment.