diff options
Diffstat (limited to 'pingora-memory-cache')
-rw-r--r-- | pingora-memory-cache/Cargo.toml | 27 | ||||
-rw-r--r-- | pingora-memory-cache/LICENSE | 202 | ||||
-rw-r--r-- | pingora-memory-cache/src/lib.rs | 249 | ||||
-rw-r--r-- | pingora-memory-cache/src/read_through.rs | 689 |
4 files changed, 1167 insertions, 0 deletions
diff --git a/pingora-memory-cache/Cargo.toml b/pingora-memory-cache/Cargo.toml new file mode 100644 index 0000000..d51268b --- /dev/null +++ b/pingora-memory-cache/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "pingora-memory-cache" +version = "0.1.0" +authors = ["Yuchen Wu <[email protected]>"] +license = "Apache-2.0" +edition = "2021" +repository = "https://github.com/cloudflare/pingora" +categories = ["algorithms", "caching"] +keywords = ["async", "cache", "pingora"] +description = """ +An async in-memory cache with cache stampede protection. +""" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "pingora_memory_cache" +path = "src/lib.rs" + +[dependencies] +TinyUFO = { version = "0.1.0", path = "../tinyufo" } +ahash = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +async-trait = { workspace = true } +pingora-error = { version = "0.1.0", path = "../pingora-error" } +log = { workspace = true } +parking_lot = "0" +pingora-timeout = { version = "0.1.0", path = "../pingora-timeout" } diff --git a/pingora-memory-cache/LICENSE b/pingora-memory-cache/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/pingora-memory-cache/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pingora-memory-cache/src/lib.rs b/pingora-memory-cache/src/lib.rs new file mode 100644 index 0000000..f5c037c --- /dev/null +++ b/pingora-memory-cache/src/lib.rs @@ -0,0 +1,249 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::RandomState; +use std::hash::Hash; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; + +use tinyufo::TinyUfo; + +mod read_through; +pub use read_through::{Lookup, MultiLookup, RTCache}; + +#[derive(Debug, PartialEq, Eq)] +/// [CacheStatus] indicates the response type for a query. +pub enum CacheStatus { + /// The key was found in cache + Hit, + /// The key was not found. + Miss, + /// The key was found but it was expired. + Expired, + /// The key was not initially found but was found after awaiting a lock. + LockHit, +} + +impl CacheStatus { + /// Return the string representation for [CacheStatus]. + pub fn as_str(&self) -> &str { + match self { + Self::Hit => "hit", + Self::Miss => "miss", + Self::Expired => "expired", + Self::LockHit => "lock_hit", + } + } +} + +#[derive(Debug, Clone)] +struct Node<T: Clone> { + pub value: T, + expire_on: Option<Instant>, +} + +impl<T: Clone> Node<T> { + fn new(value: T, ttl: Option<Duration>) -> Self { + let expire_on = match ttl { + Some(t) => Instant::now().checked_add(t), + None => None, + }; + Node { value, expire_on } + } + + fn will_expire_at(&self, time: &Instant) -> bool { + match self.expire_on.as_ref() { + Some(t) => t <= time, + None => false, + } + } + + fn is_expired(&self) -> bool { + self.will_expire_at(&Instant::now()) + } +} + +/// A high performant in-memory cache with S3-FIFO + TinyLFU +pub struct MemoryCache<K: Hash, T: Clone> { + store: TinyUfo<u64, Node<T>>, + _key_type: PhantomData<K>, + pub(crate) hasher: RandomState, +} + +impl<K: Hash, T: Clone + Send + Sync> MemoryCache<K, T> { + /// Create a new [MemoryCache] with the given size. + pub fn new(size: usize) -> Self { + MemoryCache { + store: TinyUfo::new(size, size), + _key_type: PhantomData, + hasher: RandomState::new(), + } + } + + /// Fetch the key and return its value in addition to a [CacheStatus]. + pub fn get(&self, key: &K) -> (Option<T>, CacheStatus) { + let hashed_key = self.hasher.hash_one(key); + + if let Some(n) = self.store.get(&hashed_key) { + if !n.is_expired() { + (Some(n.value), CacheStatus::Hit) + } else { + // TODO: consider returning the staled value + (None, CacheStatus::Expired) + } + } else { + (None, CacheStatus::Miss) + } + } + + /// Insert a key and value pair with an optional TTL into the cache. + /// + /// An item with zero TTL of zero not inserted. + pub fn put(&self, key: &K, value: T, ttl: Option<Duration>) { + if let Some(t) = ttl { + if t.is_zero() { + return; + } + } + let hashed_key = self.hasher.hash_one(key); + let node = Node::new(value, ttl); + // weight is always 1 for now + self.store.put(hashed_key, node, 1); + } + + pub(crate) fn force_put(&self, key: &K, value: T, ttl: Option<Duration>) { + if let Some(t) = ttl { + if t.is_zero() { + return; + } + } + let hashed_key = self.hasher.hash_one(key); + let node = Node::new(value, ttl); + // weight is always 1 for now + self.store.force_put(hashed_key, node, 1); + } + + /// This is equivalent to [MemoryCache::get] but for an arbitrary amount of keys. + pub fn multi_get<'a, I>(&self, keys: I) -> Vec<(Option<T>, CacheStatus)> + where + I: Iterator<Item = &'a K>, + K: 'a, + { + let mut resp = Vec::with_capacity(keys.size_hint().0); + for key in keys { + resp.push(self.get(key)); + } + resp + } + + /// Same as [MemoryCache::multi_get] but returns the keys that are missing from the cache. + pub fn multi_get_with_miss<'a, I>(&self, keys: I) -> (Vec<(Option<T>, CacheStatus)>, Vec<&'a K>) + where + I: Iterator<Item = &'a K>, + K: 'a, + { + let mut resp = Vec::with_capacity(keys.size_hint().0); + let mut missed = Vec::with_capacity(keys.size_hint().0 / 2); + for key in keys { + let (lookup, cache_status) = self.get(key); + if lookup.is_none() { + missed.push(key); + } + resp.push((lookup, cache_status)); + } + (resp, missed) + } + + // TODO: evict expired first +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + + #[test] + fn test_get() { + let cache: MemoryCache<i32, ()> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + } + + #[test] + fn test_put_get() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + cache.put(&1, 2, None); + let (res, hit) = cache.get(&1); + assert_eq!(res.unwrap(), 2); + assert_eq!(hit, CacheStatus::Hit); + } + + #[test] + fn test_get_expired() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + cache.put(&1, 2, Some(Duration::from_secs(1))); + sleep(Duration::from_millis(1100)); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Expired); + } + + #[test] + fn test_eviction() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(2); + cache.put(&1, 2, None); + cache.put(&2, 4, None); + cache.put(&3, 6, None); + let (res, hit) = cache.get(&1); + assert_eq!(res, None); + assert_eq!(hit, CacheStatus::Miss); + let (res, hit) = cache.get(&2); + assert_eq!(res.unwrap(), 4); + assert_eq!(hit, CacheStatus::Hit); + let (res, hit) = cache.get(&3); + assert_eq!(res.unwrap(), 6); + assert_eq!(hit, CacheStatus::Hit); + } + + #[test] + fn test_multi_get() { + let cache: MemoryCache<i32, i32> = MemoryCache::new(10); + cache.put(&2, -2, None); + let keys: Vec<i32> = vec![1, 2, 3]; + let resp = cache.multi_get(keys.iter()); + assert_eq!(resp[0].0, None); + assert_eq!(resp[0].1, CacheStatus::Miss); + assert_eq!(resp[1].0.unwrap(), -2); + assert_eq!(resp[1].1, CacheStatus::Hit); + assert_eq!(resp[2].0, None); + assert_eq!(resp[2].1, CacheStatus::Miss); + + let (resp, missed) = cache.multi_get_with_miss(keys.iter()); + assert_eq!(resp[0].0, None); + assert_eq!(resp[0].1, CacheStatus::Miss); + assert_eq!(resp[1].0.unwrap(), -2); + assert_eq!(resp[1].1, CacheStatus::Hit); + assert_eq!(resp[2].0, None); + assert_eq!(resp[2].1, CacheStatus::Miss); + assert_eq!(missed[0], &1); + assert_eq!(missed[1], &3); + } +} diff --git a/pingora-memory-cache/src/read_through.rs b/pingora-memory-cache/src/read_through.rs new file mode 100644 index 0000000..05a8d89 --- /dev/null +++ b/pingora-memory-cache/src/read_through.rs @@ -0,0 +1,689 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An async read through cache where cache miss are populated via the provided +//! async callback. + +use super::{CacheStatus, MemoryCache}; + +use async_trait::async_trait; +use log::warn; +use parking_lot::RwLock; +use pingora_error::{Error, ErrorTrait}; +use std::collections::HashMap; +use std::hash::Hash; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Semaphore; + +struct CacheLock { + pub lock_start: Instant, + pub lock: Semaphore, +} + +impl CacheLock { + pub fn new_arc() -> Arc<Self> { + Arc::new(CacheLock { + lock: Semaphore::new(0), + lock_start: Instant::now(), + }) + } + + pub fn too_old(&self, age: Option<&Duration>) -> bool { + match age { + Some(t) => Instant::now() - self.lock_start > *t, + None => false, + } + } +} + +#[async_trait] +/// [Lookup] defines the caching behavior that the implementor needs. The `extra` field can be used +/// to define any additional metadata that the implementor uses to determine cache eligibility. +/// +/// # Examples +/// +/// ```ignore +/// use pingora_error::{ErrorTrait, Result}; +/// use std::time::Duration; +/// +/// struct MyLookup; +/// +/// impl Lookup<usize, usize, ()> for MyLookup { +/// async fn lookup( +/// &self, +/// _key: &usize, +/// extra: Option<&()>, +/// ) -> Result<(usize, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>> { +/// // Define your business logic here. +/// Ok(1, None) +/// } +/// } +/// ``` +pub trait Lookup<K, T, S> { + /// Return a value and an optional TTL for the given key. + async fn lookup( + key: &K, + extra: Option<&S>, + ) -> Result<(T, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>> + where + K: 'async_trait, + S: 'async_trait; +} + +#[async_trait] +/// [MultiLookup] is similar to [Lookup]. Implement this trait if the system being queried support +/// looking up multiple keys in a single API call. +pub trait MultiLookup<K, T, S> { + /// Like [Lookup::lookup] but for an arbitrary amount of keys. + async fn multi_lookup( + keys: &[&K], + extra: Option<&S>, + ) -> Result<Vec<(T, Option<Duration>)>, Box<dyn ErrorTrait + Send + Sync>> + where + K: 'async_trait, + S: 'async_trait; +} + +const LOOKUP_ERR_MSG: &str = "RTCache: lookup error"; + +/// A read-through in-memory cache on top of [MemoryCache] +/// +/// Instead of providing a `put` function, [RTCache] requires a type which implements [Lookup] to +/// be automatically called during cache miss to populate the cache. This is useful when trying to +/// cache queries to external system such as DNS or databases. +/// +/// Lookup coalescing is provided so that multiple concurrent lookups for the same key results +/// only in one lookup callback. +pub struct RTCache<K, T, CB, S> +where + K: Hash + Send, + T: Clone + Send, +{ + inner: MemoryCache<K, T>, + _callback: PhantomData<CB>, + lockers: RwLock<HashMap<u64, Arc<CacheLock>>>, + lock_age: Option<Duration>, + lock_timeout: Option<Duration>, + phantom: PhantomData<S>, +} + +impl<K, T, CB, S> RTCache<K, T, CB, S> +where + K: Hash + Send, + T: Clone + Send + Sync, +{ + /// Create a new [RTCache] of given size. `lock_age` defines how long a lock is valid for. + /// `lock_timeout` is used to stop a lookup from holding on to the key for too long. + pub fn new(size: usize, lock_age: Option<Duration>, lock_timeout: Option<Duration>) -> Self { + RTCache { + inner: MemoryCache::new(size), + lockers: RwLock::new(HashMap::new()), + _callback: PhantomData, + lock_age, + lock_timeout, + phantom: PhantomData, + } + } +} + +impl<K, T, CB, S> RTCache<K, T, CB, S> +where + K: Hash + Send, + T: Clone + Send + Sync, + CB: Lookup<K, T, S>, +{ + /// Query the cache for a given value. If it exists and no TTL is configured initially, it will + /// use the `ttl` value given. + pub async fn get( + &self, + key: &K, + ttl: Option<Duration>, + extra: Option<&S>, + ) -> (Result<T, Box<Error>>, CacheStatus) { + let (result, cache_state) = self.inner.get(key); + if let Some(result) = result { + /* cache hit */ + return (Ok(result), cache_state); + } + + let hashed_key = self.inner.hasher.hash_one(key); + + /* Cache miss, try to lock the lookup. Check if there is already a lookup */ + let my_lock = { + let lockers = self.lockers.read(); + /* clone the Arc */ + lockers.get(&hashed_key).cloned() + }; // read lock dropped + + /* try insert a cache lock into locker */ + let (my_write, my_read) = match my_lock { + // TODO: use a union + Some(lock) => { + /* There is an ongoing lookup to the same key */ + if lock.too_old(self.lock_age.as_ref()) { + (None, None) + } else { + (None, Some(lock)) + } + } + None => { + let mut lockers = self.lockers.write(); + match lockers.get(&hashed_key) { + Some(lock) => { + /* another lookup to the same key got the write lock to locker first */ + if lock.too_old(self.lock_age.as_ref()) { + (None, None) + } else { + (None, Some(lock.clone())) + } + } + None => { + let new_lock = CacheLock::new_arc(); + let new_lock2 = new_lock.clone(); + lockers.insert(hashed_key, new_lock2); + (Some(new_lock), None) + } + } // write lock dropped + } + }; + + if my_read.is_some() { + /* another task will do the lookup */ + + let my_lock = my_read.unwrap(); + /* if available_permits > 0, writer is done */ + if my_lock.lock.available_permits() == 0 { + /* block here to wait for writer to finish lookup */ + let lock_fut = my_lock.lock.acquire(); + let timed_out = match self.lock_timeout { + Some(t) => pingora_timeout::timeout(t, lock_fut).await.is_err(), + None => { + let _ = lock_fut.await; + false + } + }; + if timed_out { + let value = CB::lookup(key, extra).await; + return match value { + Ok((v, _ttl)) => (Ok(v), cache_state), + Err(e) => { + let mut err = Error::new_str(LOOKUP_ERR_MSG); + err.set_cause(e); + (Err(err), cache_state) + } + }; + } + } // permit returned here + + let (result, cache_state) = self.inner.get(key); + if let Some(result) = result { + /* cache lock hit, slow as a miss */ + (Ok(result), CacheStatus::LockHit) + } else { + /* probably error happen during the actual lookup */ + warn!( + "RTCache: no result after read lock, cache status: {:?}", + cache_state + ); + match CB::lookup(key, extra).await { + Ok((v, new_ttl)) => { + self.inner.force_put(key, v.clone(), new_ttl.or(ttl)); + (Ok(v), cache_state) + } + Err(e) => { + let mut err = Error::new_str(LOOKUP_ERR_MSG); + err.set_cause(e); + (Err(err), cache_state) + } + } + } + } else { + /* this one will do the look up, either because it gets the write lock or the read + * lock age is reached */ + let value = CB::lookup(key, extra).await; + let ret = match value { + Ok((v, new_ttl)) => { + /* Don't put() if lock ago too old, to avoid too many concurrent writes */ + if my_write.is_some() { + self.inner.force_put(key, v.clone(), new_ttl.or(ttl)); + } + (Ok(v), cache_state) // the original cache_state: Miss or Expired + } + Err(e) => { + let mut err = Error::new_str(LOOKUP_ERR_MSG); + err.set_cause(e); + (Err(err), cache_state) + } + }; + if my_write.is_some() { + /* add permit so that reader can start. Any number of permits will do, + * since readers will return permits right away. */ + my_write.unwrap().lock.add_permits(10); + + { + // remove the lock from locker + let mut lockers = self.lockers.write(); + lockers.remove(&hashed_key); + } // write lock dropped here + } + + ret + } + } +} + +impl<K, T, CB, S> RTCache<K, T, CB, S> +where + K: Hash + Send, + T: Clone + Send + Sync, + CB: MultiLookup<K, T, S>, +{ + /// Same behavior as [RTCache::get] but for an arbitrary amount of keys. + /// + /// If there are keys that are missing from cache, `multi_lookup` is invoked to populate the + /// cache before returning the final results. This is useful if your type supports batch + /// queries. + /// + /// To avoid dead lock for the same key across concurrent `multi_get` calls, + /// this function does not provide lookup coalescing. + pub async fn multi_get<'a, I>( + &self, + keys: I, + ttl: Option<Duration>, + extra: Option<&S>, + ) -> Result<Vec<(T, CacheStatus)>, Box<Error>> + where + I: Iterator<Item = &'a K>, + K: 'a, + { + let size = keys.size_hint().0; + let (hits, misses) = self.inner.multi_get_with_miss(keys); + let mut final_results = Vec::with_capacity(size); + let miss_results = if !misses.is_empty() { + match CB::multi_lookup(&misses, extra).await { + Ok(miss_results) => { + // assert! here to prevent index panic when building results, + // final_results has full list of misses but miss_results might not + assert!( + miss_results.len() == misses.len(), + "multi_lookup() failed to return the matching number of results" + ); + /* put the misses into cache */ + for item in misses.iter().zip(miss_results.iter()) { + self.inner + .force_put(item.0, (item.1).0.clone(), (item.1).1.or(ttl)); + } + miss_results + } + Err(e) => { + /* NOTE: we give up the hits when encounter lookup error */ + let mut err = Error::new_str(LOOKUP_ERR_MSG); + err.set_cause(e); + return Err(err); + } + } + } else { + vec![] // to make the rest code simple, allocating one unused empty vec should be fine + }; + /* fill in final_result */ + let mut n_miss = 0; + for item in hits { + match item.0 { + Some(v) => final_results.push((v, item.1)), + None => { + final_results // miss_results.len() === #None in result (asserted above) + .push((miss_results[n_miss].0.clone(), CacheStatus::Miss)); + n_miss += 1; + } + } + } + Ok(final_results) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use atomic::AtomicI32; + use std::sync::atomic; + + #[derive(Clone, Debug)] + struct ExtraOpt { + error: bool, + empty: bool, + delay_for: Option<Duration>, + used: Arc<AtomicI32>, + } + + struct TestCB(); + + #[async_trait] + impl Lookup<i32, i32, ExtraOpt> for TestCB { + async fn lookup( + _key: &i32, + extra: Option<&ExtraOpt>, + ) -> Result<(i32, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>> { + // this function returns #lookup_times + let mut used = 0; + if let Some(e) = extra { + used = e.used.fetch_add(1, atomic::Ordering::Relaxed) + 1; + if e.error { + return Err(Error::new_str("test error")); + } + if let Some(delay_for) = e.delay_for { + tokio::time::sleep(delay_for).await; + } + } + Ok((used, None)) + } + } + + #[async_trait] + impl MultiLookup<i32, i32, ExtraOpt> for TestCB { + async fn multi_lookup( + keys: &[&i32], + extra: Option<&ExtraOpt>, + ) -> Result<Vec<(i32, Option<Duration>)>, Box<dyn ErrorTrait + Send + Sync>> { + let mut resp = vec![]; + if let Some(extra) = extra { + if extra.empty { + return Ok(resp); + } + } + for key in keys { + resp.push((**key, None)); + } + Ok(resp) + } + } + + #[tokio::test] + async fn test_basic_get() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let opt = Some(ExtraOpt { + error: false, + empty: false, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let (res, hit) = cache.get(&1, None, opt.as_ref()).await; + assert_eq!(res.unwrap(), 1); + assert_eq!(hit, CacheStatus::Miss); + let (res, hit) = cache.get(&1, None, opt.as_ref()).await; + assert_eq!(res.unwrap(), 1); + assert_eq!(hit, CacheStatus::Hit); + } + + #[tokio::test] + async fn test_basic_get_error() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let opt1 = Some(ExtraOpt { + error: true, + empty: false, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let (res, hit) = cache.get(&-1, None, opt1.as_ref()).await; + assert!(res.is_err()); + assert_eq!(hit, CacheStatus::Miss); + } + + #[tokio::test] + async fn test_concurrent_get() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let cache = Arc::new(cache); + let opt = Some(ExtraOpt { + error: false, + empty: false, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let cache_c = cache.clone(); + let opt1 = opt.clone(); + // concurrent gets, only 1 will call the callback + let t1 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await; + res.unwrap() + }); + let cache_c = cache.clone(); + let opt2 = opt.clone(); + let t2 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await; + res.unwrap() + }); + let opt3 = opt.clone(); + let cache_c = cache.clone(); + let t3 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await; + res.unwrap() + }); + let (r1, r2, r3) = tokio::join!(t1, t2, t3); + assert_eq!(r1.unwrap(), 1); + assert_eq!(r2.unwrap(), 1); + assert_eq!(r3.unwrap(), 1); + } + + #[tokio::test] + async fn test_concurrent_get_error() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let cache = Arc::new(cache); + let cache_c = cache.clone(); + let opt1 = Some(ExtraOpt { + error: true, + empty: false, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let opt2 = opt1.clone(); + let opt3 = opt1.clone(); + // concurrent gets, only 1 will call the callback + let t1 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&-1, None, opt1.as_ref()).await; + res.is_err() + }); + let cache_c = cache.clone(); + let t2 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&-1, None, opt2.as_ref()).await; + res.is_err() + }); + let cache_c = cache.clone(); + let t3 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&-1, None, opt3.as_ref()).await; + res.is_err() + }); + let (r1, r2, r3) = tokio::join!(t1, t2, t3); + assert!(r1.unwrap()); + assert!(r2.unwrap()); + assert!(r3.unwrap()); + } + + #[tokio::test] + async fn test_concurrent_get_different_value() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let cache = Arc::new(cache); + let opt1 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let opt2 = opt1.clone(); + let opt3 = opt1.clone(); + let cache_c = cache.clone(); + // concurrent gets to different keys, no locks, all will call the cb + let t1 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await; + res.unwrap() + }); + let cache_c = cache.clone(); + let t2 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&3, None, opt2.as_ref()).await; + res.unwrap() + }); + let cache_c = cache.clone(); + let t3 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&5, None, opt3.as_ref()).await; + res.unwrap() + }); + let (r1, r2, r3) = tokio::join!(t1, t2, t3); + // 1 lookup + 2 lookups + 3 lookups, order not matter + assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6); + } + + #[tokio::test] + async fn test_get_lock_age() { + // 1 sec lock age + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = + RTCache::new(10, Some(Duration::from_secs(1)), None); + let cache = Arc::new(cache); + let counter = Arc::new(AtomicI32::new(0)); + let opt1 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: Some(Duration::from_secs(2)), + used: counter.clone(), + }); + + let opt2 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: None, + used: counter.clone(), + }); + let opt3 = opt2.clone(); + let cache_c = cache.clone(); + // t1 will be delay for 2 sec + let t1 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await; + res.unwrap() + }); + // start t2 and t3 1.5 seconds later, since lock age is 1 sec, there will be no lock + tokio::time::sleep(Duration::from_secs_f32(1.5)).await; + let cache_c = cache.clone(); + let t2 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await; + res.unwrap() + }); + let cache_c = cache.clone(); + let t3 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await; + res.unwrap() + }); + let (r1, r2, r3) = tokio::join!(t1, t2, t3); + // 1 lookup + 2 lookups + 3 lookups, order not matter + assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6); + } + + #[tokio::test] + async fn test_get_lock_timeout() { + // 1 sec lock timeout + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = + RTCache::new(10, None, Some(Duration::from_secs(1))); + let cache = Arc::new(cache); + let counter = Arc::new(AtomicI32::new(0)); + let opt1 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: Some(Duration::from_secs(2)), + used: counter.clone(), + }); + let opt2 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: None, + used: counter.clone(), + }); + let opt3 = opt2.clone(); + let cache_c = cache.clone(); + // t1 will be delay for 2 sec + let t1 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await; + res.unwrap() + }); + // since lock timeout is 1 sec, t2 and t3 will do their own lookup after 1 sec + let cache_c = cache.clone(); + let t2 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await; + res.unwrap() + }); + let cache_c = cache.clone(); + let t3 = tokio::spawn(async move { + let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await; + res.unwrap() + }); + let (r1, r2, r3) = tokio::join!(t1, t2, t3); + // 1 lookup + 2 lookups + 3 lookups, order not matter + assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6); + } + + #[tokio::test] + async fn test_multi_get() { + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + let counter = Arc::new(AtomicI32::new(0)); + let opt1 = Some(ExtraOpt { + error: false, + empty: false, + delay_for: Some(Duration::from_secs(2)), + used: counter.clone(), + }); + // make 1 a hit first + let (res, hit) = cache.get(&1, None, opt1.as_ref()).await; + assert_eq!(res.unwrap(), 1); + assert_eq!(hit, CacheStatus::Miss); + let (res, hit) = cache.get(&1, None, opt1.as_ref()).await; + assert_eq!(res.unwrap(), 1); + assert_eq!(hit, CacheStatus::Hit); + // 1 hit 2 miss 3 miss + let resp = cache + .multi_get([1, 2, 3].iter(), None, opt1.as_ref()) + .await + .unwrap(); + assert_eq!(resp[0].0, 1); + assert_eq!(resp[0].1, CacheStatus::Hit); + assert_eq!(resp[1].0, 2); + assert_eq!(resp[1].1, CacheStatus::Miss); + assert_eq!(resp[2].0, 3); + assert_eq!(resp[2].1, CacheStatus::Miss); + // all hit after a fetch + let resp = cache + .multi_get([1, 2, 3].iter(), None, opt1.as_ref()) + .await + .unwrap(); + assert_eq!(resp[0].0, 1); + assert_eq!(resp[0].1, CacheStatus::Hit); + assert_eq!(resp[1].0, 2); + assert_eq!(resp[1].1, CacheStatus::Hit); + assert_eq!(resp[2].0, 3); + assert_eq!(resp[2].1, CacheStatus::Hit); + } + + #[tokio::test] + #[should_panic(expected = "multi_lookup() failed to return the matching number of results")] + async fn test_inconsistent_miss_results() { + // force empty result + let opt1 = Some(ExtraOpt { + error: false, + empty: true, + delay_for: None, + used: Arc::new(AtomicI32::new(0)), + }); + let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None); + cache + .multi_get([4, 5, 6].iter(), None, opt1.as_ref()) + .await + .unwrap(); + } +} |