diff options
author | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
commit | 8797329225018c4d0ab990166dd020338ae292dc (patch) | |
tree | 1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-memory-cache/src/lib.rs | |
parent | 0bca116c1027a878469b72352e1e9e3916e85dde (diff) | |
download | pingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz pingora-8797329225018c4d0ab990166dd020338ae292dc.zip |
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]>
Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-memory-cache/src/lib.rs')
-rw-r--r-- | pingora-memory-cache/src/lib.rs | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/pingora-memory-cache/src/lib.rs b/pingora-memory-cache/src/lib.rs new file mode 100644 index 0000000..f5c037c --- /dev/null +++ b/pingora-memory-cache/src/lib.rs @@ -0,0 +1,249 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::RandomState; +use std::hash::Hash; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +use tinyufo::TinyUfo; + +mod read_through; +pub use read_through::{Lookup, MultiLookup, RTCache}; + +#[derive(Debug, PartialEq, Eq)] +/// [CacheStatus] indicates the response type for a query. +pub enum CacheStatus { + /// The key was found in cache + Hit, + /// The key was not found. + Miss, + /// The key was found but it was expired. + Expired, + /// The key was not initially found but was found after awaiting a lock. + LockHit, +} + +impl CacheStatus { + /// Return the string representation for [CacheStatus]. + pub fn as_str(&self) -> &str { + match self { + Self::Hit => "hit", + Self::Miss => "miss", + Self::Expired => "expired", + Self::LockHit => "lock_hit", + } + } +} + +#[derive(Debug, Clone)] +struct Node<T: Clone> { + pub value: T, + expire_on: Option<Instant>, +} + +impl<T: Clone> Node<T> { + fn new(value: T, ttl: Option<Duration>) -> Self { + let expire_on = match ttl { + Some(t) => Instant::now().checked_add(t), + None => None, + }; + Node { value, expire_on } + } + + fn will_expire_at(&self, time: &Instant) -> bool { + match self.expire_on.as_ref() { + Some(t) => t <= time, + None => false, + } + } + + fn is_expired(&self) -> bool { + self.will_expire_at(&Instant::now()) + } +} + +/// A high performant in-memory cache with S3-FIFO + TinyLFU +pub struct MemoryCache<K: Hash, T: Clone> { + store: TinyUfo<u64, Node<T>>, + _key_type: PhantomData<K>, + pub(crate) hasher: RandomState, +} + +impl<K: Hash, T: Clone + Send + Sync> MemoryCache<K, T> { + /// Create a new [MemoryCache] with the given size. + pub fn new(size: usize) -> Self { + MemoryCache { + store: TinyUfo::new(size, size), + _key_type: PhantomData, + hasher: RandomState::new(), + } + } + + /// Fetch the key and return its value in addition to a [CacheStatus]. + pub fn get(&self, key: &K) -> (Option<T>, CacheStatus) { + let hashed_key = self.hasher.hash_one(key); + + if let Some(n) = self.store.get(&hashed_key) { + if !n.is_expired() { + (Some(n.value), CacheStatus::Hit) + } else { + // TODO: consider returning the staled value + (None, CacheStatus::Expired) + } + } else { + (None, CacheStatus::Miss) + } + } + + /// Insert a key and value pair with an optional TTL into the cache. + /// + /// An item with zero TTL of zero not inserted. + pub fn put(&self, key: &K, value: T, ttl: Option<Duration>) { + if let Some(t) = ttl { + if t.is_zero() { + return; + } + } + let hashed_key = self.hasher.hash_one(key); + let node = Node::new(value, ttl); + // weight is always 1 for now + self.store.put(hashed_key, node, 1); + } + + pub(crate) fn force_put(&self, key: &K, value: T, ttl: Option<Duration>) { + if let Some(t) = ttl { + if t.is_zero() { + return; + } + } + let hashed_key = self.hasher.hash_one(key); + let node = Node::new(value, ttl); + // weight is always 1 for now + self.store.force_put(hashed_key, node, 1); + } + + /// This is equivalent to [MemoryCache::get] but for an arbitrary amount of keys. + pub fn multi_get<'a, I>(&self, keys: I) -> Vec<(Option<T>, CacheStatus)> + where + I: Iterator<Item = &'a K>, + K: 'a, + { + let mut resp = Vec::with_capacity(keys.size_hint().0); + for key in keys { + resp.push(self.get(key)); + } + resp + } + + /// Same as [MemoryCache::multi_get] but returns the keys that are missing from the cache. + pub fn multi_get_with_miss<'a, I>(&self, keys: I) -> (Vec<(Option<T>, CacheStatus)>, Vec<&'a K>) + where + I: Iterator<Item = &'a K>, + K: 'a, + { + let mut resp = Vec::with_capacity(keys.size_hint().0); + let mut missed = Vec::with_capacity(keys.size_hint().0 / 2); + for key in keys { + let (lookup, cache_status) = self.get(key); + if lookup.is_none() { + missed.push(key); + } + resp.push((lookup, cache_status)); + } + (resp, missed) + } + + // TODO: evict expired first +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + + #[test] + fn test_get() { + let cache: MemoryCache<i32, ()> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + } + + #[test] + fn test_put_get() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + cache.put(&1, 2, None); + let (res, hit) = cache.get(&1); + assert_eq!(res.unwrap(), 2); + assert_eq!(hit, CacheStatus::Hit); + } + + #[test] + fn test_get_expired() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + cache.put(&1, 2, Some(Duration::from_secs(1))); + sleep(Duration::from_millis(1100)); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Expired); + } + + #[test] + fn test_eviction() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(2); + cache.put(&1, 2, None); + cache.put(&2, 4, None); + cache.put(&3, 6, None); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + let (res, hit) = cache.get(&2); + assert_eq!(res.unwrap(), 4); + assert_eq!(hit, CacheStatus::Hit); + let (res, hit) = cache.get(&3); + assert_eq!(res.unwrap(), 6); + assert_eq!(hit, CacheStatus::Hit); + } + + #[test] + fn test_multi_get() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + cache.put(&2, -2, None); + let keys: Vec<i32> = vec![1, 2, 3]; + let resp = cache.multi_get(keys.iter()); + assert_eq!(resp[0].0, None); + assert_eq!(resp[0].1, CacheStatus::Miss); + assert_eq!(resp[1].0.unwrap(), -2); + assert_eq!(resp[1].1, CacheStatus::Hit); + assert_eq!(resp[2].0, None); + assert_eq!(resp[2].1, CacheStatus::Miss); + + let (resp, missed) = cache.multi_get_with_miss(keys.iter()); + assert_eq!(resp[0].0, None); + assert_eq!(resp[0].1, CacheStatus::Miss); + assert_eq!(resp[1].0.unwrap(), -2); + assert_eq!(resp[1].1, CacheStatus::Hit); + assert_eq!(resp[2].0, None); + assert_eq!(resp[2].1, CacheStatus::Miss); + assert_eq!(missed[0], &1); + assert_eq!(missed[1], &3); + } +} |