diff options
Diffstat (limited to 'tinyufo/src')
-rw-r--r-- | tinyufo/src/buckets.rs | 174 | ||||
-rw-r--r-- | tinyufo/src/estimation.rs | 14 | ||||
-rw-r--r-- | tinyufo/src/lib.rs | 267 |
3 files changed, 356 insertions, 99 deletions
diff --git a/tinyufo/src/buckets.rs b/tinyufo/src/buckets.rs new file mode 100644 index 0000000..4aa627d --- /dev/null +++ b/tinyufo/src/buckets.rs @@ -0,0 +1,174 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Concurrent storage backend + +use super::{Bucket, Key}; +use ahash::RandomState; +use crossbeam_skiplist::{map::Entry, SkipMap}; +use flurry::HashMap; + +/// N-shard skip list. Memory efficient, constant time lookup on average, but a bit slower +/// than hash map +pub struct Compact<T>(Box<[SkipMap<Key, Bucket<T>>]>); + +impl<T: Send + 'static> Compact<T> { + /// Create a new [Compact] + pub fn new(total_items: usize, items_per_shard: usize) -> Self { + assert!(items_per_shard > 0); + + let shards = std::cmp::max(total_items / items_per_shard, 1); + let mut shard_array = vec![]; + for _ in 0..shards { + shard_array.push(SkipMap::new()); + } + Self(shard_array.into_boxed_slice()) + } + + pub fn get(&self, key: &Key) -> Option<Entry<Key, Bucket<T>>> { + let shard = *key as usize % self.0.len(); + self.0[shard].get(key) + } + + pub fn get_map<V, F: FnOnce(Entry<Key, Bucket<T>>) -> V>(&self, key: &Key, f: F) -> Option<V> { + let v = self.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> { + let shard = key as usize % self.0.len(); + let removed = self.0[shard].remove(&key); + self.0[shard].insert(key, value); + removed.map(|_| ()) + } + + fn remove(&self, key: &Key) { + let shard = *key as usize % self.0.len(); + (&self.0)[shard].remove(key); + } +} + +// Concurrent hash map, fast but use more memory +pub struct Fast<T>(HashMap<Key, Bucket<T>, RandomState>); + +impl<T: Send + Sync> Fast<T> { + pub fn new(total_items: usize) -> Self { + Self(HashMap::with_capacity_and_hasher( + total_items, + RandomState::new(), + )) + } + + pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> { + let pinned = self.0.pin(); + let v = pinned.get(key); + v.map(f) + } + + fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> { + let pinned = self.0.pin(); + pinned.insert(key, value).map(|_| ()) + } + + fn remove(&self, key: &Key) { + let pinned = self.0.pin(); + pinned.remove(key); + } +} + +pub enum Buckets<T> { + Fast(Box<Fast<T>>), + Compact(Compact<T>), +} + +impl<T: Send + Sync + 'static> Buckets<T> { + pub fn new_fast(items: usize) -> Self { + Self::Fast(Box::new(Fast::new(items))) + } + + pub fn new_compact(items: usize, items_per_shard: usize) -> Self { + Self::Compact(Compact::new(items, items_per_shard)) + } + + pub fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> { + match self { + Self::Compact(c) => c.insert(key, value), + Self::Fast(f) => f.insert(key, value), + } + } + + pub fn remove(&self, key: &Key) { + match self { + Self::Compact(c) => c.remove(key), + Self::Fast(f) => f.remove(key), + } + } + + pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> { + match self { + Self::Compact(c) => c.get_map(key, |v| f(v.value())), + Self::Fast(c) => c.get_map(key, f), + } + } + + #[cfg(test)] + pub fn get_queue(&self, key: &Key) -> Option<bool> { + self.get_map(key, |v| v.queue.is_main()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fast() { + let fast = Buckets::new_fast(10); + + assert!(fast.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + fast.insert(1, bucket); + + assert_eq!(fast.get_map(&1, |v| v.data), Some(1)); + + fast.remove(&1); + assert!(fast.get_map(&1, |_| ()).is_none()); + } + + #[test] + fn test_compact() { + let compact = Buckets::new_compact(10, 2); + + assert!(compact.get_map(&1, |_| ()).is_none()); + + let bucket = Bucket { + queue: crate::Location::new_small(), + weight: 1, + uses: Default::default(), + data: 1, + }; + compact.insert(1, bucket); + + assert_eq!(compact.get_map(&1, |v| v.data), Some(1)); + + compact.remove(&1); + assert!(compact.get_map(&1, |_| ()).is_none()); + } +} diff --git a/tinyufo/src/estimation.rs b/tinyufo/src/estimation.rs index 19d84d4..18c2d4f 100644 --- a/tinyufo/src/estimation.rs +++ b/tinyufo/src/estimation.rs @@ -39,6 +39,11 @@ impl Estimator { Self::new(hashes, slots) } + fn compact(items: usize) -> Self { + let (slots, hashes) = Self::optimal_paras(items / 100); + Self::new(hashes, slots) + } + /// Create a new `Estimator` with the given amount of hashes and columns (slots). pub fn new(hashes: usize, slots: usize) -> Self { let mut estimator = Vec::with_capacity(hashes); @@ -147,6 +152,15 @@ impl TinyLfu { window_limit: cache_size * 8, } } + + pub fn new_compact(cache_size: usize) -> Self { + Self { + estimator: Estimator::compact(cache_size), + window_counter: Default::default(), + // 8x: just a heuristic to balance the memory usage and accuracy + window_limit: cache_size * 8, + } + } } #[cfg(test)] diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs index 001f4e3..015afe0 100644 --- a/tinyufo/src/lib.rs +++ b/tinyufo/src/lib.rs @@ -20,14 +20,16 @@ use ahash::RandomState; use crossbeam_queue::SegQueue; -use flurry::HashMap; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::{ AtomicBool, AtomicU8, Ordering::{Acquire, Relaxed, SeqCst}, }; +mod buckets; mod estimation; + +use buckets::Buckets; use estimation::TinyLfu; use std::hash::Hash; @@ -64,20 +66,20 @@ const USES_CAP: u8 = 3; struct Uses(AtomicU8); impl Uses { - pub fn inc_uses(&self) { + pub fn inc_uses(&self) -> u8 { loop { let uses = self.uses(); if uses >= USES_CAP { - return; + return uses; } if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) { // someone else beat us to it if new >= USES_CAP { // already above cap - return; + return new; } // else, try again } else { - return; + return uses + 1; } } } @@ -126,17 +128,6 @@ struct Bucket<T> { data: T, } -impl<T: Clone> Bucket<T> { - fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self { - Self { - uses: Uses(self.uses.uses().into()), - queue: Location(main_queue.into()), - weight, - data, - } - } -} - const SMALL_QUEUE_PERCENTAGE: f32 = 0.1; struct FiFoQueues<T> { @@ -154,9 +145,7 @@ struct FiFoQueues<T> { _t: PhantomData<T>, } -type Buckets<T> = HashMap<Key, Bucket<T>, RandomState>; - -impl<T: Clone + Send + Sync> FiFoQueues<T> { +impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> { fn admit( &self, key: Key, @@ -174,9 +163,29 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { assert!(weight > 0); let new_bucket = { - let pinned_buckets = buckets.pin(); - let bucket = pinned_buckets.get(&key); - let Some(bucket) = bucket else { + let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| { + // the item exists, in case weight changes + let old_weight = bucket.weight; + let uses = bucket.uses.inc_uses(); + + fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { + if old == new { + return; + } + if old > new { + weight.fetch_sub((old - new) as usize, SeqCst); + } else { + weight.fetch_add((new - old) as usize, SeqCst); + } + } + let queue = bucket.queue.is_main(); + if queue == MAIN { + update_atomic(&self.main_weight, old_weight, weight); + } else { + update_atomic(&self.small_weight, old_weight, weight); + } + (uses, queue, weight) + }) else { let mut evicted = self.evict_to_limit(weight, buckets); // TODO: figure out the right way to compare frequencies of different weights across // many evicted assets. For now TinyLFU is only used when only evicting 1 item. @@ -204,7 +213,7 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { uses: Default::default(), // 0 data, }; - let old = pinned_buckets.insert(key, bucket); + let old = buckets.insert(key, bucket); if old.is_none() { // Always push key first before updating weight // If doing the other order, another concurrent thread might not @@ -215,32 +224,16 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { // TODO: compare old.weight and update accordingly return evicted; }; - - // the item exists, in case weight changes - let old_weight = bucket.weight; - bucket.uses.inc_uses(); - - fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) { - if old == new { - return; - } - if old > new { - weight.fetch_sub((old - new) as usize, SeqCst); - } else { - weight.fetch_add((new - old) as usize, SeqCst); - } - } - if bucket.queue.is_main() { - update_atomic(&self.main_weight, old_weight, weight); - bucket.update_bucket(MAIN, data, weight) - } else { - update_atomic(&self.small_weight, old_weight, weight); - bucket.update_bucket(SMALL, data, weight) + Bucket { + queue: Location(queue.into()), + weight, + uses: Uses(uses.into()), + data, } }; // replace the existing one - buckets.pin().insert(key, new_bucket); + buckets.insert(key, new_bucket); // NOTE: there is a chance that the item itself is evicted if it happens to be the one selected // by the algorithm. We could avoid this by checking if the item is in the returned evicted items, @@ -295,61 +288,67 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { // empty queue, this is caught between another pop() and fetch_sub() return None; }; - let pinned_buckets = buckets.pin(); - let maybe_bucket = pinned_buckets.get(&to_evict); - - let Some(bucket) = maybe_bucket.as_ref() else { - //key in queue but not bucket, shouldn't happen, but ignore - continue; - }; - - let weight = bucket.weight; - self.small_weight.fetch_sub(weight as usize, SeqCst); - if bucket.uses.uses() > 1 { - // move to main - bucket.queue.move_to_main(); - self.main.push(to_evict); - self.main_weight.fetch_add(weight as usize, SeqCst); - // continue until find one to evict - continue; + let v = buckets + .get_map(&to_evict, |bucket| { + let weight = bucket.weight; + self.small_weight.fetch_sub(weight as usize, SeqCst); + + if bucket.uses.uses() > 1 { + // move to main + bucket.queue.move_to_main(); + self.main.push(to_evict); + self.main_weight.fetch_add(weight as usize, SeqCst); + // continue until find one to evict + None + } else { + let data = bucket.data.clone(); + let weight = bucket.weight; + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten(); + if v.is_some() { + // found the one to evict, break + return v; } - // move to ghost - - let data = bucket.data.clone(); - let weight = bucket.weight; - pinned_buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); } } fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> { loop { let to_evict = self.main.pop()?; - let buckets = buckets.pin(); - let maybe_bucket = buckets.get(&to_evict); - if let Some(bucket) = maybe_bucket.as_ref() { - if bucket.uses.decr_uses() > 0 { - // put it back - self.main.push(to_evict); - // continue the loop - } else { - // evict - let weight = bucket.weight; - self.main_weight.fetch_sub(weight as usize, SeqCst); - let data = bucket.data.clone(); - buckets.remove(&to_evict); - return Some(KV { - key: to_evict, - data, - weight, - }); - } - } // else: key in queue but not bucket, shouldn't happen + + if let Some(v) = buckets + .get_map(&to_evict, |bucket| { + if bucket.uses.decr_uses() > 0 { + // put it back + self.main.push(to_evict); + // continue the loop + None + } else { + // evict + let weight = bucket.weight; + self.main_weight.fetch_sub(weight as usize, SeqCst); + let data = bucket.data.clone(); + buckets.remove(&to_evict); + Some(KV { + key: to_evict, + data, + weight, + }) + } + }) + .flatten() + { + // found the one to evict, break + return Some(v); + } } } } @@ -357,12 +356,11 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> { /// [TinyUfo] cache pub struct TinyUfo<K, T> { queues: FiFoQueues<T>, - buckets: HashMap<Key, Bucket<T>, RandomState>, + buckets: Buckets<T>, random_status: RandomState, _k: PhantomData<K>, } - -impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { +impl<K: Hash, T: Clone + Send + Sync + 'static> TinyUfo<K, T> { /// Create a new TinyUfo cache with the given weight limit and the given /// size limit of the ghost queue. pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self { @@ -377,7 +375,29 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { }; TinyUfo { queues, - buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()), + buckets: Buckets::new_fast(estimated_size), + random_status: RandomState::new(), + _k: PhantomData, + } + } + + /// Create a new TinyUfo cache but with more memory efficient data structures. + /// The trade-off is that the the get() is slower by a constant factor. + /// The cache hit ratio could be higher as this type of TinyUFO allows to store + /// more assets with the same memory. + pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self { + let queues = FiFoQueues { + small: SegQueue::new(), + small_weight: 0.into(), + main: SegQueue::new(), + main_weight: 0.into(), + total_weight_limit, + estimator: TinyLfu::new_compact(estimated_size), + _t: PhantomData, + }; + TinyUfo { + queues, + buckets: Buckets::new_compact(estimated_size, 32), random_status: RandomState::new(), _k: PhantomData, } @@ -390,8 +410,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { /// Return Some(T) if the key exists pub fn get(&self, key: &K) -> Option<T> { let key = self.random_status.hash_one(key); - let buckets = self.buckets.pin(); - buckets.get(&key).map(|p| { + self.buckets.get_map(&key, |p| { p.uses.inc_uses(); p.data.clone() }) @@ -427,7 +446,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> { #[cfg(test)] fn peek_queue(&self, key: K) -> Option<bool> { let key = self.random_status.hash_one(&key); - self.buckets.pin().get(&key).map(|p| p.queue.value()) + self.buckets.get_queue(&key) } } @@ -627,4 +646,54 @@ mod tests { assert_eq!(cache.peek_queue(3), Some(MAIN)); assert_eq!(cache.peek_queue(4), None); } + + #[test] + fn test_evict_from_small_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 3); + assert_eq!(evicted.len(), 2); + assert_eq!(evicted[0].data, 1); + assert_eq!(evicted[1].data, 2); + + assert_eq!(cache.peek_queue(1), None); + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + } + + #[test] + fn test_evict_from_small_to_main_compact() { + let cache = TinyUfo::new(5, 5); + + cache.put(1, 1, 1); + cache.put(2, 2, 2); + cache.put(3, 3, 2); + // cache full now + + cache.get(&1); + cache.get(&1); // 1 will be moved to main during next eviction + + assert_eq!(cache.peek_queue(1), Some(SMALL)); + assert_eq!(cache.peek_queue(2), Some(SMALL)); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + + let evicted = cache.put(4, 4, 1); + assert_eq!(evicted.len(), 1); + assert_eq!(evicted[0].data, 2); + + assert_eq!(cache.peek_queue(1), Some(MAIN)); + // 2 is evicted because 1 is in main + assert_eq!(cache.peek_queue(2), None); + assert_eq!(cache.peek_queue(3), Some(SMALL)); + assert_eq!(cache.peek_queue(4), Some(SMALL)); + } } |