aboutsummaryrefslogtreecommitdiffhomepage
path: root/tinyufo/src
diff options
context:
space:
mode:
Diffstat (limited to 'tinyufo/src')
-rw-r--r--tinyufo/src/buckets.rs174
-rw-r--r--tinyufo/src/estimation.rs14
-rw-r--r--tinyufo/src/lib.rs267
3 files changed, 356 insertions, 99 deletions
diff --git a/tinyufo/src/buckets.rs b/tinyufo/src/buckets.rs
new file mode 100644
index 0000000..4aa627d
--- /dev/null
+++ b/tinyufo/src/buckets.rs
@@ -0,0 +1,174 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Concurrent storage backend
+
+use super::{Bucket, Key};
+use ahash::RandomState;
+use crossbeam_skiplist::{map::Entry, SkipMap};
+use flurry::HashMap;
+
+/// N-shard skip list. Memory efficient, constant time lookup on average, but a bit slower
+/// than hash map
+pub struct Compact<T>(Box<[SkipMap<Key, Bucket<T>>]>);
+
+impl<T: Send + 'static> Compact<T> {
+ /// Create a new [Compact]
+ pub fn new(total_items: usize, items_per_shard: usize) -> Self {
+ assert!(items_per_shard > 0);
+
+ let shards = std::cmp::max(total_items / items_per_shard, 1);
+ let mut shard_array = vec![];
+ for _ in 0..shards {
+ shard_array.push(SkipMap::new());
+ }
+ Self(shard_array.into_boxed_slice())
+ }
+
+ pub fn get(&self, key: &Key) -> Option<Entry<Key, Bucket<T>>> {
+ let shard = *key as usize % self.0.len();
+ self.0[shard].get(key)
+ }
+
+ pub fn get_map<V, F: FnOnce(Entry<Key, Bucket<T>>) -> V>(&self, key: &Key, f: F) -> Option<V> {
+ let v = self.get(key);
+ v.map(f)
+ }
+
+ fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
+ let shard = key as usize % self.0.len();
+ let removed = self.0[shard].remove(&key);
+ self.0[shard].insert(key, value);
+ removed.map(|_| ())
+ }
+
+ fn remove(&self, key: &Key) {
+ let shard = *key as usize % self.0.len();
+ (&self.0)[shard].remove(key);
+ }
+}
+
+// Concurrent hash map, fast but use more memory
+pub struct Fast<T>(HashMap<Key, Bucket<T>, RandomState>);
+
+impl<T: Send + Sync> Fast<T> {
+ pub fn new(total_items: usize) -> Self {
+ Self(HashMap::with_capacity_and_hasher(
+ total_items,
+ RandomState::new(),
+ ))
+ }
+
+ pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> {
+ let pinned = self.0.pin();
+ let v = pinned.get(key);
+ v.map(f)
+ }
+
+ fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
+ let pinned = self.0.pin();
+ pinned.insert(key, value).map(|_| ())
+ }
+
+ fn remove(&self, key: &Key) {
+ let pinned = self.0.pin();
+ pinned.remove(key);
+ }
+}
+
+pub enum Buckets<T> {
+ Fast(Box<Fast<T>>),
+ Compact(Compact<T>),
+}
+
+impl<T: Send + Sync + 'static> Buckets<T> {
+ pub fn new_fast(items: usize) -> Self {
+ Self::Fast(Box::new(Fast::new(items)))
+ }
+
+ pub fn new_compact(items: usize, items_per_shard: usize) -> Self {
+ Self::Compact(Compact::new(items, items_per_shard))
+ }
+
+ pub fn insert(&self, key: Key, value: Bucket<T>) -> Option<()> {
+ match self {
+ Self::Compact(c) => c.insert(key, value),
+ Self::Fast(f) => f.insert(key, value),
+ }
+ }
+
+ pub fn remove(&self, key: &Key) {
+ match self {
+ Self::Compact(c) => c.remove(key),
+ Self::Fast(f) => f.remove(key),
+ }
+ }
+
+ pub fn get_map<V, F: FnOnce(&Bucket<T>) -> V>(&self, key: &Key, f: F) -> Option<V> {
+ match self {
+ Self::Compact(c) => c.get_map(key, |v| f(v.value())),
+ Self::Fast(c) => c.get_map(key, f),
+ }
+ }
+
+ #[cfg(test)]
+ pub fn get_queue(&self, key: &Key) -> Option<bool> {
+ self.get_map(key, |v| v.queue.is_main())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_fast() {
+ let fast = Buckets::new_fast(10);
+
+ assert!(fast.get_map(&1, |_| ()).is_none());
+
+ let bucket = Bucket {
+ queue: crate::Location::new_small(),
+ weight: 1,
+ uses: Default::default(),
+ data: 1,
+ };
+ fast.insert(1, bucket);
+
+ assert_eq!(fast.get_map(&1, |v| v.data), Some(1));
+
+ fast.remove(&1);
+ assert!(fast.get_map(&1, |_| ()).is_none());
+ }
+
+ #[test]
+ fn test_compact() {
+ let compact = Buckets::new_compact(10, 2);
+
+ assert!(compact.get_map(&1, |_| ()).is_none());
+
+ let bucket = Bucket {
+ queue: crate::Location::new_small(),
+ weight: 1,
+ uses: Default::default(),
+ data: 1,
+ };
+ compact.insert(1, bucket);
+
+ assert_eq!(compact.get_map(&1, |v| v.data), Some(1));
+
+ compact.remove(&1);
+ assert!(compact.get_map(&1, |_| ()).is_none());
+ }
+}
diff --git a/tinyufo/src/estimation.rs b/tinyufo/src/estimation.rs
index 19d84d4..18c2d4f 100644
--- a/tinyufo/src/estimation.rs
+++ b/tinyufo/src/estimation.rs
@@ -39,6 +39,11 @@ impl Estimator {
Self::new(hashes, slots)
}
+ fn compact(items: usize) -> Self {
+ let (slots, hashes) = Self::optimal_paras(items / 100);
+ Self::new(hashes, slots)
+ }
+
/// Create a new `Estimator` with the given amount of hashes and columns (slots).
pub fn new(hashes: usize, slots: usize) -> Self {
let mut estimator = Vec::with_capacity(hashes);
@@ -147,6 +152,15 @@ impl TinyLfu {
window_limit: cache_size * 8,
}
}
+
+ pub fn new_compact(cache_size: usize) -> Self {
+ Self {
+ estimator: Estimator::compact(cache_size),
+ window_counter: Default::default(),
+ // 8x: just a heuristic to balance the memory usage and accuracy
+ window_limit: cache_size * 8,
+ }
+ }
}
#[cfg(test)]
diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs
index 001f4e3..015afe0 100644
--- a/tinyufo/src/lib.rs
+++ b/tinyufo/src/lib.rs
@@ -20,14 +20,16 @@
use ahash::RandomState;
use crossbeam_queue::SegQueue;
-use flurry::HashMap;
use std::marker::PhantomData;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{
AtomicBool, AtomicU8,
Ordering::{Acquire, Relaxed, SeqCst},
};
+mod buckets;
mod estimation;
+
+use buckets::Buckets;
use estimation::TinyLfu;
use std::hash::Hash;
@@ -64,20 +66,20 @@ const USES_CAP: u8 = 3;
struct Uses(AtomicU8);
impl Uses {
- pub fn inc_uses(&self) {
+ pub fn inc_uses(&self) -> u8 {
loop {
let uses = self.uses();
if uses >= USES_CAP {
- return;
+ return uses;
}
if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) {
// someone else beat us to it
if new >= USES_CAP {
// already above cap
- return;
+ return new;
} // else, try again
} else {
- return;
+ return uses + 1;
}
}
}
@@ -126,17 +128,6 @@ struct Bucket<T> {
data: T,
}
-impl<T: Clone> Bucket<T> {
- fn update_bucket(&self, main_queue: bool, data: T, weight: Weight) -> Self {
- Self {
- uses: Uses(self.uses.uses().into()),
- queue: Location(main_queue.into()),
- weight,
- data,
- }
- }
-}
-
const SMALL_QUEUE_PERCENTAGE: f32 = 0.1;
struct FiFoQueues<T> {
@@ -154,9 +145,7 @@ struct FiFoQueues<T> {
_t: PhantomData<T>,
}
-type Buckets<T> = HashMap<Key, Bucket<T>, RandomState>;
-
-impl<T: Clone + Send + Sync> FiFoQueues<T> {
+impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
fn admit(
&self,
key: Key,
@@ -174,9 +163,29 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
assert!(weight > 0);
let new_bucket = {
- let pinned_buckets = buckets.pin();
- let bucket = pinned_buckets.get(&key);
- let Some(bucket) = bucket else {
+ let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| {
+ // the item exists, in case weight changes
+ let old_weight = bucket.weight;
+ let uses = bucket.uses.inc_uses();
+
+ fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
+ if old == new {
+ return;
+ }
+ if old > new {
+ weight.fetch_sub((old - new) as usize, SeqCst);
+ } else {
+ weight.fetch_add((new - old) as usize, SeqCst);
+ }
+ }
+ let queue = bucket.queue.is_main();
+ if queue == MAIN {
+ update_atomic(&self.main_weight, old_weight, weight);
+ } else {
+ update_atomic(&self.small_weight, old_weight, weight);
+ }
+ (uses, queue, weight)
+ }) else {
let mut evicted = self.evict_to_limit(weight, buckets);
// TODO: figure out the right way to compare frequencies of different weights across
// many evicted assets. For now TinyLFU is only used when only evicting 1 item.
@@ -204,7 +213,7 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
uses: Default::default(), // 0
data,
};
- let old = pinned_buckets.insert(key, bucket);
+ let old = buckets.insert(key, bucket);
if old.is_none() {
// Always push key first before updating weight
// If doing the other order, another concurrent thread might not
@@ -215,32 +224,16 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
// TODO: compare old.weight and update accordingly
return evicted;
};
-
- // the item exists, in case weight changes
- let old_weight = bucket.weight;
- bucket.uses.inc_uses();
-
- fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
- if old == new {
- return;
- }
- if old > new {
- weight.fetch_sub((old - new) as usize, SeqCst);
- } else {
- weight.fetch_add((new - old) as usize, SeqCst);
- }
- }
- if bucket.queue.is_main() {
- update_atomic(&self.main_weight, old_weight, weight);
- bucket.update_bucket(MAIN, data, weight)
- } else {
- update_atomic(&self.small_weight, old_weight, weight);
- bucket.update_bucket(SMALL, data, weight)
+ Bucket {
+ queue: Location(queue.into()),
+ weight,
+ uses: Uses(uses.into()),
+ data,
}
};
// replace the existing one
- buckets.pin().insert(key, new_bucket);
+ buckets.insert(key, new_bucket);
// NOTE: there is a chance that the item itself is evicted if it happens to be the one selected
// by the algorithm. We could avoid this by checking if the item is in the returned evicted items,
@@ -295,61 +288,67 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
// empty queue, this is caught between another pop() and fetch_sub()
return None;
};
- let pinned_buckets = buckets.pin();
- let maybe_bucket = pinned_buckets.get(&to_evict);
-
- let Some(bucket) = maybe_bucket.as_ref() else {
- //key in queue but not bucket, shouldn't happen, but ignore
- continue;
- };
-
- let weight = bucket.weight;
- self.small_weight.fetch_sub(weight as usize, SeqCst);
- if bucket.uses.uses() > 1 {
- // move to main
- bucket.queue.move_to_main();
- self.main.push(to_evict);
- self.main_weight.fetch_add(weight as usize, SeqCst);
- // continue until find one to evict
- continue;
+ let v = buckets
+ .get_map(&to_evict, |bucket| {
+ let weight = bucket.weight;
+ self.small_weight.fetch_sub(weight as usize, SeqCst);
+
+ if bucket.uses.uses() > 1 {
+ // move to main
+ bucket.queue.move_to_main();
+ self.main.push(to_evict);
+ self.main_weight.fetch_add(weight as usize, SeqCst);
+ // continue until find one to evict
+ None
+ } else {
+ let data = bucket.data.clone();
+ let weight = bucket.weight;
+ buckets.remove(&to_evict);
+ Some(KV {
+ key: to_evict,
+ data,
+ weight,
+ })
+ }
+ })
+ .flatten();
+ if v.is_some() {
+ // found the one to evict, break
+ return v;
}
- // move to ghost
-
- let data = bucket.data.clone();
- let weight = bucket.weight;
- pinned_buckets.remove(&to_evict);
- return Some(KV {
- key: to_evict,
- data,
- weight,
- });
}
}
fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
loop {
let to_evict = self.main.pop()?;
- let buckets = buckets.pin();
- let maybe_bucket = buckets.get(&to_evict);
- if let Some(bucket) = maybe_bucket.as_ref() {
- if bucket.uses.decr_uses() > 0 {
- // put it back
- self.main.push(to_evict);
- // continue the loop
- } else {
- // evict
- let weight = bucket.weight;
- self.main_weight.fetch_sub(weight as usize, SeqCst);
- let data = bucket.data.clone();
- buckets.remove(&to_evict);
- return Some(KV {
- key: to_evict,
- data,
- weight,
- });
- }
- } // else: key in queue but not bucket, shouldn't happen
+
+ if let Some(v) = buckets
+ .get_map(&to_evict, |bucket| {
+ if bucket.uses.decr_uses() > 0 {
+ // put it back
+ self.main.push(to_evict);
+ // continue the loop
+ None
+ } else {
+ // evict
+ let weight = bucket.weight;
+ self.main_weight.fetch_sub(weight as usize, SeqCst);
+ let data = bucket.data.clone();
+ buckets.remove(&to_evict);
+ Some(KV {
+ key: to_evict,
+ data,
+ weight,
+ })
+ }
+ })
+ .flatten()
+ {
+ // found the one to evict, break
+ return Some(v);
+ }
}
}
}
@@ -357,12 +356,11 @@ impl<T: Clone + Send + Sync> FiFoQueues<T> {
/// [TinyUfo] cache
pub struct TinyUfo<K, T> {
queues: FiFoQueues<T>,
- buckets: HashMap<Key, Bucket<T>, RandomState>,
+ buckets: Buckets<T>,
random_status: RandomState,
_k: PhantomData<K>,
}
-
-impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
+impl<K: Hash, T: Clone + Send + Sync + 'static> TinyUfo<K, T> {
/// Create a new TinyUfo cache with the given weight limit and the given
/// size limit of the ghost queue.
pub fn new(total_weight_limit: usize, estimated_size: usize) -> Self {
@@ -377,7 +375,29 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
};
TinyUfo {
queues,
- buckets: HashMap::with_capacity_and_hasher(estimated_size, RandomState::new()),
+ buckets: Buckets::new_fast(estimated_size),
+ random_status: RandomState::new(),
+ _k: PhantomData,
+ }
+ }
+
+ /// Create a new TinyUfo cache but with more memory efficient data structures.
+ /// The trade-off is that the the get() is slower by a constant factor.
+ /// The cache hit ratio could be higher as this type of TinyUFO allows to store
+ /// more assets with the same memory.
+ pub fn new_compact(total_weight_limit: usize, estimated_size: usize) -> Self {
+ let queues = FiFoQueues {
+ small: SegQueue::new(),
+ small_weight: 0.into(),
+ main: SegQueue::new(),
+ main_weight: 0.into(),
+ total_weight_limit,
+ estimator: TinyLfu::new_compact(estimated_size),
+ _t: PhantomData,
+ };
+ TinyUfo {
+ queues,
+ buckets: Buckets::new_compact(estimated_size, 32),
random_status: RandomState::new(),
_k: PhantomData,
}
@@ -390,8 +410,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
/// Return Some(T) if the key exists
pub fn get(&self, key: &K) -> Option<T> {
let key = self.random_status.hash_one(key);
- let buckets = self.buckets.pin();
- buckets.get(&key).map(|p| {
+ self.buckets.get_map(&key, |p| {
p.uses.inc_uses();
p.data.clone()
})
@@ -427,7 +446,7 @@ impl<K: Hash, T: Clone + Send + Sync> TinyUfo<K, T> {
#[cfg(test)]
fn peek_queue(&self, key: K) -> Option<bool> {
let key = self.random_status.hash_one(&key);
- self.buckets.pin().get(&key).map(|p| p.queue.value())
+ self.buckets.get_queue(&key)
}
}
@@ -627,4 +646,54 @@ mod tests {
assert_eq!(cache.peek_queue(3), Some(MAIN));
assert_eq!(cache.peek_queue(4), None);
}
+
+ #[test]
+ fn test_evict_from_small_compact() {
+ let cache = TinyUfo::new(5, 5);
+
+ cache.put(1, 1, 1);
+ cache.put(2, 2, 2);
+ cache.put(3, 3, 2);
+ // cache full now
+
+ assert_eq!(cache.peek_queue(1), Some(SMALL));
+ assert_eq!(cache.peek_queue(2), Some(SMALL));
+ assert_eq!(cache.peek_queue(3), Some(SMALL));
+
+ let evicted = cache.put(4, 4, 3);
+ assert_eq!(evicted.len(), 2);
+ assert_eq!(evicted[0].data, 1);
+ assert_eq!(evicted[1].data, 2);
+
+ assert_eq!(cache.peek_queue(1), None);
+ assert_eq!(cache.peek_queue(2), None);
+ assert_eq!(cache.peek_queue(3), Some(SMALL));
+ }
+
+ #[test]
+ fn test_evict_from_small_to_main_compact() {
+ let cache = TinyUfo::new(5, 5);
+
+ cache.put(1, 1, 1);
+ cache.put(2, 2, 2);
+ cache.put(3, 3, 2);
+ // cache full now
+
+ cache.get(&1);
+ cache.get(&1); // 1 will be moved to main during next eviction
+
+ assert_eq!(cache.peek_queue(1), Some(SMALL));
+ assert_eq!(cache.peek_queue(2), Some(SMALL));
+ assert_eq!(cache.peek_queue(3), Some(SMALL));
+
+ let evicted = cache.put(4, 4, 1);
+ assert_eq!(evicted.len(), 1);
+ assert_eq!(evicted[0].data, 2);
+
+ assert_eq!(cache.peek_queue(1), Some(MAIN));
+ // 2 is evicted because 1 is in main
+ assert_eq!(cache.peek_queue(2), None);
+ assert_eq!(cache.peek_queue(3), Some(SMALL));
+ assert_eq!(cache.peek_queue(4), Some(SMALL));
+ }
}