diff options
Diffstat (limited to 'tinyufo/src/lib.rs')
-rw-r--r-- | tinyufo/src/lib.rs | 267 |
1 files changed, 168 insertions, 99 deletions
diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs index 001f4e3..015afe0 100644 --- a/tinyufo/src/lib.rs +++ b/tinyufo/src/lib.rs @@ -20,14 +20,16 @@ use ahash::RandomState; use crossbeam_queue::SegQueue; -use flurry::HashMap; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::{ AtomicBool, AtomicU8, Ordering::{Acquire, Relaxed, SeqCst}, }; +mod buckets; mod estimation; + +use buckets::Buckets; use estimation::TinyLfu; use std::hash::Hash; @@ -64,20 +66,20 @@ const USES_CAP: u8 = 3; struct Uses(AtomicU8); impl Uses { - pub fn inc_uses(&self) { + pub fn inc_uses(&self) -> u8 { loop { let uses = self.uses(); if uses >= USES_CAP { - return; + return uses; } if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) { // someone else beat us to it if new >= USES_CAP { // already above cap - return; + return new; } // else, try again } else { - return; + return uses + 1; } } } @@ -126,17 +128,6 @@ struct Bucket<T> { data: T, } -impl<T: Clone> Bucket<T> { - fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self { - Self { - uses: Uses(self.uses.uses().into()), - queue: Location(main_queue.into()), - weight, - data, - } - } -} - const SMALL_QUEUE_PERCENTAGE: f32 = 0.1; struct FiFoQueues<T> { @@ -154,9 +145,7 @@ struct FiFoQueues<T> { _t: PhantomData<T>, } -type Buckets<T> = HashMap<Key, Bucket<T>, RandomState>; - -impl<T: Clone + Send + Sync> FiFoQueues<T> { +impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> { fn admit( &self, key: Key, @@ -174,9 +163,29 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { assert!(weight > 0); let new_bucket = { - let pinned_buckets = buckets.pin(); - let bucket = pinned_buckets.get(&key); - let Some(bucket) = bucket else { + let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| { + // the item exists, in case weight changes + let old_weight = bucket.weight; + let uses = bucket.uses.inc_uses(); + + fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { + if old == new { + return; + } + if old > new { + weight.fetch_sub((old - new) as usize, SeqCst); + } else { + weight.fetch_add((new - old) as usize, SeqCst); + } + } + let queue = bucket.queue.is_main(); + if queue == MAIN { + update_atomic(&self.main_weight, old_weight, weight); + } else { + update_atomic(&self.small_weight, old_weight, weight); + } + (uses, queue, weight) + }) else { let mut evicted = self.evict_to_limit(weight, buckets); // TODO: figure out the right way to compare frequencies of different weights across // many evicted assets. For now TinyLFU is only used when only evicting 1 item. @@ -204,7 +213,7 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { uses: Default::default(), // 0 data, }; - let old = pinned_buckets.insert(key, bucket); + let old = buckets.insert(key, bucket); if old.is_none() { // Always push key first before updating weight // If doing the other order, another concurrent thread might not @@ -215,32 +224,16 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { // TODO: compare old.weight and update accordingly return evicted; }; - - // the item exists, in case weight changes - let old_weight = bucket.weight; - bucket.uses.inc_uses(); - - fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { - if old == new { - return; - } - if old > new { - weight.fetch_sub((old - new) as usize, SeqCst); - } else { - weight.fetch_add((new - old) as usize, SeqCst); - } - } - if bucket.queue.is_main() { - update_atomic(&self.main_weight, old_weight, weight); - bucket.update_bucket(MAIN, data, weight) - } else { - update_atomic(&self.small_weight, old_weight, weight); - bucket.update_bucket(SMALL, data, weight) + Bucket { + queue: Location(queue.into()), + weight, + uses: Uses(uses.into()), + data, } }; // replace the existing one - buckets.pin().insert(key, new_bucket); + buckets.insert(key, new_bucket); // NOTE: there is a chance that the item itself is evicted if it happens to be the one selected // by the algorithm. We could avoid this by checking if the item is in the returned evicted items, @@ -295,61 +288,67 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { // empty queue, this is caught between another pop() and fetch_sub() return None; }; - let pinned_buckets = buckets.pin(); - let maybe_bucket = pinned_buckets.get(&to_evict); - - let Some(bucket) = maybe_bucket.as_ref() else { - //key in queue but not bucket, shouldn't happen, but ignore - continue; - }; - - let weight = bucket.weight; - self.small_weight.fetch_sub(weight as usize, SeqCst); - if bucket.uses.uses() > 1 { - // move to main - bucket.queue.move_to_main(); - self.main.push(to_evict); - self.main_weight.fetch_add(weight as usize, SeqCst); - // continue until find one to evict - continue; + let v = buckets + .get_map(&to_evict, |bucket| { + let weight = bucket.weight; + self.small_weight.fetch_sub(weight as usize, SeqCst); + + if bucket.uses.uses() > 1 { + // move to main + bucket.queue.move_to_main(); + self.main.push(to_evict); + self.main_weight.fetch_add(weight as usize, SeqCst); + // continue until find one to evict + None + } else { + let data = bucket.data.clone(); + let weight = bucket.weight; + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten(); + if v.is_some() { + // found the one to evict, break + return v; } - // move to ghost - - let data = bucket.data.clone(); - let weight = bucket.weight; - pinned_buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); } } fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> { loop { let to_evict = self.main.pop()?; - let buckets = buckets.pin(); - let maybe_bucket = buckets.get(&to_evict); - if let Some(bucket) = maybe_bucket.as_ref() { - if bucket.uses.decr_uses() > 0 { - // put it back - self.main.push(to_evict); - // continue the loop - } else { - // evict - let weight = bucket.weight; - self.main_weight.fetch_sub(weight as usize, SeqCst); - let data = bucket.data.clone(); - buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); - } - } // else: key in queue but not bucket, shouldn't happen + + if let Some(v) = buckets + .get_map(&to_evict, |bucket| { + if bucket.uses.decr_uses() > 0 { + // put it back + self.main.push(to_evict); + // continue the loop + None + } else { + // evict + let weight = bucket.weight; + self.main_weight.fetch_sub(weight as usize, SeqCst); + let data = bucket.data.clone(); + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten() + { + // found the one to evict, break + return Some(v); + } } } } @@ -357,12 +356,11 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { /// [TinyUfo] cache pub struct TinyUfo<K, T> { queues: FiFoQueues<T>, - buckets: HashMap<Key, Bucket<T>, RandomState>, + buckets: Buckets<T>, random_status: RandomState, _k: PhantomData<K>, } - -impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { +impl<K: Hash, T: Clone + Send + Sync + 'static> TinyUfo<K, T> { /// Create a new TinyUfo cache with the given weight limit and the given /// size limit of the ghost queue. pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self { @@ -377,7 +375,29 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { }; TinyUfo { queues, - buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()), + buckets: Buckets::new_fast(estimated_size), + random_status: RandomState::new(), + _k: PhantomData, + } + } + + /// Create a new TinyUfo cache but with more memory efficient data structures. + /// The trade-off is that the the get() is slower by a constant factor. + /// The cache hit ratio could be higher as this type of TinyUFO allows to store + /// more assets with the same memory. + pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self { + let queues = FiFoQueues { + small: SegQueue::new(), + small_weight: 0.into(), + main: SegQueue::new(), + main_weight: 0.into(), + total_weight_limit, + estimator: TinyLfu::new_compact(estimated_size), + _t: PhantomData, + }; + TinyUfo { + queues, + buckets: Buckets::new_compact(estimated_size, 32), random_status: RandomState::new(), _k: PhantomData, } @@ -390,8 +410,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { /// Return Some(T) if the key exists pub fn get(&self, key: &K) -> Option<T> { let key = self.random_status.hash_one(key); - let buckets = self.buckets.pin(); - buckets.get(&key).map(|p| { + self.buckets.get_map(&key, |p| { p.uses.inc_uses(); p.data.clone() }) @@ -427,7 +446,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { #[cfg(test)] fn peek_queue(&self, key: K) -> Option<bool> { let key = self.random_status.hash_one(&key); - self.buckets.pin().get(&key).map(|p| p.queue.value()) + self.buckets.get_queue(&key) } } @@ -627,4 +646,54 @@ mod tests { assert_eq!(cache.peek_queue(3), Some(MAIN)); assert_eq!(cache.peek_queue(4), None); } + + #[test] + fn test_evict_from_small_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 3); + assert_eq!(evicted.len(), 2); + assert_eq!(evicted[0].data, 1); + assert_eq!(evicted[1].data, 2); + + assert_eq!(cache.peek_queue(1), None); + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + } + + #[test] + fn test_evict_from_small_to_main_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + cache.get(&1); + cache.get(&1); // 1 will be moved to main during next eviction + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 1); + assert_eq!(evicted.len(), 1); + assert_eq!(evicted[0].data, 2); + + assert_eq!(cache.peek_queue(1), Some(MAIN)); + // 2 is evicted because 1 is in main + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + assert_eq!(cache.peek_queue(4), Some(SMALL)); + } } |