aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-memory-cache
diff options
context:
space:
mode:
Diffstat (limited to 'pingora-memory-cache')
-rw-r--r--pingora-memory-cache/Cargo.toml27
-rw-r--r--pingora-memory-cache/LICENSE202
-rw-r--r--pingora-memory-cache/src/lib.rs249
-rw-r--r--pingora-memory-cache/src/read_through.rs689
4 files changed, 1167 insertions, 0 deletions
diff --git a/pingora-memory-cache/Cargo.toml b/pingora-memory-cache/Cargo.toml
new file mode 100644
index 0000000..d51268b
--- /dev/null
+++ b/pingora-memory-cache/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "pingora-memory-cache"
+version = "0.1.0"
+authors = ["Yuchen Wu <[email protected]>"]
+license = "Apache-2.0"
+edition = "2021"
+repository = "https://github.com/cloudflare/pingora"
+categories = ["algorithms", "caching"]
+keywords = ["async", "cache", "pingora"]
+description = """
+An async in-memory cache with cache stampede protection.
+"""
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+[lib]
+name = "pingora_memory_cache"
+path = "src/lib.rs"
+
+[dependencies]
+TinyUFO = { version = "0.1.0", path = "../tinyufo" }
+ahash = { workspace = true }
+tokio = { workspace = true, features = ["sync"] }
+async-trait = { workspace = true }
+pingora-error = { version = "0.1.0", path = "../pingora-error" }
+log = { workspace = true }
+parking_lot = "0"
+pingora-timeout = { version = "0.1.0", path = "../pingora-timeout" }
diff --git a/pingora-memory-cache/LICENSE b/pingora-memory-cache/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/pingora-memory-cache/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/pingora-memory-cache/src/lib.rs b/pingora-memory-cache/src/lib.rs
new file mode 100644
index 0000000..f5c037c
--- /dev/null
+++ b/pingora-memory-cache/src/lib.rs
@@ -0,0 +1,249 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use ahash::RandomState;
+use std::hash::Hash;
+use std::marker::PhantomData;
+use std::time::{Duration, Instant};
+
+use tinyufo::TinyUfo;
+
+mod read_through;
+pub use read_through::{Lookup, MultiLookup, RTCache};
+
+#[derive(Debug, PartialEq, Eq)]
+/// [CacheStatus] indicates the response type for a query.
+pub enum CacheStatus {
+ /// The key was found in cache
+ Hit,
+ /// The key was not found.
+ Miss,
+ /// The key was found but it was expired.
+ Expired,
+ /// The key was not initially found but was found after awaiting a lock.
+ LockHit,
+}
+
+impl CacheStatus {
+ /// Return the string representation for [CacheStatus].
+ pub fn as_str(&self) -> &str {
+ match self {
+ Self::Hit => "hit",
+ Self::Miss => "miss",
+ Self::Expired => "expired",
+ Self::LockHit => "lock_hit",
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+struct Node<T: Clone> {
+ pub value: T,
+ expire_on: Option<Instant>,
+}
+
+impl<T: Clone> Node<T> {
+ fn new(value: T, ttl: Option<Duration>) -> Self {
+ let expire_on = match ttl {
+ Some(t) => Instant::now().checked_add(t),
+ None => None,
+ };
+ Node { value, expire_on }
+ }
+
+ fn will_expire_at(&self, time: &Instant) -> bool {
+ match self.expire_on.as_ref() {
+ Some(t) => t <= time,
+ None => false,
+ }
+ }
+
+ fn is_expired(&self) -> bool {
+ self.will_expire_at(&Instant::now())
+ }
+}
+
+/// A high performant in-memory cache with S3-FIFO + TinyLFU
+pub struct MemoryCache<K: Hash, T: Clone> {
+ store: TinyUfo<u64, Node<T>>,
+ _key_type: PhantomData<K>,
+ pub(crate) hasher: RandomState,
+}
+
+impl<K: Hash, T: Clone + Send + Sync> MemoryCache<K, T> {
+ /// Create a new [MemoryCache] with the given size.
+ pub fn new(size: usize) -> Self {
+ MemoryCache {
+ store: TinyUfo::new(size, size),
+ _key_type: PhantomData,
+ hasher: RandomState::new(),
+ }
+ }
+
+ /// Fetch the key and return its value in addition to a [CacheStatus].
+ pub fn get(&self, key: &K) -> (Option<T>, CacheStatus) {
+ let hashed_key = self.hasher.hash_one(key);
+
+ if let Some(n) = self.store.get(&hashed_key) {
+ if !n.is_expired() {
+ (Some(n.value), CacheStatus::Hit)
+ } else {
+ // TODO: consider returning the staled value
+ (None, CacheStatus::Expired)
+ }
+ } else {
+ (None, CacheStatus::Miss)
+ }
+ }
+
+ /// Insert a key and value pair with an optional TTL into the cache.
+ ///
+ /// An item with zero TTL of zero not inserted.
+ pub fn put(&self, key: &K, value: T, ttl: Option<Duration>) {
+ if let Some(t) = ttl {
+ if t.is_zero() {
+ return;
+ }
+ }
+ let hashed_key = self.hasher.hash_one(key);
+ let node = Node::new(value, ttl);
+ // weight is always 1 for now
+ self.store.put(hashed_key, node, 1);
+ }
+
+ pub(crate) fn force_put(&self, key: &K, value: T, ttl: Option<Duration>) {
+ if let Some(t) = ttl {
+ if t.is_zero() {
+ return;
+ }
+ }
+ let hashed_key = self.hasher.hash_one(key);
+ let node = Node::new(value, ttl);
+ // weight is always 1 for now
+ self.store.force_put(hashed_key, node, 1);
+ }
+
+ /// This is equivalent to [MemoryCache::get] but for an arbitrary amount of keys.
+ pub fn multi_get<'a, I>(&self, keys: I) -> Vec<(Option<T>, CacheStatus)>
+ where
+ I: Iterator<Item = &'a K>,
+ K: 'a,
+ {
+ let mut resp = Vec::with_capacity(keys.size_hint().0);
+ for key in keys {
+ resp.push(self.get(key));
+ }
+ resp
+ }
+
+ /// Same as [MemoryCache::multi_get] but returns the keys that are missing from the cache.
+ pub fn multi_get_with_miss<'a, I>(&self, keys: I) -> (Vec<(Option<T>, CacheStatus)>, Vec<&'a K>)
+ where
+ I: Iterator<Item = &'a K>,
+ K: 'a,
+ {
+ let mut resp = Vec::with_capacity(keys.size_hint().0);
+ let mut missed = Vec::with_capacity(keys.size_hint().0 / 2);
+ for key in keys {
+ let (lookup, cache_status) = self.get(key);
+ if lookup.is_none() {
+ missed.push(key);
+ }
+ resp.push((lookup, cache_status));
+ }
+ (resp, missed)
+ }
+
+ // TODO: evict expired first
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::thread::sleep;
+
+ #[test]
+ fn test_get() {
+ let cache: MemoryCache<i32, ()> = MemoryCache::new(10);
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res, None);
+ assert_eq!(hit, CacheStatus::Miss);
+ }
+
+ #[test]
+ fn test_put_get() {
+ let cache: MemoryCache<i32, i32> = MemoryCache::new(10);
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res, None);
+ assert_eq!(hit, CacheStatus::Miss);
+ cache.put(&1, 2, None);
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res.unwrap(), 2);
+ assert_eq!(hit, CacheStatus::Hit);
+ }
+
+ #[test]
+ fn test_get_expired() {
+ let cache: MemoryCache<i32, i32> = MemoryCache::new(10);
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res, None);
+ assert_eq!(hit, CacheStatus::Miss);
+ cache.put(&1, 2, Some(Duration::from_secs(1)));
+ sleep(Duration::from_millis(1100));
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res, None);
+ assert_eq!(hit, CacheStatus::Expired);
+ }
+
+ #[test]
+ fn test_eviction() {
+ let cache: MemoryCache<i32, i32> = MemoryCache::new(2);
+ cache.put(&1, 2, None);
+ cache.put(&2, 4, None);
+ cache.put(&3, 6, None);
+ let (res, hit) = cache.get(&1);
+ assert_eq!(res, None);
+ assert_eq!(hit, CacheStatus::Miss);
+ let (res, hit) = cache.get(&2);
+ assert_eq!(res.unwrap(), 4);
+ assert_eq!(hit, CacheStatus::Hit);
+ let (res, hit) = cache.get(&3);
+ assert_eq!(res.unwrap(), 6);
+ assert_eq!(hit, CacheStatus::Hit);
+ }
+
+ #[test]
+ fn test_multi_get() {
+ let cache: MemoryCache<i32, i32> = MemoryCache::new(10);
+ cache.put(&2, -2, None);
+ let keys: Vec<i32> = vec![1, 2, 3];
+ let resp = cache.multi_get(keys.iter());
+ assert_eq!(resp[0].0, None);
+ assert_eq!(resp[0].1, CacheStatus::Miss);
+ assert_eq!(resp[1].0.unwrap(), -2);
+ assert_eq!(resp[1].1, CacheStatus::Hit);
+ assert_eq!(resp[2].0, None);
+ assert_eq!(resp[2].1, CacheStatus::Miss);
+
+ let (resp, missed) = cache.multi_get_with_miss(keys.iter());
+ assert_eq!(resp[0].0, None);
+ assert_eq!(resp[0].1, CacheStatus::Miss);
+ assert_eq!(resp[1].0.unwrap(), -2);
+ assert_eq!(resp[1].1, CacheStatus::Hit);
+ assert_eq!(resp[2].0, None);
+ assert_eq!(resp[2].1, CacheStatus::Miss);
+ assert_eq!(missed[0], &1);
+ assert_eq!(missed[1], &3);
+ }
+}
diff --git a/pingora-memory-cache/src/read_through.rs b/pingora-memory-cache/src/read_through.rs
new file mode 100644
index 0000000..05a8d89
--- /dev/null
+++ b/pingora-memory-cache/src/read_through.rs
@@ -0,0 +1,689 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! An async read through cache where cache miss are populated via the provided
+//! async callback.
+
+use super::{CacheStatus, MemoryCache};
+
+use async_trait::async_trait;
+use log::warn;
+use parking_lot::RwLock;
+use pingora_error::{Error, ErrorTrait};
+use std::collections::HashMap;
+use std::hash::Hash;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use tokio::sync::Semaphore;
+
+struct CacheLock {
+ pub lock_start: Instant,
+ pub lock: Semaphore,
+}
+
+impl CacheLock {
+ pub fn new_arc() -> Arc<Self> {
+ Arc::new(CacheLock {
+ lock: Semaphore::new(0),
+ lock_start: Instant::now(),
+ })
+ }
+
+ pub fn too_old(&self, age: Option<&Duration>) -> bool {
+ match age {
+ Some(t) => Instant::now() - self.lock_start > *t,
+ None => false,
+ }
+ }
+}
+
+#[async_trait]
+/// [Lookup] defines the caching behavior that the implementor needs. The `extra` field can be used
+/// to define any additional metadata that the implementor uses to determine cache eligibility.
+///
+/// # Examples
+///
+/// ```ignore
+/// use pingora_error::{ErrorTrait, Result};
+/// use std::time::Duration;
+///
+/// struct MyLookup;
+///
+/// impl Lookup<usize, usize, ()> for MyLookup {
+/// async fn lookup(
+/// &self,
+/// _key: &usize,
+/// extra: Option<&()>,
+/// ) -> Result<(usize, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>> {
+/// // Define your business logic here.
+/// Ok(1, None)
+/// }
+/// }
+/// ```
+pub trait Lookup<K, T, S> {
+ /// Return a value and an optional TTL for the given key.
+ async fn lookup(
+ key: &K,
+ extra: Option<&S>,
+ ) -> Result<(T, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>>
+ where
+ K: 'async_trait,
+ S: 'async_trait;
+}
+
+#[async_trait]
+/// [MultiLookup] is similar to [Lookup]. Implement this trait if the system being queried support
+/// looking up multiple keys in a single API call.
+pub trait MultiLookup<K, T, S> {
+ /// Like [Lookup::lookup] but for an arbitrary amount of keys.
+ async fn multi_lookup(
+ keys: &[&K],
+ extra: Option<&S>,
+ ) -> Result<Vec<(T, Option<Duration>)>, Box<dyn ErrorTrait + Send + Sync>>
+ where
+ K: 'async_trait,
+ S: 'async_trait;
+}
+
+const LOOKUP_ERR_MSG: &str = "RTCache: lookup error";
+
+/// A read-through in-memory cache on top of [MemoryCache]
+///
+/// Instead of providing a `put` function, [RTCache] requires a type which implements [Lookup] to
+/// be automatically called during cache miss to populate the cache. This is useful when trying to
+/// cache queries to external system such as DNS or databases.
+///
+/// Lookup coalescing is provided so that multiple concurrent lookups for the same key results
+/// only in one lookup callback.
+pub struct RTCache<K, T, CB, S>
+where
+ K: Hash + Send,
+ T: Clone + Send,
+{
+ inner: MemoryCache<K, T>,
+ _callback: PhantomData<CB>,
+ lockers: RwLock<HashMap<u64, Arc<CacheLock>>>,
+ lock_age: Option<Duration>,
+ lock_timeout: Option<Duration>,
+ phantom: PhantomData<S>,
+}
+
+impl<K, T, CB, S> RTCache<K, T, CB, S>
+where
+ K: Hash + Send,
+ T: Clone + Send + Sync,
+{
+ /// Create a new [RTCache] of given size. `lock_age` defines how long a lock is valid for.
+ /// `lock_timeout` is used to stop a lookup from holding on to the key for too long.
+ pub fn new(size: usize, lock_age: Option<Duration>, lock_timeout: Option<Duration>) -> Self {
+ RTCache {
+ inner: MemoryCache::new(size),
+ lockers: RwLock::new(HashMap::new()),
+ _callback: PhantomData,
+ lock_age,
+ lock_timeout,
+ phantom: PhantomData,
+ }
+ }
+}
+
+impl<K, T, CB, S> RTCache<K, T, CB, S>
+where
+ K: Hash + Send,
+ T: Clone + Send + Sync,
+ CB: Lookup<K, T, S>,
+{
+ /// Query the cache for a given value. If it exists and no TTL is configured initially, it will
+ /// use the `ttl` value given.
+ pub async fn get(
+ &self,
+ key: &K,
+ ttl: Option<Duration>,
+ extra: Option<&S>,
+ ) -> (Result<T, Box<Error>>, CacheStatus) {
+ let (result, cache_state) = self.inner.get(key);
+ if let Some(result) = result {
+ /* cache hit */
+ return (Ok(result), cache_state);
+ }
+
+ let hashed_key = self.inner.hasher.hash_one(key);
+
+ /* Cache miss, try to lock the lookup. Check if there is already a lookup */
+ let my_lock = {
+ let lockers = self.lockers.read();
+ /* clone the Arc */
+ lockers.get(&hashed_key).cloned()
+ }; // read lock dropped
+
+ /* try insert a cache lock into locker */
+ let (my_write, my_read) = match my_lock {
+ // TODO: use a union
+ Some(lock) => {
+ /* There is an ongoing lookup to the same key */
+ if lock.too_old(self.lock_age.as_ref()) {
+ (None, None)
+ } else {
+ (None, Some(lock))
+ }
+ }
+ None => {
+ let mut lockers = self.lockers.write();
+ match lockers.get(&hashed_key) {
+ Some(lock) => {
+ /* another lookup to the same key got the write lock to locker first */
+ if lock.too_old(self.lock_age.as_ref()) {
+ (None, None)
+ } else {
+ (None, Some(lock.clone()))
+ }
+ }
+ None => {
+ let new_lock = CacheLock::new_arc();
+ let new_lock2 = new_lock.clone();
+ lockers.insert(hashed_key, new_lock2);
+ (Some(new_lock), None)
+ }
+ } // write lock dropped
+ }
+ };
+
+ if my_read.is_some() {
+ /* another task will do the lookup */
+
+ let my_lock = my_read.unwrap();
+ /* if available_permits > 0, writer is done */
+ if my_lock.lock.available_permits() == 0 {
+ /* block here to wait for writer to finish lookup */
+ let lock_fut = my_lock.lock.acquire();
+ let timed_out = match self.lock_timeout {
+ Some(t) => pingora_timeout::timeout(t, lock_fut).await.is_err(),
+ None => {
+ let _ = lock_fut.await;
+ false
+ }
+ };
+ if timed_out {
+ let value = CB::lookup(key, extra).await;
+ return match value {
+ Ok((v, _ttl)) => (Ok(v), cache_state),
+ Err(e) => {
+ let mut err = Error::new_str(LOOKUP_ERR_MSG);
+ err.set_cause(e);
+ (Err(err), cache_state)
+ }
+ };
+ }
+ } // permit returned here
+
+ let (result, cache_state) = self.inner.get(key);
+ if let Some(result) = result {
+ /* cache lock hit, slow as a miss */
+ (Ok(result), CacheStatus::LockHit)
+ } else {
+ /* probably error happen during the actual lookup */
+ warn!(
+ "RTCache: no result after read lock, cache status: {:?}",
+ cache_state
+ );
+ match CB::lookup(key, extra).await {
+ Ok((v, new_ttl)) => {
+ self.inner.force_put(key, v.clone(), new_ttl.or(ttl));
+ (Ok(v), cache_state)
+ }
+ Err(e) => {
+ let mut err = Error::new_str(LOOKUP_ERR_MSG);
+ err.set_cause(e);
+ (Err(err), cache_state)
+ }
+ }
+ }
+ } else {
+ /* this one will do the look up, either because it gets the write lock or the read
+ * lock age is reached */
+ let value = CB::lookup(key, extra).await;
+ let ret = match value {
+ Ok((v, new_ttl)) => {
+ /* Don't put() if lock ago too old, to avoid too many concurrent writes */
+ if my_write.is_some() {
+ self.inner.force_put(key, v.clone(), new_ttl.or(ttl));
+ }
+ (Ok(v), cache_state) // the original cache_state: Miss or Expired
+ }
+ Err(e) => {
+ let mut err = Error::new_str(LOOKUP_ERR_MSG);
+ err.set_cause(e);
+ (Err(err), cache_state)
+ }
+ };
+ if my_write.is_some() {
+ /* add permit so that reader can start. Any number of permits will do,
+ * since readers will return permits right away. */
+ my_write.unwrap().lock.add_permits(10);
+
+ {
+ // remove the lock from locker
+ let mut lockers = self.lockers.write();
+ lockers.remove(&hashed_key);
+ } // write lock dropped here
+ }
+
+ ret
+ }
+ }
+}
+
+impl<K, T, CB, S> RTCache<K, T, CB, S>
+where
+ K: Hash + Send,
+ T: Clone + Send + Sync,
+ CB: MultiLookup<K, T, S>,
+{
+ /// Same behavior as [RTCache::get] but for an arbitrary amount of keys.
+ ///
+ /// If there are keys that are missing from cache, `multi_lookup` is invoked to populate the
+ /// cache before returning the final results. This is useful if your type supports batch
+ /// queries.
+ ///
+ /// To avoid dead lock for the same key across concurrent `multi_get` calls,
+ /// this function does not provide lookup coalescing.
+ pub async fn multi_get<'a, I>(
+ &self,
+ keys: I,
+ ttl: Option<Duration>,
+ extra: Option<&S>,
+ ) -> Result<Vec<(T, CacheStatus)>, Box<Error>>
+ where
+ I: Iterator<Item = &'a K>,
+ K: 'a,
+ {
+ let size = keys.size_hint().0;
+ let (hits, misses) = self.inner.multi_get_with_miss(keys);
+ let mut final_results = Vec::with_capacity(size);
+ let miss_results = if !misses.is_empty() {
+ match CB::multi_lookup(&misses, extra).await {
+ Ok(miss_results) => {
+ // assert! here to prevent index panic when building results,
+ // final_results has full list of misses but miss_results might not
+ assert!(
+ miss_results.len() == misses.len(),
+ "multi_lookup() failed to return the matching number of results"
+ );
+ /* put the misses into cache */
+ for item in misses.iter().zip(miss_results.iter()) {
+ self.inner
+ .force_put(item.0, (item.1).0.clone(), (item.1).1.or(ttl));
+ }
+ miss_results
+ }
+ Err(e) => {
+ /* NOTE: we give up the hits when encounter lookup error */
+ let mut err = Error::new_str(LOOKUP_ERR_MSG);
+ err.set_cause(e);
+ return Err(err);
+ }
+ }
+ } else {
+ vec![] // to make the rest code simple, allocating one unused empty vec should be fine
+ };
+ /* fill in final_result */
+ let mut n_miss = 0;
+ for item in hits {
+ match item.0 {
+ Some(v) => final_results.push((v, item.1)),
+ None => {
+ final_results // miss_results.len() === #None in result (asserted above)
+ .push((miss_results[n_miss].0.clone(), CacheStatus::Miss));
+ n_miss += 1;
+ }
+ }
+ }
+ Ok(final_results)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use atomic::AtomicI32;
+ use std::sync::atomic;
+
+ #[derive(Clone, Debug)]
+ struct ExtraOpt {
+ error: bool,
+ empty: bool,
+ delay_for: Option<Duration>,
+ used: Arc<AtomicI32>,
+ }
+
+ struct TestCB();
+
+ #[async_trait]
+ impl Lookup<i32, i32, ExtraOpt> for TestCB {
+ async fn lookup(
+ _key: &i32,
+ extra: Option<&ExtraOpt>,
+ ) -> Result<(i32, Option<Duration>), Box<dyn ErrorTrait + Send + Sync>> {
+ // this function returns #lookup_times
+ let mut used = 0;
+ if let Some(e) = extra {
+ used = e.used.fetch_add(1, atomic::Ordering::Relaxed) + 1;
+ if e.error {
+ return Err(Error::new_str("test error"));
+ }
+ if let Some(delay_for) = e.delay_for {
+ tokio::time::sleep(delay_for).await;
+ }
+ }
+ Ok((used, None))
+ }
+ }
+
+ #[async_trait]
+ impl MultiLookup<i32, i32, ExtraOpt> for TestCB {
+ async fn multi_lookup(
+ keys: &[&i32],
+ extra: Option<&ExtraOpt>,
+ ) -> Result<Vec<(i32, Option<Duration>)>, Box<dyn ErrorTrait + Send + Sync>> {
+ let mut resp = vec![];
+ if let Some(extra) = extra {
+ if extra.empty {
+ return Ok(resp);
+ }
+ }
+ for key in keys {
+ resp.push((**key, None));
+ }
+ Ok(resp)
+ }
+ }
+
+ #[tokio::test]
+ async fn test_basic_get() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let opt = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let (res, hit) = cache.get(&1, None, opt.as_ref()).await;
+ assert_eq!(res.unwrap(), 1);
+ assert_eq!(hit, CacheStatus::Miss);
+ let (res, hit) = cache.get(&1, None, opt.as_ref()).await;
+ assert_eq!(res.unwrap(), 1);
+ assert_eq!(hit, CacheStatus::Hit);
+ }
+
+ #[tokio::test]
+ async fn test_basic_get_error() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let opt1 = Some(ExtraOpt {
+ error: true,
+ empty: false,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let (res, hit) = cache.get(&-1, None, opt1.as_ref()).await;
+ assert!(res.is_err());
+ assert_eq!(hit, CacheStatus::Miss);
+ }
+
+ #[tokio::test]
+ async fn test_concurrent_get() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let cache = Arc::new(cache);
+ let opt = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let cache_c = cache.clone();
+ let opt1 = opt.clone();
+ // concurrent gets, only 1 will call the callback
+ let t1 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await;
+ res.unwrap()
+ });
+ let cache_c = cache.clone();
+ let opt2 = opt.clone();
+ let t2 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await;
+ res.unwrap()
+ });
+ let opt3 = opt.clone();
+ let cache_c = cache.clone();
+ let t3 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await;
+ res.unwrap()
+ });
+ let (r1, r2, r3) = tokio::join!(t1, t2, t3);
+ assert_eq!(r1.unwrap(), 1);
+ assert_eq!(r2.unwrap(), 1);
+ assert_eq!(r3.unwrap(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_concurrent_get_error() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let cache = Arc::new(cache);
+ let cache_c = cache.clone();
+ let opt1 = Some(ExtraOpt {
+ error: true,
+ empty: false,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let opt2 = opt1.clone();
+ let opt3 = opt1.clone();
+ // concurrent gets, only 1 will call the callback
+ let t1 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&-1, None, opt1.as_ref()).await;
+ res.is_err()
+ });
+ let cache_c = cache.clone();
+ let t2 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&-1, None, opt2.as_ref()).await;
+ res.is_err()
+ });
+ let cache_c = cache.clone();
+ let t3 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&-1, None, opt3.as_ref()).await;
+ res.is_err()
+ });
+ let (r1, r2, r3) = tokio::join!(t1, t2, t3);
+ assert!(r1.unwrap());
+ assert!(r2.unwrap());
+ assert!(r3.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_concurrent_get_different_value() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let cache = Arc::new(cache);
+ let opt1 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let opt2 = opt1.clone();
+ let opt3 = opt1.clone();
+ let cache_c = cache.clone();
+ // concurrent gets to different keys, no locks, all will call the cb
+ let t1 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await;
+ res.unwrap()
+ });
+ let cache_c = cache.clone();
+ let t2 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&3, None, opt2.as_ref()).await;
+ res.unwrap()
+ });
+ let cache_c = cache.clone();
+ let t3 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&5, None, opt3.as_ref()).await;
+ res.unwrap()
+ });
+ let (r1, r2, r3) = tokio::join!(t1, t2, t3);
+ // 1 lookup + 2 lookups + 3 lookups, order not matter
+ assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6);
+ }
+
+ #[tokio::test]
+ async fn test_get_lock_age() {
+ // 1 sec lock age
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> =
+ RTCache::new(10, Some(Duration::from_secs(1)), None);
+ let cache = Arc::new(cache);
+ let counter = Arc::new(AtomicI32::new(0));
+ let opt1 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: Some(Duration::from_secs(2)),
+ used: counter.clone(),
+ });
+
+ let opt2 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: None,
+ used: counter.clone(),
+ });
+ let opt3 = opt2.clone();
+ let cache_c = cache.clone();
+ // t1 will be delay for 2 sec
+ let t1 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await;
+ res.unwrap()
+ });
+ // start t2 and t3 1.5 seconds later, since lock age is 1 sec, there will be no lock
+ tokio::time::sleep(Duration::from_secs_f32(1.5)).await;
+ let cache_c = cache.clone();
+ let t2 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await;
+ res.unwrap()
+ });
+ let cache_c = cache.clone();
+ let t3 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await;
+ res.unwrap()
+ });
+ let (r1, r2, r3) = tokio::join!(t1, t2, t3);
+ // 1 lookup + 2 lookups + 3 lookups, order not matter
+ assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6);
+ }
+
+ #[tokio::test]
+ async fn test_get_lock_timeout() {
+ // 1 sec lock timeout
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> =
+ RTCache::new(10, None, Some(Duration::from_secs(1)));
+ let cache = Arc::new(cache);
+ let counter = Arc::new(AtomicI32::new(0));
+ let opt1 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: Some(Duration::from_secs(2)),
+ used: counter.clone(),
+ });
+ let opt2 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: None,
+ used: counter.clone(),
+ });
+ let opt3 = opt2.clone();
+ let cache_c = cache.clone();
+ // t1 will be delay for 2 sec
+ let t1 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt1.as_ref()).await;
+ res.unwrap()
+ });
+ // since lock timeout is 1 sec, t2 and t3 will do their own lookup after 1 sec
+ let cache_c = cache.clone();
+ let t2 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt2.as_ref()).await;
+ res.unwrap()
+ });
+ let cache_c = cache.clone();
+ let t3 = tokio::spawn(async move {
+ let (res, _hit) = cache_c.get(&1, None, opt3.as_ref()).await;
+ res.unwrap()
+ });
+ let (r1, r2, r3) = tokio::join!(t1, t2, t3);
+ // 1 lookup + 2 lookups + 3 lookups, order not matter
+ assert_eq!(r1.unwrap() + r2.unwrap() + r3.unwrap(), 6);
+ }
+
+ #[tokio::test]
+ async fn test_multi_get() {
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ let counter = Arc::new(AtomicI32::new(0));
+ let opt1 = Some(ExtraOpt {
+ error: false,
+ empty: false,
+ delay_for: Some(Duration::from_secs(2)),
+ used: counter.clone(),
+ });
+ // make 1 a hit first
+ let (res, hit) = cache.get(&1, None, opt1.as_ref()).await;
+ assert_eq!(res.unwrap(), 1);
+ assert_eq!(hit, CacheStatus::Miss);
+ let (res, hit) = cache.get(&1, None, opt1.as_ref()).await;
+ assert_eq!(res.unwrap(), 1);
+ assert_eq!(hit, CacheStatus::Hit);
+ // 1 hit 2 miss 3 miss
+ let resp = cache
+ .multi_get([1, 2, 3].iter(), None, opt1.as_ref())
+ .await
+ .unwrap();
+ assert_eq!(resp[0].0, 1);
+ assert_eq!(resp[0].1, CacheStatus::Hit);
+ assert_eq!(resp[1].0, 2);
+ assert_eq!(resp[1].1, CacheStatus::Miss);
+ assert_eq!(resp[2].0, 3);
+ assert_eq!(resp[2].1, CacheStatus::Miss);
+ // all hit after a fetch
+ let resp = cache
+ .multi_get([1, 2, 3].iter(), None, opt1.as_ref())
+ .await
+ .unwrap();
+ assert_eq!(resp[0].0, 1);
+ assert_eq!(resp[0].1, CacheStatus::Hit);
+ assert_eq!(resp[1].0, 2);
+ assert_eq!(resp[1].1, CacheStatus::Hit);
+ assert_eq!(resp[2].0, 3);
+ assert_eq!(resp[2].1, CacheStatus::Hit);
+ }
+
+ #[tokio::test]
+ #[should_panic(expected = "multi_lookup() failed to return the matching number of results")]
+ async fn test_inconsistent_miss_results() {
+ // force empty result
+ let opt1 = Some(ExtraOpt {
+ error: false,
+ empty: true,
+ delay_for: None,
+ used: Arc::new(AtomicI32::new(0)),
+ });
+ let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
+ cache
+ .multi_get([4, 5, 6].iter(), None, opt1.as_ref())
+ .await
+ .unwrap();
+ }
+}