aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-cache/src
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2024-02-27 20:25:44 -0800
committerYuchen Wu <[email protected]>2024-02-27 20:25:44 -0800
commit8797329225018c4d0ab990166dd020338ae292dc (patch)
tree1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-cache/src
parent0bca116c1027a878469b72352e1e9e3916e85dde (diff)
downloadpingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz
pingora-8797329225018c4d0ab990166dd020338ae292dc.zip
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]> Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-cache/src')
-rw-r--r--pingora-cache/src/cache_control.rs839
-rw-r--r--pingora-cache/src/eviction/lru.rs431
-rw-r--r--pingora-cache/src/eviction/mod.rs89
-rw-r--r--pingora-cache/src/eviction/simple_lru.rs445
-rw-r--r--pingora-cache/src/filters.rs673
-rw-r--r--pingora-cache/src/hashtable.rs112
-rw-r--r--pingora-cache/src/key.rs302
-rw-r--r--pingora-cache/src/lib.rs1093
-rw-r--r--pingora-cache/src/lock.rs336
-rw-r--r--pingora-cache/src/max_file_size.rs75
-rw-r--r--pingora-cache/src/memory.rs510
-rw-r--r--pingora-cache/src/meta.rs608
-rw-r--r--pingora-cache/src/predictor.rs228
-rw-r--r--pingora-cache/src/put.rs754
-rw-r--r--pingora-cache/src/storage.rs122
-rw-r--r--pingora-cache/src/trace.rs98
-rw-r--r--pingora-cache/src/variance.rs120
17 files changed, 6835 insertions, 0 deletions
diff --git a/pingora-cache/src/cache_control.rs b/pingora-cache/src/cache_control.rs
new file mode 100644
index 0000000..6686c3e
--- /dev/null
+++ b/pingora-cache/src/cache_control.rs
@@ -0,0 +1,839 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Functions and utilities to help parse Cache-Control headers
+
+use super::*;
+
+use http::header::HeaderName;
+use http::HeaderValue;
+use indexmap::IndexMap;
+use once_cell::sync::Lazy;
+use pingora_error::{Error, ErrorType, Result};
+use pingora_http::ResponseHeader;
+use regex::bytes::Regex;
+use std::num::IntErrorKind;
+use std::slice;
+use std::str;
+
+/// The max delta-second per [RFC 7234](https://datatracker.ietf.org/doc/html/rfc7234#section-1.2.1)
+// "If a cache receives a delta-seconds
+// value greater than the greatest integer it can represent, or if any
+// of its subsequent calculations overflows, the cache MUST consider the
+// value to be either 2147483648 (2^31) or the greatest positive integer
+// it can conveniently represent."
+pub const DELTA_SECONDS_OVERFLOW_VALUE: u32 = 2147483648;
+
+/// Cache control directive key type
+pub type DirectiveKey = String;
+
+/// Cache control directive value type
+#[derive(Debug)]
+pub struct DirectiveValue(pub Vec<u8>);
+
+impl AsRef<[u8]> for DirectiveValue {
+ fn as_ref(&self) -> &[u8] {
+ &self.0
+ }
+}
+
+impl DirectiveValue {
+ /// A [DirectiveValue] without quotes (`"`).
+ pub fn parse_as_bytes(&self) -> &[u8] {
+ self.0
+ .strip_prefix(&[b'"'])
+ .and_then(|bytes| bytes.strip_suffix(&[b'"']))
+ .unwrap_or(&self.0[..])
+ }
+
+ /// A [DirectiveValue] without quotes (`"`) as `str`.
+ pub fn parse_as_str(&self) -> Result<&str> {
+ str::from_utf8(self.parse_as_bytes()).or_else(|e| {
+ Error::e_because(ErrorType::InternalError, "could not parse value as utf8", e)
+ })
+ }
+
+ /// Parse the [DirectiveValue] as delta seconds
+ ///
+ /// `"`s are ignored. The value is capped to [DELTA_SECONDS_OVERFLOW_VALUE].
+ pub fn parse_as_delta_seconds(&self) -> Result<u32> {
+ match self.parse_as_str()?.parse::<u32>() {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // delta-seconds expect to handle positive overflow gracefully
+ if e.kind() == &IntErrorKind::PosOverflow {
+ Ok(DELTA_SECONDS_OVERFLOW_VALUE)
+ } else {
+ Error::e_because(ErrorType::InternalError, "could not parse value as u32", e)
+ }
+ }
+ }
+ }
+}
+
+/// An ordered map to store cache control key value pairs.
+pub type DirectiveMap = IndexMap<DirectiveKey, Option<DirectiveValue>>;
+
+/// Parsed Cache-Control directives
+#[derive(Debug)]
+pub struct CacheControl {
+ /// The parsed directives
+ pub directives: DirectiveMap,
+}
+
+/// Cacheability calculated from cache control.
+#[derive(Debug, PartialEq, Eq)]
+pub enum Cacheable {
+ /// Cacheable
+ Yes,
+ /// Not cacheable
+ No,
+ /// No directive found for explicit cacheability
+ Default,
+}
+
+/// An iter over all the cache control directives
+pub struct ListValueIter<'a>(slice::Split<'a, u8, fn(&u8) -> bool>);
+
+impl<'a> ListValueIter<'a> {
+ pub fn from(value: &'a DirectiveValue) -> Self {
+ ListValueIter(value.parse_as_bytes().split(|byte| byte == &b','))
+ }
+}
+
+// https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.3
+// optional whitespace OWS = *(SP / HTAB); SP = 0x20, HTAB = 0x09
+fn trim_ows(bytes: &[u8]) -> &[u8] {
+ fn not_ows(b: &u8) -> bool {
+ b != &b'\x20' && b != &b'\x09'
+ }
+ // find first non-OWS char from front (head) and from end (tail)
+ let head = bytes.iter().position(not_ows).unwrap_or(0);
+ let tail = bytes
+ .iter()
+ .rposition(not_ows)
+ .map(|rpos| rpos + 1)
+ .unwrap_or(head);
+ &bytes[head..tail]
+}
+
+impl<'a> Iterator for ListValueIter<'a> {
+ type Item = &'a [u8];
+
+ fn next(&mut self) -> Option<Self::Item> {
+ Some(trim_ows(self.0.next()?))
+ }
+}
+
+/*
+ Originally from https://github.com/hapijs/wreck:
+ Cache-Control = 1#cache-directive
+ cache-directive = token [ "=" ( token / quoted-string ) ]
+ token = [^\x00-\x20\(\)<>@\,;\:\\"\/\[\]\?\=\{\}\x7F]+
+ quoted-string = "(?:[^"\\]|\\.)*"
+*/
+static RE_CACHE_DIRECTIVE: Lazy<Regex> =
+ // unicode support disabled, allow ; or , delimiter | capture groups: 1: directive = 2: token OR quoted-string
+ Lazy::new(|| {
+ Regex::new(r#"(?-u)(?:^|(?:\s*[,;]\s*))([^\x00-\x20\(\)<>@,;:\\"/\[\]\?=\{\}\x7F]+)(?:=((?:[^\x00-\x20\(\)<>@,;:\\"/\[\]\?=\{\}\x7F]+|(?:"(?:[^"\\]|\\.)*"))))?"#).unwrap()
+ });
+
+impl CacheControl {
+ // Our parsing strategy is more permissive than the RFC in a few ways:
+ // - Allows semicolons as delimiters (in addition to commas).
+ // - Allows octets outside of visible ASCII in tokens.
+ // - Doesn't require no-value for "boolean directives," such as must-revalidate
+ // - Allows quoted-string format for numeric values.
+ fn from_headers(headers: http::header::GetAll<HeaderValue>) -> Option<Self> {
+ let mut directives = IndexMap::new();
+ // should iterate in header line insertion order
+ for line in headers {
+ for captures in RE_CACHE_DIRECTIVE.captures_iter(line.as_bytes()) {
+ // directive key
+ // header values don't have to be utf-8, but we store keys as strings for case-insensitive hashing
+ let key = captures.get(1).and_then(|cap| {
+ str::from_utf8(cap.as_bytes())
+ .ok()
+ .map(|token| token.to_lowercase())
+ });
+ if key.is_none() {
+ continue;
+ }
+ // directive value
+ // match token or quoted-string
+ let value = captures
+ .get(2)
+ .map(|cap| DirectiveValue(cap.as_bytes().to_vec()));
+ directives.insert(key.unwrap(), value);
+ }
+ }
+ Some(CacheControl { directives })
+ }
+
+ /// Parse from the given header name in `headers`
+ pub fn from_headers_named(header_name: &str, headers: &http::HeaderMap) -> Option<Self> {
+ if !headers.contains_key(header_name) {
+ return None;
+ }
+
+ Self::from_headers(headers.get_all(header_name))
+ }
+
+ /// Parse from the given header name in the [ReqHeader]
+ pub fn from_req_headers_named(header_name: &str, req_header: &ReqHeader) -> Option<Self> {
+ Self::from_headers_named(header_name, &req_header.headers)
+ }
+
+ /// Parse `Cache-Control` header name from the [ReqHeader]
+ pub fn from_req_headers(req_header: &ReqHeader) -> Option<Self> {
+ Self::from_req_headers_named("cache-control", req_header)
+ }
+
+ /// Parse from the given header name in the [RespHeader]
+ pub fn from_resp_headers_named(header_name: &str, resp_header: &RespHeader) -> Option<Self> {
+ Self::from_headers_named(header_name, &resp_header.headers)
+ }
+
+ /// Parse `Cache-Control` header name from the [RespHeader]
+ pub fn from_resp_headers(resp_header: &RespHeader) -> Option<Self> {
+ Self::from_resp_headers_named("cache-control", resp_header)
+ }
+
+ /// Whether the given directive is in the cache control.
+ pub fn has_key(&self, key: &str) -> bool {
+ self.directives.contains_key(key)
+ }
+
+ /// Whether the `public` directive is in the cache control.
+ pub fn public(&self) -> bool {
+ self.has_key("public")
+ }
+
+ /// Whether the given directive exists and it has no value.
+ fn has_key_without_value(&self, key: &str) -> bool {
+ matches!(self.directives.get(key), Some(None))
+ }
+
+ /// Whether the standalone `private` exists in the cache control
+ // RFC 7234: using the #field-name versions of `private`
+ // means a shared cache "MUST NOT store the specified field-name(s),
+ // whereas it MAY store the remainder of the response."
+ // It must be a boolean form (no value) to apply to the whole response.
+ // https://datatracker.ietf.org/doc/html/rfc7234#section-5.2.2.6
+ pub fn private(&self) -> bool {
+ self.has_key_without_value("private")
+ }
+
+ fn get_field_names(&self, key: &str) -> Option<ListValueIter> {
+ if let Some(Some(value)) = self.directives.get(key) {
+ Some(ListValueIter::from(value))
+ } else {
+ None
+ }
+ }
+
+ /// Get the values of `private=`
+ pub fn private_field_names(&self) -> Option<ListValueIter> {
+ self.get_field_names("private")
+ }
+
+ /// Whether the standalone `no-cache` exists in the cache control
+ pub fn no_cache(&self) -> bool {
+ self.has_key_without_value("no-cache")
+ }
+
+ /// Get the values of `no-cache=`
+ pub fn no_cache_field_names(&self) -> Option<ListValueIter> {
+ self.get_field_names("no-cache")
+ }
+
+ /// Whether `no-store` exists.
+ pub fn no_store(&self) -> bool {
+ self.has_key("no-store")
+ }
+
+ fn parse_delta_seconds(&self, key: &str) -> Result<Option<u32>> {
+ if let Some(Some(dir_value)) = self.directives.get(key) {
+ Ok(Some(dir_value.parse_as_delta_seconds()?))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Return the `max-age` seconds
+ pub fn max_age(&self) -> Result<Option<u32>> {
+ self.parse_delta_seconds("max-age")
+ }
+
+ /// Return the `s-maxage` seconds
+ pub fn s_maxage(&self) -> Result<Option<u32>> {
+ self.parse_delta_seconds("s-maxage")
+ }
+
+ /// Return the `stale-while-revalidate` seconds
+ pub fn stale_while_revalidate(&self) -> Result<Option<u32>> {
+ self.parse_delta_seconds("stale-while-revalidate")
+ }
+
+ /// Return the `stale-if-error` seconds
+ pub fn stale_if_error(&self) -> Result<Option<u32>> {
+ self.parse_delta_seconds("stale-if-error")
+ }
+
+ /// Whether `must-revalidate` exists.
+ pub fn must_revalidate(&self) -> bool {
+ self.has_key("must-revalidate")
+ }
+
+ /// Whether `proxy-revalidate` exists.
+ pub fn proxy_revalidate(&self) -> bool {
+ self.has_key("proxy-revalidate")
+ }
+
+ /// Whether `only-if-cached` exists.
+ pub fn only_if_cached(&self) -> bool {
+ self.has_key("only-if-cached")
+ }
+}
+
+impl InterpretCacheControl for CacheControl {
+ fn is_cacheable(&self) -> Cacheable {
+ if self.no_store() || self.private() {
+ return Cacheable::No;
+ }
+ if self.has_key("s-maxage") || self.has_key("max-age") || self.public() {
+ return Cacheable::Yes;
+ }
+ Cacheable::Default
+ }
+
+ fn allow_caching_authorized_req(&self) -> bool {
+ // RFC 7234 https://datatracker.ietf.org/doc/html/rfc7234#section-3
+ // "MUST NOT" store requests with Authorization header
+ // unless response contains one of these directives
+ self.must_revalidate() || self.public() || self.has_key("s-maxage")
+ }
+
+ fn fresh_sec(&self) -> Option<u32> {
+ if self.no_cache() {
+ // always treated as stale
+ return Some(0);
+ }
+ match self.s_maxage() {
+ Ok(Some(seconds)) => Some(seconds),
+ // s-maxage not present
+ Ok(None) => match self.max_age() {
+ Ok(Some(seconds)) => Some(seconds),
+ _ => None,
+ },
+ _ => None,
+ }
+ }
+
+ fn serve_stale_while_revalidate_sec(&self) -> Option<u32> {
+ // RFC 7234: these directives forbid serving stale.
+ // https://datatracker.ietf.org/doc/html/rfc7234#section-4.2.4
+ if self.must_revalidate() || self.proxy_revalidate() || self.has_key("s-maxage") {
+ return Some(0);
+ }
+ self.stale_while_revalidate().unwrap_or(None)
+ }
+
+ fn serve_stale_if_error_sec(&self) -> Option<u32> {
+ if self.must_revalidate() || self.proxy_revalidate() || self.has_key("s-maxage") {
+ return Some(0);
+ }
+ self.stale_if_error().unwrap_or(None)
+ }
+
+ // Strip header names listed in `private` or `no-cache` directives from a response.
+ fn strip_private_headers(&self, resp_header: &mut ResponseHeader) {
+ fn strip_listed_headers(resp: &mut ResponseHeader, field_names: ListValueIter) {
+ for name in field_names {
+ if let Ok(header) = HeaderName::from_bytes(name) {
+ resp.remove_header(&header);
+ }
+ }
+ }
+
+ if let Some(headers) = self.private_field_names() {
+ strip_listed_headers(resp_header, headers);
+ }
+ // We interpret `no-cache` the same way as `private`,
+ // though technically it has a less restrictive requirement
+ // ("MUST NOT be sent in the response to a subsequent request
+ // without successful revalidation with the origin server").
+ // https://datatracker.ietf.org/doc/html/rfc7234#section-5.2.2.2
+ if let Some(headers) = self.no_cache_field_names() {
+ strip_listed_headers(resp_header, headers);
+ }
+ }
+}
+
+/// `InterpretCacheControl` provides a meaningful interface to the parsed `CacheControl`.
+/// These functions actually interpret the parsed cache-control directives to return
+/// the freshness or other cache meta values that cache-control is signaling.
+///
+/// By default `CacheControl` implements an RFC-7234 compliant reading that assumes it is being
+/// used with a shared (proxy) cache.
+pub trait InterpretCacheControl {
+ /// Does cache-control specify this response is cacheable?
+ ///
+ /// Note that an RFC-7234 compliant cacheability check must also
+ /// check if the request contained the Authorization header and
+ /// `allow_caching_authorized_req`.
+ fn is_cacheable(&self) -> Cacheable;
+
+ /// Does this cache-control allow caching a response to
+ /// a request with the Authorization header?
+ fn allow_caching_authorized_req(&self) -> bool;
+
+ /// Returns freshness ttl specified in cache-control
+ ///
+ /// - `Some(_)` indicates cache-control specifies a valid ttl. Some(0) = always stale.
+ /// - `None` means cache-control did not specify a valid ttl.
+ fn fresh_sec(&self) -> Option<u32>;
+
+ /// Returns stale-while-revalidate ttl,
+ ///
+ /// The result should consider all the relevant cache directives, not just SWR header itself.
+ ///
+ /// Some(0) means serving such stale is disallowed by directive like `must-revalidate`
+ /// or `stale-while-revalidater=0`.
+ ///
+ /// `None` indicates no SWR ttl was specified.
+ fn serve_stale_while_revalidate_sec(&self) -> Option<u32>;
+
+ /// Returns stale-if-error ttl,
+ ///
+ /// The result should consider all the relevant cache directives, not just SIE header itself.
+ ///
+ /// Some(0) means serving such stale is disallowed by directive like `must-revalidate`
+ /// or `stale-if-error=0`.
+ ///
+ /// `None` indicates no SIE ttl was specified.
+ fn serve_stale_if_error_sec(&self) -> Option<u32>;
+
+ /// Strip header names listed in `private` or `no-cache` directives from a response,
+ /// usually prior to storing that response in cache.
+ fn strip_private_headers(&self, resp_header: &mut ResponseHeader);
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use http::header::CACHE_CONTROL;
+ use http::HeaderValue;
+ use http::{request, response};
+
+ fn build_response(cc_key: HeaderName, cc_value: &str) -> response::Parts {
+ let (parts, _) = response::Builder::new()
+ .header(cc_key, cc_value)
+ .body(())
+ .unwrap()
+ .into_parts();
+ parts
+ }
+
+ #[test]
+ fn test_simple_cache_control() {
+ let resp = build_response(CACHE_CONTROL, "public, max-age=10000");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.public());
+ assert_eq!(cc.max_age().unwrap().unwrap(), 10000);
+ }
+
+ #[test]
+ fn test_private_cache_control() {
+ let resp = build_response(CACHE_CONTROL, "private");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+
+ assert!(cc.private());
+ assert!(cc.max_age().unwrap().is_none());
+ }
+
+ #[test]
+ fn test_directives_across_header_lines() {
+ let (parts, _) = response::Builder::new()
+ .header(CACHE_CONTROL, "public,")
+ .header("cache-Control", "max-age=10000")
+ .body(())
+ .unwrap()
+ .into_parts();
+ let cc = CacheControl::from_resp_headers(&parts).unwrap();
+
+ assert!(cc.public());
+ assert_eq!(cc.max_age().unwrap().unwrap(), 10000);
+ }
+
+ #[test]
+ fn test_recognizes_semicolons_as_delimiters() {
+ let resp = build_response(CACHE_CONTROL, "public; max-age=0");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+
+ assert!(cc.public());
+ assert_eq!(cc.max_age().unwrap().unwrap(), 0);
+ }
+
+ #[test]
+ fn test_unknown_directives() {
+ let resp = build_response(CACHE_CONTROL, "public,random1=random2, rand3=\"\"");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ let mut directive_iter = cc.directives.iter();
+
+ let first = directive_iter.next().unwrap();
+ assert_eq!(first.0, &"public");
+ assert!(first.1.is_none());
+
+ let second = directive_iter.next().unwrap();
+ assert_eq!(second.0, &"random1");
+ assert_eq!(second.1.as_ref().unwrap().0, "random2".as_bytes());
+
+ let third = directive_iter.next().unwrap();
+ assert_eq!(third.0, &"rand3");
+ assert_eq!(third.1.as_ref().unwrap().0, "\"\"".as_bytes());
+
+ assert!(directive_iter.next().is_none());
+ }
+
+ #[test]
+ fn test_case_insensitive_directive_keys() {
+ let resp = build_response(
+ CACHE_CONTROL,
+ "Public=\"something\", mAx-AGe=\"10000\", foo=cRaZyCaSe, bAr=\"inQuotes\"",
+ );
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+
+ assert!(cc.public());
+ assert_eq!(cc.max_age().unwrap().unwrap(), 10000);
+
+ let mut directive_iter = cc.directives.iter();
+ let first = directive_iter.next().unwrap();
+ assert_eq!(first.0, &"public");
+ assert_eq!(first.1.as_ref().unwrap().0, "\"something\"".as_bytes());
+
+ let second = directive_iter.next().unwrap();
+ assert_eq!(second.0, &"max-age");
+ assert_eq!(second.1.as_ref().unwrap().0, "\"10000\"".as_bytes());
+
+ // values are still stored with casing
+ let third = directive_iter.next().unwrap();
+ assert_eq!(third.0, &"foo");
+ assert_eq!(third.1.as_ref().unwrap().0, "cRaZyCaSe".as_bytes());
+
+ let fourth = directive_iter.next().unwrap();
+ assert_eq!(fourth.0, &"bar");
+ assert_eq!(fourth.1.as_ref().unwrap().0, "\"inQuotes\"".as_bytes());
+
+ assert!(directive_iter.next().is_none());
+ }
+
+ #[test]
+ fn test_non_ascii() {
+ let resp = build_response(CACHE_CONTROL, "püblic=💖, max-age=\"💯\"");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+
+ // Not considered valid registered directive keys / values
+ assert!(!cc.public());
+ assert_eq!(
+ cc.max_age().unwrap_err().context.unwrap().to_string(),
+ "could not parse value as u32"
+ );
+
+ let mut directive_iter = cc.directives.iter();
+ let first = directive_iter.next().unwrap();
+ assert_eq!(first.0, &"püblic");
+ assert_eq!(first.1.as_ref().unwrap().0, "💖".as_bytes());
+
+ let second = directive_iter.next().unwrap();
+ assert_eq!(second.0, &"max-age");
+ assert_eq!(second.1.as_ref().unwrap().0, "\"💯\"".as_bytes());
+
+ assert!(directive_iter.next().is_none());
+ }
+
+ #[test]
+ fn test_non_utf8_key() {
+ let mut resp = response::Builder::new().body(()).unwrap();
+ resp.headers_mut().insert(
+ CACHE_CONTROL,
+ HeaderValue::from_bytes(b"bar\xFF=\"baz\", a=b").unwrap(),
+ );
+ let (parts, _) = resp.into_parts();
+ let cc = CacheControl::from_resp_headers(&parts).unwrap();
+
+ // invalid bytes for key
+ let mut directive_iter = cc.directives.iter();
+ let first = directive_iter.next().unwrap();
+ assert_eq!(first.0, &"a");
+ assert_eq!(first.1.as_ref().unwrap().0, "b".as_bytes());
+
+ assert!(directive_iter.next().is_none());
+ }
+
+ #[test]
+ fn test_non_utf8_value() {
+ // RFC 7230: 0xFF is part of obs-text and is officially considered a valid octet in quoted-strings
+ let mut resp = response::Builder::new().body(()).unwrap();
+ resp.headers_mut().insert(
+ CACHE_CONTROL,
+ HeaderValue::from_bytes(b"max-age=ba\xFFr, bar=\"baz\xFF\", a=b").unwrap(),
+ );
+ let (parts, _) = resp.into_parts();
+ let cc = CacheControl::from_resp_headers(&parts).unwrap();
+
+ assert_eq!(
+ cc.max_age().unwrap_err().context.unwrap().to_string(),
+ "could not parse value as utf8"
+ );
+
+ let mut directive_iter = cc.directives.iter();
+
+ let first = directive_iter.next().unwrap();
+ assert_eq!(first.0, &"max-age");
+ assert_eq!(first.1.as_ref().unwrap().0, b"ba\xFFr");
+
+ let second = directive_iter.next().unwrap();
+ assert_eq!(second.0, &"bar");
+ assert_eq!(second.1.as_ref().unwrap().0, b"\"baz\xFF\"");
+
+ let third = directive_iter.next().unwrap();
+ assert_eq!(third.0, &"a");
+ assert_eq!(third.1.as_ref().unwrap().0, "b".as_bytes());
+
+ assert!(directive_iter.next().is_none());
+ }
+
+ #[test]
+ fn test_age_overflow() {
+ let resp = build_response(
+ CACHE_CONTROL,
+ "max-age=-99999999999999999999999999, s-maxage=99999999999999999999999999",
+ );
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+
+ assert_eq!(
+ cc.s_maxage().unwrap().unwrap(),
+ DELTA_SECONDS_OVERFLOW_VALUE
+ );
+ // negative ages still result in errors even with overflow handling
+ assert_eq!(
+ cc.max_age().unwrap_err().context.unwrap().to_string(),
+ "could not parse value as u32"
+ );
+ }
+
+ #[test]
+ fn test_fresh_sec() {
+ let resp = build_response(CACHE_CONTROL, "");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.fresh_sec().is_none());
+
+ let resp = build_response(CACHE_CONTROL, "max-age=12345");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.fresh_sec().unwrap(), 12345);
+
+ let resp = build_response(CACHE_CONTROL, "max-age=99999,s-maxage=123");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ // prefer s-maxage over max-age
+ assert_eq!(cc.fresh_sec().unwrap(), 123);
+ }
+
+ #[test]
+ fn test_cacheability() {
+ let resp = build_response(CACHE_CONTROL, "");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::Default);
+
+ // uncacheable
+ let resp = build_response(CACHE_CONTROL, "private, max-age=12345");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::No);
+
+ let resp = build_response(CACHE_CONTROL, "no-store, max-age=12345");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::No);
+
+ // cacheable
+ let resp = build_response(CACHE_CONTROL, "public");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::Yes);
+
+ let resp = build_response(CACHE_CONTROL, "max-age=0");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::Yes);
+ }
+
+ #[test]
+ fn test_no_cache() {
+ let resp = build_response(CACHE_CONTROL, "no-cache, max-age=12345");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.is_cacheable(), Cacheable::Yes);
+ assert_eq!(cc.fresh_sec().unwrap(), 0);
+ }
+
+ #[test]
+ fn test_no_cache_field_names() {
+ let resp = build_response(CACHE_CONTROL, "no-cache=\"set-cookie\", max-age=12345");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(!cc.private());
+ assert_eq!(cc.is_cacheable(), Cacheable::Yes);
+ assert_eq!(cc.fresh_sec().unwrap(), 12345);
+ let mut field_names = cc.no_cache_field_names().unwrap();
+ assert_eq!(
+ str::from_utf8(field_names.next().unwrap()).unwrap(),
+ "set-cookie"
+ );
+ assert!(field_names.next().is_none());
+
+ let mut resp = response::Builder::new().body(()).unwrap();
+ resp.headers_mut().insert(
+ CACHE_CONTROL,
+ HeaderValue::from_bytes(
+ b"private=\"\", no-cache=\"a\xFF, set-cookie, Baz\x09 , c,d ,, \"",
+ )
+ .unwrap(),
+ );
+ let (parts, _) = resp.into_parts();
+ let cc = CacheControl::from_resp_headers(&parts).unwrap();
+ let mut field_names = cc.private_field_names().unwrap();
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "");
+ assert!(field_names.next().is_none());
+ let mut field_names = cc.no_cache_field_names().unwrap();
+ assert!(str::from_utf8(field_names.next().unwrap()).is_err());
+ assert_eq!(
+ str::from_utf8(field_names.next().unwrap()).unwrap(),
+ "set-cookie"
+ );
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "Baz");
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "c");
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "d");
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "");
+ assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "");
+ assert!(field_names.next().is_none());
+ }
+
+ #[test]
+ fn test_strip_private_headers() {
+ let mut resp = ResponseHeader::build(200, None).unwrap();
+ resp.append_header(
+ CACHE_CONTROL,
+ "no-cache=\"x-private-header\", max-age=12345",
+ )
+ .unwrap();
+ resp.append_header("X-Private-Header", "dropped").unwrap();
+
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ cc.strip_private_headers(&mut resp);
+ assert!(!resp.headers.contains_key("X-Private-Header"));
+ }
+
+ #[test]
+ fn test_stale_while_revalidate() {
+ let resp = build_response(CACHE_CONTROL, "max-age=12345, stale-while-revalidate=5");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 5);
+ assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 5);
+ assert!(cc.serve_stale_if_error_sec().is_none());
+ }
+
+ #[test]
+ fn test_stale_if_error() {
+ let resp = build_response(CACHE_CONTROL, "max-age=12345, stale-if-error=3600");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.stale_if_error().unwrap().unwrap(), 3600);
+ assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 3600);
+ assert!(cc.serve_stale_while_revalidate_sec().is_none());
+ }
+
+ #[test]
+ fn test_must_revalidate() {
+ let resp = build_response(
+ CACHE_CONTROL,
+ "max-age=12345, stale-while-revalidate=60, stale-if-error=30, must-revalidate",
+ );
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.must_revalidate());
+ assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60);
+ assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30);
+ assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0);
+ assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0);
+ }
+
+ #[test]
+ fn test_proxy_revalidate() {
+ let resp = build_response(
+ CACHE_CONTROL,
+ "max-age=12345, stale-while-revalidate=60, stale-if-error=30, proxy-revalidate",
+ );
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.proxy_revalidate());
+ assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60);
+ assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30);
+ assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0);
+ assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0);
+ }
+
+ #[test]
+ fn test_s_maxage_stale() {
+ let resp = build_response(
+ CACHE_CONTROL,
+ "s-maxage=0, stale-while-revalidate=60, stale-if-error=30",
+ );
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60);
+ assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30);
+ assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0);
+ assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0);
+ }
+
+ #[test]
+ fn test_authorized_request() {
+ let resp = build_response(CACHE_CONTROL, "max-age=10");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(!cc.allow_caching_authorized_req());
+
+ let resp = build_response(CACHE_CONTROL, "s-maxage=10");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.allow_caching_authorized_req());
+
+ let resp = build_response(CACHE_CONTROL, "public");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.allow_caching_authorized_req());
+
+ let resp = build_response(CACHE_CONTROL, "must-revalidate, max-age=0");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(cc.allow_caching_authorized_req());
+
+ let resp = build_response(CACHE_CONTROL, "");
+ let cc = CacheControl::from_resp_headers(&resp).unwrap();
+ assert!(!cc.allow_caching_authorized_req());
+ }
+
+ fn build_request(cc_key: HeaderName, cc_value: &str) -> request::Parts {
+ let (parts, _) = request::Builder::new()
+ .header(cc_key, cc_value)
+ .body(())
+ .unwrap()
+ .into_parts();
+ parts
+ }
+
+ #[test]
+ fn test_request_only_if_cached() {
+ let req = build_request(CACHE_CONTROL, "only-if-cached=1");
+ let cc = CacheControl::from_req_headers(&req).unwrap();
+ assert!(cc.only_if_cached())
+ }
+}
diff --git a/pingora-cache/src/eviction/lru.rs b/pingora-cache/src/eviction/lru.rs
new file mode 100644
index 0000000..47c88a6
--- /dev/null
+++ b/pingora-cache/src/eviction/lru.rs
@@ -0,0 +1,431 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! A shared LRU cache manager
+
+use super::EvictionManager;
+use crate::key::CompactCacheKey;
+
+use async_trait::async_trait;
+use pingora_error::{BError, ErrorType::*, OrErr, Result};
+use pingora_lru::Lru;
+use serde::de::SeqAccess;
+use serde::{Deserialize, Serialize};
+use std::fs::File;
+use std::hash::{Hash, Hasher};
+use std::io::prelude::*;
+use std::path::Path;
+use std::time::SystemTime;
+
+/// A shared LRU cache manager designed to manage a large volume of assets.
+///
+/// - Space optimized in-memory LRU (see [pingora_lru]).
+/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs.
+/// This allows [EvictionManager::save()] not to lock the entire cache mananger while performing
+/// serialization.
+pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SerdeHelperNode(CompactCacheKey, usize);
+
+impl<const N: usize> Manager<N> {
+ /// Create a [Manager] with the given size limit and estimated per shard capacity.
+ ///
+ /// The `capacity` is for preallocating to avoid reallocation cost when the LRU grows.
+ pub fn with_capacity(limit: usize, capacity: usize) -> Self {
+ Manager(Lru::with_capacity(limit, capacity))
+ }
+
+ /// Serialize the given shard
+ pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
+ use rmp_serde::encode::Serializer;
+ use serde::ser::SerializeSeq;
+ use serde::ser::Serializer as _;
+
+ assert!(shard < N);
+
+ // NOTE: This could use a lot memory to buffer the serialized data in memory
+ // NOTE: This for loop could lock the LRU for too long
+ let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
+ self.0.iter_for_each(shard, |(node, size)| {
+ nodes.push(SerdeHelperNode(node.clone(), size));
+ });
+ let mut ser = Serializer::new(vec![]);
+ let mut seq = ser
+ .serialize_seq(Some(self.0.shard_len(shard)))
+ .or_err(InternalError, "fail to serialize node")?;
+ for node in nodes {
+ seq.serialize_element(&node).unwrap(); // write to vec, safe
+ }
+
+ seq.end().or_err(InternalError, "when serializing LRU")?;
+ Ok(ser.into_inner())
+ }
+
+ /// Deserialize a shard
+ ///
+ /// Shard number is not needed because the key itself will hash to the correct shard.
+ pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
+ use rmp_serde::decode::Deserializer;
+ use serde::de::Deserializer as _;
+
+ let mut de = Deserializer::new(buf);
+ let visitor = InsertToManager { lru: self };
+ de.deserialize_seq(visitor)
+ .or_err(InternalError, "when deserializing LRU")?;
+ Ok(())
+ }
+}
+
+struct InsertToManager<'a, const N: usize> {
+ lru: &'a Manager<N>,
+}
+
+impl<'de, 'a, const N: usize> serde::de::Visitor<'de> for InsertToManager<'a, N> {
+ type Value = ();
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("array of lru nodes")
+ }
+
+ fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
+ where
+ A: SeqAccess<'de>,
+ {
+ while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
+ let key = u64key(&node.0);
+ self.lru.0.insert_tail(key, node.0, node.1); // insert in the back
+ }
+ Ok(())
+ }
+}
+
+#[inline]
+fn u64key(key: &CompactCacheKey) -> u64 {
+ // note that std hash is not uniform, I'm not sure if ahash is also the case
+ let mut hasher = ahash::AHasher::default();
+ key.hash(&mut hasher);
+ hasher.finish()
+}
+
+const FILE_NAME: &str = "lru.data";
+
+#[inline]
+fn err_str_path(s: &str, path: &Path) -> String {
+ format!("{s} {}", path.display())
+}
+
+#[async_trait]
+impl<const N: usize> EvictionManager for Manager<N> {
+ fn total_size(&self) -> usize {
+ self.0.weight()
+ }
+ fn total_items(&self) -> usize {
+ self.0.len()
+ }
+ fn evicted_size(&self) -> usize {
+ self.0.evicted_weight()
+ }
+ fn evicted_items(&self) -> usize {
+ self.0.evicted_len()
+ }
+
+ fn admit(
+ &self,
+ item: CompactCacheKey,
+ size: usize,
+ _fresh_until: SystemTime,
+ ) -> Vec<CompactCacheKey> {
+ let key = u64key(&item);
+ self.0.admit(key, item, size);
+ self.0
+ .evict_to_limit()
+ .into_iter()
+ .map(|(key, _weight)| key)
+ .collect()
+ }
+
+ fn remove(&self, item: &CompactCacheKey) {
+ let key = u64key(item);
+ self.0.remove(key);
+ }
+
+ fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
+ let key = u64key(item);
+ if !self.0.promote(key) {
+ self.0.admit(key, item.clone(), size);
+ false
+ } else {
+ true
+ }
+ }
+
+ fn peek(&self, item: &CompactCacheKey) -> bool {
+ let key = u64key(item);
+ self.0.peek(key)
+ }
+
+ async fn save(&self, dir_path: &str) -> Result<()> {
+ let dir_path_str = dir_path.to_owned();
+
+ tokio::task::spawn_blocking(move || {
+ let dir_path = Path::new(&dir_path_str);
+ std::fs::create_dir_all(dir_path)
+ .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
+ })
+ .await
+ .or_err(InternalError, "async blocking IO failure")??;
+
+ for i in 0..N {
+ let data = self.serialize_shard(i)?;
+ let dir_path = dir_path.to_owned();
+ tokio::task::spawn_blocking(move || {
+ let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
+ let mut file = File::create(&file_path)
+ .or_err_with(InternalError, || err_str_path("fail to create", &file_path))?;
+ file.write_all(&data).or_err_with(InternalError, || {
+ err_str_path("fail to write to", &file_path)
+ })
+ })
+ .await
+ .or_err(InternalError, "async blocking IO failure")??;
+ }
+ Ok(())
+ }
+
+ async fn load(&self, dir_path: &str) -> Result<()> {
+ // TODO: check the saved shards so that we load all the save files
+ for i in 0..N {
+ let dir_path = dir_path.to_owned();
+
+ let data = tokio::task::spawn_blocking(move || {
+ let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
+ let mut file = File::open(&file_path)
+ .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
+ let mut buffer = Vec::with_capacity(8192);
+ file.read_to_end(&mut buffer)
+ .or_err_with(InternalError, || {
+ err_str_path("fail to write to", &file_path)
+ })?;
+ Ok::<Vec<u8>, BError>(buffer)
+ })
+ .await
+ .or_err(InternalError, "async blocking IO failure")??;
+ self.deserialize_shard(&data)?;
+ }
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::CacheKey;
+ use EvictionManager;
+
+ // we use shard (N) = 1 for eviction consistency in all tests
+
+ #[test]
+ fn test_admission() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru si full (4) now
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
+ assert_eq!(v.len(), 2);
+ assert_eq!(v[0], key1);
+ assert_eq!(v[1], key2);
+ }
+
+ #[test]
+ fn test_access() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // make key1 most recently used
+ lru.access(&key1, 1, until);
+ assert_eq!(v.len(), 0);
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[test]
+ fn test_remove() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // remove key1
+ lru.remove(&key1);
+
+ // key2 is the least recently used one now
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[test]
+ fn test_access_add() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let until = SystemTime::now(); // unused value as a placeholder
+
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ lru.access(&key1, 1, until);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ lru.access(&key2, 2, until);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ lru.access(&key3, 2, until);
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
+ assert_eq!(v.len(), 2);
+ assert_eq!(v[0], key1);
+ assert_eq!(v[1], key2);
+ }
+
+ #[test]
+ fn test_admit_update() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // update key2 to reduce its size by 1
+ let v = lru.admit(key2, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is not full anymore
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+
+ // make key4 larger
+ let v = lru.admit(key4, 2, until);
+ // need to evict now
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key1);
+ }
+
+ #[test]
+ fn test_peek() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let until = SystemTime::now(); // unused value as a placeholder
+
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ lru.access(&key1, 1, until);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ lru.access(&key2, 2, until);
+ assert!(lru.peek(&key1));
+ assert!(lru.peek(&key2));
+ }
+
+ #[test]
+ fn test_serde() {
+ let lru = Manager::<1>::with_capacity(4, 10);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // make key1 most recently used
+ lru.access(&key1, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // load lru2 with lru's data
+ let ser = lru.serialize_shard(0).unwrap();
+ let lru2 = Manager::<1>::with_capacity(4, 10);
+ lru2.deserialize_shard(&ser).unwrap();
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru2.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[tokio::test]
+ async fn test_save_to_disk() {
+ let until = SystemTime::now(); // unused value as a placeholder
+ let lru = Manager::<2>::with_capacity(10, 10);
+
+ lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
+ lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
+ lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
+ lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
+ lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
+ lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
+
+ // load lru2 with lru's data
+ lru.save("/tmp/test_lru_save").await.unwrap();
+ let lru2 = Manager::<2>::with_capacity(4, 10);
+ lru2.load("/tmp/test_lru_save").await.unwrap();
+
+ let ser0 = lru.serialize_shard(0).unwrap();
+ let ser1 = lru.serialize_shard(1).unwrap();
+
+ assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
+ assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
+ }
+}
diff --git a/pingora-cache/src/eviction/mod.rs b/pingora-cache/src/eviction/mod.rs
new file mode 100644
index 0000000..7c9d60b
--- /dev/null
+++ b/pingora-cache/src/eviction/mod.rs
@@ -0,0 +1,89 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cache eviction module
+
+use crate::key::CompactCacheKey;
+
+use async_trait::async_trait;
+use pingora_error::Result;
+use std::time::SystemTime;
+
+pub mod lru;
+pub mod simple_lru;
+
+/// The trait that a cache eviction algorithm needs to implement
+///
+/// NOTE: these trait methods require &self not &mut self, which means concurrency should
+/// be handled the implementations internally.
+#[async_trait]
+pub trait EvictionManager {
+ /// Total size of the cache in bytes tracked by this eviction mananger
+ fn total_size(&self) -> usize;
+ /// Number of assets tracked by this eviction mananger
+ fn total_items(&self) -> usize;
+ /// Number of bytes that are already evicted
+ ///
+ /// The accumulated number is returned to play well with Prometheus counter metric type.
+ fn evicted_size(&self) -> usize;
+ /// Number of assets that are already evicted
+ ///
+ /// The accumulated number is returned to play well with Prometheus counter metric type.
+ fn evicted_items(&self) -> usize;
+
+ /// Admit an item
+ ///
+ /// Return one or more items to evict. The sizes of these items are deducted
+ /// from the total size already. The caller needs to make sure that these assets are actually
+ /// removed from the storage.
+ ///
+ /// If the item is already admitted, A. update its freshness; B. if the new size is larger than the
+ /// existing one, Some(_) might be returned for the caller to evict.
+ fn admit(
+ &self,
+ item: CompactCacheKey,
+ size: usize,
+ fresh_until: SystemTime,
+ ) -> Vec<CompactCacheKey>;
+
+ /// Remove an item from the eviction manager.
+ ///
+ /// The size of the item will be deducted.
+ fn remove(&self, item: &CompactCacheKey);
+
+ /// Access an item that should already be in cache.
+ ///
+ /// If the item is not tracked by this [EvictionManager], track it but no eviction will happen.
+ ///
+ /// The call used for asking the eviction manager to track the assets that are already admitted
+ /// in the cache storage system.
+ fn access(&self, item: &CompactCacheKey, size: usize, fresh_until: SystemTime) -> bool;
+
+ /// Peek into the manager to see if the item is already tracked by the system
+ ///
+ /// This function should have no side-effect on the asset itself. For example, for LRU, this
+ /// method shouldn't change the popularity of the asset being peeked.
+ fn peek(&self, item: &CompactCacheKey) -> bool;
+
+ /// Serialize to save the state of this eviction mananger to disk
+ ///
+ /// This function is for preserving the eviction manager's state across server restarts.
+ ///
+ /// `dir_path` define the directory on disk that the data should use.
+ // dir_path is &str no AsRef<Path> so that trait objects can be used
+ async fn save(&self, dir_path: &str) -> Result<()>;
+
+ /// The counterpart of [Self::save()].
+ async fn load(&self, dir_path: &str) -> Result<()>;
+}
diff --git a/pingora-cache/src/eviction/simple_lru.rs b/pingora-cache/src/eviction/simple_lru.rs
new file mode 100644
index 0000000..73efb85
--- /dev/null
+++ b/pingora-cache/src/eviction/simple_lru.rs
@@ -0,0 +1,445 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! A simple LRU cache manager built on top of the `lru` crate
+
+use super::EvictionManager;
+use crate::key::CompactCacheKey;
+
+use async_trait::async_trait;
+use lru::LruCache;
+use parking_lot::RwLock;
+use pingora_error::{BError, ErrorType::*, OrErr, Result};
+use serde::de::SeqAccess;
+use serde::{Deserialize, Serialize};
+use std::collections::hash_map::DefaultHasher;
+use std::fs::File;
+use std::hash::{Hash, Hasher};
+use std::io::prelude::*;
+use std::path::Path;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::time::SystemTime;
+
+#[derive(Debug, Deserialize, Serialize)]
+struct Node {
+ key: CompactCacheKey,
+ size: usize,
+}
+
+/// A simple LRU eviction manager
+///
+/// The implementation is not optimized. All operation require global locks.
+pub struct Manager {
+ lru: RwLock<LruCache<u64, Node>>,
+ limit: usize,
+ used: AtomicUsize,
+ items: AtomicUsize,
+ evicted_size: AtomicUsize,
+ evicted_items: AtomicUsize,
+}
+
+impl Manager {
+ /// Create a new [Manager] with the given total size limit `limit`.
+ pub fn new(limit: usize) -> Self {
+ Manager {
+ lru: RwLock::new(LruCache::unbounded()),
+ limit,
+ used: AtomicUsize::new(0),
+ items: AtomicUsize::new(0),
+ evicted_size: AtomicUsize::new(0),
+ evicted_items: AtomicUsize::new(0),
+ }
+ }
+
+ fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
+ use std::cmp::Ordering::*;
+ let node = Node { key: node, size };
+ let old = {
+ let mut lru = self.lru.write();
+ let old = lru.push(hash_key, node);
+ if reverse && old.is_none() {
+ lru.demote(&hash_key);
+ }
+ old
+ };
+ if let Some(old) = old {
+ // replacing a node, just need to update used size
+ match size.cmp(&old.1.size) {
+ Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
+ Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
+ Equal => 0, // same size, update nothing, use 0 to match other arms' type
+ };
+ } else {
+ self.used.fetch_add(size, Ordering::Relaxed);
+ self.items.fetch_add(1, Ordering::Relaxed);
+ }
+ }
+
+ // evict items until the used capacity is below limit
+ fn evict(&self) -> Vec<CompactCacheKey> {
+ if self.used.load(Ordering::Relaxed) <= self.limit {
+ return vec![];
+ }
+ let mut to_evict = Vec::with_capacity(1); // we will at least pop 1 item
+ while self.used.load(Ordering::Relaxed) > self.limit {
+ if let Some((_, node)) = self.lru.write().pop_lru() {
+ self.used.fetch_sub(node.size, Ordering::Relaxed);
+ self.items.fetch_sub(1, Ordering::Relaxed);
+ self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
+ self.evicted_items.fetch_add(1, Ordering::Relaxed);
+ to_evict.push(node.key);
+ } else {
+ // lru empty
+ return to_evict;
+ }
+ }
+ to_evict
+ }
+
+ // This could use a lot memory to buffer the serialized data in memory and could lock the LRU
+ // for too long
+ fn serialize(&self) -> Result<Vec<u8>> {
+ use rmp_serde::encode::Serializer;
+ use serde::ser::SerializeSeq;
+ use serde::ser::Serializer as _;
+ // NOTE: This could use a lot memory to buffer the serialized data in memory
+ let mut ser = Serializer::new(vec![]);
+ // NOTE: This long for loop could lock the LRU for too long
+ let lru = self.lru.read();
+ let mut seq = ser
+ .serialize_seq(Some(lru.len()))
+ .or_err(InternalError, "fail to serialize node")?;
+ for item in lru.iter() {
+ seq.serialize_element(item.1).unwrap(); // write to vec, safe
+ }
+ seq.end().or_err(InternalError, "when serializing LRU")?;
+ Ok(ser.into_inner())
+ }
+
+ fn deserialize(&self, buf: &[u8]) -> Result<()> {
+ use rmp_serde::decode::Deserializer;
+ use serde::de::Deserializer as _;
+ let mut de = Deserializer::new(buf);
+ let visitor = InsertToManager { lru: self };
+ de.deserialize_seq(visitor)
+ .or_err(InternalError, "when deserializing LRU")?;
+ Ok(())
+ }
+}
+
+struct InsertToManager<'a> {
+ lru: &'a Manager,
+}
+
+impl<'de, 'a> serde::de::Visitor<'de> for InsertToManager<'a> {
+ type Value = ();
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("array of lru nodes")
+ }
+
+ fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
+ where
+ A: SeqAccess<'de>,
+ {
+ while let Some(node) = seq.next_element::<Node>()? {
+ let key = u64key(&node.key);
+ self.lru.insert(key, node.key, node.size, true); // insert in the back
+ }
+ Ok(())
+ }
+}
+
+#[inline]
+fn u64key(key: &CompactCacheKey) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ key.hash(&mut hasher);
+ hasher.finish()
+}
+
+const FILE_NAME: &str = "simple_lru.data";
+
+#[async_trait]
+impl EvictionManager for Manager {
+ fn total_size(&self) -> usize {
+ self.used.load(Ordering::Relaxed)
+ }
+ fn total_items(&self) -> usize {
+ self.items.load(Ordering::Relaxed)
+ }
+ fn evicted_size(&self) -> usize {
+ self.evicted_size.load(Ordering::Relaxed)
+ }
+ fn evicted_items(&self) -> usize {
+ self.evicted_items.load(Ordering::Relaxed)
+ }
+
+ fn admit(
+ &self,
+ item: CompactCacheKey,
+ size: usize,
+ _fresh_until: SystemTime,
+ ) -> Vec<CompactCacheKey> {
+ let key = u64key(&item);
+ self.insert(key, item, size, false);
+ self.evict()
+ }
+
+ fn remove(&self, item: &CompactCacheKey) {
+ let key = u64key(item);
+ let node = self.lru.write().pop(&key);
+ if let Some(n) = node {
+ self.used.fetch_sub(n.size, Ordering::Relaxed);
+ self.items.fetch_sub(1, Ordering::Relaxed);
+ }
+ }
+
+ fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
+ let key = u64key(item);
+ if self.lru.write().get(&key).is_none() {
+ self.insert(key, item.clone(), size, false);
+ false
+ } else {
+ true
+ }
+ }
+
+ fn peek(&self, item: &CompactCacheKey) -> bool {
+ let key = u64key(item);
+ self.lru.read().peek(&key).is_some()
+ }
+
+ async fn save(&self, dir_path: &str) -> Result<()> {
+ let data = self.serialize()?;
+ let dir_path = dir_path.to_owned();
+ tokio::task::spawn_blocking(move || {
+ let dir_path = Path::new(&dir_path);
+ std::fs::create_dir_all(dir_path).or_err(InternalError, "fail to create {dir_path}")?;
+ let file_path = dir_path.join(FILE_NAME);
+ let mut file =
+ File::create(file_path).or_err(InternalError, "fail to create {file_path}")?;
+ file.write_all(&data)
+ .or_err(InternalError, "fail to write to {file_path}")
+ })
+ .await
+ .or_err(InternalError, "async blocking IO failure")?
+ }
+
+ async fn load(&self, dir_path: &str) -> Result<()> {
+ let dir_path = dir_path.to_owned();
+ let data = tokio::task::spawn_blocking(move || {
+ let file_path = Path::new(&dir_path).join(FILE_NAME);
+ let mut file =
+ File::open(file_path).or_err(InternalError, "fail to open {file_path}")?;
+ let mut buffer = Vec::with_capacity(8192);
+ file.read_to_end(&mut buffer)
+ .or_err(InternalError, "fail to write to {file_path}")?;
+ Ok::<Vec<u8>, BError>(buffer)
+ })
+ .await
+ .or_err(InternalError, "async blocking IO failure")??;
+ self.deserialize(&data)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::CacheKey;
+
+ #[test]
+ fn test_admission() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru si full (4) now
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
+ assert_eq!(v.len(), 2);
+ assert_eq!(v[0], key1);
+ assert_eq!(v[1], key2);
+ }
+
+ #[test]
+ fn test_access() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // make key1 most recently used
+ lru.access(&key1, 1, until);
+ assert_eq!(v.len(), 0);
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[test]
+ fn test_remove() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // remove key1
+ lru.remove(&key1);
+
+ // key2 is the least recently used one now
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[test]
+ fn test_access_add() {
+ let lru = Manager::new(4);
+ let until = SystemTime::now(); // unused value as a placeholder
+
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ lru.access(&key1, 1, until);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ lru.access(&key2, 2, until);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ lru.access(&key3, 2, until);
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4, 2, until);
+ // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
+ assert_eq!(v.len(), 2);
+ assert_eq!(v[0], key1);
+ assert_eq!(v[1], key2);
+ }
+
+ #[test]
+ fn test_admit_update() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // update key2 to reduce its size by 1
+ let v = lru.admit(key2, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is not full anymore
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru.admit(key4.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+
+ // make key4 larger
+ let v = lru.admit(key4, 2, until);
+ // need to evict now
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key1);
+ }
+
+ #[test]
+ fn test_serde() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // make key1 most recently used
+ lru.access(&key1, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // load lru2 with lru's data
+ let ser = lru.serialize().unwrap();
+ let lru2 = Manager::new(4);
+ lru2.deserialize(&ser).unwrap();
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru2.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+
+ #[tokio::test]
+ async fn test_save_to_disk() {
+ let lru = Manager::new(4);
+ let key1 = CacheKey::new("", "a", "1").to_compact();
+ let until = SystemTime::now(); // unused value as a placeholder
+ let v = lru.admit(key1.clone(), 1, until);
+ assert_eq!(v.len(), 0);
+ let key2 = CacheKey::new("", "b", "1").to_compact();
+ let v = lru.admit(key2.clone(), 2, until);
+ assert_eq!(v.len(), 0);
+ let key3 = CacheKey::new("", "c", "1").to_compact();
+ let v = lru.admit(key3, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // lru is full (4) now
+ // make key1 most recently used
+ lru.access(&key1, 1, until);
+ assert_eq!(v.len(), 0);
+
+ // load lru2 with lru's data
+ lru.save("/tmp/test_simple_lru_save").await.unwrap();
+ let lru2 = Manager::new(4);
+ lru2.load("/tmp/test_simple_lru_save").await.unwrap();
+
+ let key4 = CacheKey::new("", "d", "1").to_compact();
+ let v = lru2.admit(key4, 2, until);
+ assert_eq!(v.len(), 1);
+ assert_eq!(v[0], key2);
+ }
+}
diff --git a/pingora-cache/src/filters.rs b/pingora-cache/src/filters.rs
new file mode 100644
index 0000000..8293cb5
--- /dev/null
+++ b/pingora-cache/src/filters.rs
@@ -0,0 +1,673 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Utility functions to help process HTTP headers for caching
+
+use super::*;
+use crate::cache_control::{CacheControl, Cacheable, InterpretCacheControl};
+use crate::{RespCacheable, RespCacheable::*};
+
+use http::{header, HeaderValue};
+use httpdate::HttpDate;
+use log::warn;
+use pingora_http::{RequestHeader, ResponseHeader};
+
+/// Decide if the request can be cacheable
+pub fn request_cacheable(req_header: &ReqHeader) -> bool {
+ // TODO: the check is incomplete
+ matches!(req_header.method, Method::GET | Method::HEAD)
+}
+
+/// Decide if the response is cacheable.
+///
+/// `cache_control` is the parsed [CacheControl] from the response header. It is an standalone
+/// argument so that caller has the flexibility to choose to use, change or ignore it.
+// TODO: vary processing
+pub fn resp_cacheable(
+ cache_control: Option<&CacheControl>,
+ resp_header: &ResponseHeader,
+ authorization_present: bool,
+ defaults: &CacheMetaDefaults,
+) -> RespCacheable {
+ let now = SystemTime::now();
+ let expire_time = calculate_fresh_until(
+ now,
+ cache_control,
+ resp_header,
+ authorization_present,
+ defaults,
+ );
+ if let Some(fresh_until) = expire_time {
+ let (stale_while_revalidate_sec, stale_if_error_sec) =
+ calculate_serve_stale_sec(cache_control, defaults);
+
+ let mut cloned_header = resp_header.clone();
+ if let Some(cc) = cache_control {
+ cc.strip_private_headers(&mut cloned_header);
+ }
+ return Cacheable(CacheMeta::new(
+ fresh_until,
+ now,
+ stale_while_revalidate_sec,
+ stale_if_error_sec,
+ cloned_header,
+ ));
+ }
+ Uncacheable(NoCacheReason::OriginNotCache)
+}
+
+/// Calculate the [SystemTime] at which the asset expires
+///
+/// Return None when not cacheable.
+pub fn calculate_fresh_until(
+ now: SystemTime,
+ cache_control: Option<&CacheControl>,
+ resp_header: &RespHeader,
+ authorization_present: bool,
+ defaults: &CacheMetaDefaults,
+) -> Option<SystemTime> {
+ fn freshness_ttl_to_time(now: SystemTime, fresh_sec: u32) -> Option<SystemTime> {
+ if fresh_sec == 0 {
+ // ensure that the response is treated as stale
+ now.checked_sub(Duration::from_secs(1))
+ } else {
+ now.checked_add(Duration::from_secs(fresh_sec.into()))
+ }
+ }
+
+ // A request with Authorization is normally not cacheable, unless Cache-Control allows it
+ if authorization_present {
+ let uncacheable = cache_control
+ .as_ref()
+ .map_or(true, |cc| !cc.allow_caching_authorized_req());
+ if uncacheable {
+ return None;
+ }
+ }
+
+ let uncacheable = cache_control
+ .as_ref()
+ .map_or(false, |cc| cc.is_cacheable() == Cacheable::No);
+ if uncacheable {
+ return None;
+ }
+
+ // For TTL check cache-control first, then expires header, then defaults
+ cache_control
+ .and_then(|cc| {
+ cc.fresh_sec()
+ .and_then(|ttl| freshness_ttl_to_time(now, ttl))
+ })
+ .or_else(|| calculate_expires_header_time(resp_header))
+ .or_else(|| {
+ defaults
+ .fresh_sec(resp_header.status)
+ .and_then(|ttl| freshness_ttl_to_time(now, ttl))
+ })
+}
+
+/// Calculate the expire time from the `Expires` header only
+pub fn calculate_expires_header_time(resp_header: &RespHeader) -> Option<SystemTime> {
+ // according to RFC 7234:
+ // https://datatracker.ietf.org/doc/html/rfc7234#section-4.2.1
+ // - treat multiple expires headers as invalid
+ // https://datatracker.ietf.org/doc/html/rfc7234#section-5.3
+ // - "MUST interpret invalid date formats... as representing a time in the past"
+ fn parse_expires_value(expires_value: &HeaderValue) -> Option<SystemTime> {
+ let expires = expires_value.to_str().ok()?;
+ Some(SystemTime::from(
+ expires
+ .parse::<HttpDate>()
+ .map_err(|e| warn!("Invalid HttpDate in Expires: {}, error: {}", expires, e))
+ .ok()?,
+ ))
+ }
+
+ let mut expires_iter = resp_header.headers.get_all("expires").iter();
+ let expires_header = expires_iter.next();
+ if expires_header.is_none() || expires_iter.next().is_some() {
+ return None;
+ }
+ parse_expires_value(expires_header.unwrap()).or(Some(SystemTime::UNIX_EPOCH))
+}
+
+/// Calculates stale-while-revalidate and stale-if-error seconds from Cache-Control or the [CacheMetaDefaults].
+pub fn calculate_serve_stale_sec(
+ cache_control: Option<&impl InterpretCacheControl>,
+ defaults: &CacheMetaDefaults,
+) -> (u32, u32) {
+ let serve_stale_while_revalidate_sec = cache_control
+ .and_then(|cc| cc.serve_stale_while_revalidate_sec())
+ .unwrap_or_else(|| defaults.serve_stale_while_revalidate_sec());
+ let serve_stale_if_error_sec = cache_control
+ .and_then(|cc| cc.serve_stale_if_error_sec())
+ .unwrap_or_else(|| defaults.serve_stale_if_error_sec());
+ (serve_stale_while_revalidate_sec, serve_stale_if_error_sec)
+}
+
+/// Filters to run when sending requests to upstream
+pub mod upstream {
+ use super::*;
+
+ /// Adjust the request header for cacheable requests
+ ///
+ /// This filter does the following in order to fetch the entire response to cache
+ /// - Convert HEAD to GET
+ /// - `If-*` headers are removed
+ /// - `Range` header is removed
+ ///
+ /// When `meta` is set, this function will inject `If-modified-since` according to the `Last-Modified` header
+ /// and inject `If-none-match` according to `Etag` header
+ pub fn request_filter(req: &mut RequestHeader, meta: Option<&CacheMeta>) -> Result<()> {
+ // change HEAD to GET, HEAD itself is not semantically cacheable
+ if req.method == Method::HEAD {
+ req.set_method(Method::GET);
+ }
+
+ // remove downstream precondition headers https://datatracker.ietf.org/doc/html/rfc7232#section-3
+ // we'd like to cache the 200 not the 304
+ req.remove_header(&header::IF_MATCH);
+ req.remove_header(&header::IF_NONE_MATCH);
+ req.remove_header(&header::IF_MODIFIED_SINCE);
+ req.remove_header(&header::IF_UNMODIFIED_SINCE);
+ // see below range header
+ req.remove_header(&header::IF_RANGE);
+
+ // remove downstream range header as we'd like to cache the entire response (this might change in the future)
+ req.remove_header(&header::RANGE);
+
+ // we have a persumably staled response already, add precondition headers for revalidation
+ if let Some(m) = meta {
+ // rfc7232: "SHOULD send both validators in cache validation" but
+ // there have been weird cases that an origin has matching etag but not Last-Modified
+ if let Some(since) = m.headers().get(&header::LAST_MODIFIED) {
+ req.insert_header(header::IF_MODIFIED_SINCE, since).unwrap();
+ }
+ if let Some(etag) = m.headers().get(&header::ETAG) {
+ req.insert_header(header::IF_NONE_MATCH, etag).unwrap();
+ }
+ }
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use http::header::{HeaderName, CACHE_CONTROL, EXPIRES, SET_COOKIE};
+ use http::StatusCode;
+ use httpdate::fmt_http_date;
+
+ fn init_log() {
+ let _ = env_logger::builder().is_test(true).try_init();
+ }
+
+ const DEFAULTS: CacheMetaDefaults = CacheMetaDefaults::new(
+ |status| match status {
+ StatusCode::OK => Some(10),
+ StatusCode::NOT_FOUND => Some(5),
+ StatusCode::PARTIAL_CONTENT => None,
+ _ => Some(1),
+ },
+ 0,
+ u32::MAX, /* "infinite" stale-if-error */
+ );
+
+ // Cache nothing, by default
+ const BYPASS_CACHE_DEFAULTS: CacheMetaDefaults = CacheMetaDefaults::new(|_| None, 0, 0);
+
+ fn build_response(status: u16, headers: &[(HeaderName, &str)]) -> ResponseHeader {
+ let mut header = ResponseHeader::build(status, Some(headers.len())).unwrap();
+ for (k, v) in headers {
+ header.append_header(k.to_string(), *v).unwrap();
+ }
+ header
+ }
+
+ fn resp_cacheable_wrapper(
+ resp: &ResponseHeader,
+ defaults: &CacheMetaDefaults,
+ authorization_present: bool,
+ ) -> Option<CacheMeta> {
+ if let Cacheable(meta) = resp_cacheable(
+ CacheControl::from_resp_headers(resp).as_ref(),
+ resp,
+ authorization_present,
+ defaults,
+ ) {
+ Some(meta)
+ } else {
+ None
+ }
+ }
+
+ #[test]
+ fn test_resp_cacheable() {
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=12345")]),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(SystemTime::now()));
+ assert!(meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(12))
+ .unwrap()
+ ),);
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(12346))
+ .unwrap()
+ ));
+ }
+
+ #[test]
+ fn test_resp_uncacheable_directives() {
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "private, max-age=12345")]),
+ &DEFAULTS,
+ false,
+ );
+ assert!(meta.is_none());
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "no-store, max-age=12345")]),
+ &DEFAULTS,
+ false,
+ );
+ assert!(meta.is_none());
+ }
+
+ #[test]
+ fn test_resp_cache_authorization() {
+ let meta = resp_cacheable_wrapper(&build_response(200, &[]), &DEFAULTS, true);
+ assert!(meta.is_none());
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=10")]),
+ &DEFAULTS,
+ true,
+ );
+ assert!(meta.is_none());
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "s-maxage=10")]),
+ &DEFAULTS,
+ true,
+ );
+ assert!(meta.unwrap().is_fresh(SystemTime::now()));
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "public, max-age=10")]),
+ &DEFAULTS,
+ true,
+ );
+ assert!(meta.unwrap().is_fresh(SystemTime::now()));
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "must-revalidate")]),
+ &DEFAULTS,
+ true,
+ );
+ assert!(meta.unwrap().is_fresh(SystemTime::now()));
+ }
+
+ #[test]
+ fn test_resp_zero_max_age() {
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=0, public")]),
+ &DEFAULTS,
+ false,
+ );
+
+ // cacheable, but needs revalidation
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+ }
+
+ #[test]
+ fn test_resp_expires() {
+ let five_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(5))
+ .unwrap();
+
+ // future expires is cacheable
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(EXPIRES, &fmt_http_date(five_sec_time))]),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(SystemTime::now()));
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(6))
+ .unwrap()
+ ));
+
+ // even on default uncacheable statuses
+ let meta = resp_cacheable_wrapper(
+ &build_response(206, &[(EXPIRES, &fmt_http_date(five_sec_time))]),
+ &DEFAULTS,
+ false,
+ );
+ assert!(meta.is_some());
+ }
+
+ #[test]
+ fn test_resp_past_expires() {
+ // cacheable, but expired
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(EXPIRES, "Fri, 15 May 2015 15:34:21 GMT")]),
+ &BYPASS_CACHE_DEFAULTS,
+ false,
+ );
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+ }
+
+ #[test]
+ fn test_resp_nonstandard_expires() {
+ // init log to allow inspecting warnings
+ init_log();
+
+ // invalid cases, according to parser
+ // (but should be stale according to RFC)
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(EXPIRES, "Mon, 13 Feb 0002 12:00:00 GMT")]),
+ &BYPASS_CACHE_DEFAULTS,
+ false,
+ );
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(EXPIRES, "Fri, 01 Dec 99999 16:00:00 GMT")]),
+ &BYPASS_CACHE_DEFAULTS,
+ false,
+ );
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(EXPIRES, "0")]),
+ &BYPASS_CACHE_DEFAULTS,
+ false,
+ );
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+ }
+
+ #[test]
+ fn test_resp_multiple_expires() {
+ let five_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(5))
+ .unwrap();
+ let ten_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(10))
+ .unwrap();
+
+ // multiple expires = uncacheable
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[
+ (EXPIRES, &fmt_http_date(five_sec_time)),
+ (EXPIRES, &fmt_http_date(ten_sec_time)),
+ ],
+ ),
+ &BYPASS_CACHE_DEFAULTS,
+ false,
+ );
+ assert!(meta.is_none());
+
+ // unless the default is cacheable
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[
+ (EXPIRES, &fmt_http_date(five_sec_time)),
+ (EXPIRES, &fmt_http_date(ten_sec_time)),
+ ],
+ ),
+ &DEFAULTS,
+ false,
+ );
+ assert!(meta.is_some());
+ }
+
+ #[test]
+ fn test_resp_cache_control_with_expires() {
+ let five_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(5))
+ .unwrap();
+ // cache-control takes precedence over expires
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[
+ (EXPIRES, &fmt_http_date(five_sec_time)),
+ (CACHE_CONTROL, "max-age=0"),
+ ],
+ ),
+ &DEFAULTS,
+ false,
+ );
+ assert!(!meta.unwrap().is_fresh(SystemTime::now()));
+ }
+
+ #[test]
+ fn test_resp_stale_while_revalidate() {
+ // respect defaults
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=10")]),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ let eleven_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap();
+ assert!(!meta.is_fresh(eleven_sec_time));
+ assert!(!meta.serve_stale_while_revalidate(SystemTime::now()));
+ assert!(!meta.serve_stale_while_revalidate(eleven_sec_time));
+
+ // override with stale-while-revalidate
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[(CACHE_CONTROL, "max-age=10, stale-while-revalidate=5")],
+ ),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ let eleven_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap();
+ let sixteen_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(16))
+ .unwrap();
+ assert!(!meta.is_fresh(eleven_sec_time));
+ assert!(meta.serve_stale_while_revalidate(eleven_sec_time));
+ assert!(!meta.serve_stale_while_revalidate(sixteen_sec_time));
+ }
+
+ #[test]
+ fn test_resp_stale_if_error() {
+ // respect defaults
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=10")]),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ let hundred_years_time = SystemTime::now()
+ .checked_add(Duration::from_secs(86400 * 365 * 100))
+ .unwrap();
+ assert!(!meta.is_fresh(hundred_years_time));
+ assert!(meta.serve_stale_if_error(hundred_years_time));
+
+ // override with stale-if-error
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[(
+ CACHE_CONTROL,
+ "max-age=10, stale-while-revalidate=5, stale-if-error=60",
+ )],
+ ),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ let eleven_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap();
+ let seventy_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(70))
+ .unwrap();
+ assert!(!meta.is_fresh(eleven_sec_time));
+ assert!(meta.serve_stale_if_error(SystemTime::now()));
+ assert!(meta.serve_stale_if_error(eleven_sec_time));
+ assert!(!meta.serve_stale_if_error(seventy_sec_time));
+
+ // never serve stale
+ let meta = resp_cacheable_wrapper(
+ &build_response(200, &[(CACHE_CONTROL, "max-age=10, stale-if-error=0")]),
+ &DEFAULTS,
+ false,
+ );
+
+ let meta = meta.unwrap();
+ let eleven_sec_time = SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap();
+ assert!(!meta.is_fresh(eleven_sec_time));
+ assert!(!meta.serve_stale_if_error(eleven_sec_time));
+ }
+
+ #[test]
+ fn test_resp_status_cache_defaults() {
+ // 200 response
+ let meta = resp_cacheable_wrapper(&build_response(200, &[]), &DEFAULTS, false);
+ assert!(meta.is_some());
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(9))
+ .unwrap()
+ ));
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap()
+ ));
+
+ // 404 response, different ttl
+ let meta = resp_cacheable_wrapper(&build_response(404, &[]), &DEFAULTS, false);
+ assert!(meta.is_some());
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(4))
+ .unwrap()
+ ));
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(6))
+ .unwrap()
+ ));
+
+ // 206 marked uncacheable (no cache TTL)
+ let meta = resp_cacheable_wrapper(&build_response(206, &[]), &DEFAULTS, false);
+ assert!(meta.is_none());
+
+ // default uncacheable status with explicit Cache-Control is cacheable
+ let meta = resp_cacheable_wrapper(
+ &build_response(206, &[(CACHE_CONTROL, "public, max-age=10")]),
+ &DEFAULTS,
+ false,
+ );
+ assert!(meta.is_some());
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(9))
+ .unwrap()
+ ));
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(11))
+ .unwrap()
+ ));
+
+ // 416 matches any status
+ let meta = resp_cacheable_wrapper(&build_response(416, &[]), &DEFAULTS, false);
+ assert!(meta.is_some());
+
+ let meta = meta.unwrap();
+ assert!(meta.is_fresh(SystemTime::now()));
+ assert!(!meta.is_fresh(
+ SystemTime::now()
+ .checked_add(Duration::from_secs(2))
+ .unwrap()
+ ));
+ }
+
+ #[test]
+ fn test_resp_cache_no_cache_fields() {
+ // check #field-names are stripped from the cache header
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[
+ (SET_COOKIE, "my-cookie"),
+ (CACHE_CONTROL, "private=\"something\", max-age=10"),
+ (HeaderName::from_bytes(b"Something").unwrap(), "foo"),
+ ],
+ ),
+ &DEFAULTS,
+ false,
+ );
+ let meta = meta.unwrap();
+ assert!(meta.headers().contains_key(SET_COOKIE));
+ assert!(!meta.headers().contains_key("Something"));
+
+ let meta = resp_cacheable_wrapper(
+ &build_response(
+ 200,
+ &[
+ (SET_COOKIE, "my-cookie"),
+ (
+ CACHE_CONTROL,
+ "max-age=0, no-cache=\"meta1, SeT-Cookie ,meta2\"",
+ ),
+ (HeaderName::from_bytes(b"meta1").unwrap(), "foo"),
+ ],
+ ),
+ &DEFAULTS,
+ false,
+ );
+ let meta = meta.unwrap();
+ assert!(!meta.headers().contains_key(SET_COOKIE));
+ assert!(!meta.headers().contains_key("meta1"));
+ }
+}
diff --git a/pingora-cache/src/hashtable.rs b/pingora-cache/src/hashtable.rs
new file mode 100644
index 0000000..a89f9ad
--- /dev/null
+++ b/pingora-cache/src/hashtable.rs
@@ -0,0 +1,112 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Concurrent hash tables and LRUs
+
+use lru::LruCache;
+use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+use std::collections::HashMap;
+
+// There are probably off-the-shelf crates of this, DashMap?
+/// A hash table that shards to a constant number of tables to reduce lock contention
+pub struct ConcurrentHashTable<V, const N: usize> {
+ tables: [RwLock<HashMap<u128, V>>; N],
+}
+
+#[inline]
+fn get_shard(key: u128, n_shards: usize) -> usize {
+ (key % n_shards as u128) as usize
+}
+
+impl<V, const N: usize> ConcurrentHashTable<V, N>
+where
+ [RwLock<HashMap<u128, V>>; N]: Default,
+{
+ pub fn new() -> Self {
+ ConcurrentHashTable {
+ tables: Default::default(),
+ }
+ }
+ pub fn get(&self, key: u128) -> &RwLock<HashMap<u128, V>> {
+ &self.tables[get_shard(key, N)]
+ }
+
+ #[allow(dead_code)]
+ pub fn read(&self, key: u128) -> RwLockReadGuard<HashMap<u128, V>> {
+ self.get(key).read()
+ }
+
+ pub fn write(&self, key: u128) -> RwLockWriteGuard<HashMap<u128, V>> {
+ self.get(key).write()
+ }
+
+ // TODO: work out the lifetimes to provide get/set directly
+}
+
+impl<V, const N: usize> Default for ConcurrentHashTable<V, N>
+where
+ [RwLock<HashMap<u128, V>>; N]: Default,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[doc(hidden)] // not need in public API
+pub struct LruShard<V>(RwLock<LruCache<u128, V>>);
+impl<V> Default for LruShard<V> {
+ fn default() -> Self {
+ // help satisfy default construction of array
+ LruShard(RwLock::new(LruCache::unbounded()))
+ }
+}
+
+/// Sharded concurrent data structure for LruCache
+pub struct ConcurrentLruCache<V, const N: usize> {
+ lrus: [LruShard<V>; N],
+}
+
+impl<V, const N: usize> ConcurrentLruCache<V, N>
+where
+ [LruShard<V>; N]: Default,
+{
+ pub fn new(shard_capacity: usize) -> Self {
+ use std::num::NonZeroUsize;
+ // safe, 1 != 0
+ const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
+ let mut cache = ConcurrentLruCache {
+ lrus: Default::default(),
+ };
+ for lru in &mut cache.lrus {
+ lru.0
+ .write()
+ .resize(shard_capacity.try_into().unwrap_or(ONE));
+ }
+ cache
+ }
+ pub fn get(&self, key: u128) -> &RwLock<LruCache<u128, V>> {
+ &self.lrus[get_shard(key, N)].0
+ }
+
+ #[allow(dead_code)]
+ pub fn read(&self, key: u128) -> RwLockReadGuard<LruCache<u128, V>> {
+ self.get(key).read()
+ }
+
+ pub fn write(&self, key: u128) -> RwLockWriteGuard<LruCache<u128, V>> {
+ self.get(key).write()
+ }
+
+ // TODO: work out the lifetimes to provide get/set directly
+}
diff --git a/pingora-cache/src/key.rs b/pingora-cache/src/key.rs
new file mode 100644
index 0000000..26e9362
--- /dev/null
+++ b/pingora-cache/src/key.rs
@@ -0,0 +1,302 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cache key
+
+use super::*;
+
+use blake2::{Blake2b, Digest};
+use serde::{Deserialize, Serialize};
+
+// 16-byte / 128-bit key: large enough to avoid collision
+const KEY_SIZE: usize = 16;
+
+/// An 128 bit hash binary
+pub type HashBinary = [u8; KEY_SIZE];
+
+fn hex2str(hex: &[u8]) -> String {
+ use std::fmt::Write;
+ let mut s = String::with_capacity(KEY_SIZE * 2);
+ for c in hex {
+ write!(s, "{:02x}", c).unwrap(); // safe, just dump hex to string
+ }
+ s
+}
+
+/// Decode the hex str into [HashBinary].
+///
+/// Return `None` when the decode fails or the input is not exact 32 (to decode to 16 bytes).
+pub fn str2hex(s: &str) -> Option<HashBinary> {
+ if s.len() != KEY_SIZE * 2 {
+ return None;
+ }
+ let mut output = [0; KEY_SIZE];
+ // no need to bubble the error, it should be obvious why the decode fails
+ hex::decode_to_slice(s.as_bytes(), &mut output).ok()?;
+ Some(output)
+}
+
+/// The trait for cache key
+pub trait CacheHashKey {
+ /// Return the hash of the cache key
+ fn primary_bin(&self) -> HashBinary;
+
+ /// Return the variance hash of the cache key.
+ ///
+ /// `None` if no variance.
+ fn variance_bin(&self) -> Option<HashBinary>;
+
+ /// Return the hash including both primary and variance keys
+ fn combined_bin(&self) -> HashBinary {
+ let key = self.primary_bin();
+ if let Some(v) = self.variance_bin() {
+ let mut hasher = Blake2b128::new();
+ hasher.update(key);
+ hasher.update(v);
+ hasher.finalize().into()
+ } else {
+ // if there is no variance, combined_bin should return the same as primary_bin
+ key
+ }
+ }
+
+ /// An extra tag for identifying users
+ ///
+ /// For example if the storage backend implements per user quota, this tag can be used.
+ fn user_tag(&self) -> &str;
+
+ /// The hex string of [Self::primary_bin()]
+ fn primary(&self) -> String {
+ hex2str(&self.primary_bin())
+ }
+
+ /// The hex string of [Self::variance_bin()]
+ fn variance(&self) -> Option<String> {
+ self.variance_bin().as_ref().map(|b| hex2str(&b[..]))
+ }
+
+ /// The hex string of [Self::combined_bin()]
+ fn combined(&self) -> String {
+ hex2str(&self.combined_bin())
+ }
+}
+
+/// General purpose cache key
+#[derive(Debug, Clone)]
+pub struct CacheKey {
+ // All strings for now, can be more structural as long as it can hash
+ namespace: String,
+ primary: String,
+ variance: Option<HashBinary>,
+ /// An extra tag for identifying users
+ ///
+ /// For example if the storage backend implements per user quota, this tag can be used.
+ pub user_tag: String,
+}
+
+impl CacheKey {
+ /// Set the value of the variance hash
+ pub fn set_variance_key(&mut self, key: HashBinary) {
+ self.variance = Some(key)
+ }
+
+ /// Get the value of the variance hash
+ pub fn get_variance_key(&self) -> Option<&HashBinary> {
+ self.variance.as_ref()
+ }
+
+ /// Removes the variance from this cache key
+ pub fn remove_variance_key(&mut self) {
+ self.variance = None
+ }
+}
+
+/// Storage optimized cache key to keep in memory or in storage
+// 16 bytes + 8 bytes (+16 * u8) + user_tag.len() + 16 Bytes (Box<str>)
+#[derive(Debug, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)]
+pub struct CompactCacheKey {
+ pub primary: HashBinary,
+ // save 8 bytes for non-variance but waste 8 bytes for variance vs, store flat 16 bytes
+ pub variance: Option<Box<HashBinary>>,
+ pub user_tag: Box<str>, // the len should be small to keep memory usage bounded
+}
+
+impl CacheHashKey for CompactCacheKey {
+ fn primary_bin(&self) -> HashBinary {
+ self.primary
+ }
+
+ fn variance_bin(&self) -> Option<HashBinary> {
+ self.variance.as_ref().map(|s| *s.as_ref())
+ }
+
+ fn user_tag(&self) -> &str {
+ &self.user_tag
+ }
+}
+
+/*
+ * We use blake2 hashing, which is faster and more secure, to replace md5.
+ * We have not given too much thought on whether non-crypto hash can be safely
+ * use because hashing performance is not critical.
+ * Note: we should avoid hashes like ahash which does not have consistent output
+ * across machines because it is designed purely for in memory hashtable
+*/
+
+// hash output: we use 128 bits (16 bytes) hash which will map to 32 bytes hex string
+pub(crate) type Blake2b128 = Blake2b<blake2::digest::consts::U16>;
+
+/// helper function: hash str to u8
+pub fn hash_u8(key: &str) -> u8 {
+ let mut hasher = Blake2b128::new();
+ hasher.update(key);
+ let raw = hasher.finalize();
+ raw[0]
+}
+
+/// helper function: hash str to [HashBinary]
+pub fn hash_key(key: &str) -> HashBinary {
+ let mut hasher = Blake2b128::new();
+ hasher.update(key);
+ let raw = hasher.finalize();
+ raw.into()
+}
+
+impl CacheKey {
+ fn primary_hasher(&self) -> Blake2b128 {
+ let mut hasher = Blake2b128::new();
+ hasher.update(&self.namespace);
+ hasher.update(&self.primary);
+ hasher
+ }
+
+ /// Create a default [CacheKey] from a request, which just takes it URI as the primary key.
+ pub fn default(req_header: &ReqHeader) -> Self {
+ CacheKey {
+ namespace: "".into(),
+ primary: format!("{}", req_header.uri),
+ variance: None,
+ user_tag: "".into(),
+ }
+ }
+
+ /// Create a new [CacheKey] from the given namespace, primary, and user_tag string.
+ ///
+ /// Both `namespace` and `primary` will be used for the primary hash
+ pub fn new<S1, S2, S3>(namespace: S1, primary: S2, user_tag: S3) -> Self
+ where
+ S1: Into<String>,
+ S2: Into<String>,
+ S3: Into<String>,
+ {
+ CacheKey {
+ namespace: namespace.into(),
+ primary: primary.into(),
+ variance: None,
+ user_tag: user_tag.into(),
+ }
+ }
+
+ /// Return the namespace of this key
+ pub fn namespace(&self) -> &str {
+ &self.namespace
+ }
+
+ /// Return the primary key of this key
+ pub fn primary_key(&self) -> &str {
+ &self.primary
+ }
+
+ /// Convert this key to [CompactCacheKey].
+ pub fn to_compact(&self) -> CompactCacheKey {
+ let primary = self.primary_bin();
+ CompactCacheKey {
+ primary,
+ variance: self.variance_bin().map(Box::new),
+ user_tag: self.user_tag.clone().into_boxed_str(),
+ }
+ }
+}
+
+impl CacheHashKey for CacheKey {
+ fn primary_bin(&self) -> HashBinary {
+ self.primary_hasher().finalize().into()
+ }
+
+ fn variance_bin(&self) -> Option<HashBinary> {
+ self.variance
+ }
+
+ fn user_tag(&self) -> &str {
+ &self.user_tag
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_cache_key_hash() {
+ let key = CacheKey {
+ namespace: "".into(),
+ primary: "aa".into(),
+ variance: None,
+ user_tag: "1".into(),
+ };
+ let hash = key.primary();
+ assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e");
+ assert!(key.variance().is_none());
+ assert_eq!(key.combined(), hash);
+ let compact = key.to_compact();
+ assert_eq!(compact.primary(), hash);
+ assert!(compact.variance().is_none());
+ assert_eq!(compact.combined(), hash);
+ }
+
+ #[test]
+ fn test_cache_key_vary_hash() {
+ let key = CacheKey {
+ namespace: "".into(),
+ primary: "aa".into(),
+ variance: Some([0u8; 16]),
+ user_tag: "1".into(),
+ };
+ let hash = key.primary();
+ assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e");
+ assert_eq!(key.variance().unwrap(), "00000000000000000000000000000000");
+ assert_eq!(key.combined(), "004174d3e75a811a5b44c46b3856f3ee");
+ let compact = key.to_compact();
+ assert_eq!(compact.primary(), "ac10f2aef117729f8dad056b3059eb7e");
+ assert_eq!(
+ compact.variance().unwrap(),
+ "00000000000000000000000000000000"
+ );
+ assert_eq!(compact.combined(), "004174d3e75a811a5b44c46b3856f3ee");
+ }
+
+ #[test]
+ fn test_hex_str() {
+ let mut key = [0; KEY_SIZE];
+ for (i, v) in key.iter_mut().enumerate() {
+ // key: [0, 1, 2, .., 15]
+ *v = i as u8;
+ }
+ let hex_str = hex2str(&key);
+ let key2 = str2hex(&hex_str).unwrap();
+ for i in 0..KEY_SIZE {
+ assert_eq!(key[i], key2[i]);
+ }
+ }
+}
diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs
new file mode 100644
index 0000000..be352dc
--- /dev/null
+++ b/pingora-cache/src/lib.rs
@@ -0,0 +1,1093 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The HTTP caching layer for proxies.
+
+#![allow(clippy::new_without_default)]
+
+use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader};
+use key::{CacheHashKey, HashBinary};
+use lock::WritePermit;
+use pingora_error::Result;
+use pingora_http::ResponseHeader;
+use std::time::{Duration, SystemTime};
+use trace::CacheTraceCTX;
+
+pub mod cache_control;
+pub mod eviction;
+pub mod filters;
+pub mod hashtable;
+pub mod key;
+pub mod lock;
+pub mod max_file_size;
+mod memory;
+pub mod meta;
+pub mod predictor;
+pub mod put;
+pub mod storage;
+pub mod trace;
+mod variance;
+
+use crate::max_file_size::MaxFileSizeMissHandler;
+pub use key::CacheKey;
+use lock::{CacheLock, LockStatus, Locked};
+pub use memory::MemCache;
+pub use meta::{CacheMeta, CacheMetaDefaults};
+pub use storage::{HitHandler, MissHandler, Storage};
+pub use variance::VarianceBuilder;
+
+pub mod prelude {}
+
+/// The state machine for http caching
+///
+/// This object is used to handle the state and transitions for HTTP caching through the life of a
+/// request.
+pub struct HttpCache {
+ phase: CachePhase,
+ // Box the rest so that a disabled HttpCache struct is small
+ inner: Option<Box<HttpCacheInner>>,
+}
+
+/// This reflects the phase of HttpCache during the lifetime of a request
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum CachePhase {
+ /// Cache disabled, with reason (NeverEnabled if never explicitly used)
+ Disabled(NoCacheReason),
+ /// Cache enabled but nothing is set yet
+ Uninit,
+ /// Cache was enabled, the request decided not to use it
+ // HttpCache.inner is kept
+ Bypass,
+ /// Awaiting cache key to be generated
+ CacheKey,
+ /// Cache hit
+ Hit,
+ /// No cached asset is found
+ Miss,
+ /// A staled (expired) asset is found
+ Stale,
+ /// A staled (expired) asset was found, so a fresh one was fetched
+ Expired,
+ /// A staled (expired) asset was found, and it was revalidated to be fresh
+ Revalidated,
+ /// Revalidated, but deemed uncacheable so we do not freshen it
+ RevalidatedNoCache(NoCacheReason),
+}
+
+impl CachePhase {
+ /// Convert [CachePhase] as `str`, for logging and debugging.
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ CachePhase::Disabled(_) => "disabled",
+ CachePhase::Uninit => "uninitialized",
+ CachePhase::Bypass => "bypass",
+ CachePhase::CacheKey => "key",
+ CachePhase::Hit => "hit",
+ CachePhase::Miss => "miss",
+ CachePhase::Stale => "stale",
+ CachePhase::Expired => "expired",
+ CachePhase::Revalidated => "revalidated",
+ CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
+ }
+ }
+}
+
+/// The possible reasons for not caching
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum NoCacheReason {
+ /// Caching is not enabled to begin with
+ NeverEnabled,
+ /// Origin directives indicated this was not cacheable
+ OriginNotCache,
+ /// Response size was larger than the cache's configured maximum asset size
+ ResponseTooLarge,
+ /// Due to internal caching storage error
+ StorageError,
+ /// Due to other type of internal issues
+ InternalError,
+ /// will be cacheable but skip cache admission now
+ ///
+ /// This happens when the cache predictor predicted that this request is not cacheable but
+ /// the response turns out to be OK to cache. However it might be too large to re-enable caching
+ /// for this request.
+ Deferred,
+ /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache)
+ CacheLockGiveUp,
+ /// This request waited too long for the writer of the cache lock to finish, so this request will
+ /// fetch from the origin without caching
+ CacheLockTimeout,
+ /// Other custom defined reasons
+ Custom(&'static str),
+}
+
+impl NoCacheReason {
+ /// Convert [NoCacheReason] as `str`, for logging and debugging.
+ pub fn as_str(&self) -> &'static str {
+ use NoCacheReason::*;
+ match self {
+ NeverEnabled => "NeverEnabled",
+ OriginNotCache => "OriginNotCache",
+ ResponseTooLarge => "ResponseTooLarge",
+ StorageError => "StorageError",
+ InternalError => "InternalError",
+ Deferred => "Deferred",
+ CacheLockGiveUp => "CacheLockGiveUp",
+ CacheLockTimeout => "CacheLockTimeout",
+ Custom(s) => s,
+ }
+ }
+}
+
+/// Response cacheable decision
+///
+///
+#[derive(Debug)]
+pub enum RespCacheable {
+ Cacheable(CacheMeta),
+ Uncacheable(NoCacheReason),
+}
+
+impl RespCacheable {
+ /// Whether it is cacheable
+ #[inline]
+ pub fn is_cacheable(&self) -> bool {
+ matches!(*self, Self::Cacheable(_))
+ }
+
+ /// Unwrap [RespCacheable] to get the [CacheMeta] stored
+ /// # Panic
+ /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first.
+ pub fn unwrap_meta(self) -> CacheMeta {
+ match self {
+ Self::Cacheable(meta) => meta,
+ Self::Uncacheable(_) => panic!("expected Cacheable value"),
+ }
+ }
+}
+
+/// Freshness state of cache hit asset
+///
+///
+#[derive(Debug, Copy, Clone)]
+pub enum HitStatus {
+ Expired,
+ ForceExpired,
+ FailedHitFilter,
+ Fresh,
+}
+
+impl HitStatus {
+ /// For displaying cache hit status
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ Self::Expired => "expired",
+ Self::ForceExpired => "force_expired",
+ Self::FailedHitFilter => "failed_hit_filter",
+ Self::Fresh => "fresh",
+ }
+ }
+
+ /// Whether cached asset can be served as fresh
+ pub fn is_fresh(&self) -> bool {
+ match self {
+ Self::Expired | Self::ForceExpired | Self::FailedHitFilter => false,
+ Self::Fresh => true,
+ }
+ }
+}
+
+struct HttpCacheInner {
+ pub key: Option<CacheKey>,
+ pub meta: Option<CacheMeta>,
+ // when set, even if an asset exists, it would only be considered valid after this timestamp
+ pub valid_after: Option<SystemTime>,
+ // when set, an asset will be rejected from the cache if it exceeds this size in bytes
+ pub max_file_size_bytes: Option<usize>,
+ pub miss_handler: Option<MissHandler>,
+ pub body_reader: Option<HitHandler>,
+ pub storage: &'static (dyn storage::Storage + Sync), // static for now
+ pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
+ pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
+ pub lock: Option<Locked>, // TODO: these 3 fields should come in 1 sub struct
+ pub cache_lock: Option<&'static CacheLock>,
+ pub lock_duration: Option<Duration>,
+ pub traces: trace::CacheTraceCTX,
+}
+
+impl HttpCache {
+ /// Create a new [HttpCache].
+ ///
+ /// Caching is not enabled by default.
+ pub fn new() -> Self {
+ HttpCache {
+ phase: CachePhase::Disabled(NoCacheReason::NeverEnabled),
+ inner: None,
+ }
+ }
+
+ /// Whether the cache is enabled
+ pub fn enabled(&self) -> bool {
+ !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass)
+ }
+
+ /// Whether the cache is being bypassed
+ pub fn bypassing(&self) -> bool {
+ matches!(self.phase, CachePhase::Bypass)
+ }
+
+ /// Return the [CachePhase]
+ pub fn phase(&self) -> CachePhase {
+ self.phase
+ }
+
+ /// Whether anything was fetched from the upstream
+ ///
+ /// This essentially checks all possible [CachePhase] who need to contact the upstream server
+ pub fn upstream_used(&self) -> bool {
+ use CachePhase::*;
+ match self.phase {
+ Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
+ Hit | Stale => false,
+ Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
+ }
+ }
+
+ /// Check whether the backend storage is the type `T`.
+ pub fn storage_type_is<T: 'static>(&self) -> bool {
+ self.inner
+ .as_ref()
+ .and_then(|inner| inner.storage.as_any().downcast_ref::<T>())
+ .is_some()
+ }
+
+ /// Disable caching
+ pub fn disable(&mut self, reason: NoCacheReason) {
+ use NoCacheReason::*;
+ match self.phase {
+ CachePhase::Disabled(_) => {
+ // replace reason
+ self.phase = CachePhase::Disabled(reason);
+ }
+ _ => {
+ self.phase = CachePhase::Disabled(reason);
+ if let Some(inner) = self.inner.as_mut() {
+ let lock = inner.lock.take();
+ if let Some(Locked::Write(_r)) = lock {
+ let lock_status = match reason {
+ // let next request try to fetch it
+ InternalError | StorageError | Deferred => LockStatus::TransientError,
+ // no need for the lock anymore
+ OriginNotCache | ResponseTooLarge => LockStatus::GiveUp,
+ // not sure which LockStatus make sense, we treat it as GiveUp for now
+ Custom(_) => LockStatus::GiveUp,
+ // should never happen, NeverEnabled shouldn't hold a lock
+ NeverEnabled => panic!("NeverEnabled holds a write lock"),
+ CacheLockGiveUp | CacheLockTimeout => {
+ panic!("CacheLock* are for cache lock readers only")
+ }
+ };
+ inner
+ .cache_lock
+ .unwrap()
+ .release(inner.key.as_ref().unwrap(), lock_status);
+ }
+ }
+ // log initial disable reason
+ self.inner_mut()
+ .traces
+ .cache_span
+ .set_tag(|| trace::Tag::new("disable_reason", reason.as_str()));
+ self.inner = None;
+ }
+ }
+ }
+
+ /* The following methods panic when they are used in the wrong phase.
+ * This is better than returning errors as such panics are only caused by coding error, which
+ * should be fixed right away. Tokio runtime only crashes the current task instead of the whole
+ * program when these panics happen. */
+
+ /// Set the cache to bypass
+ ///
+ /// # Panic
+ /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed).
+ /// Use it in any other phase will lead to panic.
+ pub fn bypass(&mut self) {
+ match self.phase {
+ CachePhase::CacheKey => {
+ // before cache lookup / found / miss
+ self.phase = CachePhase::Bypass;
+ self.inner_mut()
+ .traces
+ .cache_span
+ .set_tag(|| trace::Tag::new("bypassed", true));
+ }
+ _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase),
+ }
+ }
+
+ /// Enable the cache
+ ///
+ /// - `storage`: the cache storage backend that implements [storage::Storage]
+ /// - `eviction`: optionally the eviction mananger, without it, nothing will be evicted from the storage
+ /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely
+ /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to
+ /// cacheable and uncacheable requests.
+ /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it
+ /// such lookups will all be allowed to fetch the asset independently.
+ pub fn enable(
+ &mut self,
+ storage: &'static (dyn storage::Storage + Sync),
+ eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
+ predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>,
+ cache_lock: Option<&'static CacheLock>,
+ ) {
+ match self.phase {
+ CachePhase::Disabled(_) => {
+ self.phase = CachePhase::Uninit;
+ self.inner = Some(Box::new(HttpCacheInner {
+ key: None,
+ meta: None,
+ valid_after: None,
+ max_file_size_bytes: None,
+ miss_handler: None,
+ body_reader: None,
+ storage,
+ eviction,
+ predictor,
+ lock: None,
+ cache_lock,
+ lock_duration: None,
+ traces: CacheTraceCTX::new(),
+ }));
+ }
+ _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase),
+ }
+ }
+
+ // Enable distributed tracing
+ pub fn enable_tracing(&mut self, parent_span: trace::Span) {
+ if let Some(inner) = self.inner.as_mut() {
+ inner.traces.enable(parent_span);
+ }
+ }
+
+ // Get the cache `miss` tracing span
+ pub fn get_miss_span(&mut self) -> Option<trace::SpanHandle> {
+ self.inner.as_mut().map(|i| i.traces.get_miss_span())
+ }
+
+ // shortcut to access inner, panic if phase is disabled
+ #[inline]
+ fn inner_mut(&mut self) -> &mut HttpCacheInner {
+ self.inner.as_mut().unwrap()
+ }
+
+ #[inline]
+ fn inner(&self) -> &HttpCacheInner {
+ self.inner.as_ref().unwrap()
+ }
+
+ /// Set the cache key
+ /// # Panic
+ /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic.
+ pub fn set_cache_key(&mut self, key: CacheKey) {
+ match self.phase {
+ CachePhase::Uninit | CachePhase::CacheKey => {
+ self.phase = CachePhase::CacheKey;
+ self.inner_mut().key = Some(key);
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Return the cache key used for asset lookup
+ /// # Panic
+ /// Can only be called after cache key is set and cache is not disabled. Panic otherwise.
+ pub fn cache_key(&self) -> &CacheKey {
+ match self.phase {
+ CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
+ _ => self.inner().key.as_ref().unwrap(),
+ }
+ }
+
+ /// Return the max size allowed to be cached.
+ pub fn max_file_size_bytes(&self) -> Option<usize> {
+ match self.phase {
+ CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase),
+ _ => self.inner().max_file_size_bytes,
+ }
+ }
+
+ /// Set the maximum response _body_ size in bytes that will be admitted to the cache.
+ ///
+ /// Response header size does not contribute to the max file size.
+ pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
+ match self.phase {
+ CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase),
+ _ => {
+ self.inner_mut().max_file_size_bytes = Some(max_file_size_bytes);
+ }
+ }
+ }
+
+ /// Set that cache is found in cache storage.
+ ///
+ /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and
+ /// [HitHandler].
+ ///
+ /// The `hit_status` enum allows the caller to force expire assets.
+ pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
+ match self.phase {
+ // Stale allowed because of cache lock and then retry
+ CachePhase::CacheKey | CachePhase::Stale => {
+ self.phase = if hit_status.is_fresh() {
+ CachePhase::Hit
+ } else {
+ CachePhase::Stale
+ };
+ let phase = self.phase;
+ let inner = self.inner_mut();
+ let key = inner.key.as_ref().unwrap();
+ if phase == CachePhase::Stale {
+ if let Some(lock) = inner.cache_lock.as_ref() {
+ inner.lock = Some(lock.lock(key));
+ }
+ }
+ inner.traces.log_meta(&meta);
+ if let Some(eviction) = inner.eviction {
+ // TODO: make access() accept CacheKey
+ let cache_key = key.to_compact();
+ // FIXME: get size
+ eviction.access(&cache_key, 0, meta.0.internal.fresh_until);
+ }
+ inner.traces.start_hit_span(phase, hit_status);
+ inner.meta = Some(meta);
+ inner.body_reader = Some(hit_handler);
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Mark `self` to be cache miss.
+ ///
+ /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides
+ /// not to use the assets found.
+ /// # Panic
+ /// Panic in other phases.
+ pub fn cache_miss(&mut self) {
+ match self.phase {
+ // from CacheKey: set state to miss during cache lookup
+ // from Bypass: response became cacheable, set state to miss to cache
+ CachePhase::CacheKey | CachePhase::Bypass => {
+ self.phase = CachePhase::Miss;
+ self.inner_mut().traces.start_miss_span();
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Return the [HitHandler]
+ /// # Panic
+ /// Call this after [Self::cache_found()], panic in other phases.
+ pub fn hit_handler(&mut self) -> &mut HitHandler {
+ match self.phase {
+ CachePhase::Hit
+ | CachePhase::Stale
+ | CachePhase::Revalidated
+ | CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(),
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Return the body reader during a cache admission(miss/expired) which decouples the downstream
+ /// read and upstream cache write
+ pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> {
+ match self.phase {
+ CachePhase::Miss | CachePhase::Expired => {
+ let inner = self.inner_mut();
+ if inner.storage.support_streaming_partial_write() {
+ inner.body_reader.as_mut()
+ } else {
+ // body_reader could be set even when the storage doesn't support streaming
+ // Expired cache would have the reader set.
+ None
+ }
+ }
+ _ => None,
+ }
+ }
+
+ /// Call this when cache hit is fully read.
+ ///
+ /// This call will release resource if any and log the timing in tracing if set.
+ /// # Panic
+ /// Panic in phases where there is no cache hit.
+ pub async fn finish_hit_handler(&mut self) -> Result<()> {
+ match self.phase {
+ CachePhase::Hit
+ | CachePhase::Miss
+ | CachePhase::Expired
+ | CachePhase::Stale
+ | CachePhase::Revalidated
+ | CachePhase::RevalidatedNoCache(_) => {
+ let inner = self.inner_mut();
+ if inner.body_reader.is_none() {
+ // already finished, we allow calling this function more than once
+ return Ok(());
+ }
+ let body_reader = inner.body_reader.take().unwrap();
+ let key = inner.key.as_ref().unwrap();
+ let result = body_reader
+ .finish(inner.storage, key, &inner.traces.hit_span.handle())
+ .await;
+ inner.traces.finish_hit_span();
+ result
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Set the [MissHandler] according to cache_key and meta, can only call once
+ pub async fn set_miss_handler(&mut self) -> Result<()> {
+ match self.phase {
+ // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire).
+ // This is an artificial rule to enforce the state transitions
+ CachePhase::Miss | CachePhase::Expired => {
+ let max_file_size_bytes = self.max_file_size_bytes();
+
+ let inner = self.inner_mut();
+ if inner.miss_handler.is_some() {
+ panic!("write handler is already set")
+ }
+ let meta = inner.meta.as_ref().unwrap();
+ let key = inner.key.as_ref().unwrap();
+ let miss_handler = inner
+ .storage
+ .get_miss_handler(key, meta, &inner.traces.get_miss_span())
+ .await?;
+
+ inner.miss_handler = if let Some(max_size) = max_file_size_bytes {
+ Some(Box::new(MaxFileSizeMissHandler::new(
+ miss_handler,
+ max_size,
+ )))
+ } else {
+ Some(miss_handler)
+ };
+
+ if inner.storage.support_streaming_partial_write() {
+ // If reader can access partial write, the cache lock can be released here
+ // to let readers start reading the body.
+ let lock = inner.lock.take();
+ if let Some(Locked::Write(_r)) = lock {
+ inner.cache_lock.unwrap().release(key, LockStatus::Done);
+ }
+ // Downstream read and upstream write can be decoupled
+ let body_reader = inner
+ .storage
+ .lookup(key, &inner.traces.get_miss_span())
+ .await?;
+
+ if let Some((_meta, body_reader)) = body_reader {
+ inner.body_reader = Some(body_reader);
+ } else {
+ // body_reader should exist now because streaming_partial_write is to support it
+ panic!("unable to get body_reader for {:?}", meta);
+ }
+ }
+ Ok(())
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Return the [MissHandler] to write the response body to cache.
+ ///
+ /// `None`: the handler has not been set or already finished
+ pub fn miss_handler(&mut self) -> Option<&mut MissHandler> {
+ match self.phase {
+ CachePhase::Miss | CachePhase::Expired => self.inner_mut().miss_handler.as_mut(),
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Finish cache admission
+ ///
+ /// If [self] is dropped without calling this, the cache admission is considered incomplete and
+ /// should be cleaned up.
+ ///
+ /// This call will also trigger eviction if set.
+ pub async fn finish_miss_handler(&mut self) -> Result<()> {
+ match self.phase {
+ CachePhase::Miss | CachePhase::Expired => {
+ let inner = self.inner_mut();
+ if inner.miss_handler.is_none() {
+ // already finished, we allow calling this function more than once
+ return Ok(());
+ }
+ let miss_handler = inner.miss_handler.take().unwrap();
+ let size = miss_handler.finish().await?;
+ let lock = inner.lock.take();
+ let key = inner.key.as_ref().unwrap();
+ if let Some(Locked::Write(_r)) = lock {
+ // no need to call r.unlock() because release() will call it
+ // r is a guard to make sure the lock is unlocked when this request is dropped
+ inner.cache_lock.unwrap().release(key, LockStatus::Done);
+ }
+ if let Some(eviction) = inner.eviction {
+ let cache_key = key.to_compact();
+ let meta = inner.meta.as_ref().unwrap();
+ let evicted = eviction.admit(cache_key, size, meta.0.internal.fresh_until);
+ // TODO: make this async
+ let span = inner.traces.child("eviction");
+ let handle = span.handle();
+ for item in evicted {
+ // TODO: warn/log the error
+ let _ = inner.storage.purge(&item, &handle).await;
+ }
+ }
+ inner.traces.finish_miss_span();
+ Ok(())
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Set the [CacheMeta] of the cache
+ pub fn set_cache_meta(&mut self, meta: CacheMeta) {
+ match self.phase {
+ // TODO: store the staled meta somewhere else for future use?
+ CachePhase::Stale | CachePhase::Miss => {
+ let inner = self.inner_mut();
+ inner.traces.log_meta(&meta);
+ inner.meta = Some(meta);
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ if self.phase == CachePhase::Stale {
+ self.phase = CachePhase::Expired;
+ }
+ }
+
+ /// Set the [CacheMeta] of the cache after revalidation.
+ ///
+ /// Certain info such as the original cache admission time will be preserved. Others will
+ /// be replaced by the input `meta`.
+ pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> {
+ let result = match self.phase {
+ CachePhase::Stale => {
+ let inner = self.inner_mut();
+ // TODO: we should keep old meta in place, just use new one to update it
+ // that requires cacheable_filter to take a mut header and just return InternalMeta
+
+ // update new meta with old meta's created time
+ let created = inner.meta.as_ref().unwrap().0.internal.created;
+ meta.0.internal.created = created;
+ // meta.internal.updated was already set to new meta's `created`,
+ // no need to set `updated` here
+
+ inner.meta.replace(meta);
+
+ let lock = inner.lock.take();
+ if let Some(Locked::Write(_r)) = lock {
+ inner
+ .cache_lock
+ .unwrap()
+ .release(inner.key.as_ref().unwrap(), LockStatus::Done);
+ }
+
+ let mut span = inner.traces.child("update_meta");
+ // TODO: this call can be async
+ let result = inner
+ .storage
+ .update_meta(
+ inner.key.as_ref().unwrap(),
+ inner.meta.as_ref().unwrap(),
+ &span.handle(),
+ )
+ .await;
+ span.set_tag(|| trace::Tag::new("updated", result.is_ok()));
+ result
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ };
+ self.phase = CachePhase::Revalidated;
+ result
+ }
+
+ /// After a successful revalidation, update certain headers for the cached asset
+ /// such as `Etag` with the fresh response header `resp`.
+ pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader {
+ match self.phase {
+ CachePhase::Stale => {
+ /*
+ * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5
+ * 304 response MUST generate ... would have been sent in a 200 ...
+ * - Content-Location, Date, ETag, and Vary
+ * - Cache-Control and Expires...
+ */
+ let mut old_header = self.inner().meta.as_ref().unwrap().0.header.clone();
+ let mut clone_header = |header_name: &'static str| {
+ // TODO: multiple headers
+ if let Some(value) = resp.headers.get(header_name) {
+ old_header.insert_header(header_name, value).unwrap();
+ }
+ };
+ clone_header("cache-control");
+ clone_header("expires");
+ clone_header("cache-tag");
+ clone_header("cdn-cache-control");
+ clone_header("etag");
+ // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4
+ // "...cache MUST update its header fields with the header fields provided in the 304..."
+ // But if the Vary header changes, the cached response may no longer match the
+ // incoming request.
+ //
+ // For simplicity, ignore changing Vary in revalidation for now.
+ // TODO: if we support vary during revalidation, there are a few edge cases to
+ // consider (what if Vary header appears/disappears/changes)?
+ //
+ // clone_header("vary");
+ old_header
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Mark this asset uncacheable after revalidation
+ pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) {
+ match self.phase {
+ CachePhase::Stale => {
+ // replace cache meta header
+ self.inner_mut().meta.as_mut().unwrap().0.header = header;
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ self.phase = CachePhase::RevalidatedNoCache(reason);
+ // TODO: remove this asset from cache once finished?
+ }
+
+ /// Update the variance of the [CacheMeta].
+ ///
+ /// Note that this process may change the lookup `key`, and eventually (when the asset is
+ /// written to storage) invalidate other cached variants under the same primary key as the
+ /// current asset.
+ pub fn update_variance(&mut self, variance: Option<HashBinary>) {
+ // If this is a cache miss, we will simply update the variance in the meta.
+ //
+ // If this is an expired response, we will have to consider a few cases:
+ //
+ // **Case 1**: Variance was absent, but caller sets it now.
+ // We will just insert it into the meta. The current asset becomes the primary variant.
+ // Because the current location of the asset is already the primary variant, nothing else
+ // needs to be done.
+ //
+ // **Case 2**: Variance was present, but it changed or was removed.
+ // We want the current asset to take over the primary slot, in order to invalidate all
+ // other variants derived under the old Vary.
+ //
+ // **Case 3**: Variance did not change.
+ // Nothing needs to happen.
+ let inner = match self.phase {
+ CachePhase::Miss | CachePhase::Expired => self.inner_mut(),
+ _ => panic!("wrong phase {:?}", self.phase),
+ };
+
+ // Update the variance in the meta
+ if let Some(variance_hash) = variance.as_ref() {
+ inner
+ .meta
+ .as_mut()
+ .unwrap()
+ .set_variance_key(*variance_hash);
+ } else {
+ inner.meta.as_mut().unwrap().remove_variance();
+ }
+
+ // Change the lookup `key` if necessary, in order to admit asset into the primary slot
+ // instead of the secondary slot.
+ let key = inner.key.as_ref().unwrap();
+ if let Some(old_variance) = key.get_variance_key().as_ref() {
+ // This is a secondary variant slot.
+ if Some(*old_variance) != variance.as_ref() {
+ // This new variance does not match the variance in the cache key we used to look
+ // up this asset.
+ // Drop the cache lock to avoid leaving a dangling lock
+ // (because we locked with the old cache key for the secondary slot)
+ // TODO: maybe we should try to signal waiting readers to compete for the primary key
+ // lock instead? we will not be modifying this secondary slot so it's not actually
+ // ready for readers
+ if let Some(lock) = inner.cache_lock.as_ref() {
+ lock.release(key, LockStatus::Done);
+ }
+ // Remove the `variance` from the `key`, so that we admit this asset into the
+ // primary slot. (`key` is used to tell storage where to write the data.)
+ inner.key.as_mut().unwrap().remove_variance_key();
+ }
+ }
+ }
+
+ /// Return the [CacheMeta] of this asset
+ ///
+ /// # Panic
+ /// Panic in phases which has no cache meta.
+ pub fn cache_meta(&self) -> &CacheMeta {
+ match self.phase {
+ // TODO: allow in Bypass phase?
+ CachePhase::Stale
+ | CachePhase::Expired
+ | CachePhase::Hit
+ | CachePhase::Revalidated
+ | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref().unwrap(),
+ CachePhase::Miss => {
+ // this is the async body read case, safe because body_reader is only set
+ // after meta is retrieved
+ if self.inner().body_reader.is_some() {
+ self.inner().meta.as_ref().unwrap()
+ } else {
+ panic!("wrong phase {:?}", self.phase);
+ }
+ }
+
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Return the [CacheMeta] of this asset if any
+ ///
+ /// Different from [Self::cache_meta()], this function is allowed to be called in
+ /// [CachePhase::Miss] phase where the cache meta maybe set.
+ /// # Panic
+ /// Panic in phases that shouldn't have cache meta.
+ pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> {
+ match self.phase {
+ CachePhase::Miss
+ | CachePhase::Stale
+ | CachePhase::Expired
+ | CachePhase::Hit
+ | CachePhase::Revalidated
+ | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref(),
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Perform the cache lookup from the given cache storage with the given cache key
+ ///
+ /// A cache hit will return [CacheMeta] which contains the header and meta info about
+ /// the cache as well as a [HitHandler] to read the cache hit body.
+ /// # Panic
+ /// Panic in other phases.
+ pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> {
+ match self.phase {
+ // Stale is allowed here because stale-> cache_lock -> lookup again
+ CachePhase::CacheKey | CachePhase::Stale => {
+ let inner = self.inner_mut();
+ let mut span = inner.traces.child("lookup");
+ let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key
+ let result = inner.storage.lookup(key, &span.handle()).await?;
+ let result = result.and_then(|(meta, header)| {
+ if let Some(ts) = inner.valid_after {
+ if meta.created() < ts {
+ span.set_tag(|| trace::Tag::new("not valid", true));
+ return None;
+ }
+ }
+ Some((meta, header))
+ });
+ if result.is_none() {
+ if let Some(lock) = inner.cache_lock.as_ref() {
+ inner.lock = Some(lock.lock(key));
+ }
+ }
+ span.set_tag(|| trace::Tag::new("found", result.is_some()));
+ Ok(result)
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Update variance and see if the meta matches the current variance
+ ///
+ /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()`
+ /// This function allows callers to compute vary based on the initial cache hit.
+ /// `meta` should be the ones returned from the initial cache_lookup()
+ /// - return true if the meta is the variance.
+ /// - return false if the current meta doesn't match the variance, need to cache_lookup() again
+ pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool {
+ match self.phase {
+ CachePhase::CacheKey => {
+ let inner = self.inner_mut();
+ // make sure that all variance found are fresher than this asset
+ // this is because when purging all the variance, only the primary slot is deleted
+ // the created TS of the primary is the tombstone of all the variances
+ inner.valid_after = Some(meta.created());
+
+ // update vary
+ let key = inner.key.as_mut().unwrap();
+ // if no variance was previously set, then this is the first cache hit
+ let is_initial_cache_hit = key.get_variance_key().is_none();
+ key.set_variance_key(variance);
+ let variance_binary = key.variance_bin();
+ let matches_variance = meta.variance() == variance_binary;
+
+ // We should remove the variance in the lookup `key` if this is the primary variant
+ // slot. We know this is the primary variant slot if this is the initial cache hit
+ // AND the variance in the `key` already matches the `meta`'s.)
+ //
+ // For the primary variant slot, the storage backend needs to use the primary key
+ // for both cache lookup and updating the meta. Otherwise it will look for the
+ // asset in the wrong location during revalidation.
+ //
+ // We can recreate the "full" cache key by using the meta's variance, if needed.
+ if matches_variance && is_initial_cache_hit {
+ inner.key.as_mut().unwrap().remove_variance_key();
+ }
+
+ matches_variance
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Whether this request is behind a cache lock in order to wait for another request to read the
+ /// asset.
+ pub fn is_cache_locked(&self) -> bool {
+ matches!(self.inner().lock, Some(Locked::Read(_)))
+ }
+
+ /// Whether this request is the leader request to fetch the assets for itself and other requests
+ /// behind the cache lock.
+ pub fn is_cache_lock_writer(&self) -> bool {
+ matches!(self.inner().lock, Some(Locked::Write(_)))
+ }
+
+ /// Take the write lock from this request to transfer it to another one.
+ /// # Panic
+ /// Call is_cache_lock_writer() to check first, will panic otherwise.
+ pub fn take_write_lock(&mut self) -> WritePermit {
+ let lock = self.inner_mut().lock.take().unwrap();
+ match lock {
+ Locked::Write(w) => w,
+ Locked::Read(_) => panic!("take_write_lock() called on read lock"),
+ }
+ }
+
+ /// Set the write lock, which is usually transferred from [Self::take_write_lock()]
+ pub fn set_write_lock(&mut self, write_lock: WritePermit) {
+ self.inner_mut().lock.replace(Locked::Write(write_lock));
+ }
+
+ /// Whether this request's cache hit is staled
+ fn has_staled_asset(&self) -> bool {
+ self.phase == CachePhase::Stale
+ }
+
+ /// Whether this asset is staled and stale if error is allowed
+ pub fn can_serve_stale_error(&self) -> bool {
+ self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now())
+ }
+
+ /// Whether this asset is staled and stale while revalidate is allowed.
+ pub fn can_serve_stale_updating(&self) -> bool {
+ self.has_staled_asset()
+ && self
+ .cache_meta()
+ .serve_stale_while_revalidate(SystemTime::now())
+ }
+
+ /// Wait for the cache read lock to be unlocked
+ /// # Panic
+ /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock.
+ pub async fn cache_lock_wait(&mut self) -> LockStatus {
+ let inner = self.inner_mut();
+ let _span = inner.traces.child("cache_lock");
+ let lock = inner.lock.take(); // remove the lock from self
+ if let Some(Locked::Read(r)) = lock {
+ let now = std::time::Instant::now();
+ r.wait().await;
+ let lock_duration = now.elapsed();
+ // it's possible for a request to be locked more than once
+ inner.lock_duration = Some(
+ inner
+ .lock_duration
+ .map_or(lock_duration, |d| d + lock_duration),
+ );
+ r.lock_status() // TODO: tag the span with lock status
+ } else {
+ // should always call is_cache_locked() before this function
+ panic!("cache_lock_wait on wrong type of lock")
+ }
+ }
+
+ /// How long did this request wait behind the read lock
+ pub fn lock_duration(&self) -> Option<Duration> {
+ // FIXME: this duration is lost when cache is disabled
+ self.inner.as_ref().and_then(|i| i.lock_duration)
+ }
+
+ /// Delete the asset from the cache storage
+ /// # Panic
+ /// Need to be called after cache key is set. Panic otherwise.
+ pub async fn purge(&mut self) -> Result<bool> {
+ match self.phase {
+ CachePhase::CacheKey => {
+ let inner = self.inner_mut();
+ let mut span = inner.traces.child("purge");
+ let key = inner.key.as_ref().unwrap().to_compact();
+ let result = inner.storage.purge(&key, &span.handle()).await;
+ // FIXME: also need to remove from eviction manager
+ span.set_tag(|| trace::Tag::new("purged", matches!(result, Ok(true))));
+ result
+ }
+ _ => panic!("wrong phase {:?}", self.phase),
+ }
+ }
+
+ /// Check the cacheable prediction
+ ///
+ /// Return true if the predictor is not set
+ pub fn cacheable_prediction(&self) -> bool {
+ if let Some(predictor) = self.inner().predictor {
+ predictor.cacheable_prediction(self.cache_key())
+ } else {
+ true
+ }
+ }
+
+ /// Tell the predictor that this response which is previously predicted to be uncacheable
+ /// is cacheable now.
+ pub fn response_became_cacheable(&self) {
+ if let Some(predictor) = self.inner().predictor {
+ predictor.mark_cacheable(self.cache_key());
+ }
+ }
+
+ /// Tell the predictor that this response is uncacheable so that it will know next time
+ /// this request arrives.
+ pub fn response_became_uncacheable(&self, reason: NoCacheReason) {
+ if let Some(predictor) = self.inner().predictor {
+ predictor.mark_uncacheable(self.cache_key(), reason);
+ }
+ }
+}
+
+/// Set the header compression dictionary that help serialize http header.
+///
+/// Return false if it is already set.
+pub fn set_compression_dict_path(path: &str) -> bool {
+ crate::meta::COMPRESSION_DICT_PATH
+ .set(path.to_string())
+ .is_ok()
+}
diff --git a/pingora-cache/src/lock.rs b/pingora-cache/src/lock.rs
new file mode 100644
index 0000000..c5e3c31
--- /dev/null
+++ b/pingora-cache/src/lock.rs
@@ -0,0 +1,336 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cache lock
+
+use crate::key::CacheHashKey;
+
+use crate::hashtable::ConcurrentHashTable;
+use pingora_timeout::timeout;
+use std::sync::Arc;
+
+const N_SHARDS: usize = 16;
+
+/// The global cache locking manager
+pub struct CacheLock {
+ lock_table: ConcurrentHashTable<LockStub, N_SHARDS>,
+ timeout: Duration, // fixed timeout value for now
+}
+
+/// A struct prepresenting a locked cache access
+#[derive(Debug)]
+pub enum Locked {
+ /// The writer is allowed to fetch the asset
+ Write(WritePermit),
+ /// The reader waits for the writer to fetch the asset
+ Read(ReadLock),
+}
+
+impl Locked {
+ /// Is this a write lock
+ pub fn is_write(&self) -> bool {
+ matches!(self, Self::Write(_))
+ }
+}
+
+impl CacheLock {
+ /// Create a new [CacheLock] with the given lock timeout
+ ///
+ /// When the timeout is reached, the read locks are automatically unlocked
+ pub fn new(timeout: Duration) -> Self {
+ CacheLock {
+ lock_table: ConcurrentHashTable::new(),
+ timeout,
+ }
+ }
+
+ /// Try to lock a cache fetch
+ ///
+ /// Users should call after a cache miss before fetching the asset.
+ /// The returned [Locked] will tell the caller either to fetch or wait.
+ pub fn lock<K: CacheHashKey>(&self, key: &K) -> Locked {
+ let hash = key.combined_bin();
+ let key = u128::from_be_bytes(hash); // endianness doesn't matter
+ let table = self.lock_table.get(key);
+ if let Some(lock) = table.read().get(&key) {
+ // already has an ongoing request
+ if lock.0.lock_status() != LockStatus::Dangling {
+ return Locked::Read(lock.read_lock());
+ }
+ // Dangling: the previous writer quit without unlocking the lock. Requests should
+ // compete for the write lock again.
+ }
+
+ let (permit, stub) = WritePermit::new(self.timeout);
+ let mut table = table.write();
+ // check again in case another request already added it
+ if let Some(lock) = table.get(&key) {
+ if lock.0.lock_status() != LockStatus::Dangling {
+ return Locked::Read(lock.read_lock());
+ }
+ }
+ table.insert(key, stub);
+ Locked::Write(permit)
+ }
+
+ /// Release a lock for the given key
+ ///
+ /// When the write lock is dropped without being released, the read lock holders will consider
+ /// it to be failed so that they will compete for the write lock again.
+ pub fn release<K: CacheHashKey>(&self, key: &K, reason: LockStatus) {
+ let hash = key.combined_bin();
+ let key = u128::from_be_bytes(hash); // endianness doesn't matter
+ if let Some(lock) = self.lock_table.write(key).remove(&key) {
+ // make sure that the caller didn't forget to unlock it
+ if lock.0.locked() {
+ lock.0.unlock(reason);
+ }
+ }
+ }
+}
+
+use std::sync::atomic::{AtomicU8, Ordering};
+use std::time::{Duration, Instant};
+use tokio::sync::Semaphore;
+
+/// Status which the read locks could possibly see.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum LockStatus {
+ /// Waiting for the writer to populate the asset
+ Waiting,
+ /// The writer finishes, readers can start
+ Done,
+ /// The writer encountered error, such as network issue. A new writer will be elected.
+ TransientError,
+ /// The writer observed that no cache lock is needed (e.g., uncacheable), readers should start
+ /// to fetch independently without a new writer
+ GiveUp,
+ /// The write lock is dropped without being unlocked
+ Dangling,
+ /// The lock is held for too long
+ Timeout,
+}
+
+impl From<LockStatus> for u8 {
+ fn from(l: LockStatus) -> u8 {
+ match l {
+ LockStatus::Waiting => 0,
+ LockStatus::Done => 1,
+ LockStatus::TransientError => 2,
+ LockStatus::GiveUp => 3,
+ LockStatus::Dangling => 4,
+ LockStatus::Timeout => 5,
+ }
+ }
+}
+
+impl From<u8> for LockStatus {
+ fn from(v: u8) -> Self {
+ match v {
+ 0 => Self::Waiting,
+ 1 => Self::Done,
+ 2 => Self::TransientError,
+ 3 => Self::GiveUp,
+ 4 => Self::Dangling,
+ 5 => Self::Timeout,
+ _ => Self::GiveUp, // placeholder
+ }
+ }
+}
+
+#[derive(Debug)]
+struct LockCore {
+ pub lock_start: Instant,
+ pub timeout: Duration,
+ pub(super) lock: Semaphore,
+ // use u8 for Atomic enum
+ lock_status: AtomicU8,
+}
+
+impl LockCore {
+ pub fn new_arc(timeout: Duration) -> Arc<Self> {
+ Arc::new(LockCore {
+ lock: Semaphore::new(0),
+ timeout,
+ lock_start: Instant::now(),
+ lock_status: AtomicU8::new(LockStatus::Waiting.into()),
+ })
+ }
+
+ fn locked(&self) -> bool {
+ self.lock.available_permits() == 0
+ }
+
+ fn unlock(&self, reason: LockStatus) {
+ self.lock_status.store(reason.into(), Ordering::SeqCst);
+ // any small positive number will do, 10 is used for RwLock too
+ // no need to wake up all at once
+ self.lock.add_permits(10);
+ }
+
+ fn lock_status(&self) -> LockStatus {
+ self.lock_status.load(Ordering::Relaxed).into()
+ }
+}
+
+// all 3 structs below are just Arc<LockCore> with different interfaces
+
+/// ReadLock: requests who get it need to wait until it is released
+#[derive(Debug)]
+pub struct ReadLock(Arc<LockCore>);
+
+impl ReadLock {
+ /// Wait for the writer to release the lock
+ pub async fn wait(&self) {
+ if !self.locked() || self.expired() {
+ return;
+ }
+
+ // TODO: should subtract now - start so that the lock don't wait beyond start + timeout
+ // Also need to be careful not to wake everyone up at the same time
+ // (maybe not an issue because regular cache lock release behaves that way)
+ let _ = timeout(self.0.timeout, self.0.lock.acquire()).await;
+ // permit is returned to Semaphore right away
+ }
+
+ /// Test if it is still locked
+ pub fn locked(&self) -> bool {
+ self.0.locked()
+ }
+
+ /// Whether the lock is expired, e.g., the writer has been holding the lock for too long
+ pub fn expired(&self) -> bool {
+ // NOTE: this whether the lock is currently expired
+ // not whether it was timed out during wait()
+ self.0.lock_start.elapsed() >= self.0.timeout
+ }
+
+ /// The current status of the lock
+ pub fn lock_status(&self) -> LockStatus {
+ let status = self.0.lock_status();
+ if matches!(status, LockStatus::Waiting) && self.expired() {
+ LockStatus::Timeout
+ } else {
+ status
+ }
+ }
+}
+
+/// WritePermit: requires who get it need to populate the cache and then release it
+#[derive(Debug)]
+pub struct WritePermit(Arc<LockCore>);
+
+impl WritePermit {
+ fn new(timeout: Duration) -> (WritePermit, LockStub) {
+ let lock = LockCore::new_arc(timeout);
+ let stub = LockStub(lock.clone());
+ (WritePermit(lock), stub)
+ }
+
+ fn unlock(&self, reason: LockStatus) {
+ self.0.unlock(reason)
+ }
+}
+
+impl Drop for WritePermit {
+ fn drop(&mut self) {
+ // writer exit without properly unlock, let others to compete for the write lock again
+ if self.0.locked() {
+ self.unlock(LockStatus::Dangling);
+ }
+ }
+}
+
+struct LockStub(Arc<LockCore>);
+impl LockStub {
+ pub fn read_lock(&self) -> ReadLock {
+ ReadLock(self.0.clone())
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::CacheKey;
+
+ #[test]
+ fn test_get_release() {
+ let cache_lock = CacheLock::new(Duration::from_secs(1000));
+ let key1 = CacheKey::new("", "a", "1");
+ let locked1 = cache_lock.lock(&key1);
+ assert!(locked1.is_write()); // write permit
+ let locked2 = cache_lock.lock(&key1);
+ assert!(!locked2.is_write()); // read lock
+ cache_lock.release(&key1, LockStatus::Done);
+ let locked3 = cache_lock.lock(&key1);
+ assert!(locked3.is_write()); // write permit again
+ }
+
+ #[tokio::test]
+ async fn test_lock() {
+ let cache_lock = CacheLock::new(Duration::from_secs(1000));
+ let key1 = CacheKey::new("", "a", "1");
+ let permit = match cache_lock.lock(&key1) {
+ Locked::Write(w) => w,
+ _ => panic!(),
+ };
+ let lock = match cache_lock.lock(&key1) {
+ Locked::Read(r) => r,
+ _ => panic!(),
+ };
+ assert!(lock.locked());
+ let handle = tokio::spawn(async move {
+ lock.wait().await;
+ assert_eq!(lock.lock_status(), LockStatus::Done);
+ });
+ permit.unlock(LockStatus::Done);
+ handle.await.unwrap(); // check lock is unlocked and the task is returned
+ }
+
+ #[tokio::test]
+ async fn test_lock_timeout() {
+ let cache_lock = CacheLock::new(Duration::from_secs(1));
+ let key1 = CacheKey::new("", "a", "1");
+ let permit = match cache_lock.lock(&key1) {
+ Locked::Write(w) => w,
+ _ => panic!(),
+ };
+ let lock = match cache_lock.lock(&key1) {
+ Locked::Read(r) => r,
+ _ => panic!(),
+ };
+ assert!(lock.locked());
+
+ let handle = tokio::spawn(async move {
+ // timed out
+ lock.wait().await;
+ assert_eq!(lock.lock_status(), LockStatus::Timeout);
+ });
+
+ tokio::time::sleep(Duration::from_secs(2)).await;
+
+ // expired lock
+ let lock2 = match cache_lock.lock(&key1) {
+ Locked::Read(r) => r,
+ _ => panic!(),
+ };
+ assert!(lock2.locked());
+ assert_eq!(lock2.lock_status(), LockStatus::Timeout);
+ lock2.wait().await;
+ assert_eq!(lock2.lock_status(), LockStatus::Timeout);
+
+ permit.unlock(LockStatus::Done);
+ handle.await.unwrap();
+ }
+}
diff --git a/pingora-cache/src/max_file_size.rs b/pingora-cache/src/max_file_size.rs
new file mode 100644
index 0000000..7d812f2
--- /dev/null
+++ b/pingora-cache/src/max_file_size.rs
@@ -0,0 +1,75 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Set limit on the largest size to cache
+
+use crate::storage::HandleMiss;
+use crate::MissHandler;
+use async_trait::async_trait;
+use bytes::Bytes;
+use pingora_error::{Error, ErrorType};
+
+/// [MaxFileSizeMissHandler] wraps a MissHandler to enforce a maximum asset size that should be
+/// written to the MissHandler.
+///
+/// This is used to enforce a maximum cache size for a request when the
+/// response size is not known ahead of time (no Content-Length header). When the response size _is_
+/// known ahead of time, it should be checked up front (when calculating cacheability) for efficiency.
+/// Note: for requests with partial read support (where downstream reads the response from cache as
+/// it is filled), this will cause the request as a whole to fail. The response will be remembered
+/// as uncacheable, though, so downstream will be able to retry the request, since the cache will be
+/// disabled for the retried request.
+pub struct MaxFileSizeMissHandler {
+ inner: MissHandler,
+ max_file_size_bytes: usize,
+ bytes_written: usize,
+}
+
+impl MaxFileSizeMissHandler {
+ /// Create a new [MaxFileSizeMissHandler] wrapping the given [MissHandler]
+ pub fn new(inner: MissHandler, max_file_size_bytes: usize) -> MaxFileSizeMissHandler {
+ MaxFileSizeMissHandler {
+ inner,
+ max_file_size_bytes,
+ bytes_written: 0,
+ }
+ }
+}
+
+/// Error type returned when the limit is reached.
+pub const ERR_RESPONSE_TOO_LARGE: ErrorType = ErrorType::Custom("response too large");
+
+#[async_trait]
+impl HandleMiss for MaxFileSizeMissHandler {
+ async fn write_body(&mut self, data: Bytes, eof: bool) -> pingora_error::Result<()> {
+ // fail if writing the body would exceed the max_file_size_bytes
+ if self.bytes_written + data.len() > self.max_file_size_bytes {
+ return Error::e_explain(
+ ERR_RESPONSE_TOO_LARGE,
+ format!(
+ "writing data of size {} bytes would exceed max file size of {} bytes",
+ data.len(),
+ self.max_file_size_bytes
+ ),
+ );
+ }
+
+ self.bytes_written += data.len();
+ self.inner.write_body(data, eof).await
+ }
+
+ async fn finish(self: Box<Self>) -> pingora_error::Result<usize> {
+ self.inner.finish().await
+ }
+}
diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs
new file mode 100644
index 0000000..679517d
--- /dev/null
+++ b/pingora-cache/src/memory.rs
@@ -0,0 +1,510 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Hash map based in memory cache
+//!
+//! For testing only, not for production use
+
+//TODO: Mark this module #[test] only
+
+use super::*;
+use crate::key::{CacheHashKey, CompactCacheKey};
+use crate::storage::{HandleHit, HandleMiss, Storage};
+use crate::trace::SpanHandle;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use parking_lot::RwLock;
+use pingora_error::*;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::sync::watch;
+
+type BinaryMeta = (Vec<u8>, Vec<u8>);
+
+pub(crate) struct CacheObject {
+ pub meta: BinaryMeta,
+ pub body: Arc<Vec<u8>>,
+}
+
+pub(crate) struct TempObject {
+ pub meta: BinaryMeta,
+ // these are Arc because they need to continue exist after this TempObject is removed
+ pub body: Arc<RwLock<Vec<u8>>>,
+ bytes_written: Arc<watch::Sender<PartialState>>, // this should match body.len()
+}
+
+impl TempObject {
+ fn new(meta: BinaryMeta) -> Self {
+ let (tx, _rx) = watch::channel(PartialState::Partial(0));
+ TempObject {
+ meta,
+ body: Arc::new(RwLock::new(Vec::new())),
+ bytes_written: Arc::new(tx),
+ }
+ }
+ // this is not at all optimized
+ fn make_cache_object(&self) -> CacheObject {
+ let meta = self.meta.clone();
+ let body = Arc::new(self.body.read().clone());
+ CacheObject { meta, body }
+ }
+}
+
+/// Hash map based in memory cache
+///
+/// For testing only, not for production use.
+pub struct MemCache {
+ pub(crate) cached: Arc<RwLock<HashMap<String, CacheObject>>>,
+ pub(crate) temp: Arc<RwLock<HashMap<String, TempObject>>>,
+}
+
+impl MemCache {
+ /// Create a new [MemCache]
+ pub fn new() -> Self {
+ MemCache {
+ cached: Arc::new(RwLock::new(HashMap::new())),
+ temp: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+}
+
+pub enum MemHitHandler {
+ Complete(CompleteHit),
+ Partial(PartialHit),
+}
+
+#[derive(Copy, Clone)]
+enum PartialState {
+ Partial(usize),
+ Complete(usize),
+}
+
+pub struct CompleteHit {
+ body: Arc<Vec<u8>>,
+ done: bool,
+ range_start: usize,
+ range_end: usize,
+}
+
+impl CompleteHit {
+ fn get(&mut self) -> Option<Bytes> {
+ if self.done {
+ None
+ } else {
+ self.done = true;
+ Some(Bytes::copy_from_slice(
+ &self.body.as_slice()[self.range_start..self.range_end],
+ ))
+ }
+ }
+
+ fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
+ if start >= self.body.len() {
+ return Error::e_explain(
+ ErrorType::InternalError,
+ format!("seek start out of range {start} >= {}", self.body.len()),
+ );
+ }
+ self.range_start = start;
+ if let Some(end) = end {
+ // end over the actual last byte is allowed, we just need to return the actual bytes
+ self.range_end = std::cmp::min(self.body.len(), end);
+ }
+ // seek resets read so that one handler can be used for multiple ranges
+ self.done = false;
+ Ok(())
+ }
+}
+
+pub struct PartialHit {
+ body: Arc<RwLock<Vec<u8>>>,
+ bytes_written: watch::Receiver<PartialState>,
+ bytes_read: usize,
+}
+
+impl PartialHit {
+ async fn read(&mut self) -> Option<Bytes> {
+ loop {
+ let bytes_written = *self.bytes_written.borrow_and_update();
+ let bytes_end = match bytes_written {
+ PartialState::Partial(s) => s,
+ PartialState::Complete(c) => {
+ // no more data will arrive
+ if c == self.bytes_read {
+ return None;
+ }
+ c
+ }
+ };
+ assert!(bytes_end >= self.bytes_read);
+
+ // more data avaliable to read
+ if bytes_end > self.bytes_read {
+ let new_bytes =
+ Bytes::copy_from_slice(&self.body.read()[self.bytes_read..bytes_end]);
+ self.bytes_read = bytes_end;
+ return Some(new_bytes);
+ }
+
+ // wait for more data
+ if self.bytes_written.changed().await.is_err() {
+ // err: sender dropped, body is finished
+ // FIXME: sender could drop because of an error
+ return None;
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl HandleHit for MemHitHandler {
+ async fn read_body(&mut self) -> Result<Option<Bytes>> {
+ match self {
+ Self::Complete(c) => Ok(c.get()),
+ Self::Partial(p) => Ok(p.read().await),
+ }
+ }
+ async fn finish(
+ self: Box<Self>, // because self is always used as a trait object
+ _storage: &'static (dyn storage::Storage + Sync),
+ _key: &CacheKey,
+ _trace: &SpanHandle,
+ ) -> Result<()> {
+ Ok(())
+ }
+
+ fn can_seek(&self) -> bool {
+ match self {
+ Self::Complete(_) => true,
+ Self::Partial(_) => false, // TODO: support seeking in partial reads
+ }
+ }
+
+ fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
+ match self {
+ Self::Complete(c) => c.seek(start, end),
+ Self::Partial(_) => Error::e_explain(
+ ErrorType::InternalError,
+ "seek not supported for partial cache",
+ ),
+ }
+ }
+
+ fn as_any(&self) -> &(dyn Any + Send + Sync) {
+ self
+ }
+}
+
+pub struct MemMissHandler {
+ body: Arc<RwLock<Vec<u8>>>,
+ bytes_written: Arc<watch::Sender<PartialState>>,
+ // these are used only in finish() to to data from temp to cache
+ key: String,
+ cache: Arc<RwLock<HashMap<String, CacheObject>>>,
+ temp: Arc<RwLock<HashMap<String, TempObject>>>,
+}
+
+#[async_trait]
+impl HandleMiss for MemMissHandler {
+ async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()> {
+ let current_bytes = match *self.bytes_written.borrow() {
+ PartialState::Partial(p) => p,
+ PartialState::Complete(_) => panic!("already EOF"),
+ };
+ self.body.write().extend_from_slice(&data);
+ let written = current_bytes + data.len();
+ let new_state = if eof {
+ PartialState::Complete(written)
+ } else {
+ PartialState::Partial(written)
+ };
+ self.bytes_written.send_replace(new_state);
+ Ok(())
+ }
+
+ async fn finish(self: Box<Self>) -> Result<usize> {
+ // safe, the temp object is inserted when the miss handler is created
+ let cache_object = self.temp.read().get(&self.key).unwrap().make_cache_object();
+ let size = cache_object.body.len(); // FIXME: this just body size, also track meta size
+ self.cache.write().insert(self.key.clone(), cache_object);
+ self.temp.write().remove(&self.key);
+ Ok(size)
+ }
+}
+
+impl Drop for MemMissHandler {
+ fn drop(&mut self) {
+ self.temp.write().remove(&self.key);
+ }
+}
+
+#[async_trait]
+impl Storage for MemCache {
+ async fn lookup(
+ &'static self,
+ key: &CacheKey,
+ _trace: &SpanHandle,
+ ) -> Result<Option<(CacheMeta, HitHandler)>> {
+ let hash = key.combined();
+ // always prefer partial read otherwise fresh asset will not be visible on expired asset
+ // until it is fully updated
+ if let Some(temp_obj) = self.temp.read().get(&hash) {
+ let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?;
+ let partial = PartialHit {
+ body: temp_obj.body.clone(),
+ bytes_written: temp_obj.bytes_written.subscribe(),
+ bytes_read: 0,
+ };
+ let hit_handler = MemHitHandler::Partial(partial);
+ Ok(Some((meta, Box::new(hit_handler))))
+ } else if let Some(obj) = self.cached.read().get(&hash) {
+ let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
+ let hit_handler = CompleteHit {
+ body: obj.body.clone(),
+ done: false,
+ range_start: 0,
+ range_end: obj.body.len(),
+ };
+ let hit_handler = MemHitHandler::Complete(hit_handler);
+ Ok(Some((meta, Box::new(hit_handler))))
+ } else {
+ Ok(None)
+ }
+ }
+
+ async fn get_miss_handler(
+ &'static self,
+ key: &CacheKey,
+ meta: &CacheMeta,
+ _trace: &SpanHandle,
+ ) -> Result<MissHandler> {
+ // TODO: support multiple concurrent writes or panic if the is already a writer
+ let hash = key.combined();
+ let meta = meta.serialize()?;
+ let temp_obj = TempObject::new(meta);
+ let miss_handler = MemMissHandler {
+ body: temp_obj.body.clone(),
+ bytes_written: temp_obj.bytes_written.clone(),
+ key: hash.clone(),
+ cache: self.cached.clone(),
+ temp: self.temp.clone(),
+ };
+ self.temp.write().insert(hash, temp_obj);
+ Ok(Box::new(miss_handler))
+ }
+
+ async fn purge(&'static self, key: &CompactCacheKey, _trace: &SpanHandle) -> Result<bool> {
+ // TODO: purge partial
+
+ // This usually purges the primary key because, without a lookup, variance key is usually
+ // empty
+ let hash = key.combined();
+ Ok(self.cached.write().remove(&hash).is_some())
+ }
+
+ async fn update_meta(
+ &'static self,
+ key: &CacheKey,
+ meta: &CacheMeta,
+ _trace: &SpanHandle,
+ ) -> Result<bool> {
+ let hash = key.combined();
+ if let Some(obj) = self.cached.write().get_mut(&hash) {
+ obj.meta = meta.serialize()?;
+ Ok(true)
+ } else {
+ panic!("no meta found")
+ }
+ }
+
+ fn support_streaming_partial_write(&self) -> bool {
+ true
+ }
+
+ fn as_any(&self) -> &(dyn Any + Send + Sync) {
+ self
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use once_cell::sync::Lazy;
+ use rustracing::span::Span;
+
+ fn gen_meta() -> CacheMeta {
+ let mut header = ResponseHeader::build(200, None).unwrap();
+ header.append_header("foo1", "bar1").unwrap();
+ header.append_header("foo2", "bar2").unwrap();
+ header.append_header("foo3", "bar3").unwrap();
+ header.append_header("Server", "Pingora").unwrap();
+ let internal = crate::meta::InternalMeta::default();
+ CacheMeta(Box::new(crate::meta::CacheMetaInner {
+ internal,
+ header,
+ extensions: http::Extensions::new(),
+ }))
+ }
+
+ #[tokio::test]
+ async fn test_write_then_read() {
+ static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new);
+ let span = &Span::inactive().handle();
+
+ let key1 = CacheKey::new("", "a", "1");
+ let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
+ assert!(res.is_none());
+
+ let cache_meta = gen_meta();
+
+ let mut miss_handler = MEM_CACHE
+ .get_miss_handler(&key1, &cache_meta, span)
+ .await
+ .unwrap();
+ miss_handler
+ .write_body(b"test1"[..].into(), false)
+ .await
+ .unwrap();
+ miss_handler
+ .write_body(b"test2"[..].into(), false)
+ .await
+ .unwrap();
+ miss_handler.finish().await.unwrap();
+
+ let (cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
+ assert_eq!(
+ cache_meta.0.internal.fresh_until,
+ cache_meta2.0.internal.fresh_until
+ );
+
+ let data = hit_handler.read_body().await.unwrap().unwrap();
+ assert_eq!("test1test2", data);
+ let data = hit_handler.read_body().await.unwrap();
+ assert!(data.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_read_range() {
+ static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new);
+ let span = &Span::inactive().handle();
+
+ let key1 = CacheKey::new("", "a", "1");
+ let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
+ assert!(res.is_none());
+
+ let cache_meta = gen_meta();
+
+ let mut miss_handler = MEM_CACHE
+ .get_miss_handler(&key1, &cache_meta, span)
+ .await
+ .unwrap();
+ miss_handler
+ .write_body(b"test1test2"[..].into(), false)
+ .await
+ .unwrap();
+ miss_handler.finish().await.unwrap();
+
+ let (cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
+ assert_eq!(
+ cache_meta.0.internal.fresh_until,
+ cache_meta2.0.internal.fresh_until
+ );
+
+ // out of range
+ assert!(hit_handler.seek(10000, None).is_err());
+
+ assert!(hit_handler.seek(5, None).is_ok());
+ let data = hit_handler.read_body().await.unwrap().unwrap();
+ assert_eq!("test2", data);
+ let data = hit_handler.read_body().await.unwrap();
+ assert!(data.is_none());
+
+ assert!(hit_handler.seek(4, Some(5)).is_ok());
+ let data = hit_handler.read_body().await.unwrap().unwrap();
+ assert_eq!("1", data);
+ let data = hit_handler.read_body().await.unwrap();
+ assert!(data.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_write_while_read() {
+ use futures::FutureExt;
+
+ static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new);
+ let span = &Span::inactive().handle();
+
+ let key1 = CacheKey::new("", "a", "1");
+ let res = MEM_CACHE.lookup(&key1, span).await.unwrap();
+ assert!(res.is_none());
+
+ let cache_meta = gen_meta();
+
+ let mut miss_handler = MEM_CACHE
+ .get_miss_handler(&key1, &cache_meta, span)
+ .await
+ .unwrap();
+
+ // first reader
+ let (cache_meta1, mut hit_handler1) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
+ assert_eq!(
+ cache_meta.0.internal.fresh_until,
+ cache_meta1.0.internal.fresh_until
+ );
+
+ // No body to read
+ let res = hit_handler1.read_body().now_or_never();
+ assert!(res.is_none());
+
+ miss_handler
+ .write_body(b"test1"[..].into(), false)
+ .await
+ .unwrap();
+
+ let data = hit_handler1.read_body().await.unwrap().unwrap();
+ assert_eq!("test1", data);
+ let res = hit_handler1.read_body().now_or_never();
+ assert!(res.is_none());
+
+ miss_handler
+ .write_body(b"test2"[..].into(), false)
+ .await
+ .unwrap();
+ let data = hit_handler1.read_body().await.unwrap().unwrap();
+ assert_eq!("test2", data);
+
+ // second reader
+ let (cache_meta2, mut hit_handler2) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap();
+ assert_eq!(
+ cache_meta.0.internal.fresh_until,
+ cache_meta2.0.internal.fresh_until
+ );
+
+ let data = hit_handler2.read_body().await.unwrap().unwrap();
+ assert_eq!("test1test2", data);
+ let res = hit_handler2.read_body().now_or_never();
+ assert!(res.is_none());
+
+ let res = hit_handler1.read_body().now_or_never();
+ assert!(res.is_none());
+
+ miss_handler.finish().await.unwrap();
+
+ let data = hit_handler1.read_body().await.unwrap();
+ assert!(data.is_none());
+ let data = hit_handler2.read_body().await.unwrap();
+ assert!(data.is_none());
+ }
+}
diff --git a/pingora-cache/src/meta.rs b/pingora-cache/src/meta.rs
new file mode 100644
index 0000000..a534dc0
--- /dev/null
+++ b/pingora-cache/src/meta.rs
@@ -0,0 +1,608 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Metadata for caching
+
+use http::Extensions;
+use pingora_error::{Error, ErrorType::*, OrErr, Result};
+use pingora_http::{HMap, ResponseHeader};
+use serde::{Deserialize, Serialize};
+use std::time::{Duration, SystemTime};
+
+use crate::key::HashBinary;
+
+pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
+mod internal_meta {
+ use super::*;
+
+ pub(crate) type InternalMetaLatest = InternalMetaV2;
+
+ #[derive(Debug, Deserialize, Serialize, Clone)]
+ pub(crate) struct InternalMetaV0 {
+ pub(crate) fresh_until: SystemTime,
+ pub(crate) created: SystemTime,
+ pub(crate) stale_while_revalidate_sec: u32,
+ pub(crate) stale_if_error_sec: u32,
+ // Do not add more field
+ }
+
+ impl InternalMetaV0 {
+ #[allow(dead_code)]
+ fn serialize(&self) -> Result<Vec<u8>> {
+ rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
+ }
+
+ fn deserialize(buf: &[u8]) -> Result<Self> {
+ rmp_serde::decode::from_slice(buf)
+ .or_err(InternalError, "failed to decode cache meta v0")
+ }
+ }
+
+ #[derive(Debug, Deserialize, Serialize, Clone)]
+ pub(crate) struct InternalMetaV1 {
+ pub(crate) version: u8,
+ pub(crate) fresh_until: SystemTime,
+ pub(crate) created: SystemTime,
+ pub(crate) stale_while_revalidate_sec: u32,
+ pub(crate) stale_if_error_sec: u32,
+ // Do not add more field
+ }
+
+ impl InternalMetaV1 {
+ #[allow(dead_code)]
+ pub const VERSION: u8 = 1;
+
+ #[allow(dead_code)]
+ pub fn serialize(&self) -> Result<Vec<u8>> {
+ assert_eq!(self.version, 1);
+ rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
+ }
+
+ fn deserialize(buf: &[u8]) -> Result<Self> {
+ rmp_serde::decode::from_slice(buf)
+ .or_err(InternalError, "failed to decode cache meta v1")
+ }
+ }
+
+ #[derive(Debug, Deserialize, Serialize, Clone)]
+ pub(crate) struct InternalMetaV2 {
+ pub(crate) version: u8,
+ pub(crate) fresh_until: SystemTime,
+ pub(crate) created: SystemTime,
+ pub(crate) updated: SystemTime,
+ pub(crate) stale_while_revalidate_sec: u32,
+ pub(crate) stale_if_error_sec: u32,
+ // Only the extended field to be added below. One field at a time.
+ // 1. serde default in order to accept an older version schema without the field existing
+ // 2. serde skip_serializing_if in order for software with only an older version of this
+ // schema to decode it
+ // After full releases, remove `skip_serializing_if` so that we can add the next extended field.
+ #[serde(default)]
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub(crate) variance: Option<HashBinary>,
+ }
+
+ impl Default for InternalMetaV2 {
+ fn default() -> Self {
+ let epoch = SystemTime::UNIX_EPOCH;
+ InternalMetaV2 {
+ version: InternalMetaV2::VERSION,
+ fresh_until: epoch,
+ created: epoch,
+ updated: epoch,
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ variance: None,
+ }
+ }
+ }
+
+ impl InternalMetaV2 {
+ pub const VERSION: u8 = 2;
+
+ pub fn serialize(&self) -> Result<Vec<u8>> {
+ assert_eq!(self.version, Self::VERSION);
+ rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
+ }
+
+ fn deserialize(buf: &[u8]) -> Result<Self> {
+ rmp_serde::decode::from_slice(buf)
+ .or_err(InternalError, "failed to decode cache meta v2")
+ }
+ }
+
+ impl From<InternalMetaV0> for InternalMetaV2 {
+ fn from(v0: InternalMetaV0) -> Self {
+ InternalMetaV2 {
+ version: InternalMetaV2::VERSION,
+ fresh_until: v0.fresh_until,
+ created: v0.created,
+ updated: v0.created,
+ stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
+ stale_if_error_sec: v0.stale_if_error_sec,
+ ..Default::default()
+ }
+ }
+ }
+
+ impl From<InternalMetaV1> for InternalMetaV2 {
+ fn from(v1: InternalMetaV1) -> Self {
+ InternalMetaV2 {
+ version: InternalMetaV2::VERSION,
+ fresh_until: v1.fresh_until,
+ created: v1.created,
+ updated: v1.created,
+ stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
+ stale_if_error_sec: v1.stale_if_error_sec,
+ ..Default::default()
+ }
+ }
+ }
+
+ // cross version decode
+ pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
+ const MIN_SIZE: usize = 10; // a small number to read the first few bytes
+ if buf.len() < MIN_SIZE {
+ return Error::e_explain(
+ InternalError,
+ format!("Buf too short ({}) to be InternalMeta", buf.len()),
+ );
+ }
+ let preread_buf = &mut &buf[..MIN_SIZE];
+ // the struct is always packed as a fixed size array
+ match rmp::decode::read_array_len(preread_buf)
+ .or_err(InternalError, "failed to decode cache meta array size")?
+ {
+ // v0 has 4 items and no version number
+ 4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
+ // other V should has version number encoded
+ _ => {
+ // rmp will encode version < 128 into a fixint (one byte),
+ // so we use read_pfix
+ let version = rmp::decode::read_pfix(preread_buf)
+ .or_err(InternalError, "failed to decode meta version")?;
+ match version {
+ 1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
+ 2 => InternalMetaV2::deserialize(buf),
+ _ => Error::e_explain(
+ InternalError,
+ format!("Unknown InternalMeta version {version}"),
+ ),
+ }
+ }
+ }
+ }
+
+ #[cfg(test)]
+ mod tests {
+ use super::*;
+
+ #[test]
+ fn test_internal_meta_serde_v0() {
+ let meta = InternalMetaV0 {
+ fresh_until: SystemTime::now(),
+ created: SystemTime::now(),
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ };
+ let binary = meta.serialize().unwrap();
+ let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ }
+
+ #[test]
+ fn test_internal_meta_serde_v1() {
+ let meta = InternalMetaV1 {
+ version: InternalMetaV1::VERSION,
+ fresh_until: SystemTime::now(),
+ created: SystemTime::now(),
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ };
+ let binary = meta.serialize().unwrap();
+ let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ }
+
+ #[test]
+ fn test_internal_meta_serde_v2() {
+ let meta = InternalMetaV2::default();
+ let binary = meta.serialize().unwrap();
+ let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
+ assert_eq!(meta2.version, 2);
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ assert_eq!(meta.created, meta2.created);
+ assert_eq!(meta.updated, meta2.updated);
+ }
+
+ #[test]
+ fn test_internal_meta_serde_across_versions() {
+ let meta = InternalMetaV0 {
+ fresh_until: SystemTime::now(),
+ created: SystemTime::now(),
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ };
+ let binary = meta.serialize().unwrap();
+ let meta2 = deserialize(&binary).unwrap();
+ assert_eq!(meta2.version, 2);
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+
+ let meta = InternalMetaV1 {
+ version: 1,
+ fresh_until: SystemTime::now(),
+ created: SystemTime::now(),
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ };
+ let binary = meta.serialize().unwrap();
+ let meta2 = deserialize(&binary).unwrap();
+ assert_eq!(meta2.version, 2);
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ // `updated` == `created` when upgrading to v2
+ assert_eq!(meta2.created, meta2.updated);
+ }
+
+ #[test]
+ fn test_internal_meta_serde_v2_extend_fields() {
+ // make sure that v2 format is backward compatible
+ // this is the base version of v2 without any extended fields
+ #[derive(Deserialize, Serialize)]
+ pub(crate) struct InternalMetaV2Base {
+ pub(crate) version: u8,
+ pub(crate) fresh_until: SystemTime,
+ pub(crate) created: SystemTime,
+ pub(crate) updated: SystemTime,
+ pub(crate) stale_while_revalidate_sec: u32,
+ pub(crate) stale_if_error_sec: u32,
+ }
+
+ impl InternalMetaV2Base {
+ pub const VERSION: u8 = 2;
+ pub fn serialize(&self) -> Result<Vec<u8>> {
+ assert!(self.version >= Self::VERSION);
+ rmp_serde::encode::to_vec(self)
+ .or_err(InternalError, "failed to encode cache meta")
+ }
+ fn deserialize(buf: &[u8]) -> Result<Self> {
+ rmp_serde::decode::from_slice(buf)
+ .or_err(InternalError, "failed to decode cache meta v2")
+ }
+ }
+
+ // ext V2 to base v2
+ let meta = InternalMetaV2::default();
+ let binary = meta.serialize().unwrap();
+ let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
+ assert_eq!(meta2.version, 2);
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ assert_eq!(meta.created, meta2.created);
+ assert_eq!(meta.updated, meta2.updated);
+
+ // base V2 to ext v2
+ let now = SystemTime::now();
+ let meta = InternalMetaV2Base {
+ version: InternalMetaV2::VERSION,
+ fresh_until: now,
+ created: now,
+ updated: now,
+ stale_while_revalidate_sec: 0,
+ stale_if_error_sec: 0,
+ };
+ let binary = meta.serialize().unwrap();
+ let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
+ assert_eq!(meta2.version, 2);
+ assert_eq!(meta.fresh_until, meta2.fresh_until);
+ assert_eq!(meta.created, meta2.created);
+ assert_eq!(meta.updated, meta2.updated);
+ }
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct CacheMetaInner {
+ // http header and Internal meta have different ways of serialization, so keep them separated
+ pub(crate) internal: InternalMeta,
+ pub(crate) header: ResponseHeader,
+ /// An opaque type map to hold extra information for communication between cache backends
+ /// and users. This field is **not** garanteed be persistently stored in the cache backend.
+ pub extensions: Extensions,
+}
+
+/// The cacheable response header and cache metadata
+#[derive(Debug)]
+pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
+
+impl CacheMeta {
+ /// Create a [CacheMeta] from the given metadata and the response header
+ pub fn new(
+ fresh_until: SystemTime,
+ created: SystemTime,
+ stale_while_revalidate_sec: u32,
+ stale_if_error_sec: u32,
+ header: ResponseHeader,
+ ) -> CacheMeta {
+ CacheMeta(Box::new(CacheMetaInner {
+ internal: InternalMeta {
+ version: InternalMeta::VERSION,
+ fresh_until,
+ created,
+ updated: created, // created == updated for new meta
+ stale_while_revalidate_sec,
+ stale_if_error_sec,
+ ..Default::default()
+ },
+ header,
+ extensions: Extensions::new(),
+ }))
+ }
+
+ /// When the asset was created/admitted to cache
+ pub fn created(&self) -> SystemTime {
+ self.0.internal.created
+ }
+
+ /// The last time the asset was revalidated
+ ///
+ /// This value will be the same as [Self::created()] if no revalidation ever happens
+ pub fn updated(&self) -> SystemTime {
+ self.0.internal.updated
+ }
+
+ /// Is the asset still valid
+ pub fn is_fresh(&self, time: SystemTime) -> bool {
+ // NOTE: HTTP cache time resolution is second
+ self.0.internal.fresh_until >= time
+ }
+
+ /// How long (in seconds) the asset should be fresh since its admission/revalidation
+ ///
+ /// This is essentially the max-age value (or its equivalence)
+ pub fn fresh_sec(&self) -> u64 {
+ // swallow `duration_since` error, assets that are always stale have earlier `fresh_until` than `created`
+ // practically speaking we can always treat these as 0 ttl
+ // XXX: return Error if `fresh_until` is much earlier than expected?
+ self.0
+ .internal
+ .fresh_until
+ .duration_since(self.0.internal.updated)
+ .map_or(0, |duration| duration.as_secs())
+ }
+
+ /// Until when the asset is considered fresh
+ pub fn fresh_until(&self) -> SystemTime {
+ self.0.internal.fresh_until
+ }
+
+ /// How old the asset is since its admission/revalidation
+ pub fn age(&self) -> Duration {
+ SystemTime::now()
+ .duration_since(self.updated())
+ .unwrap_or_default()
+ }
+
+ /// The stale-while-revalidate limit in seconds
+ pub fn stale_while_revalidate_sec(&self) -> u32 {
+ self.0.internal.stale_while_revalidate_sec
+ }
+
+ /// The stale-if-error limit in seconds
+ pub fn stale_if_error_sec(&self) -> u32 {
+ self.0.internal.stale_if_error_sec
+ }
+
+ /// Can the asset be used to serve stale during revalidation at the given time.
+ ///
+ /// NOTE: the serve stale functions do not check !is_fresh(time),
+ /// i.e. the object is already assumed to be stale.
+ pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
+ self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
+ }
+
+ /// Can the asset be used to serve stale after error at the given time.
+ ///
+ /// NOTE: the serve stale functions do not check !is_fresh(time),
+ /// i.e. the object is already assumed to be stale.
+ pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
+ self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
+ }
+
+ /// Disable serve stale for this asset
+ pub fn disable_serve_stale(&mut self) {
+ self.0.internal.stale_if_error_sec = 0;
+ self.0.internal.stale_while_revalidate_sec = 0;
+ }
+
+ /// Get the variance hash of this asset
+ pub fn variance(&self) -> Option<HashBinary> {
+ self.0.internal.variance
+ }
+
+ /// Set the variance key of this asset
+ pub fn set_variance_key(&mut self, variance_key: HashBinary) {
+ self.0.internal.variance = Some(variance_key);
+ }
+
+ /// Set the variance (hash) of this asset
+ pub fn set_variance(&mut self, variance: HashBinary) {
+ self.0.internal.variance = Some(variance)
+ }
+
+ /// Removes the variance (hash) of this asset
+ pub fn remove_variance(&mut self) {
+ self.0.internal.variance = None
+ }
+
+ /// Get the response header in this asset
+ pub fn response_header(&self) -> &ResponseHeader {
+ &self.0.header
+ }
+
+ /// Modify the header in this asset
+ pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
+ &mut self.0.header
+ }
+
+ /// Expose the extensions to read
+ pub fn extensions(&self) -> &Extensions {
+ &self.0.extensions
+ }
+
+ /// Expose the extensions to modify
+ pub fn extensions_mut(&mut self) -> &mut Extensions {
+ &mut self.0.extensions
+ }
+
+ /// Get a copy of the response header
+ pub fn response_header_copy(&self) -> ResponseHeader {
+ self.0.header.clone()
+ }
+
+ /// get all the headers of this asset
+ pub fn headers(&self) -> &HMap {
+ &self.0.header.headers
+ }
+
+ fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
+ if serve_stale_sec == 0 {
+ return false;
+ }
+ if let Some(stale_until) = self
+ .0
+ .internal
+ .fresh_until
+ .checked_add(Duration::from_secs(serve_stale_sec.into()))
+ {
+ stale_until >= time
+ } else {
+ // overflowed: treat as infinite ttl
+ true
+ }
+ }
+
+ /// Serialize this object
+ pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
+ let internal = self.0.internal.serialize()?;
+ let header = header_serialize(&self.0.header)?;
+ Ok((internal, header))
+ }
+
+ /// Deserialize from the binary format
+ pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
+ let internal = internal_meta::deserialize(internal)?;
+ let header = header_deserialize(header)?;
+ Ok(CacheMeta(Box::new(CacheMetaInner {
+ internal,
+ header,
+ extensions: Extensions::new(),
+ })))
+ }
+}
+
+use http::StatusCode;
+
+/// The function to generate TTL from the given [StatusCode].
+pub type FreshSecByStatusFn = fn(StatusCode) -> Option<u32>;
+
+/// The default settings to generate [CacheMeta]
+pub struct CacheMetaDefaults {
+ // if a status code is not included in fresh_sec, it's not considered cacheable by default.
+ fresh_sec_fn: FreshSecByStatusFn,
+ stale_while_revalidate_sec: u32,
+ // TODO: allow "error" condition to be configurable?
+ stale_if_error_sec: u32,
+}
+
+impl CacheMetaDefaults {
+ /// Create a new [CacheMetaDefaults]
+ pub const fn new(
+ fresh_sec_fn: FreshSecByStatusFn,
+ stale_while_revalidate_sec: u32,
+ stale_if_error_sec: u32,
+ ) -> Self {
+ CacheMetaDefaults {
+ fresh_sec_fn,
+ stale_while_revalidate_sec,
+ stale_if_error_sec,
+ }
+ }
+
+ /// Return the default TTL for the given [StatusCode]
+ ///
+ /// `None`: do no cache this code.
+ pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<u32> {
+ // safe guard to make sure 304 response to share the same default ttl of 200
+ if resp_status == StatusCode::NOT_MODIFIED {
+ (self.fresh_sec_fn)(StatusCode::OK)
+ } else {
+ (self.fresh_sec_fn)(resp_status)
+ }
+ }
+
+ /// The default SWR seconds
+ pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
+ self.stale_while_revalidate_sec
+ }
+
+ /// The default SIE seconds
+ pub fn serve_stale_if_error_sec(&self) -> u32 {
+ self.stale_if_error_sec
+ }
+}
+
+use log::warn;
+use once_cell::sync::{Lazy, OnceCell};
+use pingora_header_serde::HeaderSerde;
+use std::fs::File;
+use std::io::Read;
+
+/* load header compression engine and its' dictionary globally */
+pub(crate) static COMPRESSION_DICT_PATH: OnceCell<String> = OnceCell::new();
+
+fn load_file(path: &String) -> Option<Vec<u8>> {
+ let mut file = File::open(path)
+ .map_err(|e| {
+ warn!(
+ "failed to open header compress dictionary file at {}, {:?}",
+ path, e
+ );
+ e
+ })
+ .ok()?;
+ let mut dict = Vec::new();
+ file.read_to_end(&mut dict)
+ .map_err(|e| {
+ warn!(
+ "failed to read header compress dictionary file at {}, {:?}",
+ path, e
+ );
+ e
+ })
+ .ok()?;
+
+ Some(dict)
+}
+
+static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
+ let dict = COMPRESSION_DICT_PATH.get().and_then(load_file);
+ HeaderSerde::new(dict)
+});
+
+pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
+ HEADER_SERDE.serialize(header)
+}
+
+pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
+ HEADER_SERDE.deserialize(buf.as_ref())
+}
diff --git a/pingora-cache/src/predictor.rs b/pingora-cache/src/predictor.rs
new file mode 100644
index 0000000..df8f374
--- /dev/null
+++ b/pingora-cache/src/predictor.rs
@@ -0,0 +1,228 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cacheability Predictor
+
+use crate::hashtable::{ConcurrentLruCache, LruShard};
+
+pub type CustomReasonPredicate = fn(&'static str) -> bool;
+
+/// Cacheability Predictor
+///
+/// Remembers previously uncacheable assets.
+/// Allows bypassing cache / cache lock early based on historical precedent.
+///
+/// NOTE: to simply avoid caching requests with certain characteristics,
+/// add checks in request_cache_filter to avoid enabling cache in the first place.
+/// The predictor's bypass mechanism handles cases where the request _looks_ cacheable
+/// but its previous responses suggest otherwise. The request _could_ be cacheable in the future.
+pub struct Predictor<const N_SHARDS: usize> {
+ uncacheable_keys: ConcurrentLruCache<(), N_SHARDS>,
+ skip_custom_reasons_fn: Option<CustomReasonPredicate>,
+}
+
+use crate::{key::CacheHashKey, CacheKey, NoCacheReason};
+use log::debug;
+
+/// The cache predictor trait.
+///
+/// This trait allows user defined predictor to replace [Predictor].
+pub trait CacheablePredictor {
+ /// Return true if likely cacheable, false if likely not.
+ fn cacheable_prediction(&self, key: &CacheKey) -> bool;
+
+ /// Mark cacheable to allow next request to cache.
+ /// Returns false if the key was already marked cacheable.
+ fn mark_cacheable(&self, key: &CacheKey) -> bool;
+
+ /// Mark uncacheable to actively bypass cache on the next request.
+ /// May skip marking on certain NoCacheReasons.
+ /// Returns None if we skipped marking uncacheable.
+ /// Returns Some(false) if the key was already marked uncacheable.
+ fn mark_uncacheable(&self, key: &CacheKey, reason: NoCacheReason) -> Option<bool>;
+}
+
+// This particular bit of `where [LruShard...; N]: Default` nonsense arises from
+// ConcurrentLruCache needing this trait bound, which in turns arises from the Rust
+// compiler not being able to guarantee that all array sizes N implement `Default`.
+// See https://github.com/rust-lang/rust/issues/61415
+impl<const N_SHARDS: usize> Predictor<N_SHARDS>
+where
+ [LruShard<()>; N_SHARDS]: Default,
+{
+ /// Create a new Predictor with `N_SHARDS * shard_capacity` total capacity for
+ /// uncacheable cache keys.
+ ///
+ /// - `shard_capacity`: defines number of keys remembered as uncacheable per LRU shard.
+ /// - `skip_custom_reasons_fn`: an optional predicate used in `mark_uncacheable`
+ /// that can customize which `Custom` `NoCacheReason`s ought to be remembered as uncacheable.
+ /// If the predicate returns true, then the predictor will skip remembering the current
+ /// cache key as uncacheable (and avoid bypassing cache on the next request).
+ pub fn new(
+ shard_capacity: usize,
+ skip_custom_reasons_fn: Option<CustomReasonPredicate>,
+ ) -> Predictor<N_SHARDS> {
+ Predictor {
+ uncacheable_keys: ConcurrentLruCache::<(), N_SHARDS>::new(shard_capacity),
+ skip_custom_reasons_fn,
+ }
+ }
+}
+
+impl<const N_SHARDS: usize> CacheablePredictor for Predictor<N_SHARDS>
+where
+ [LruShard<()>; N_SHARDS]: Default,
+{
+ fn cacheable_prediction(&self, key: &CacheKey) -> bool {
+ // variance key is ignored because this check happens before cache lookup
+ let hash = key.primary_bin();
+ let key = u128::from_be_bytes(hash); // Endianness doesn't matter
+
+ // Note: LRU updated in mark_* functions only,
+ // as we assume the caller always updates the cacheability of the response later
+ !self.uncacheable_keys.read(key).contains(&key)
+ }
+
+ fn mark_cacheable(&self, key: &CacheKey) -> bool {
+ // variance key is ignored because cacheable_prediction() is called before cache lookup
+ // where the variance key is unknown
+ let hash = key.primary_bin();
+ let key = u128::from_be_bytes(hash);
+
+ let cache = self.uncacheable_keys.get(key);
+ if !cache.read().contains(&key) {
+ // not in uncacheable list, nothing to do
+ return true;
+ }
+
+ let mut cache = cache.write();
+ cache.pop(&key);
+ debug!("bypassed request became cacheable");
+ false
+ }
+
+ fn mark_uncacheable(&self, key: &CacheKey, reason: NoCacheReason) -> Option<bool> {
+ // only mark as uncacheable for the future on certain reasons,
+ // (e.g. InternalErrors)
+ use NoCacheReason::*;
+ match reason {
+ // CacheLockGiveUp: the writer will set OriginNotCache (if applicable)
+ // readers don't need to do it
+ NeverEnabled | StorageError | InternalError | Deferred | CacheLockGiveUp
+ | CacheLockTimeout => {
+ return None;
+ }
+ // Skip certain NoCacheReason::Custom according to user
+ Custom(reason) if self.skip_custom_reasons_fn.map_or(false, |f| f(reason)) => {
+ return None;
+ }
+ Custom(_) | OriginNotCache | ResponseTooLarge => { /* mark uncacheable for these only */
+ }
+ }
+
+ // variance key is ignored because cacheable_prediction() is called before cache lookup
+ // where the variance key is unknown
+ let hash = key.primary_bin();
+ let key = u128::from_be_bytes(hash);
+
+ let mut cache = self.uncacheable_keys.get(key).write();
+ // put() returns Some(old_value) if the key existed, else None
+ let new_key = cache.put(key, ()).is_none();
+ if new_key {
+ debug!("request marked uncacheable");
+ }
+ Some(new_key)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ #[test]
+ fn test_mark_cacheability() {
+ let predictor = Predictor::<1>::new(10, None);
+ let key = CacheKey::new("a", "b", "c");
+ // cacheable if no history
+ assert!(predictor.cacheable_prediction(&key));
+
+ // don't remember internal / storage errors
+ predictor.mark_uncacheable(&key, NoCacheReason::InternalError);
+ assert!(predictor.cacheable_prediction(&key));
+ predictor.mark_uncacheable(&key, NoCacheReason::StorageError);
+ assert!(predictor.cacheable_prediction(&key));
+
+ // origin explicitly said uncacheable
+ predictor.mark_uncacheable(&key, NoCacheReason::OriginNotCache);
+ assert!(!predictor.cacheable_prediction(&key));
+
+ // mark cacheable again
+ predictor.mark_cacheable(&key);
+ assert!(predictor.cacheable_prediction(&key));
+ }
+
+ #[test]
+ fn test_custom_skip_predicate() {
+ let predictor = Predictor::<1>::new(
+ 10,
+ Some(|custom_reason| matches!(custom_reason, "Skipping")),
+ );
+ let key = CacheKey::new("a", "b", "c");
+ // cacheable if no history
+ assert!(predictor.cacheable_prediction(&key));
+
+ // custom predicate still uses default skip reasons
+ predictor.mark_uncacheable(&key, NoCacheReason::InternalError);
+ assert!(predictor.cacheable_prediction(&key));
+
+ // other custom reasons can still be marked uncacheable
+ predictor.mark_uncacheable(&key, NoCacheReason::Custom("DontCacheMe"));
+ assert!(!predictor.cacheable_prediction(&key));
+
+ let key = CacheKey::new("a", "c", "d");
+ assert!(predictor.cacheable_prediction(&key));
+ // specific custom reason is skipped
+ predictor.mark_uncacheable(&key, NoCacheReason::Custom("Skipping"));
+ assert!(predictor.cacheable_prediction(&key));
+ }
+
+ #[test]
+ fn test_mark_uncacheable_lru() {
+ let predictor = Predictor::<1>::new(3, None);
+ let key1 = CacheKey::new("a", "b", "c");
+ predictor.mark_uncacheable(&key1, NoCacheReason::OriginNotCache);
+ assert!(!predictor.cacheable_prediction(&key1));
+
+ let key2 = CacheKey::new("a", "bc", "c");
+ predictor.mark_uncacheable(&key2, NoCacheReason::OriginNotCache);
+ assert!(!predictor.cacheable_prediction(&key2));
+
+ let key3 = CacheKey::new("a", "cd", "c");
+ predictor.mark_uncacheable(&key3, NoCacheReason::OriginNotCache);
+ assert!(!predictor.cacheable_prediction(&key3));
+
+ // promote / reinsert key1
+ predictor.mark_uncacheable(&key1, NoCacheReason::OriginNotCache);
+
+ let key4 = CacheKey::new("a", "de", "c");
+ predictor.mark_uncacheable(&key4, NoCacheReason::OriginNotCache);
+ assert!(!predictor.cacheable_prediction(&key4));
+
+ // key 1 was recently used
+ assert!(!predictor.cacheable_prediction(&key1));
+ // key 2 was evicted
+ assert!(predictor.cacheable_prediction(&key2));
+ assert!(!predictor.cacheable_prediction(&key3));
+ assert!(!predictor.cacheable_prediction(&key4));
+ }
+}
diff --git a/pingora-cache/src/put.rs b/pingora-cache/src/put.rs
new file mode 100644
index 0000000..c50cc2b
--- /dev/null
+++ b/pingora-cache/src/put.rs
@@ -0,0 +1,754 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cache Put module
+
+use crate::*;
+use bytes::Bytes;
+use http::header;
+use pingora_core::protocols::http::{
+ v1::common::header_value_content_length, HttpTask, ServerSession,
+};
+
+/// The interface to define cache put behavior
+pub trait CachePut {
+ /// Return whether to cache the asset according to the given response header.
+ fn cacheable(&self, response: &ResponseHeader) -> RespCacheable {
+ let cc = cache_control::CacheControl::from_resp_headers(response);
+ filters::resp_cacheable(cc.as_ref(), response, false, Self::cache_defaults())
+ }
+
+ /// Return the [CacheMetaDefaults]
+ fn cache_defaults() -> &'static CacheMetaDefaults;
+}
+
+use parse_response::ResponseParse;
+
+/// The cache put context
+pub struct CachePutCtx<C: CachePut> {
+ cache_put: C, // the user defined cache put behavior
+ key: CacheKey,
+ storage: &'static (dyn storage::Storage + Sync), // static for now
+ eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
+ miss_handler: Option<MissHandler>,
+ max_file_size_bytes: Option<usize>,
+ meta: Option<CacheMeta>,
+ parser: ResponseParse,
+ // FIXME: cache put doesn't have cache lock but some storage cannot handle concurrent put
+ // to the same asset.
+ trace: trace::Span,
+}
+
+impl<C: CachePut> CachePutCtx<C> {
+ /// Create a new [CachePutCtx]
+ pub fn new(
+ cache_put: C,
+ key: CacheKey,
+ storage: &'static (dyn storage::Storage + Sync),
+ eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>,
+ trace: trace::Span,
+ ) -> Self {
+ CachePutCtx {
+ cache_put,
+ key,
+ storage,
+ eviction,
+ miss_handler: None,
+ max_file_size_bytes: None,
+ meta: None,
+ parser: ResponseParse::new(),
+ trace,
+ }
+ }
+
+ /// Set the max cacheable size limit
+ pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) {
+ self.max_file_size_bytes = Some(max_file_size_bytes);
+ }
+
+ async fn put_header(&mut self, meta: CacheMeta) -> Result<()> {
+ let trace = self.trace.child("cache put header", |o| o.start()).handle();
+ let miss_handler = self
+ .storage
+ .get_miss_handler(&self.key, &meta, &trace)
+ .await?;
+ self.miss_handler = Some(
+ if let Some(max_file_size_bytes) = self.max_file_size_bytes {
+ Box::new(MaxFileSizeMissHandler::new(
+ miss_handler,
+ max_file_size_bytes,
+ ))
+ } else {
+ miss_handler
+ },
+ );
+ self.meta = Some(meta);
+ Ok(())
+ }
+
+ async fn put_body(&mut self, data: Bytes, eof: bool) -> Result<()> {
+ let miss_handler = self.miss_handler.as_mut().unwrap();
+ miss_handler.write_body(data, eof).await
+ }
+
+ async fn finish(&mut self) -> Result<()> {
+ let Some(miss_handler) = self.miss_handler.take() else {
+ // no miss_handler, uncacheable
+ return Ok(());
+ };
+ let size = miss_handler.finish().await?;
+ if let Some(eviction) = self.eviction.as_ref() {
+ let cache_key = self.key.to_compact();
+ let meta = self.meta.as_ref().unwrap();
+ let evicted = eviction.admit(cache_key, size, meta.0.internal.fresh_until);
+ // TODO: make this async
+ let trace = self
+ .trace
+ .child("cache put eviction", |o| o.start())
+ .handle();
+ for item in evicted {
+ // TODO: warn/log the error
+ let _ = self.storage.purge(&item, &trace).await;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn do_cache_put(&mut self, data: &[u8]) -> Result<Option<NoCacheReason>> {
+ let tasks = self.parser.inject_data(data)?;
+ for task in tasks {
+ match task {
+ HttpTask::Header(header, _eos) => match self.cache_put.cacheable(&header) {
+ RespCacheable::Cacheable(meta) => {
+ if let Some(max_file_size_bytes) = self.max_file_size_bytes {
+ let content_length_hdr = header.headers.get(header::CONTENT_LENGTH);
+ if let Some(content_length) =
+ header_value_content_length(content_length_hdr)
+ {
+ if content_length > max_file_size_bytes {
+ return Ok(Some(NoCacheReason::ResponseTooLarge));
+ }
+ }
+ }
+
+ self.put_header(meta).await?;
+ }
+ RespCacheable::Uncacheable(reason) => {
+ return Ok(Some(reason));
+ }
+ },
+ HttpTask::Body(data, eos) => {
+ if let Some(data) = data {
+ self.put_body(data, eos).await?;
+ }
+ }
+ _ => {
+ panic!("unexpected HttpTask during cache put {task:?}");
+ }
+ }
+ }
+ Ok(None)
+ }
+
+ /// Start the cache put logic for the given request
+ ///
+ /// This function will start to read the request body to put into cache.
+ /// Return:
+ /// - `Ok(None)` when the payload will be cache.
+ /// - `Ok(Some(reason))` when the payload is not cacheable
+ pub async fn cache_put(
+ &mut self,
+ session: &mut ServerSession,
+ ) -> Result<Option<NoCacheReason>> {
+ let mut no_cache_reason = None;
+ while let Some(data) = session.read_request_body().await? {
+ if no_cache_reason.is_some() {
+ // even uncacheable, the entire body needs to be drains for 1. downstream
+ // not throwing errors 2. connection reuse
+ continue;
+ }
+ no_cache_reason = self.do_cache_put(&data).await?
+ }
+ self.parser.finish()?;
+ self.finish().await?;
+ Ok(no_cache_reason)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use once_cell::sync::Lazy;
+ use rustracing::span::Span;
+
+ struct TestCachePut();
+ impl CachePut for TestCachePut {
+ fn cache_defaults() -> &'static CacheMetaDefaults {
+ const DEFAULT: CacheMetaDefaults = CacheMetaDefaults::new(|_| Some(1), 1, 1);
+ &DEFAULT
+ }
+ }
+
+ type TestCachePutCtx = CachePutCtx<TestCachePut>;
+ static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);
+
+ #[tokio::test]
+ async fn test_cache_put() {
+ let key = CacheKey::new("", "a", "1");
+ let span = Span::inactive();
+ let put = TestCachePut();
+ let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
+ let payload = b"HTTP/1.1 200 OK\r\n\
+ Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
+ Content-Type: text/html; charset=utf-8\r\n\
+ Connection: keep-alive\r\n\
+ X-Frame-Options: SAMEORIGIN\r\n\
+ Cache-Control: public, max-age=1\r\n\
+ Server: origin-server\r\n\
+ Content-Length: 4\r\n\r\nrust";
+ // here we skip mocking a real http session for simplicity
+ let res = ctx.do_cache_put(payload).await.unwrap();
+ assert!(res.is_none()); // cacheable
+ ctx.parser.finish().unwrap();
+ ctx.finish().await.unwrap();
+
+ let span = Span::inactive();
+ let (meta, mut hit) = CACHE_BACKEND
+ .lookup(&key, &span.handle())
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(
+ meta.headers().get("date").unwrap(),
+ "Thu, 26 Apr 2018 05:42:05 GMT"
+ );
+ let data = hit.read_body().await.unwrap().unwrap();
+ assert_eq!(data, "rust");
+ }
+
+ #[tokio::test]
+ async fn test_cache_put_uncacheable() {
+ let key = CacheKey::new("", "a", "1");
+ let span = Span::inactive();
+ let put = TestCachePut();
+ let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span);
+ let payload = b"HTTP/1.1 200 OK\r\n\
+ Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\
+ Content-Type: text/html; charset=utf-8\r\n\
+ Connection: keep-alive\r\n\
+ X-Frame-Options: SAMEORIGIN\r\n\
+ Cache-Control: no-store\r\n\
+ Server: origin-server\r\n\
+ Content-Length: 4\r\n\r\nrust";
+ // here we skip mocking a real http session for simplicity
+ let no_cache = ctx.do_cache_put(payload).await.unwrap().unwrap();
+ assert_eq!(no_cache, NoCacheReason::OriginNotCache);
+ ctx.parser.finish().unwrap();
+ ctx.finish().await.unwrap();
+ }
+}
+
+// maybe this can simplify some logic in pingora::h1
+
+mod parse_response {
+ use super::*;
+ use bytes::{Bytes, BytesMut};
+ use httparse::Status;
+ use pingora_error::{
+ Error,
+ ErrorType::{self, *},
+ Result,
+ };
+ use pingora_http::ResponseHeader;
+
+ pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk");
+ pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");
+
+ const MAX_HEADERS: usize = 256;
+ const INIT_HEADER_BUF_SIZE: usize = 4096;
+ const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n
+
+ #[derive(Debug, Clone, Copy)]
+ enum ParseState {
+ Init,
+ PartialHeader,
+ PartialBodyContentLength(usize, usize),
+ PartialChunkedBody(usize),
+ PartialHttp10Body(usize),
+ Done(usize),
+ Invalid(httparse::Error),
+ }
+
+ impl ParseState {
+ fn is_done(&self) -> bool {
+ matches!(self, Self::Done(_))
+ }
+ fn read_header(&self) -> bool {
+ matches!(self, Self::Init | Self::PartialHeader)
+ }
+ fn read_body(&self) -> bool {
+ matches!(
+ self,
+ Self::PartialBodyContentLength(..)
+ | Self::PartialChunkedBody(_)
+ | Self::PartialHttp10Body(_)
+ )
+ }
+ }
+
+ pub(super) struct ResponseParse {
+ state: ParseState,
+ buf: BytesMut,
+ header_bytes: Bytes,
+ }
+
+ impl ResponseParse {
+ pub fn new() -> Self {
+ ResponseParse {
+ state: ParseState::Init,
+ buf: BytesMut::with_capacity(INIT_HEADER_BUF_SIZE),
+ header_bytes: Bytes::new(),
+ }
+ }
+
+ pub fn inject_data(&mut self, data: &[u8]) -> Result<Vec<HttpTask>> {
+ self.put_data(data);
+
+ let mut tasks = vec![];
+ while !self.state.is_done() {
+ if self.state.read_header() {
+ let header = self.parse_header()?;
+ let Some(header) = header else {
+ break;
+ };
+ tasks.push(HttpTask::Header(Box::new(header), self.state.is_done()));
+ } else if self.state.read_body() {
+ let body = self.parse_body()?;
+ let Some(body) = body else {
+ break;
+ };
+ tasks.push(HttpTask::Body(Some(body), self.state.is_done()));
+ } else {
+ break;
+ }
+ }
+ Ok(tasks)
+ }
+
+ fn put_data(&mut self, data: &[u8]) {
+ use ParseState::*;
+ if matches!(self.state, Done(_) | Invalid(_)) {
+ panic!("Wrong phase {:?}", self.state);
+ }
+ self.buf.extend_from_slice(data);
+ }
+
+ fn parse_header(&mut self) -> Result<Option<ResponseHeader>> {
+ let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
+ let mut resp = httparse::Response::new(&mut headers);
+ let mut parser = httparse::ParserConfig::default();
+ parser.allow_spaces_after_header_name_in_responses(true);
+ parser.allow_obsolete_multiline_headers_in_responses(true);
+
+ let res = parser.parse_response(&mut resp, &self.buf);
+ let res = match res {
+ Ok(res) => res,
+ Err(e) => {
+ self.state = ParseState::Invalid(e);
+ return Error::e_because(
+ InvalidHTTPHeader,
+ format!("buf: {:?}", String::from_utf8_lossy(&self.buf)),
+ e,
+ );
+ }
+ };
+
+ let split_to = match res {
+ Status::Complete(s) => s,
+ Status::Partial => {
+ self.state = ParseState::PartialHeader;
+ return Ok(None);
+ }
+ };
+ // safe to unwrap, valid response always has code set.
+ let mut response =
+ ResponseHeader::build(resp.code.unwrap(), Some(resp.headers.len())).unwrap();
+ for header in resp.headers {
+ // TODO: consider hold a Bytes and all header values can be Bytes referencing the
+ // original buffer without reallocation
+ response.append_header(header.name.to_owned(), header.value.to_owned())?;
+ }
+ // TODO: see above, we can make header value `Bytes` referencing header_bytes
+ let header_bytes = self.buf.split_to(split_to).freeze();
+ self.header_bytes = header_bytes;
+ self.state = body_type(&response);
+
+ Ok(Some(response))
+ }
+
+ fn parse_body(&mut self) -> Result<Option<Bytes>> {
+ use ParseState::*;
+ if self.buf.is_empty() {
+ return Ok(None);
+ }
+ match self.state {
+ Init | PartialHeader | Invalid(_) => {
+ panic!("Wrong phase {:?}", self.state);
+ }
+ Done(_) => Ok(None),
+ PartialBodyContentLength(total, mut seen) => {
+ let end = if total < self.buf.len() + seen {
+ // TODO: warn! more data than expected
+ total - seen
+ } else {
+ self.buf.len()
+ };
+ seen += end;
+ if seen >= total {
+ self.state = Done(seen);
+ } else {
+ self.state = PartialBodyContentLength(total, seen);
+ }
+ Ok(Some(self.buf.split_to(end).freeze()))
+ }
+ PartialChunkedBody(seen) => {
+ let parsed = httparse::parse_chunk_size(&self.buf).map_err(|e| {
+ self.state = Done(seen);
+ Error::explain(INVALID_CHUNK, format!("Invalid chucked encoding: {e:?}"))
+ })?;
+ match parsed {
+ httparse::Status::Complete((header_len, body_len)) => {
+ // 4\r\nRust\r\n: header: "4\r\n", body: "Rust", "\r\n"
+ let total_chunk_size =
+ header_len + body_len as usize + CHUNK_DELIMITER_SIZE;
+ if self.buf.len() < total_chunk_size {
+ // wait for the full chunk tob read
+ // Note that we have to buffer the entire chunk in this design
+ Ok(None)
+ } else {
+ if body_len == 0 {
+ self.state = Done(seen);
+ } else {
+ self.state = PartialChunkedBody(seen + body_len as usize);
+ }
+ let mut chunk_bytes = self.buf.split_to(total_chunk_size);
+ let mut chunk_body = chunk_bytes.split_off(header_len);
+ chunk_body.truncate(body_len as usize);
+ // Note that the final 0 sized chunk will return an empty Bytes
+ // instead of not None
+ Ok(Some(chunk_body.freeze()))
+ }
+ }
+ httparse::Status::Partial => {
+ // not even a full chunk, continue waiting for more data
+ Ok(None)
+ }
+ }
+ }
+ PartialHttp10Body(seen) => {
+ self.state = PartialHttp10Body(seen + self.buf.len());
+ Ok(Some(self.buf.split().freeze()))
+ }
+ }
+ }
+
+ pub fn finish(&mut self) -> Result<()> {
+ if let ParseState::PartialHttp10Body(seen) = self.state {
+ self.state = ParseState::Done(seen);
+ }
+ if !self.state.is_done() {
+ Error::e_explain(INCOMPLETE_BODY, format!("{:?}", self.state))
+ } else {
+ Ok(())
+ }
+ }
+ }
+
+ fn body_type(resp: &ResponseHeader) -> ParseState {
+ use http::StatusCode;
+
+ if matches!(
+ resp.status,
+ StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED
+ ) {
+ // these status code cannot have body by definition
+ return ParseState::Done(0);
+ }
+ if let Some(encoding) = resp.headers.get(http::header::TRANSFER_ENCODING) {
+ // TODO: case sensitive?
+ if encoding.as_bytes() == b"chunked" {
+ return ParseState::PartialChunkedBody(0);
+ }
+ }
+ if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) {
+ // ignore invalid header value
+ if let Some(cl) = std::str::from_utf8(cl.as_bytes())
+ .ok()
+ .and_then(|cl| cl.parse::<usize>().ok())
+ {
+ return if cl == 0 {
+ ParseState::Done(0)
+ } else {
+ ParseState::PartialBodyContentLength(cl, 0)
+ };
+ }
+ }
+ ParseState::PartialHttp10Body(0)
+ }
+
+ #[cfg(test)]
+ mod test {
+ use super::*;
+
+ #[test]
+ fn test_basic_response() {
+ let input = b"HTTP/1.1 200 OK\r\n\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Header(header, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+ assert!(!eos);
+
+ let body = b"abc";
+ let output = parser.inject_data(body).unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Body(data, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), &body[..]);
+ parser.finish().unwrap();
+ }
+
+ #[test]
+ fn test_partial_response_headers() {
+ let input = b"HTTP/1.1 200 OK\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+ // header is not complete
+ assert_eq!(output.len(), 0);
+
+ let output = parser
+ .inject_data("Server: pingora\r\n\r\n".as_bytes())
+ .unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Header(header, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+ assert_eq!(header.headers.get("Server").unwrap(), "pingora");
+ assert!(!eos);
+ }
+
+ #[test]
+ fn test_invalid_headers() {
+ let input = b"HTP/1.1 200 OK\r\nServer: pingora\r\n\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input);
+ // header is not complete
+ assert!(output.is_err());
+ }
+
+ #[test]
+ fn test_body_content_length() {
+ let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 2);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "abc");
+ assert!(!eos);
+
+ let output = parser.inject_data(b"def").unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Body(data, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "def");
+ assert!(eos);
+
+ parser.finish().unwrap();
+ }
+
+ #[test]
+ fn test_body_chunked() {
+ let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 2);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "rust");
+ assert!(!eos);
+
+ let output = parser.inject_data(b"0\r\n\r\n").unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Body(data, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "");
+ assert!(eos);
+
+ parser.finish().unwrap();
+ }
+
+ #[test]
+ fn test_body_content_length_early() {
+ let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 2);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "abc");
+ assert!(!eos);
+
+ parser.finish().unwrap_err();
+ }
+
+ #[test]
+ fn test_body_content_length_more_data() {
+ let input = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nabc";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 2);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "ab");
+ assert!(eos);
+
+ // extra data is dropped without error
+ parser.finish().unwrap();
+ }
+
+ #[test]
+ fn test_body_chunked_early() {
+ let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 2);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "rust");
+ assert!(!eos);
+
+ parser.finish().unwrap_err();
+ }
+
+ #[test]
+ fn test_body_chunked_partial_chunk() {
+ let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nru";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 1);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let output = parser.inject_data(b"st\r\n").unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Body(data, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "rust");
+ assert!(!eos);
+ }
+
+ #[test]
+ fn test_body_chunked_partial_chunk_head() {
+ let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 1);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+
+ let output = parser.inject_data(b"\nrust\r\n").unwrap();
+ assert_eq!(output.len(), 1);
+ let HttpTask::Body(data, eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "rust");
+ assert!(!eos);
+ }
+
+ #[test]
+ fn test_body_chunked_many_chunks() {
+ let input =
+ b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n1\r\ny\r\n";
+ let mut parser = ResponseParse::new();
+ let output = parser.inject_data(input).unwrap();
+
+ assert_eq!(output.len(), 3);
+ let HttpTask::Header(header, _eos) = &output[0] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(header.status, 200);
+ let HttpTask::Body(data, eos) = &output[1] else {
+ panic!("{:?}", output);
+ };
+ assert!(!eos);
+ assert_eq!(data.as_ref().unwrap(), "rust");
+ let HttpTask::Body(data, eos) = &output[2] else {
+ panic!("{:?}", output);
+ };
+ assert_eq!(data.as_ref().unwrap(), "y");
+ assert!(!eos);
+ }
+ }
+}
diff --git a/pingora-cache/src/storage.rs b/pingora-cache/src/storage.rs
new file mode 100644
index 0000000..c6365c7
--- /dev/null
+++ b/pingora-cache/src/storage.rs
@@ -0,0 +1,122 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Cache backend storage abstraction
+
+use super::{CacheKey, CacheMeta};
+use crate::key::CompactCacheKey;
+use crate::trace::SpanHandle;
+
+use async_trait::async_trait;
+use pingora_error::Result;
+use std::any::Any;
+
+/// Cache storage interface
+#[async_trait]
+pub trait Storage {
+ // TODO: shouldn't have to be static
+
+ /// Lookup the storage for the given [CacheKey]
+ async fn lookup(
+ &'static self,
+ key: &CacheKey,
+ trace: &SpanHandle,
+ ) -> Result<Option<(CacheMeta, HitHandler)>>;
+
+ /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later.
+ async fn get_miss_handler(
+ &'static self,
+ key: &CacheKey,
+ meta: &CacheMeta,
+ trace: &SpanHandle,
+ ) -> Result<MissHandler>;
+
+ /// Delete the cached asset for the given key
+ ///
+ /// [CompactCacheKey] is used here because it is how eviction managers store the keys
+ async fn purge(&'static self, key: &CompactCacheKey, trace: &SpanHandle) -> Result<bool>;
+
+ /// Update cache header and metadata for the already stored asset.
+ async fn update_meta(
+ &'static self,
+ key: &CacheKey,
+ meta: &CacheMeta,
+ trace: &SpanHandle,
+ ) -> Result<bool>;
+
+ /// Whether this storage backend supports reading partially written data
+ ///
+ /// This is to indicate when cache should unlock readers
+ fn support_streaming_partial_write(&self) -> bool {
+ false
+ }
+
+ /// Helper function to cast the trait object to concrete types
+ fn as_any(&self) -> &(dyn Any + Send + Sync + 'static);
+}
+
+/// Cache hit handling trait
+#[async_trait]
+pub trait HandleHit {
+ /// Read cached body
+ ///
+ /// Return `None` when no more body to read.
+ async fn read_body(&mut self) -> Result<Option<bytes::Bytes>>;
+
+ /// Finish the current cache hit
+ async fn finish(
+ self: Box<Self>, // because self is always used as a trait object
+ storage: &'static (dyn Storage + Sync),
+ key: &CacheKey,
+ trace: &SpanHandle,
+ ) -> Result<()>;
+
+ /// Whether this storage allow seeking to a certain range of body
+ fn can_seek(&self) -> bool {
+ false
+ }
+
+ /// Try to seek to a certain range of the body
+ ///
+ /// `end: None` means to read to the end of the body.
+ fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> {
+ // to prevent impl can_seek() without impl seek
+ todo!("seek() needs to be implemented")
+ }
+ // TODO: fn is_stream_hit()
+
+ /// Helper function to cast the trait object to concrete types
+ fn as_any(&self) -> &(dyn Any + Send + Sync);
+}
+
+/// Hit Handler
+pub type HitHandler = Box<(dyn HandleHit + Sync + Send)>;
+
+/// Cache miss handling trait
+#[async_trait]
+pub trait HandleMiss {
+ /// Write the given body to the storage
+ async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()>;
+
+ /// Finish the cache admission
+ ///
+ /// When `self` is dropped without calling this function, the storage should consider this write
+ /// failed.
+ async fn finish(
+ self: Box<Self>, // because self is always used as a trait object
+ ) -> Result<usize>;
+}
+
+/// Miss Handler
+pub type MissHandler = Box<(dyn HandleMiss + Sync + Send)>;
diff --git a/pingora-cache/src/trace.rs b/pingora-cache/src/trace.rs
new file mode 100644
index 0000000..c385aea
--- /dev/null
+++ b/pingora-cache/src/trace.rs
@@ -0,0 +1,98 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Distributed tracing helpers
+
+use rustracing_jaeger::span::SpanContextState;
+use std::time::SystemTime;
+
+use crate::{CacheMeta, CachePhase, HitStatus};
+
+pub use rustracing::tag::Tag;
+
+pub type Span = rustracing::span::Span<SpanContextState>;
+pub type SpanHandle = rustracing::span::SpanHandle<SpanContextState>;
+
+#[derive(Debug)]
+pub(crate) struct CacheTraceCTX {
+ // parent span
+ pub cache_span: Span,
+ // only spans across multiple calls need to store here
+ pub miss_span: Span,
+ pub hit_span: Span,
+}
+
+impl CacheTraceCTX {
+ pub fn new() -> Self {
+ CacheTraceCTX {
+ cache_span: Span::inactive(),
+ miss_span: Span::inactive(),
+ hit_span: Span::inactive(),
+ }
+ }
+
+ pub fn enable(&mut self, cache_span: Span) {
+ self.cache_span = cache_span;
+ }
+
+ #[inline]
+ pub fn child(&self, name: &'static str) -> Span {
+ self.cache_span.child(name, |o| o.start())
+ }
+
+ pub fn start_miss_span(&mut self) {
+ self.miss_span = self.child("miss");
+ }
+
+ pub fn get_miss_span(&self) -> SpanHandle {
+ self.miss_span.handle()
+ }
+
+ pub fn finish_miss_span(&mut self) {
+ self.miss_span.set_finish_time(SystemTime::now);
+ }
+
+ pub fn start_hit_span(&mut self, phase: CachePhase, hit_status: HitStatus) {
+ self.hit_span = self.child("hit");
+ self.hit_span.set_tag(|| Tag::new("phase", phase.as_str()));
+ self.hit_span
+ .set_tag(|| Tag::new("status", hit_status.as_str()));
+ }
+
+ pub fn finish_hit_span(&mut self) {
+ self.hit_span.set_finish_time(SystemTime::now);
+ }
+
+ pub fn log_meta(&mut self, meta: &CacheMeta) {
+ fn ts2epoch(ts: SystemTime) -> f64 {
+ ts.duration_since(SystemTime::UNIX_EPOCH)
+ .unwrap_or_default() // should never overflow but be safe here
+ .as_secs_f64()
+ }
+ let internal = &meta.0.internal;
+ self.hit_span.set_tags(|| {
+ [
+ Tag::new("created", ts2epoch(internal.created)),
+ Tag::new("fresh_until", ts2epoch(internal.fresh_until)),
+ Tag::new("updated", ts2epoch(internal.updated)),
+ Tag::new("stale_if_error_sec", internal.stale_if_error_sec as i64),
+ Tag::new(
+ "stale_while_revalidate_sec",
+ internal.stale_while_revalidate_sec as i64,
+ ),
+ Tag::new("variance", internal.variance.is_some()),
+ ]
+ });
+ }
+}
diff --git a/pingora-cache/src/variance.rs b/pingora-cache/src/variance.rs
new file mode 100644
index 0000000..cce8160
--- /dev/null
+++ b/pingora-cache/src/variance.rs
@@ -0,0 +1,120 @@
+use std::{borrow::Cow, collections::BTreeMap};
+
+use blake2::Digest;
+
+use crate::key::{Blake2b128, HashBinary};
+
+/// A builder for variance keys, used for distinguishing multiple cached assets
+/// at the same URL. This is intended to be easily passed to helper functions,
+/// which can each populate a portion of the variance.
+pub struct VarianceBuilder<'a> {
+ values: BTreeMap<Cow<'a, str>, Cow<'a, [u8]>>,
+}
+
+impl<'a> VarianceBuilder<'a> {
+ /// Create an empty variance key. Has no variance by default - add some variance using
+ /// [`Self::add_value`].
+ pub fn new() -> Self {
+ VarianceBuilder {
+ values: BTreeMap::new(),
+ }
+ }
+
+ /// Add a byte string to the variance key. Not sensitive to insertion order.
+ /// `value` is intended to take either `&str` or `&[u8]`.
+ pub fn add_value(&mut self, name: &'a str, value: &'a (impl AsRef<[u8]> + ?Sized)) {
+ self.values
+ .insert(name.into(), Cow::Borrowed(value.as_ref()));
+ }
+
+ /// Move a byte string to the variance key. Not sensitive to insertion order. Useful when
+ /// writing helper functions which generate a value then add said value to the VarianceBuilder.
+ /// Without this, the helper function would have to move the value to the calling function
+ /// to extend its lifetime to at least match the VarianceBuilder.
+ pub fn add_owned_value(&mut self, name: &'a str, value: Vec<u8>) {
+ self.values.insert(name.into(), Cow::Owned(value));
+ }
+
+ /// Check whether this variance key actually has variance, or just refers to the root asset
+ pub fn has_variance(&self) -> bool {
+ !self.values.is_empty()
+ }
+
+ /// Hash this variance key. Returns [`None`] if [`Self::has_variance`] is false.
+ pub fn finalize(self) -> Option<HashBinary> {
+ const SALT: &[u8; 1] = &[0u8; 1];
+ if self.has_variance() {
+ let mut hash = Blake2b128::new();
+ for (name, value) in self.values.iter() {
+ hash.update(name.as_bytes());
+ hash.update(SALT);
+ hash.update(value);
+ hash.update(SALT);
+ }
+ Some(hash.finalize().into())
+ } else {
+ None
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_basic() {
+ let key_empty = VarianceBuilder::new().finalize();
+ assert_eq!(None, key_empty);
+
+ let mut key_value = VarianceBuilder::new();
+ key_value.add_value("a", "a");
+ let key_value = key_value.finalize();
+
+ let mut key_owned_value = VarianceBuilder::new();
+ key_owned_value.add_owned_value("a", "a".as_bytes().to_vec());
+ let key_owned_value = key_owned_value.finalize();
+
+ assert_ne!(key_empty, key_value);
+ assert_ne!(key_empty, key_owned_value);
+ assert_eq!(key_value, key_owned_value);
+ }
+
+ #[test]
+ fn test_value_ordering() {
+ let mut key_abc = VarianceBuilder::new();
+ key_abc.add_value("a", "a");
+ key_abc.add_value("b", "b");
+ key_abc.add_value("c", "c");
+ let key_abc = key_abc.finalize().unwrap();
+
+ let mut key_bac = VarianceBuilder::new();
+ key_bac.add_value("b", "b");
+ key_bac.add_value("a", "a");
+ key_bac.add_value("c", "c");
+ let key_bac = key_bac.finalize().unwrap();
+
+ let mut key_cba = VarianceBuilder::new();
+ key_cba.add_value("c", "c");
+ key_cba.add_value("b", "b");
+ key_cba.add_value("a", "a");
+ let key_cba = key_cba.finalize().unwrap();
+
+ assert_eq!(key_abc, key_bac);
+ assert_eq!(key_abc, key_cba);
+ }
+
+ #[test]
+ fn test_value_overriding() {
+ let mut key_a = VarianceBuilder::new();
+ key_a.add_value("a", "a");
+ let key_a = key_a.finalize().unwrap();
+
+ let mut key_b = VarianceBuilder::new();
+ key_b.add_value("a", "b");
+ key_b.add_value("a", "a");
+ let key_b = key_b.finalize().unwrap();
+
+ assert_eq!(key_a, key_b);
+ }
+}