diff options
author | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
commit | 8797329225018c4d0ab990166dd020338ae292dc (patch) | |
tree | 1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-cache/src | |
parent | 0bca116c1027a878469b72352e1e9e3916e85dde (diff) | |
download | pingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz pingora-8797329225018c4d0ab990166dd020338ae292dc.zip |
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]>
Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-cache/src')
-rw-r--r-- | pingora-cache/src/cache_control.rs | 839 | ||||
-rw-r--r-- | pingora-cache/src/eviction/lru.rs | 431 | ||||
-rw-r--r-- | pingora-cache/src/eviction/mod.rs | 89 | ||||
-rw-r--r-- | pingora-cache/src/eviction/simple_lru.rs | 445 | ||||
-rw-r--r-- | pingora-cache/src/filters.rs | 673 | ||||
-rw-r--r-- | pingora-cache/src/hashtable.rs | 112 | ||||
-rw-r--r-- | pingora-cache/src/key.rs | 302 | ||||
-rw-r--r-- | pingora-cache/src/lib.rs | 1093 | ||||
-rw-r--r-- | pingora-cache/src/lock.rs | 336 | ||||
-rw-r--r-- | pingora-cache/src/max_file_size.rs | 75 | ||||
-rw-r--r-- | pingora-cache/src/memory.rs | 510 | ||||
-rw-r--r-- | pingora-cache/src/meta.rs | 608 | ||||
-rw-r--r-- | pingora-cache/src/predictor.rs | 228 | ||||
-rw-r--r-- | pingora-cache/src/put.rs | 754 | ||||
-rw-r--r-- | pingora-cache/src/storage.rs | 122 | ||||
-rw-r--r-- | pingora-cache/src/trace.rs | 98 | ||||
-rw-r--r-- | pingora-cache/src/variance.rs | 120 |
17 files changed, 6835 insertions, 0 deletions
diff --git a/pingora-cache/src/cache_control.rs b/pingora-cache/src/cache_control.rs new file mode 100644 index 0000000..6686c3e --- /dev/null +++ b/pingora-cache/src/cache_control.rs @@ -0,0 +1,839 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Functions and utilities to help parse Cache-Control headers + +use super::*; + +use http::header::HeaderName; +use http::HeaderValue; +use indexmap::IndexMap; +use once_cell::sync::Lazy; +use pingora_error::{Error, ErrorType, Result}; +use pingora_http::ResponseHeader; +use regex::bytes::Regex; +use std::num::IntErrorKind; +use std::slice; +use std::str; + +/// The max delta-second per [RFC 7234](https://datatracker.ietf.org/doc/html/rfc7234#section-1.2.1) +// "If a cache receives a delta-seconds +// value greater than the greatest integer it can represent, or if any +// of its subsequent calculations overflows, the cache MUST consider the +// value to be either 2147483648 (2^31) or the greatest positive integer +// it can conveniently represent." +pub const DELTA_SECONDS_OVERFLOW_VALUE: u32 = 2147483648; + +/// Cache control directive key type +pub type DirectiveKey = String; + +/// Cache control directive value type +#[derive(Debug)] +pub struct DirectiveValue(pub Vec<u8>); + +impl AsRef<[u8]> for DirectiveValue { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl DirectiveValue { + /// A [DirectiveValue] without quotes (`"`). + pub fn parse_as_bytes(&self) -> &[u8] { + self.0 + .strip_prefix(&[b'"']) + .and_then(|bytes| bytes.strip_suffix(&[b'"'])) + .unwrap_or(&self.0[..]) + } + + /// A [DirectiveValue] without quotes (`"`) as `str`. + pub fn parse_as_str(&self) -> Result<&str> { + str::from_utf8(self.parse_as_bytes()).or_else(|e| { + Error::e_because(ErrorType::InternalError, "could not parse value as utf8", e) + }) + } + + /// Parse the [DirectiveValue] as delta seconds + /// + /// `"`s are ignored. The value is capped to [DELTA_SECONDS_OVERFLOW_VALUE]. + pub fn parse_as_delta_seconds(&self) -> Result<u32> { + match self.parse_as_str()?.parse::<u32>() { + Ok(value) => Ok(value), + Err(e) => { + // delta-seconds expect to handle positive overflow gracefully + if e.kind() == &IntErrorKind::PosOverflow { + Ok(DELTA_SECONDS_OVERFLOW_VALUE) + } else { + Error::e_because(ErrorType::InternalError, "could not parse value as u32", e) + } + } + } + } +} + +/// An ordered map to store cache control key value pairs. +pub type DirectiveMap = IndexMap<DirectiveKey, Option<DirectiveValue>>; + +/// Parsed Cache-Control directives +#[derive(Debug)] +pub struct CacheControl { + /// The parsed directives + pub directives: DirectiveMap, +} + +/// Cacheability calculated from cache control. +#[derive(Debug, PartialEq, Eq)] +pub enum Cacheable { + /// Cacheable + Yes, + /// Not cacheable + No, + /// No directive found for explicit cacheability + Default, +} + +/// An iter over all the cache control directives +pub struct ListValueIter<'a>(slice::Split<'a, u8, fn(&u8) -> bool>); + +impl<'a> ListValueIter<'a> { + pub fn from(value: &'a DirectiveValue) -> Self { + ListValueIter(value.parse_as_bytes().split(|byte| byte == &b',')) + } +} + +// https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.3 +// optional whitespace OWS = *(SP / HTAB); SP = 0x20, HTAB = 0x09 +fn trim_ows(bytes: &[u8]) -> &[u8] { + fn not_ows(b: &u8) -> bool { + b != &b'\x20' && b != &b'\x09' + } + // find first non-OWS char from front (head) and from end (tail) + let head = bytes.iter().position(not_ows).unwrap_or(0); + let tail = bytes + .iter() + .rposition(not_ows) + .map(|rpos| rpos + 1) + .unwrap_or(head); + &bytes[head..tail] +} + +impl<'a> Iterator for ListValueIter<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option<Self::Item> { + Some(trim_ows(self.0.next()?)) + } +} + +/* + Originally from https://github.com/hapijs/wreck: + Cache-Control = 1#cache-directive + cache-directive = token [ "=" ( token / quoted-string ) ] + token = [^\x00-\x20\(\)<>@\,;\:\\"\/\[\]\?\=\{\}\x7F]+ + quoted-string = "(?:[^"\\]|\\.)*" +*/ +static RE_CACHE_DIRECTIVE: Lazy<Regex> = + // unicode support disabled, allow ; or , delimiter | capture groups: 1: directive = 2: token OR quoted-string + Lazy::new(|| { + Regex::new(r#"(?-u)(?:^|(?:\s*[,;]\s*))([^\x00-\x20\(\)<>@,;:\\"/\[\]\?=\{\}\x7F]+)(?:=((?:[^\x00-\x20\(\)<>@,;:\\"/\[\]\?=\{\}\x7F]+|(?:"(?:[^"\\]|\\.)*"))))?"#).unwrap() + }); + +impl CacheControl { + // Our parsing strategy is more permissive than the RFC in a few ways: + // - Allows semicolons as delimiters (in addition to commas). + // - Allows octets outside of visible ASCII in tokens. + // - Doesn't require no-value for "boolean directives," such as must-revalidate + // - Allows quoted-string format for numeric values. + fn from_headers(headers: http::header::GetAll<HeaderValue>) -> Option<Self> { + let mut directives = IndexMap::new(); + // should iterate in header line insertion order + for line in headers { + for captures in RE_CACHE_DIRECTIVE.captures_iter(line.as_bytes()) { + // directive key + // header values don't have to be utf-8, but we store keys as strings for case-insensitive hashing + let key = captures.get(1).and_then(|cap| { + str::from_utf8(cap.as_bytes()) + .ok() + .map(|token| token.to_lowercase()) + }); + if key.is_none() { + continue; + } + // directive value + // match token or quoted-string + let value = captures + .get(2) + .map(|cap| DirectiveValue(cap.as_bytes().to_vec())); + directives.insert(key.unwrap(), value); + } + } + Some(CacheControl { directives }) + } + + /// Parse from the given header name in `headers` + pub fn from_headers_named(header_name: &str, headers: &http::HeaderMap) -> Option<Self> { + if !headers.contains_key(header_name) { + return None; + } + + Self::from_headers(headers.get_all(header_name)) + } + + /// Parse from the given header name in the [ReqHeader] + pub fn from_req_headers_named(header_name: &str, req_header: &ReqHeader) -> Option<Self> { + Self::from_headers_named(header_name, &req_header.headers) + } + + /// Parse `Cache-Control` header name from the [ReqHeader] + pub fn from_req_headers(req_header: &ReqHeader) -> Option<Self> { + Self::from_req_headers_named("cache-control", req_header) + } + + /// Parse from the given header name in the [RespHeader] + pub fn from_resp_headers_named(header_name: &str, resp_header: &RespHeader) -> Option<Self> { + Self::from_headers_named(header_name, &resp_header.headers) + } + + /// Parse `Cache-Control` header name from the [RespHeader] + pub fn from_resp_headers(resp_header: &RespHeader) -> Option<Self> { + Self::from_resp_headers_named("cache-control", resp_header) + } + + /// Whether the given directive is in the cache control. + pub fn has_key(&self, key: &str) -> bool { + self.directives.contains_key(key) + } + + /// Whether the `public` directive is in the cache control. + pub fn public(&self) -> bool { + self.has_key("public") + } + + /// Whether the given directive exists and it has no value. + fn has_key_without_value(&self, key: &str) -> bool { + matches!(self.directives.get(key), Some(None)) + } + + /// Whether the standalone `private` exists in the cache control + // RFC 7234: using the #field-name versions of `private` + // means a shared cache "MUST NOT store the specified field-name(s), + // whereas it MAY store the remainder of the response." + // It must be a boolean form (no value) to apply to the whole response. + // https://datatracker.ietf.org/doc/html/rfc7234#section-5.2.2.6 + pub fn private(&self) -> bool { + self.has_key_without_value("private") + } + + fn get_field_names(&self, key: &str) -> Option<ListValueIter> { + if let Some(Some(value)) = self.directives.get(key) { + Some(ListValueIter::from(value)) + } else { + None + } + } + + /// Get the values of `private=` + pub fn private_field_names(&self) -> Option<ListValueIter> { + self.get_field_names("private") + } + + /// Whether the standalone `no-cache` exists in the cache control + pub fn no_cache(&self) -> bool { + self.has_key_without_value("no-cache") + } + + /// Get the values of `no-cache=` + pub fn no_cache_field_names(&self) -> Option<ListValueIter> { + self.get_field_names("no-cache") + } + + /// Whether `no-store` exists. + pub fn no_store(&self) -> bool { + self.has_key("no-store") + } + + fn parse_delta_seconds(&self, key: &str) -> Result<Option<u32>> { + if let Some(Some(dir_value)) = self.directives.get(key) { + Ok(Some(dir_value.parse_as_delta_seconds()?)) + } else { + Ok(None) + } + } + + /// Return the `max-age` seconds + pub fn max_age(&self) -> Result<Option<u32>> { + self.parse_delta_seconds("max-age") + } + + /// Return the `s-maxage` seconds + pub fn s_maxage(&self) -> Result<Option<u32>> { + self.parse_delta_seconds("s-maxage") + } + + /// Return the `stale-while-revalidate` seconds + pub fn stale_while_revalidate(&self) -> Result<Option<u32>> { + self.parse_delta_seconds("stale-while-revalidate") + } + + /// Return the `stale-if-error` seconds + pub fn stale_if_error(&self) -> Result<Option<u32>> { + self.parse_delta_seconds("stale-if-error") + } + + /// Whether `must-revalidate` exists. + pub fn must_revalidate(&self) -> bool { + self.has_key("must-revalidate") + } + + /// Whether `proxy-revalidate` exists. + pub fn proxy_revalidate(&self) -> bool { + self.has_key("proxy-revalidate") + } + + /// Whether `only-if-cached` exists. + pub fn only_if_cached(&self) -> bool { + self.has_key("only-if-cached") + } +} + +impl InterpretCacheControl for CacheControl { + fn is_cacheable(&self) -> Cacheable { + if self.no_store() || self.private() { + return Cacheable::No; + } + if self.has_key("s-maxage") || self.has_key("max-age") || self.public() { + return Cacheable::Yes; + } + Cacheable::Default + } + + fn allow_caching_authorized_req(&self) -> bool { + // RFC 7234 https://datatracker.ietf.org/doc/html/rfc7234#section-3 + // "MUST NOT" store requests with Authorization header + // unless response contains one of these directives + self.must_revalidate() || self.public() || self.has_key("s-maxage") + } + + fn fresh_sec(&self) -> Option<u32> { + if self.no_cache() { + // always treated as stale + return Some(0); + } + match self.s_maxage() { + Ok(Some(seconds)) => Some(seconds), + // s-maxage not present + Ok(None) => match self.max_age() { + Ok(Some(seconds)) => Some(seconds), + _ => None, + }, + _ => None, + } + } + + fn serve_stale_while_revalidate_sec(&self) -> Option<u32> { + // RFC 7234: these directives forbid serving stale. + // https://datatracker.ietf.org/doc/html/rfc7234#section-4.2.4 + if self.must_revalidate() || self.proxy_revalidate() || self.has_key("s-maxage") { + return Some(0); + } + self.stale_while_revalidate().unwrap_or(None) + } + + fn serve_stale_if_error_sec(&self) -> Option<u32> { + if self.must_revalidate() || self.proxy_revalidate() || self.has_key("s-maxage") { + return Some(0); + } + self.stale_if_error().unwrap_or(None) + } + + // Strip header names listed in `private` or `no-cache` directives from a response. + fn strip_private_headers(&self, resp_header: &mut ResponseHeader) { + fn strip_listed_headers(resp: &mut ResponseHeader, field_names: ListValueIter) { + for name in field_names { + if let Ok(header) = HeaderName::from_bytes(name) { + resp.remove_header(&header); + } + } + } + + if let Some(headers) = self.private_field_names() { + strip_listed_headers(resp_header, headers); + } + // We interpret `no-cache` the same way as `private`, + // though technically it has a less restrictive requirement + // ("MUST NOT be sent in the response to a subsequent request + // without successful revalidation with the origin server"). + // https://datatracker.ietf.org/doc/html/rfc7234#section-5.2.2.2 + if let Some(headers) = self.no_cache_field_names() { + strip_listed_headers(resp_header, headers); + } + } +} + +/// `InterpretCacheControl` provides a meaningful interface to the parsed `CacheControl`. +/// These functions actually interpret the parsed cache-control directives to return +/// the freshness or other cache meta values that cache-control is signaling. +/// +/// By default `CacheControl` implements an RFC-7234 compliant reading that assumes it is being +/// used with a shared (proxy) cache. +pub trait InterpretCacheControl { + /// Does cache-control specify this response is cacheable? + /// + /// Note that an RFC-7234 compliant cacheability check must also + /// check if the request contained the Authorization header and + /// `allow_caching_authorized_req`. + fn is_cacheable(&self) -> Cacheable; + + /// Does this cache-control allow caching a response to + /// a request with the Authorization header? + fn allow_caching_authorized_req(&self) -> bool; + + /// Returns freshness ttl specified in cache-control + /// + /// - `Some(_)` indicates cache-control specifies a valid ttl. Some(0) = always stale. + /// - `None` means cache-control did not specify a valid ttl. + fn fresh_sec(&self) -> Option<u32>; + + /// Returns stale-while-revalidate ttl, + /// + /// The result should consider all the relevant cache directives, not just SWR header itself. + /// + /// Some(0) means serving such stale is disallowed by directive like `must-revalidate` + /// or `stale-while-revalidater=0`. + /// + /// `None` indicates no SWR ttl was specified. + fn serve_stale_while_revalidate_sec(&self) -> Option<u32>; + + /// Returns stale-if-error ttl, + /// + /// The result should consider all the relevant cache directives, not just SIE header itself. + /// + /// Some(0) means serving such stale is disallowed by directive like `must-revalidate` + /// or `stale-if-error=0`. + /// + /// `None` indicates no SIE ttl was specified. + fn serve_stale_if_error_sec(&self) -> Option<u32>; + + /// Strip header names listed in `private` or `no-cache` directives from a response, + /// usually prior to storing that response in cache. + fn strip_private_headers(&self, resp_header: &mut ResponseHeader); +} + +#[cfg(test)] +mod tests { + use super::*; + use http::header::CACHE_CONTROL; + use http::HeaderValue; + use http::{request, response}; + + fn build_response(cc_key: HeaderName, cc_value: &str) -> response::Parts { + let (parts, _) = response::Builder::new() + .header(cc_key, cc_value) + .body(()) + .unwrap() + .into_parts(); + parts + } + + #[test] + fn test_simple_cache_control() { + let resp = build_response(CACHE_CONTROL, "public, max-age=10000"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.public()); + assert_eq!(cc.max_age().unwrap().unwrap(), 10000); + } + + #[test] + fn test_private_cache_control() { + let resp = build_response(CACHE_CONTROL, "private"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + + assert!(cc.private()); + assert!(cc.max_age().unwrap().is_none()); + } + + #[test] + fn test_directives_across_header_lines() { + let (parts, _) = response::Builder::new() + .header(CACHE_CONTROL, "public,") + .header("cache-Control", "max-age=10000") + .body(()) + .unwrap() + .into_parts(); + let cc = CacheControl::from_resp_headers(&parts).unwrap(); + + assert!(cc.public()); + assert_eq!(cc.max_age().unwrap().unwrap(), 10000); + } + + #[test] + fn test_recognizes_semicolons_as_delimiters() { + let resp = build_response(CACHE_CONTROL, "public; max-age=0"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + + assert!(cc.public()); + assert_eq!(cc.max_age().unwrap().unwrap(), 0); + } + + #[test] + fn test_unknown_directives() { + let resp = build_response(CACHE_CONTROL, "public,random1=random2, rand3=\"\""); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + let mut directive_iter = cc.directives.iter(); + + let first = directive_iter.next().unwrap(); + assert_eq!(first.0, &"public"); + assert!(first.1.is_none()); + + let second = directive_iter.next().unwrap(); + assert_eq!(second.0, &"random1"); + assert_eq!(second.1.as_ref().unwrap().0, "random2".as_bytes()); + + let third = directive_iter.next().unwrap(); + assert_eq!(third.0, &"rand3"); + assert_eq!(third.1.as_ref().unwrap().0, "\"\"".as_bytes()); + + assert!(directive_iter.next().is_none()); + } + + #[test] + fn test_case_insensitive_directive_keys() { + let resp = build_response( + CACHE_CONTROL, + "Public=\"something\", mAx-AGe=\"10000\", foo=cRaZyCaSe, bAr=\"inQuotes\"", + ); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + + assert!(cc.public()); + assert_eq!(cc.max_age().unwrap().unwrap(), 10000); + + let mut directive_iter = cc.directives.iter(); + let first = directive_iter.next().unwrap(); + assert_eq!(first.0, &"public"); + assert_eq!(first.1.as_ref().unwrap().0, "\"something\"".as_bytes()); + + let second = directive_iter.next().unwrap(); + assert_eq!(second.0, &"max-age"); + assert_eq!(second.1.as_ref().unwrap().0, "\"10000\"".as_bytes()); + + // values are still stored with casing + let third = directive_iter.next().unwrap(); + assert_eq!(third.0, &"foo"); + assert_eq!(third.1.as_ref().unwrap().0, "cRaZyCaSe".as_bytes()); + + let fourth = directive_iter.next().unwrap(); + assert_eq!(fourth.0, &"bar"); + assert_eq!(fourth.1.as_ref().unwrap().0, "\"inQuotes\"".as_bytes()); + + assert!(directive_iter.next().is_none()); + } + + #[test] + fn test_non_ascii() { + let resp = build_response(CACHE_CONTROL, "püblic=💖, max-age=\"💯\""); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + + // Not considered valid registered directive keys / values + assert!(!cc.public()); + assert_eq!( + cc.max_age().unwrap_err().context.unwrap().to_string(), + "could not parse value as u32" + ); + + let mut directive_iter = cc.directives.iter(); + let first = directive_iter.next().unwrap(); + assert_eq!(first.0, &"püblic"); + assert_eq!(first.1.as_ref().unwrap().0, "💖".as_bytes()); + + let second = directive_iter.next().unwrap(); + assert_eq!(second.0, &"max-age"); + assert_eq!(second.1.as_ref().unwrap().0, "\"💯\"".as_bytes()); + + assert!(directive_iter.next().is_none()); + } + + #[test] + fn test_non_utf8_key() { + let mut resp = response::Builder::new().body(()).unwrap(); + resp.headers_mut().insert( + CACHE_CONTROL, + HeaderValue::from_bytes(b"bar\xFF=\"baz\", a=b").unwrap(), + ); + let (parts, _) = resp.into_parts(); + let cc = CacheControl::from_resp_headers(&parts).unwrap(); + + // invalid bytes for key + let mut directive_iter = cc.directives.iter(); + let first = directive_iter.next().unwrap(); + assert_eq!(first.0, &"a"); + assert_eq!(first.1.as_ref().unwrap().0, "b".as_bytes()); + + assert!(directive_iter.next().is_none()); + } + + #[test] + fn test_non_utf8_value() { + // RFC 7230: 0xFF is part of obs-text and is officially considered a valid octet in quoted-strings + let mut resp = response::Builder::new().body(()).unwrap(); + resp.headers_mut().insert( + CACHE_CONTROL, + HeaderValue::from_bytes(b"max-age=ba\xFFr, bar=\"baz\xFF\", a=b").unwrap(), + ); + let (parts, _) = resp.into_parts(); + let cc = CacheControl::from_resp_headers(&parts).unwrap(); + + assert_eq!( + cc.max_age().unwrap_err().context.unwrap().to_string(), + "could not parse value as utf8" + ); + + let mut directive_iter = cc.directives.iter(); + + let first = directive_iter.next().unwrap(); + assert_eq!(first.0, &"max-age"); + assert_eq!(first.1.as_ref().unwrap().0, b"ba\xFFr"); + + let second = directive_iter.next().unwrap(); + assert_eq!(second.0, &"bar"); + assert_eq!(second.1.as_ref().unwrap().0, b"\"baz\xFF\""); + + let third = directive_iter.next().unwrap(); + assert_eq!(third.0, &"a"); + assert_eq!(third.1.as_ref().unwrap().0, "b".as_bytes()); + + assert!(directive_iter.next().is_none()); + } + + #[test] + fn test_age_overflow() { + let resp = build_response( + CACHE_CONTROL, + "max-age=-99999999999999999999999999, s-maxage=99999999999999999999999999", + ); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + + assert_eq!( + cc.s_maxage().unwrap().unwrap(), + DELTA_SECONDS_OVERFLOW_VALUE + ); + // negative ages still result in errors even with overflow handling + assert_eq!( + cc.max_age().unwrap_err().context.unwrap().to_string(), + "could not parse value as u32" + ); + } + + #[test] + fn test_fresh_sec() { + let resp = build_response(CACHE_CONTROL, ""); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.fresh_sec().is_none()); + + let resp = build_response(CACHE_CONTROL, "max-age=12345"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.fresh_sec().unwrap(), 12345); + + let resp = build_response(CACHE_CONTROL, "max-age=99999,s-maxage=123"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + // prefer s-maxage over max-age + assert_eq!(cc.fresh_sec().unwrap(), 123); + } + + #[test] + fn test_cacheability() { + let resp = build_response(CACHE_CONTROL, ""); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::Default); + + // uncacheable + let resp = build_response(CACHE_CONTROL, "private, max-age=12345"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::No); + + let resp = build_response(CACHE_CONTROL, "no-store, max-age=12345"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::No); + + // cacheable + let resp = build_response(CACHE_CONTROL, "public"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::Yes); + + let resp = build_response(CACHE_CONTROL, "max-age=0"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::Yes); + } + + #[test] + fn test_no_cache() { + let resp = build_response(CACHE_CONTROL, "no-cache, max-age=12345"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.is_cacheable(), Cacheable::Yes); + assert_eq!(cc.fresh_sec().unwrap(), 0); + } + + #[test] + fn test_no_cache_field_names() { + let resp = build_response(CACHE_CONTROL, "no-cache=\"set-cookie\", max-age=12345"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(!cc.private()); + assert_eq!(cc.is_cacheable(), Cacheable::Yes); + assert_eq!(cc.fresh_sec().unwrap(), 12345); + let mut field_names = cc.no_cache_field_names().unwrap(); + assert_eq!( + str::from_utf8(field_names.next().unwrap()).unwrap(), + "set-cookie" + ); + assert!(field_names.next().is_none()); + + let mut resp = response::Builder::new().body(()).unwrap(); + resp.headers_mut().insert( + CACHE_CONTROL, + HeaderValue::from_bytes( + b"private=\"\", no-cache=\"a\xFF, set-cookie, Baz\x09 , c,d ,, \"", + ) + .unwrap(), + ); + let (parts, _) = resp.into_parts(); + let cc = CacheControl::from_resp_headers(&parts).unwrap(); + let mut field_names = cc.private_field_names().unwrap(); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), ""); + assert!(field_names.next().is_none()); + let mut field_names = cc.no_cache_field_names().unwrap(); + assert!(str::from_utf8(field_names.next().unwrap()).is_err()); + assert_eq!( + str::from_utf8(field_names.next().unwrap()).unwrap(), + "set-cookie" + ); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "Baz"); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "c"); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), "d"); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), ""); + assert_eq!(str::from_utf8(field_names.next().unwrap()).unwrap(), ""); + assert!(field_names.next().is_none()); + } + + #[test] + fn test_strip_private_headers() { + let mut resp = ResponseHeader::build(200, None).unwrap(); + resp.append_header( + CACHE_CONTROL, + "no-cache=\"x-private-header\", max-age=12345", + ) + .unwrap(); + resp.append_header("X-Private-Header", "dropped").unwrap(); + + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + cc.strip_private_headers(&mut resp); + assert!(!resp.headers.contains_key("X-Private-Header")); + } + + #[test] + fn test_stale_while_revalidate() { + let resp = build_response(CACHE_CONTROL, "max-age=12345, stale-while-revalidate=5"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 5); + assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 5); + assert!(cc.serve_stale_if_error_sec().is_none()); + } + + #[test] + fn test_stale_if_error() { + let resp = build_response(CACHE_CONTROL, "max-age=12345, stale-if-error=3600"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.stale_if_error().unwrap().unwrap(), 3600); + assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 3600); + assert!(cc.serve_stale_while_revalidate_sec().is_none()); + } + + #[test] + fn test_must_revalidate() { + let resp = build_response( + CACHE_CONTROL, + "max-age=12345, stale-while-revalidate=60, stale-if-error=30, must-revalidate", + ); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.must_revalidate()); + assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60); + assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30); + assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0); + assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0); + } + + #[test] + fn test_proxy_revalidate() { + let resp = build_response( + CACHE_CONTROL, + "max-age=12345, stale-while-revalidate=60, stale-if-error=30, proxy-revalidate", + ); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.proxy_revalidate()); + assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60); + assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30); + assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0); + assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0); + } + + #[test] + fn test_s_maxage_stale() { + let resp = build_response( + CACHE_CONTROL, + "s-maxage=0, stale-while-revalidate=60, stale-if-error=30", + ); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert_eq!(cc.stale_while_revalidate().unwrap().unwrap(), 60); + assert_eq!(cc.stale_if_error().unwrap().unwrap(), 30); + assert_eq!(cc.serve_stale_while_revalidate_sec().unwrap(), 0); + assert_eq!(cc.serve_stale_if_error_sec().unwrap(), 0); + } + + #[test] + fn test_authorized_request() { + let resp = build_response(CACHE_CONTROL, "max-age=10"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(!cc.allow_caching_authorized_req()); + + let resp = build_response(CACHE_CONTROL, "s-maxage=10"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.allow_caching_authorized_req()); + + let resp = build_response(CACHE_CONTROL, "public"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.allow_caching_authorized_req()); + + let resp = build_response(CACHE_CONTROL, "must-revalidate, max-age=0"); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(cc.allow_caching_authorized_req()); + + let resp = build_response(CACHE_CONTROL, ""); + let cc = CacheControl::from_resp_headers(&resp).unwrap(); + assert!(!cc.allow_caching_authorized_req()); + } + + fn build_request(cc_key: HeaderName, cc_value: &str) -> request::Parts { + let (parts, _) = request::Builder::new() + .header(cc_key, cc_value) + .body(()) + .unwrap() + .into_parts(); + parts + } + + #[test] + fn test_request_only_if_cached() { + let req = build_request(CACHE_CONTROL, "only-if-cached=1"); + let cc = CacheControl::from_req_headers(&req).unwrap(); + assert!(cc.only_if_cached()) + } +} diff --git a/pingora-cache/src/eviction/lru.rs b/pingora-cache/src/eviction/lru.rs new file mode 100644 index 0000000..47c88a6 --- /dev/null +++ b/pingora-cache/src/eviction/lru.rs @@ -0,0 +1,431 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A shared LRU cache manager + +use super::EvictionManager; +use crate::key::CompactCacheKey; + +use async_trait::async_trait; +use pingora_error::{BError, ErrorType::*, OrErr, Result}; +use pingora_lru::Lru; +use serde::de::SeqAccess; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use std::hash::{Hash, Hasher}; +use std::io::prelude::*; +use std::path::Path; +use std::time::SystemTime; + +/// A shared LRU cache manager designed to manage a large volume of assets. +/// +/// - Space optimized in-memory LRU (see [pingora_lru]). +/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs. +/// This allows [EvictionManager::save()] not to lock the entire cache mananger while performing +/// serialization. +pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>); + +#[derive(Debug, Serialize, Deserialize)] +struct SerdeHelperNode(CompactCacheKey, usize); + +impl<const N: usize> Manager<N> { + /// Create a [Manager] with the given size limit and estimated per shard capacity. + /// + /// The `capacity` is for preallocating to avoid reallocation cost when the LRU grows. + pub fn with_capacity(limit: usize, capacity: usize) -> Self { + Manager(Lru::with_capacity(limit, capacity)) + } + + /// Serialize the given shard + pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> { + use rmp_serde::encode::Serializer; + use serde::ser::SerializeSeq; + use serde::ser::Serializer as _; + + assert!(shard < N); + + // NOTE: This could use a lot memory to buffer the serialized data in memory + // NOTE: This for loop could lock the LRU for too long + let mut nodes = Vec::with_capacity(self.0.shard_len(shard)); + self.0.iter_for_each(shard, |(node, size)| { + nodes.push(SerdeHelperNode(node.clone(), size)); + }); + let mut ser = Serializer::new(vec![]); + let mut seq = ser + .serialize_seq(Some(self.0.shard_len(shard))) + .or_err(InternalError, "fail to serialize node")?; + for node in nodes { + seq.serialize_element(&node).unwrap(); // write to vec, safe + } + + seq.end().or_err(InternalError, "when serializing LRU")?; + Ok(ser.into_inner()) + } + + /// Deserialize a shard + /// + /// Shard number is not needed because the key itself will hash to the correct shard. + pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> { + use rmp_serde::decode::Deserializer; + use serde::de::Deserializer as _; + + let mut de = Deserializer::new(buf); + let visitor = InsertToManager { lru: self }; + de.deserialize_seq(visitor) + .or_err(InternalError, "when deserializing LRU")?; + Ok(()) + } +} + +struct InsertToManager<'a, const N: usize> { + lru: &'a Manager<N>, +} + +impl<'de, 'a, const N: usize> serde::de::Visitor<'de> for InsertToManager<'a, N> { + type Value = (); + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("array of lru nodes") + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: SeqAccess<'de>, + { + while let Some(node) = seq.next_element::<SerdeHelperNode>()? { + let key = u64key(&node.0); + self.lru.0.insert_tail(key, node.0, node.1); // insert in the back + } + Ok(()) + } +} + +#[inline] +fn u64key(key: &CompactCacheKey) -> u64 { + // note that std hash is not uniform, I'm not sure if ahash is also the case + let mut hasher = ahash::AHasher::default(); + key.hash(&mut hasher); + hasher.finish() +} + +const FILE_NAME: &str = "lru.data"; + +#[inline] +fn err_str_path(s: &str, path: &Path) -> String { + format!("{s} {}", path.display()) +} + +#[async_trait] +impl<const N: usize> EvictionManager for Manager<N> { + fn total_size(&self) -> usize { + self.0.weight() + } + fn total_items(&self) -> usize { + self.0.len() + } + fn evicted_size(&self) -> usize { + self.0.evicted_weight() + } + fn evicted_items(&self) -> usize { + self.0.evicted_len() + } + + fn admit( + &self, + item: CompactCacheKey, + size: usize, + _fresh_until: SystemTime, + ) -> Vec<CompactCacheKey> { + let key = u64key(&item); + self.0.admit(key, item, size); + self.0 + .evict_to_limit() + .into_iter() + .map(|(key, _weight)| key) + .collect() + } + + fn remove(&self, item: &CompactCacheKey) { + let key = u64key(item); + self.0.remove(key); + } + + fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool { + let key = u64key(item); + if !self.0.promote(key) { + self.0.admit(key, item.clone(), size); + false + } else { + true + } + } + + fn peek(&self, item: &CompactCacheKey) -> bool { + let key = u64key(item); + self.0.peek(key) + } + + async fn save(&self, dir_path: &str) -> Result<()> { + let dir_path_str = dir_path.to_owned(); + + tokio::task::spawn_blocking(move || { + let dir_path = Path::new(&dir_path_str); + std::fs::create_dir_all(dir_path) + .or_err_with(InternalError, || err_str_path("fail to create", dir_path)) + }) + .await + .or_err(InternalError, "async blocking IO failure")??; + + for i in 0..N { + let data = self.serialize_shard(i)?; + let dir_path = dir_path.to_owned(); + tokio::task::spawn_blocking(move || { + let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME)); + let mut file = File::create(&file_path) + .or_err_with(InternalError, || err_str_path("fail to create", &file_path))?; + file.write_all(&data).or_err_with(InternalError, || { + err_str_path("fail to write to", &file_path) + }) + }) + .await + .or_err(InternalError, "async blocking IO failure")??; + } + Ok(()) + } + + async fn load(&self, dir_path: &str) -> Result<()> { + // TODO: check the saved shards so that we load all the save files + for i in 0..N { + let dir_path = dir_path.to_owned(); + + let data = tokio::task::spawn_blocking(move || { + let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME)); + let mut file = File::open(&file_path) + .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?; + let mut buffer = Vec::with_capacity(8192); + file.read_to_end(&mut buffer) + .or_err_with(InternalError, || { + err_str_path("fail to write to", &file_path) + })?; + Ok::<Vec<u8>, BError>(buffer) + }) + .await + .or_err(InternalError, "async blocking IO failure")??; + self.deserialize_shard(&data)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::CacheKey; + use EvictionManager; + + // we use shard (N) = 1 for eviction consistency in all tests + + #[test] + fn test_admission() { + let lru = Manager::<1>::with_capacity(4, 10); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru si full (4) now + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3 + assert_eq!(v.len(), 2); + assert_eq!(v[0], key1); + assert_eq!(v[1], key2); + } + + #[test] + fn test_access() { + let lru = Manager::<1>::with_capacity(4, 10); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // make key1 most recently used + lru.access(&key1, 1, until); + assert_eq!(v.len(), 0); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[test] + fn test_remove() { + let lru = Manager::<1>::with_capacity(4, 10); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // remove key1 + lru.remove(&key1); + + // key2 is the least recently used one now + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[test] + fn test_access_add() { + let lru = Manager::<1>::with_capacity(4, 10); + let until = SystemTime::now(); // unused value as a placeholder + + let key1 = CacheKey::new("", "a", "1").to_compact(); + lru.access(&key1, 1, until); + let key2 = CacheKey::new("", "b", "1").to_compact(); + lru.access(&key2, 2, until); + let key3 = CacheKey::new("", "c", "1").to_compact(); + lru.access(&key3, 2, until); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3 + assert_eq!(v.len(), 2); + assert_eq!(v[0], key1); + assert_eq!(v[1], key2); + } + + #[test] + fn test_admit_update() { + let lru = Manager::<1>::with_capacity(4, 10); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // update key2 to reduce its size by 1 + let v = lru.admit(key2, 1, until); + assert_eq!(v.len(), 0); + + // lru is not full anymore + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4.clone(), 1, until); + assert_eq!(v.len(), 0); + + // make key4 larger + let v = lru.admit(key4, 2, until); + // need to evict now + assert_eq!(v.len(), 1); + assert_eq!(v[0], key1); + } + + #[test] + fn test_peek() { + let lru = Manager::<1>::with_capacity(4, 10); + let until = SystemTime::now(); // unused value as a placeholder + + let key1 = CacheKey::new("", "a", "1").to_compact(); + lru.access(&key1, 1, until); + let key2 = CacheKey::new("", "b", "1").to_compact(); + lru.access(&key2, 2, until); + assert!(lru.peek(&key1)); + assert!(lru.peek(&key2)); + } + + #[test] + fn test_serde() { + let lru = Manager::<1>::with_capacity(4, 10); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // make key1 most recently used + lru.access(&key1, 1, until); + assert_eq!(v.len(), 0); + + // load lru2 with lru's data + let ser = lru.serialize_shard(0).unwrap(); + let lru2 = Manager::<1>::with_capacity(4, 10); + lru2.deserialize_shard(&ser).unwrap(); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru2.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[tokio::test] + async fn test_save_to_disk() { + let until = SystemTime::now(); // unused value as a placeholder + let lru = Manager::<2>::with_capacity(10, 10); + + lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until); + lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until); + lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until); + lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until); + lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until); + lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until); + + // load lru2 with lru's data + lru.save("/tmp/test_lru_save").await.unwrap(); + let lru2 = Manager::<2>::with_capacity(4, 10); + lru2.load("/tmp/test_lru_save").await.unwrap(); + + let ser0 = lru.serialize_shard(0).unwrap(); + let ser1 = lru.serialize_shard(1).unwrap(); + + assert_eq!(ser0, lru2.serialize_shard(0).unwrap()); + assert_eq!(ser1, lru2.serialize_shard(1).unwrap()); + } +} diff --git a/pingora-cache/src/eviction/mod.rs b/pingora-cache/src/eviction/mod.rs new file mode 100644 index 0000000..7c9d60b --- /dev/null +++ b/pingora-cache/src/eviction/mod.rs @@ -0,0 +1,89 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache eviction module + +use crate::key::CompactCacheKey; + +use async_trait::async_trait; +use pingora_error::Result; +use std::time::SystemTime; + +pub mod lru; +pub mod simple_lru; + +/// The trait that a cache eviction algorithm needs to implement +/// +/// NOTE: these trait methods require &self not &mut self, which means concurrency should +/// be handled the implementations internally. +#[async_trait] +pub trait EvictionManager { + /// Total size of the cache in bytes tracked by this eviction mananger + fn total_size(&self) -> usize; + /// Number of assets tracked by this eviction mananger + fn total_items(&self) -> usize; + /// Number of bytes that are already evicted + /// + /// The accumulated number is returned to play well with Prometheus counter metric type. + fn evicted_size(&self) -> usize; + /// Number of assets that are already evicted + /// + /// The accumulated number is returned to play well with Prometheus counter metric type. + fn evicted_items(&self) -> usize; + + /// Admit an item + /// + /// Return one or more items to evict. The sizes of these items are deducted + /// from the total size already. The caller needs to make sure that these assets are actually + /// removed from the storage. + /// + /// If the item is already admitted, A. update its freshness; B. if the new size is larger than the + /// existing one, Some(_) might be returned for the caller to evict. + fn admit( + &self, + item: CompactCacheKey, + size: usize, + fresh_until: SystemTime, + ) -> Vec<CompactCacheKey>; + + /// Remove an item from the eviction manager. + /// + /// The size of the item will be deducted. + fn remove(&self, item: &CompactCacheKey); + + /// Access an item that should already be in cache. + /// + /// If the item is not tracked by this [EvictionManager], track it but no eviction will happen. + /// + /// The call used for asking the eviction manager to track the assets that are already admitted + /// in the cache storage system. + fn access(&self, item: &CompactCacheKey, size: usize, fresh_until: SystemTime) -> bool; + + /// Peek into the manager to see if the item is already tracked by the system + /// + /// This function should have no side-effect on the asset itself. For example, for LRU, this + /// method shouldn't change the popularity of the asset being peeked. + fn peek(&self, item: &CompactCacheKey) -> bool; + + /// Serialize to save the state of this eviction mananger to disk + /// + /// This function is for preserving the eviction manager's state across server restarts. + /// + /// `dir_path` define the directory on disk that the data should use. + // dir_path is &str no AsRef<Path> so that trait objects can be used + async fn save(&self, dir_path: &str) -> Result<()>; + + /// The counterpart of [Self::save()]. + async fn load(&self, dir_path: &str) -> Result<()>; +} diff --git a/pingora-cache/src/eviction/simple_lru.rs b/pingora-cache/src/eviction/simple_lru.rs new file mode 100644 index 0000000..73efb85 --- /dev/null +++ b/pingora-cache/src/eviction/simple_lru.rs @@ -0,0 +1,445 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A simple LRU cache manager built on top of the `lru` crate + +use super::EvictionManager; +use crate::key::CompactCacheKey; + +use async_trait::async_trait; +use lru::LruCache; +use parking_lot::RwLock; +use pingora_error::{BError, ErrorType::*, OrErr, Result}; +use serde::de::SeqAccess; +use serde::{Deserialize, Serialize}; +use std::collections::hash_map::DefaultHasher; +use std::fs::File; +use std::hash::{Hash, Hasher}; +use std::io::prelude::*; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::SystemTime; + +#[derive(Debug, Deserialize, Serialize)] +struct Node { + key: CompactCacheKey, + size: usize, +} + +/// A simple LRU eviction manager +/// +/// The implementation is not optimized. All operation require global locks. +pub struct Manager { + lru: RwLock<LruCache<u64, Node>>, + limit: usize, + used: AtomicUsize, + items: AtomicUsize, + evicted_size: AtomicUsize, + evicted_items: AtomicUsize, +} + +impl Manager { + /// Create a new [Manager] with the given total size limit `limit`. + pub fn new(limit: usize) -> Self { + Manager { + lru: RwLock::new(LruCache::unbounded()), + limit, + used: AtomicUsize::new(0), + items: AtomicUsize::new(0), + evicted_size: AtomicUsize::new(0), + evicted_items: AtomicUsize::new(0), + } + } + + fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) { + use std::cmp::Ordering::*; + let node = Node { key: node, size }; + let old = { + let mut lru = self.lru.write(); + let old = lru.push(hash_key, node); + if reverse && old.is_none() { + lru.demote(&hash_key); + } + old + }; + if let Some(old) = old { + // replacing a node, just need to update used size + match size.cmp(&old.1.size) { + Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed), + Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed), + Equal => 0, // same size, update nothing, use 0 to match other arms' type + }; + } else { + self.used.fetch_add(size, Ordering::Relaxed); + self.items.fetch_add(1, Ordering::Relaxed); + } + } + + // evict items until the used capacity is below limit + fn evict(&self) -> Vec<CompactCacheKey> { + if self.used.load(Ordering::Relaxed) <= self.limit { + return vec![]; + } + let mut to_evict = Vec::with_capacity(1); // we will at least pop 1 item + while self.used.load(Ordering::Relaxed) > self.limit { + if let Some((_, node)) = self.lru.write().pop_lru() { + self.used.fetch_sub(node.size, Ordering::Relaxed); + self.items.fetch_sub(1, Ordering::Relaxed); + self.evicted_size.fetch_add(node.size, Ordering::Relaxed); + self.evicted_items.fetch_add(1, Ordering::Relaxed); + to_evict.push(node.key); + } else { + // lru empty + return to_evict; + } + } + to_evict + } + + // This could use a lot memory to buffer the serialized data in memory and could lock the LRU + // for too long + fn serialize(&self) -> Result<Vec<u8>> { + use rmp_serde::encode::Serializer; + use serde::ser::SerializeSeq; + use serde::ser::Serializer as _; + // NOTE: This could use a lot memory to buffer the serialized data in memory + let mut ser = Serializer::new(vec![]); + // NOTE: This long for loop could lock the LRU for too long + let lru = self.lru.read(); + let mut seq = ser + .serialize_seq(Some(lru.len())) + .or_err(InternalError, "fail to serialize node")?; + for item in lru.iter() { + seq.serialize_element(item.1).unwrap(); // write to vec, safe + } + seq.end().or_err(InternalError, "when serializing LRU")?; + Ok(ser.into_inner()) + } + + fn deserialize(&self, buf: &[u8]) -> Result<()> { + use rmp_serde::decode::Deserializer; + use serde::de::Deserializer as _; + let mut de = Deserializer::new(buf); + let visitor = InsertToManager { lru: self }; + de.deserialize_seq(visitor) + .or_err(InternalError, "when deserializing LRU")?; + Ok(()) + } +} + +struct InsertToManager<'a> { + lru: &'a Manager, +} + +impl<'de, 'a> serde::de::Visitor<'de> for InsertToManager<'a> { + type Value = (); + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("array of lru nodes") + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: SeqAccess<'de>, + { + while let Some(node) = seq.next_element::<Node>()? { + let key = u64key(&node.key); + self.lru.insert(key, node.key, node.size, true); // insert in the back + } + Ok(()) + } +} + +#[inline] +fn u64key(key: &CompactCacheKey) -> u64 { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + hasher.finish() +} + +const FILE_NAME: &str = "simple_lru.data"; + +#[async_trait] +impl EvictionManager for Manager { + fn total_size(&self) -> usize { + self.used.load(Ordering::Relaxed) + } + fn total_items(&self) -> usize { + self.items.load(Ordering::Relaxed) + } + fn evicted_size(&self) -> usize { + self.evicted_size.load(Ordering::Relaxed) + } + fn evicted_items(&self) -> usize { + self.evicted_items.load(Ordering::Relaxed) + } + + fn admit( + &self, + item: CompactCacheKey, + size: usize, + _fresh_until: SystemTime, + ) -> Vec<CompactCacheKey> { + let key = u64key(&item); + self.insert(key, item, size, false); + self.evict() + } + + fn remove(&self, item: &CompactCacheKey) { + let key = u64key(item); + let node = self.lru.write().pop(&key); + if let Some(n) = node { + self.used.fetch_sub(n.size, Ordering::Relaxed); + self.items.fetch_sub(1, Ordering::Relaxed); + } + } + + fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool { + let key = u64key(item); + if self.lru.write().get(&key).is_none() { + self.insert(key, item.clone(), size, false); + false + } else { + true + } + } + + fn peek(&self, item: &CompactCacheKey) -> bool { + let key = u64key(item); + self.lru.read().peek(&key).is_some() + } + + async fn save(&self, dir_path: &str) -> Result<()> { + let data = self.serialize()?; + let dir_path = dir_path.to_owned(); + tokio::task::spawn_blocking(move || { + let dir_path = Path::new(&dir_path); + std::fs::create_dir_all(dir_path).or_err(InternalError, "fail to create {dir_path}")?; + let file_path = dir_path.join(FILE_NAME); + let mut file = + File::create(file_path).or_err(InternalError, "fail to create {file_path}")?; + file.write_all(&data) + .or_err(InternalError, "fail to write to {file_path}") + }) + .await + .or_err(InternalError, "async blocking IO failure")? + } + + async fn load(&self, dir_path: &str) -> Result<()> { + let dir_path = dir_path.to_owned(); + let data = tokio::task::spawn_blocking(move || { + let file_path = Path::new(&dir_path).join(FILE_NAME); + let mut file = + File::open(file_path).or_err(InternalError, "fail to open {file_path}")?; + let mut buffer = Vec::with_capacity(8192); + file.read_to_end(&mut buffer) + .or_err(InternalError, "fail to write to {file_path}")?; + Ok::<Vec<u8>, BError>(buffer) + }) + .await + .or_err(InternalError, "async blocking IO failure")??; + self.deserialize(&data) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::CacheKey; + + #[test] + fn test_admission() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru si full (4) now + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3 + assert_eq!(v.len(), 2); + assert_eq!(v[0], key1); + assert_eq!(v[1], key2); + } + + #[test] + fn test_access() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // make key1 most recently used + lru.access(&key1, 1, until); + assert_eq!(v.len(), 0); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[test] + fn test_remove() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // remove key1 + lru.remove(&key1); + + // key2 is the least recently used one now + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[test] + fn test_access_add() { + let lru = Manager::new(4); + let until = SystemTime::now(); // unused value as a placeholder + + let key1 = CacheKey::new("", "a", "1").to_compact(); + lru.access(&key1, 1, until); + let key2 = CacheKey::new("", "b", "1").to_compact(); + lru.access(&key2, 2, until); + let key3 = CacheKey::new("", "c", "1").to_compact(); + lru.access(&key3, 2, until); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4, 2, until); + // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3 + assert_eq!(v.len(), 2); + assert_eq!(v[0], key1); + assert_eq!(v[1], key2); + } + + #[test] + fn test_admit_update() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // update key2 to reduce its size by 1 + let v = lru.admit(key2, 1, until); + assert_eq!(v.len(), 0); + + // lru is not full anymore + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru.admit(key4.clone(), 1, until); + assert_eq!(v.len(), 0); + + // make key4 larger + let v = lru.admit(key4, 2, until); + // need to evict now + assert_eq!(v.len(), 1); + assert_eq!(v[0], key1); + } + + #[test] + fn test_serde() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // make key1 most recently used + lru.access(&key1, 1, until); + assert_eq!(v.len(), 0); + + // load lru2 with lru's data + let ser = lru.serialize().unwrap(); + let lru2 = Manager::new(4); + lru2.deserialize(&ser).unwrap(); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru2.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } + + #[tokio::test] + async fn test_save_to_disk() { + let lru = Manager::new(4); + let key1 = CacheKey::new("", "a", "1").to_compact(); + let until = SystemTime::now(); // unused value as a placeholder + let v = lru.admit(key1.clone(), 1, until); + assert_eq!(v.len(), 0); + let key2 = CacheKey::new("", "b", "1").to_compact(); + let v = lru.admit(key2.clone(), 2, until); + assert_eq!(v.len(), 0); + let key3 = CacheKey::new("", "c", "1").to_compact(); + let v = lru.admit(key3, 1, until); + assert_eq!(v.len(), 0); + + // lru is full (4) now + // make key1 most recently used + lru.access(&key1, 1, until); + assert_eq!(v.len(), 0); + + // load lru2 with lru's data + lru.save("/tmp/test_simple_lru_save").await.unwrap(); + let lru2 = Manager::new(4); + lru2.load("/tmp/test_simple_lru_save").await.unwrap(); + + let key4 = CacheKey::new("", "d", "1").to_compact(); + let v = lru2.admit(key4, 2, until); + assert_eq!(v.len(), 1); + assert_eq!(v[0], key2); + } +} diff --git a/pingora-cache/src/filters.rs b/pingora-cache/src/filters.rs new file mode 100644 index 0000000..8293cb5 --- /dev/null +++ b/pingora-cache/src/filters.rs @@ -0,0 +1,673 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utility functions to help process HTTP headers for caching + +use super::*; +use crate::cache_control::{CacheControl, Cacheable, InterpretCacheControl}; +use crate::{RespCacheable, RespCacheable::*}; + +use http::{header, HeaderValue}; +use httpdate::HttpDate; +use log::warn; +use pingora_http::{RequestHeader, ResponseHeader}; + +/// Decide if the request can be cacheable +pub fn request_cacheable(req_header: &ReqHeader) -> bool { + // TODO: the check is incomplete + matches!(req_header.method, Method::GET | Method::HEAD) +} + +/// Decide if the response is cacheable. +/// +/// `cache_control` is the parsed [CacheControl] from the response header. It is an standalone +/// argument so that caller has the flexibility to choose to use, change or ignore it. +// TODO: vary processing +pub fn resp_cacheable( + cache_control: Option<&CacheControl>, + resp_header: &ResponseHeader, + authorization_present: bool, + defaults: &CacheMetaDefaults, +) -> RespCacheable { + let now = SystemTime::now(); + let expire_time = calculate_fresh_until( + now, + cache_control, + resp_header, + authorization_present, + defaults, + ); + if let Some(fresh_until) = expire_time { + let (stale_while_revalidate_sec, stale_if_error_sec) = + calculate_serve_stale_sec(cache_control, defaults); + + let mut cloned_header = resp_header.clone(); + if let Some(cc) = cache_control { + cc.strip_private_headers(&mut cloned_header); + } + return Cacheable(CacheMeta::new( + fresh_until, + now, + stale_while_revalidate_sec, + stale_if_error_sec, + cloned_header, + )); + } + Uncacheable(NoCacheReason::OriginNotCache) +} + +/// Calculate the [SystemTime] at which the asset expires +/// +/// Return None when not cacheable. +pub fn calculate_fresh_until( + now: SystemTime, + cache_control: Option<&CacheControl>, + resp_header: &RespHeader, + authorization_present: bool, + defaults: &CacheMetaDefaults, +) -> Option<SystemTime> { + fn freshness_ttl_to_time(now: SystemTime, fresh_sec: u32) -> Option<SystemTime> { + if fresh_sec == 0 { + // ensure that the response is treated as stale + now.checked_sub(Duration::from_secs(1)) + } else { + now.checked_add(Duration::from_secs(fresh_sec.into())) + } + } + + // A request with Authorization is normally not cacheable, unless Cache-Control allows it + if authorization_present { + let uncacheable = cache_control + .as_ref() + .map_or(true, |cc| !cc.allow_caching_authorized_req()); + if uncacheable { + return None; + } + } + + let uncacheable = cache_control + .as_ref() + .map_or(false, |cc| cc.is_cacheable() == Cacheable::No); + if uncacheable { + return None; + } + + // For TTL check cache-control first, then expires header, then defaults + cache_control + .and_then(|cc| { + cc.fresh_sec() + .and_then(|ttl| freshness_ttl_to_time(now, ttl)) + }) + .or_else(|| calculate_expires_header_time(resp_header)) + .or_else(|| { + defaults + .fresh_sec(resp_header.status) + .and_then(|ttl| freshness_ttl_to_time(now, ttl)) + }) +} + +/// Calculate the expire time from the `Expires` header only +pub fn calculate_expires_header_time(resp_header: &RespHeader) -> Option<SystemTime> { + // according to RFC 7234: + // https://datatracker.ietf.org/doc/html/rfc7234#section-4.2.1 + // - treat multiple expires headers as invalid + // https://datatracker.ietf.org/doc/html/rfc7234#section-5.3 + // - "MUST interpret invalid date formats... as representing a time in the past" + fn parse_expires_value(expires_value: &HeaderValue) -> Option<SystemTime> { + let expires = expires_value.to_str().ok()?; + Some(SystemTime::from( + expires + .parse::<HttpDate>() + .map_err(|e| warn!("Invalid HttpDate in Expires: {}, error: {}", expires, e)) + .ok()?, + )) + } + + let mut expires_iter = resp_header.headers.get_all("expires").iter(); + let expires_header = expires_iter.next(); + if expires_header.is_none() || expires_iter.next().is_some() { + return None; + } + parse_expires_value(expires_header.unwrap()).or(Some(SystemTime::UNIX_EPOCH)) +} + +/// Calculates stale-while-revalidate and stale-if-error seconds from Cache-Control or the [CacheMetaDefaults]. +pub fn calculate_serve_stale_sec( + cache_control: Option<&impl InterpretCacheControl>, + defaults: &CacheMetaDefaults, +) -> (u32, u32) { + let serve_stale_while_revalidate_sec = cache_control + .and_then(|cc| cc.serve_stale_while_revalidate_sec()) + .unwrap_or_else(|| defaults.serve_stale_while_revalidate_sec()); + let serve_stale_if_error_sec = cache_control + .and_then(|cc| cc.serve_stale_if_error_sec()) + .unwrap_or_else(|| defaults.serve_stale_if_error_sec()); + (serve_stale_while_revalidate_sec, serve_stale_if_error_sec) +} + +/// Filters to run when sending requests to upstream +pub mod upstream { + use super::*; + + /// Adjust the request header for cacheable requests + /// + /// This filter does the following in order to fetch the entire response to cache + /// - Convert HEAD to GET + /// - `If-*` headers are removed + /// - `Range` header is removed + /// + /// When `meta` is set, this function will inject `If-modified-since` according to the `Last-Modified` header + /// and inject `If-none-match` according to `Etag` header + pub fn request_filter(req: &mut RequestHeader, meta: Option<&CacheMeta>) -> Result<()> { + // change HEAD to GET, HEAD itself is not semantically cacheable + if req.method == Method::HEAD { + req.set_method(Method::GET); + } + + // remove downstream precondition headers https://datatracker.ietf.org/doc/html/rfc7232#section-3 + // we'd like to cache the 200 not the 304 + req.remove_header(&header::IF_MATCH); + req.remove_header(&header::IF_NONE_MATCH); + req.remove_header(&header::IF_MODIFIED_SINCE); + req.remove_header(&header::IF_UNMODIFIED_SINCE); + // see below range header + req.remove_header(&header::IF_RANGE); + + // remove downstream range header as we'd like to cache the entire response (this might change in the future) + req.remove_header(&header::RANGE); + + // we have a persumably staled response already, add precondition headers for revalidation + if let Some(m) = meta { + // rfc7232: "SHOULD send both validators in cache validation" but + // there have been weird cases that an origin has matching etag but not Last-Modified + if let Some(since) = m.headers().get(&header::LAST_MODIFIED) { + req.insert_header(header::IF_MODIFIED_SINCE, since).unwrap(); + } + if let Some(etag) = m.headers().get(&header::ETAG) { + req.insert_header(header::IF_NONE_MATCH, etag).unwrap(); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::header::{HeaderName, CACHE_CONTROL, EXPIRES, SET_COOKIE}; + use http::StatusCode; + use httpdate::fmt_http_date; + + fn init_log() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + const DEFAULTS: CacheMetaDefaults = CacheMetaDefaults::new( + |status| match status { + StatusCode::OK => Some(10), + StatusCode::NOT_FOUND => Some(5), + StatusCode::PARTIAL_CONTENT => None, + _ => Some(1), + }, + 0, + u32::MAX, /* "infinite" stale-if-error */ + ); + + // Cache nothing, by default + const BYPASS_CACHE_DEFAULTS: CacheMetaDefaults = CacheMetaDefaults::new(|_| None, 0, 0); + + fn build_response(status: u16, headers: &[(HeaderName, &str)]) -> ResponseHeader { + let mut header = ResponseHeader::build(status, Some(headers.len())).unwrap(); + for (k, v) in headers { + header.append_header(k.to_string(), *v).unwrap(); + } + header + } + + fn resp_cacheable_wrapper( + resp: &ResponseHeader, + defaults: &CacheMetaDefaults, + authorization_present: bool, + ) -> Option<CacheMeta> { + if let Cacheable(meta) = resp_cacheable( + CacheControl::from_resp_headers(resp).as_ref(), + resp, + authorization_present, + defaults, + ) { + Some(meta) + } else { + None + } + } + + #[test] + fn test_resp_cacheable() { + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=12345")]), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + assert!(meta.is_fresh(SystemTime::now())); + assert!(meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(12)) + .unwrap() + ),); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(12346)) + .unwrap() + )); + } + + #[test] + fn test_resp_uncacheable_directives() { + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "private, max-age=12345")]), + &DEFAULTS, + false, + ); + assert!(meta.is_none()); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "no-store, max-age=12345")]), + &DEFAULTS, + false, + ); + assert!(meta.is_none()); + } + + #[test] + fn test_resp_cache_authorization() { + let meta = resp_cacheable_wrapper(&build_response(200, &[]), &DEFAULTS, true); + assert!(meta.is_none()); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=10")]), + &DEFAULTS, + true, + ); + assert!(meta.is_none()); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "s-maxage=10")]), + &DEFAULTS, + true, + ); + assert!(meta.unwrap().is_fresh(SystemTime::now())); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "public, max-age=10")]), + &DEFAULTS, + true, + ); + assert!(meta.unwrap().is_fresh(SystemTime::now())); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "must-revalidate")]), + &DEFAULTS, + true, + ); + assert!(meta.unwrap().is_fresh(SystemTime::now())); + } + + #[test] + fn test_resp_zero_max_age() { + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=0, public")]), + &DEFAULTS, + false, + ); + + // cacheable, but needs revalidation + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + } + + #[test] + fn test_resp_expires() { + let five_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(5)) + .unwrap(); + + // future expires is cacheable + let meta = resp_cacheable_wrapper( + &build_response(200, &[(EXPIRES, &fmt_http_date(five_sec_time))]), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + assert!(meta.is_fresh(SystemTime::now())); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(6)) + .unwrap() + )); + + // even on default uncacheable statuses + let meta = resp_cacheable_wrapper( + &build_response(206, &[(EXPIRES, &fmt_http_date(five_sec_time))]), + &DEFAULTS, + false, + ); + assert!(meta.is_some()); + } + + #[test] + fn test_resp_past_expires() { + // cacheable, but expired + let meta = resp_cacheable_wrapper( + &build_response(200, &[(EXPIRES, "Fri, 15 May 2015 15:34:21 GMT")]), + &BYPASS_CACHE_DEFAULTS, + false, + ); + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + } + + #[test] + fn test_resp_nonstandard_expires() { + // init log to allow inspecting warnings + init_log(); + + // invalid cases, according to parser + // (but should be stale according to RFC) + let meta = resp_cacheable_wrapper( + &build_response(200, &[(EXPIRES, "Mon, 13 Feb 0002 12:00:00 GMT")]), + &BYPASS_CACHE_DEFAULTS, + false, + ); + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(EXPIRES, "Fri, 01 Dec 99999 16:00:00 GMT")]), + &BYPASS_CACHE_DEFAULTS, + false, + ); + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + + let meta = resp_cacheable_wrapper( + &build_response(200, &[(EXPIRES, "0")]), + &BYPASS_CACHE_DEFAULTS, + false, + ); + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + } + + #[test] + fn test_resp_multiple_expires() { + let five_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(5)) + .unwrap(); + let ten_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(10)) + .unwrap(); + + // multiple expires = uncacheable + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[ + (EXPIRES, &fmt_http_date(five_sec_time)), + (EXPIRES, &fmt_http_date(ten_sec_time)), + ], + ), + &BYPASS_CACHE_DEFAULTS, + false, + ); + assert!(meta.is_none()); + + // unless the default is cacheable + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[ + (EXPIRES, &fmt_http_date(five_sec_time)), + (EXPIRES, &fmt_http_date(ten_sec_time)), + ], + ), + &DEFAULTS, + false, + ); + assert!(meta.is_some()); + } + + #[test] + fn test_resp_cache_control_with_expires() { + let five_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(5)) + .unwrap(); + // cache-control takes precedence over expires + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[ + (EXPIRES, &fmt_http_date(five_sec_time)), + (CACHE_CONTROL, "max-age=0"), + ], + ), + &DEFAULTS, + false, + ); + assert!(!meta.unwrap().is_fresh(SystemTime::now())); + } + + #[test] + fn test_resp_stale_while_revalidate() { + // respect defaults + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=10")]), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + let eleven_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap(); + assert!(!meta.is_fresh(eleven_sec_time)); + assert!(!meta.serve_stale_while_revalidate(SystemTime::now())); + assert!(!meta.serve_stale_while_revalidate(eleven_sec_time)); + + // override with stale-while-revalidate + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[(CACHE_CONTROL, "max-age=10, stale-while-revalidate=5")], + ), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + let eleven_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap(); + let sixteen_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(16)) + .unwrap(); + assert!(!meta.is_fresh(eleven_sec_time)); + assert!(meta.serve_stale_while_revalidate(eleven_sec_time)); + assert!(!meta.serve_stale_while_revalidate(sixteen_sec_time)); + } + + #[test] + fn test_resp_stale_if_error() { + // respect defaults + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=10")]), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + let hundred_years_time = SystemTime::now() + .checked_add(Duration::from_secs(86400 * 365 * 100)) + .unwrap(); + assert!(!meta.is_fresh(hundred_years_time)); + assert!(meta.serve_stale_if_error(hundred_years_time)); + + // override with stale-if-error + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[( + CACHE_CONTROL, + "max-age=10, stale-while-revalidate=5, stale-if-error=60", + )], + ), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + let eleven_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap(); + let seventy_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(70)) + .unwrap(); + assert!(!meta.is_fresh(eleven_sec_time)); + assert!(meta.serve_stale_if_error(SystemTime::now())); + assert!(meta.serve_stale_if_error(eleven_sec_time)); + assert!(!meta.serve_stale_if_error(seventy_sec_time)); + + // never serve stale + let meta = resp_cacheable_wrapper( + &build_response(200, &[(CACHE_CONTROL, "max-age=10, stale-if-error=0")]), + &DEFAULTS, + false, + ); + + let meta = meta.unwrap(); + let eleven_sec_time = SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap(); + assert!(!meta.is_fresh(eleven_sec_time)); + assert!(!meta.serve_stale_if_error(eleven_sec_time)); + } + + #[test] + fn test_resp_status_cache_defaults() { + // 200 response + let meta = resp_cacheable_wrapper(&build_response(200, &[]), &DEFAULTS, false); + assert!(meta.is_some()); + + let meta = meta.unwrap(); + assert!(meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(9)) + .unwrap() + )); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap() + )); + + // 404 response, different ttl + let meta = resp_cacheable_wrapper(&build_response(404, &[]), &DEFAULTS, false); + assert!(meta.is_some()); + + let meta = meta.unwrap(); + assert!(meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(4)) + .unwrap() + )); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(6)) + .unwrap() + )); + + // 206 marked uncacheable (no cache TTL) + let meta = resp_cacheable_wrapper(&build_response(206, &[]), &DEFAULTS, false); + assert!(meta.is_none()); + + // default uncacheable status with explicit Cache-Control is cacheable + let meta = resp_cacheable_wrapper( + &build_response(206, &[(CACHE_CONTROL, "public, max-age=10")]), + &DEFAULTS, + false, + ); + assert!(meta.is_some()); + + let meta = meta.unwrap(); + assert!(meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(9)) + .unwrap() + )); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(11)) + .unwrap() + )); + + // 416 matches any status + let meta = resp_cacheable_wrapper(&build_response(416, &[]), &DEFAULTS, false); + assert!(meta.is_some()); + + let meta = meta.unwrap(); + assert!(meta.is_fresh(SystemTime::now())); + assert!(!meta.is_fresh( + SystemTime::now() + .checked_add(Duration::from_secs(2)) + .unwrap() + )); + } + + #[test] + fn test_resp_cache_no_cache_fields() { + // check #field-names are stripped from the cache header + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[ + (SET_COOKIE, "my-cookie"), + (CACHE_CONTROL, "private=\"something\", max-age=10"), + (HeaderName::from_bytes(b"Something").unwrap(), "foo"), + ], + ), + &DEFAULTS, + false, + ); + let meta = meta.unwrap(); + assert!(meta.headers().contains_key(SET_COOKIE)); + assert!(!meta.headers().contains_key("Something")); + + let meta = resp_cacheable_wrapper( + &build_response( + 200, + &[ + (SET_COOKIE, "my-cookie"), + ( + CACHE_CONTROL, + "max-age=0, no-cache=\"meta1, SeT-Cookie ,meta2\"", + ), + (HeaderName::from_bytes(b"meta1").unwrap(), "foo"), + ], + ), + &DEFAULTS, + false, + ); + let meta = meta.unwrap(); + assert!(!meta.headers().contains_key(SET_COOKIE)); + assert!(!meta.headers().contains_key("meta1")); + } +} diff --git a/pingora-cache/src/hashtable.rs b/pingora-cache/src/hashtable.rs new file mode 100644 index 0000000..a89f9ad --- /dev/null +++ b/pingora-cache/src/hashtable.rs @@ -0,0 +1,112 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Concurrent hash tables and LRUs + +use lru::LruCache; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::collections::HashMap; + +// There are probably off-the-shelf crates of this, DashMap? +/// A hash table that shards to a constant number of tables to reduce lock contention +pub struct ConcurrentHashTable<V, const N: usize> { + tables: [RwLock<HashMap<u128, V>>; N], +} + +#[inline] +fn get_shard(key: u128, n_shards: usize) -> usize { + (key % n_shards as u128) as usize +} + +impl<V, const N: usize> ConcurrentHashTable<V, N> +where + [RwLock<HashMap<u128, V>>; N]: Default, +{ + pub fn new() -> Self { + ConcurrentHashTable { + tables: Default::default(), + } + } + pub fn get(&self, key: u128) -> &RwLock<HashMap<u128, V>> { + &self.tables[get_shard(key, N)] + } + + #[allow(dead_code)] + pub fn read(&self, key: u128) -> RwLockReadGuard<HashMap<u128, V>> { + self.get(key).read() + } + + pub fn write(&self, key: u128) -> RwLockWriteGuard<HashMap<u128, V>> { + self.get(key).write() + } + + // TODO: work out the lifetimes to provide get/set directly +} + +impl<V, const N: usize> Default for ConcurrentHashTable<V, N> +where + [RwLock<HashMap<u128, V>>; N]: Default, +{ + fn default() -> Self { + Self::new() + } +} + +#[doc(hidden)] // not need in public API +pub struct LruShard<V>(RwLock<LruCache<u128, V>>); +impl<V> Default for LruShard<V> { + fn default() -> Self { + // help satisfy default construction of array + LruShard(RwLock::new(LruCache::unbounded())) + } +} + +/// Sharded concurrent data structure for LruCache +pub struct ConcurrentLruCache<V, const N: usize> { + lrus: [LruShard<V>; N], +} + +impl<V, const N: usize> ConcurrentLruCache<V, N> +where + [LruShard<V>; N]: Default, +{ + pub fn new(shard_capacity: usize) -> Self { + use std::num::NonZeroUsize; + // safe, 1 != 0 + const ONE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) }; + let mut cache = ConcurrentLruCache { + lrus: Default::default(), + }; + for lru in &mut cache.lrus { + lru.0 + .write() + .resize(shard_capacity.try_into().unwrap_or(ONE)); + } + cache + } + pub fn get(&self, key: u128) -> &RwLock<LruCache<u128, V>> { + &self.lrus[get_shard(key, N)].0 + } + + #[allow(dead_code)] + pub fn read(&self, key: u128) -> RwLockReadGuard<LruCache<u128, V>> { + self.get(key).read() + } + + pub fn write(&self, key: u128) -> RwLockWriteGuard<LruCache<u128, V>> { + self.get(key).write() + } + + // TODO: work out the lifetimes to provide get/set directly +} diff --git a/pingora-cache/src/key.rs b/pingora-cache/src/key.rs new file mode 100644 index 0000000..26e9362 --- /dev/null +++ b/pingora-cache/src/key.rs @@ -0,0 +1,302 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache key + +use super::*; + +use blake2::{Blake2b, Digest}; +use serde::{Deserialize, Serialize}; + +// 16-byte / 128-bit key: large enough to avoid collision +const KEY_SIZE: usize = 16; + +/// An 128 bit hash binary +pub type HashBinary = [u8; KEY_SIZE]; + +fn hex2str(hex: &[u8]) -> String { + use std::fmt::Write; + let mut s = String::with_capacity(KEY_SIZE * 2); + for c in hex { + write!(s, "{:02x}", c).unwrap(); // safe, just dump hex to string + } + s +} + +/// Decode the hex str into [HashBinary]. +/// +/// Return `None` when the decode fails or the input is not exact 32 (to decode to 16 bytes). +pub fn str2hex(s: &str) -> Option<HashBinary> { + if s.len() != KEY_SIZE * 2 { + return None; + } + let mut output = [0; KEY_SIZE]; + // no need to bubble the error, it should be obvious why the decode fails + hex::decode_to_slice(s.as_bytes(), &mut output).ok()?; + Some(output) +} + +/// The trait for cache key +pub trait CacheHashKey { + /// Return the hash of the cache key + fn primary_bin(&self) -> HashBinary; + + /// Return the variance hash of the cache key. + /// + /// `None` if no variance. + fn variance_bin(&self) -> Option<HashBinary>; + + /// Return the hash including both primary and variance keys + fn combined_bin(&self) -> HashBinary { + let key = self.primary_bin(); + if let Some(v) = self.variance_bin() { + let mut hasher = Blake2b128::new(); + hasher.update(key); + hasher.update(v); + hasher.finalize().into() + } else { + // if there is no variance, combined_bin should return the same as primary_bin + key + } + } + + /// An extra tag for identifying users + /// + /// For example if the storage backend implements per user quota, this tag can be used. + fn user_tag(&self) -> &str; + + /// The hex string of [Self::primary_bin()] + fn primary(&self) -> String { + hex2str(&self.primary_bin()) + } + + /// The hex string of [Self::variance_bin()] + fn variance(&self) -> Option<String> { + self.variance_bin().as_ref().map(|b| hex2str(&b[..])) + } + + /// The hex string of [Self::combined_bin()] + fn combined(&self) -> String { + hex2str(&self.combined_bin()) + } +} + +/// General purpose cache key +#[derive(Debug, Clone)] +pub struct CacheKey { + // All strings for now, can be more structural as long as it can hash + namespace: String, + primary: String, + variance: Option<HashBinary>, + /// An extra tag for identifying users + /// + /// For example if the storage backend implements per user quota, this tag can be used. + pub user_tag: String, +} + +impl CacheKey { + /// Set the value of the variance hash + pub fn set_variance_key(&mut self, key: HashBinary) { + self.variance = Some(key) + } + + /// Get the value of the variance hash + pub fn get_variance_key(&self) -> Option<&HashBinary> { + self.variance.as_ref() + } + + /// Removes the variance from this cache key + pub fn remove_variance_key(&mut self) { + self.variance = None + } +} + +/// Storage optimized cache key to keep in memory or in storage +// 16 bytes + 8 bytes (+16 * u8) + user_tag.len() + 16 Bytes (Box<str>) +#[derive(Debug, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)] +pub struct CompactCacheKey { + pub primary: HashBinary, + // save 8 bytes for non-variance but waste 8 bytes for variance vs, store flat 16 bytes + pub variance: Option<Box<HashBinary>>, + pub user_tag: Box<str>, // the len should be small to keep memory usage bounded +} + +impl CacheHashKey for CompactCacheKey { + fn primary_bin(&self) -> HashBinary { + self.primary + } + + fn variance_bin(&self) -> Option<HashBinary> { + self.variance.as_ref().map(|s| *s.as_ref()) + } + + fn user_tag(&self) -> &str { + &self.user_tag + } +} + +/* + * We use blake2 hashing, which is faster and more secure, to replace md5. + * We have not given too much thought on whether non-crypto hash can be safely + * use because hashing performance is not critical. + * Note: we should avoid hashes like ahash which does not have consistent output + * across machines because it is designed purely for in memory hashtable +*/ + +// hash output: we use 128 bits (16 bytes) hash which will map to 32 bytes hex string +pub(crate) type Blake2b128 = Blake2b<blake2::digest::consts::U16>; + +/// helper function: hash str to u8 +pub fn hash_u8(key: &str) -> u8 { + let mut hasher = Blake2b128::new(); + hasher.update(key); + let raw = hasher.finalize(); + raw[0] +} + +/// helper function: hash str to [HashBinary] +pub fn hash_key(key: &str) -> HashBinary { + let mut hasher = Blake2b128::new(); + hasher.update(key); + let raw = hasher.finalize(); + raw.into() +} + +impl CacheKey { + fn primary_hasher(&self) -> Blake2b128 { + let mut hasher = Blake2b128::new(); + hasher.update(&self.namespace); + hasher.update(&self.primary); + hasher + } + + /// Create a default [CacheKey] from a request, which just takes it URI as the primary key. + pub fn default(req_header: &ReqHeader) -> Self { + CacheKey { + namespace: "".into(), + primary: format!("{}", req_header.uri), + variance: None, + user_tag: "".into(), + } + } + + /// Create a new [CacheKey] from the given namespace, primary, and user_tag string. + /// + /// Both `namespace` and `primary` will be used for the primary hash + pub fn new<S1, S2, S3>(namespace: S1, primary: S2, user_tag: S3) -> Self + where + S1: Into<String>, + S2: Into<String>, + S3: Into<String>, + { + CacheKey { + namespace: namespace.into(), + primary: primary.into(), + variance: None, + user_tag: user_tag.into(), + } + } + + /// Return the namespace of this key + pub fn namespace(&self) -> &str { + &self.namespace + } + + /// Return the primary key of this key + pub fn primary_key(&self) -> &str { + &self.primary + } + + /// Convert this key to [CompactCacheKey]. + pub fn to_compact(&self) -> CompactCacheKey { + let primary = self.primary_bin(); + CompactCacheKey { + primary, + variance: self.variance_bin().map(Box::new), + user_tag: self.user_tag.clone().into_boxed_str(), + } + } +} + +impl CacheHashKey for CacheKey { + fn primary_bin(&self) -> HashBinary { + self.primary_hasher().finalize().into() + } + + fn variance_bin(&self) -> Option<HashBinary> { + self.variance + } + + fn user_tag(&self) -> &str { + &self.user_tag + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cache_key_hash() { + let key = CacheKey { + namespace: "".into(), + primary: "aa".into(), + variance: None, + user_tag: "1".into(), + }; + let hash = key.primary(); + assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e"); + assert!(key.variance().is_none()); + assert_eq!(key.combined(), hash); + let compact = key.to_compact(); + assert_eq!(compact.primary(), hash); + assert!(compact.variance().is_none()); + assert_eq!(compact.combined(), hash); + } + + #[test] + fn test_cache_key_vary_hash() { + let key = CacheKey { + namespace: "".into(), + primary: "aa".into(), + variance: Some([0u8; 16]), + user_tag: "1".into(), + }; + let hash = key.primary(); + assert_eq!(hash, "ac10f2aef117729f8dad056b3059eb7e"); + assert_eq!(key.variance().unwrap(), "00000000000000000000000000000000"); + assert_eq!(key.combined(), "004174d3e75a811a5b44c46b3856f3ee"); + let compact = key.to_compact(); + assert_eq!(compact.primary(), "ac10f2aef117729f8dad056b3059eb7e"); + assert_eq!( + compact.variance().unwrap(), + "00000000000000000000000000000000" + ); + assert_eq!(compact.combined(), "004174d3e75a811a5b44c46b3856f3ee"); + } + + #[test] + fn test_hex_str() { + let mut key = [0; KEY_SIZE]; + for (i, v) in key.iter_mut().enumerate() { + // key: [0, 1, 2, .., 15] + *v = i as u8; + } + let hex_str = hex2str(&key); + let key2 = str2hex(&hex_str).unwrap(); + for i in 0..KEY_SIZE { + assert_eq!(key[i], key2[i]); + } + } +} diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs new file mode 100644 index 0000000..be352dc --- /dev/null +++ b/pingora-cache/src/lib.rs @@ -0,0 +1,1093 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The HTTP caching layer for proxies. + +#![allow(clippy::new_without_default)] + +use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader}; +use key::{CacheHashKey, HashBinary}; +use lock::WritePermit; +use pingora_error::Result; +use pingora_http::ResponseHeader; +use std::time::{Duration, SystemTime}; +use trace::CacheTraceCTX; + +pub mod cache_control; +pub mod eviction; +pub mod filters; +pub mod hashtable; +pub mod key; +pub mod lock; +pub mod max_file_size; +mod memory; +pub mod meta; +pub mod predictor; +pub mod put; +pub mod storage; +pub mod trace; +mod variance; + +use crate::max_file_size::MaxFileSizeMissHandler; +pub use key::CacheKey; +use lock::{CacheLock, LockStatus, Locked}; +pub use memory::MemCache; +pub use meta::{CacheMeta, CacheMetaDefaults}; +pub use storage::{HitHandler, MissHandler, Storage}; +pub use variance::VarianceBuilder; + +pub mod prelude {} + +/// The state machine for http caching +/// +/// This object is used to handle the state and transitions for HTTP caching through the life of a +/// request. +pub struct HttpCache { + phase: CachePhase, + // Box the rest so that a disabled HttpCache struct is small + inner: Option<Box<HttpCacheInner>>, +} + +/// This reflects the phase of HttpCache during the lifetime of a request +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum CachePhase { + /// Cache disabled, with reason (NeverEnabled if never explicitly used) + Disabled(NoCacheReason), + /// Cache enabled but nothing is set yet + Uninit, + /// Cache was enabled, the request decided not to use it + // HttpCache.inner is kept + Bypass, + /// Awaiting cache key to be generated + CacheKey, + /// Cache hit + Hit, + /// No cached asset is found + Miss, + /// A staled (expired) asset is found + Stale, + /// A staled (expired) asset was found, so a fresh one was fetched + Expired, + /// A staled (expired) asset was found, and it was revalidated to be fresh + Revalidated, + /// Revalidated, but deemed uncacheable so we do not freshen it + RevalidatedNoCache(NoCacheReason), +} + +impl CachePhase { + /// Convert [CachePhase] as `str`, for logging and debugging. + pub fn as_str(&self) -> &'static str { + match self { + CachePhase::Disabled(_) => "disabled", + CachePhase::Uninit => "uninitialized", + CachePhase::Bypass => "bypass", + CachePhase::CacheKey => "key", + CachePhase::Hit => "hit", + CachePhase::Miss => "miss", + CachePhase::Stale => "stale", + CachePhase::Expired => "expired", + CachePhase::Revalidated => "revalidated", + CachePhase::RevalidatedNoCache(_) => "revalidated-nocache", + } + } +} + +/// The possible reasons for not caching +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum NoCacheReason { + /// Caching is not enabled to begin with + NeverEnabled, + /// Origin directives indicated this was not cacheable + OriginNotCache, + /// Response size was larger than the cache's configured maximum asset size + ResponseTooLarge, + /// Due to internal caching storage error + StorageError, + /// Due to other type of internal issues + InternalError, + /// will be cacheable but skip cache admission now + /// + /// This happens when the cache predictor predicted that this request is not cacheable but + /// the response turns out to be OK to cache. However it might be too large to re-enable caching + /// for this request. + Deferred, + /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache) + CacheLockGiveUp, + /// This request waited too long for the writer of the cache lock to finish, so this request will + /// fetch from the origin without caching + CacheLockTimeout, + /// Other custom defined reasons + Custom(&'static str), +} + +impl NoCacheReason { + /// Convert [NoCacheReason] as `str`, for logging and debugging. + pub fn as_str(&self) -> &'static str { + use NoCacheReason::*; + match self { + NeverEnabled => "NeverEnabled", + OriginNotCache => "OriginNotCache", + ResponseTooLarge => "ResponseTooLarge", + StorageError => "StorageError", + InternalError => "InternalError", + Deferred => "Deferred", + CacheLockGiveUp => "CacheLockGiveUp", + CacheLockTimeout => "CacheLockTimeout", + Custom(s) => s, + } + } +} + +/// Response cacheable decision +/// +/// +#[derive(Debug)] +pub enum RespCacheable { + Cacheable(CacheMeta), + Uncacheable(NoCacheReason), +} + +impl RespCacheable { + /// Whether it is cacheable + #[inline] + pub fn is_cacheable(&self) -> bool { + matches!(*self, Self::Cacheable(_)) + } + + /// Unwrap [RespCacheable] to get the [CacheMeta] stored + /// # Panic + /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first. + pub fn unwrap_meta(self) -> CacheMeta { + match self { + Self::Cacheable(meta) => meta, + Self::Uncacheable(_) => panic!("expected Cacheable value"), + } + } +} + +/// Freshness state of cache hit asset +/// +/// +#[derive(Debug, Copy, Clone)] +pub enum HitStatus { + Expired, + ForceExpired, + FailedHitFilter, + Fresh, +} + +impl HitStatus { + /// For displaying cache hit status + pub fn as_str(&self) -> &'static str { + match self { + Self::Expired => "expired", + Self::ForceExpired => "force_expired", + Self::FailedHitFilter => "failed_hit_filter", + Self::Fresh => "fresh", + } + } + + /// Whether cached asset can be served as fresh + pub fn is_fresh(&self) -> bool { + match self { + Self::Expired | Self::ForceExpired | Self::FailedHitFilter => false, + Self::Fresh => true, + } + } +} + +struct HttpCacheInner { + pub key: Option<CacheKey>, + pub meta: Option<CacheMeta>, + // when set, even if an asset exists, it would only be considered valid after this timestamp + pub valid_after: Option<SystemTime>, + // when set, an asset will be rejected from the cache if it exceeds this size in bytes + pub max_file_size_bytes: Option<usize>, + pub miss_handler: Option<MissHandler>, + pub body_reader: Option<HitHandler>, + pub storage: &'static (dyn storage::Storage + Sync), // static for now + pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>, + pub lock: Option<Locked>, // TODO: these 3 fields should come in 1 sub struct + pub cache_lock: Option<&'static CacheLock>, + pub lock_duration: Option<Duration>, + pub traces: trace::CacheTraceCTX, +} + +impl HttpCache { + /// Create a new [HttpCache]. + /// + /// Caching is not enabled by default. + pub fn new() -> Self { + HttpCache { + phase: CachePhase::Disabled(NoCacheReason::NeverEnabled), + inner: None, + } + } + + /// Whether the cache is enabled + pub fn enabled(&self) -> bool { + !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass) + } + + /// Whether the cache is being bypassed + pub fn bypassing(&self) -> bool { + matches!(self.phase, CachePhase::Bypass) + } + + /// Return the [CachePhase] + pub fn phase(&self) -> CachePhase { + self.phase + } + + /// Whether anything was fetched from the upstream + /// + /// This essentially checks all possible [CachePhase] who need to contact the upstream server + pub fn upstream_used(&self) -> bool { + use CachePhase::*; + match self.phase { + Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true, + Hit | Stale => false, + Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple + } + } + + /// Check whether the backend storage is the type `T`. + pub fn storage_type_is<T: 'static>(&self) -> bool { + self.inner + .as_ref() + .and_then(|inner| inner.storage.as_any().downcast_ref::<T>()) + .is_some() + } + + /// Disable caching + pub fn disable(&mut self, reason: NoCacheReason) { + use NoCacheReason::*; + match self.phase { + CachePhase::Disabled(_) => { + // replace reason + self.phase = CachePhase::Disabled(reason); + } + _ => { + self.phase = CachePhase::Disabled(reason); + if let Some(inner) = self.inner.as_mut() { + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + let lock_status = match reason { + // let next request try to fetch it + InternalError | StorageError | Deferred => LockStatus::TransientError, + // no need for the lock anymore + OriginNotCache | ResponseTooLarge => LockStatus::GiveUp, + // not sure which LockStatus make sense, we treat it as GiveUp for now + Custom(_) => LockStatus::GiveUp, + // should never happen, NeverEnabled shouldn't hold a lock + NeverEnabled => panic!("NeverEnabled holds a write lock"), + CacheLockGiveUp | CacheLockTimeout => { + panic!("CacheLock* are for cache lock readers only") + } + }; + inner + .cache_lock + .unwrap() + .release(inner.key.as_ref().unwrap(), lock_status); + } + } + // log initial disable reason + self.inner_mut() + .traces + .cache_span + .set_tag(|| trace::Tag::new("disable_reason", reason.as_str())); + self.inner = None; + } + } + } + + /* The following methods panic when they are used in the wrong phase. + * This is better than returning errors as such panics are only caused by coding error, which + * should be fixed right away. Tokio runtime only crashes the current task instead of the whole + * program when these panics happen. */ + + /// Set the cache to bypass + /// + /// # Panic + /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed). + /// Use it in any other phase will lead to panic. + pub fn bypass(&mut self) { + match self.phase { + CachePhase::CacheKey => { + // before cache lookup / found / miss + self.phase = CachePhase::Bypass; + self.inner_mut() + .traces + .cache_span + .set_tag(|| trace::Tag::new("bypassed", true)); + } + _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase), + } + } + + /// Enable the cache + /// + /// - `storage`: the cache storage backend that implements [storage::Storage] + /// - `eviction`: optionally the eviction mananger, without it, nothing will be evicted from the storage + /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely + /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to + /// cacheable and uncacheable requests. + /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it + /// such lookups will all be allowed to fetch the asset independently. + pub fn enable( + &mut self, + storage: &'static (dyn storage::Storage + Sync), + eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>, + cache_lock: Option<&'static CacheLock>, + ) { + match self.phase { + CachePhase::Disabled(_) => { + self.phase = CachePhase::Uninit; + self.inner = Some(Box::new(HttpCacheInner { + key: None, + meta: None, + valid_after: None, + max_file_size_bytes: None, + miss_handler: None, + body_reader: None, + storage, + eviction, + predictor, + lock: None, + cache_lock, + lock_duration: None, + traces: CacheTraceCTX::new(), + })); + } + _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase), + } + } + + // Enable distributed tracing + pub fn enable_tracing(&mut self, parent_span: trace::Span) { + if let Some(inner) = self.inner.as_mut() { + inner.traces.enable(parent_span); + } + } + + // Get the cache `miss` tracing span + pub fn get_miss_span(&mut self) -> Option<trace::SpanHandle> { + self.inner.as_mut().map(|i| i.traces.get_miss_span()) + } + + // shortcut to access inner, panic if phase is disabled + #[inline] + fn inner_mut(&mut self) -> &mut HttpCacheInner { + self.inner.as_mut().unwrap() + } + + #[inline] + fn inner(&self) -> &HttpCacheInner { + self.inner.as_ref().unwrap() + } + + /// Set the cache key + /// # Panic + /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic. + pub fn set_cache_key(&mut self, key: CacheKey) { + match self.phase { + CachePhase::Uninit | CachePhase::CacheKey => { + self.phase = CachePhase::CacheKey; + self.inner_mut().key = Some(key); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the cache key used for asset lookup + /// # Panic + /// Can only be called after cache key is set and cache is not disabled. Panic otherwise. + pub fn cache_key(&self) -> &CacheKey { + match self.phase { + CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase), + _ => self.inner().key.as_ref().unwrap(), + } + } + + /// Return the max size allowed to be cached. + pub fn max_file_size_bytes(&self) -> Option<usize> { + match self.phase { + CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase), + _ => self.inner().max_file_size_bytes, + } + } + + /// Set the maximum response _body_ size in bytes that will be admitted to the cache. + /// + /// Response header size does not contribute to the max file size. + pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) { + match self.phase { + CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase), + _ => { + self.inner_mut().max_file_size_bytes = Some(max_file_size_bytes); + } + } + } + + /// Set that cache is found in cache storage. + /// + /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and + /// [HitHandler]. + /// + /// The `hit_status` enum allows the caller to force expire assets. + pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) { + match self.phase { + // Stale allowed because of cache lock and then retry + CachePhase::CacheKey | CachePhase::Stale => { + self.phase = if hit_status.is_fresh() { + CachePhase::Hit + } else { + CachePhase::Stale + }; + let phase = self.phase; + let inner = self.inner_mut(); + let key = inner.key.as_ref().unwrap(); + if phase == CachePhase::Stale { + if let Some(lock) = inner.cache_lock.as_ref() { + inner.lock = Some(lock.lock(key)); + } + } + inner.traces.log_meta(&meta); + if let Some(eviction) = inner.eviction { + // TODO: make access() accept CacheKey + let cache_key = key.to_compact(); + // FIXME: get size + eviction.access(&cache_key, 0, meta.0.internal.fresh_until); + } + inner.traces.start_hit_span(phase, hit_status); + inner.meta = Some(meta); + inner.body_reader = Some(hit_handler); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Mark `self` to be cache miss. + /// + /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides + /// not to use the assets found. + /// # Panic + /// Panic in other phases. + pub fn cache_miss(&mut self) { + match self.phase { + // from CacheKey: set state to miss during cache lookup + // from Bypass: response became cacheable, set state to miss to cache + CachePhase::CacheKey | CachePhase::Bypass => { + self.phase = CachePhase::Miss; + self.inner_mut().traces.start_miss_span(); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [HitHandler] + /// # Panic + /// Call this after [Self::cache_found()], panic in other phases. + pub fn hit_handler(&mut self) -> &mut HitHandler { + match self.phase { + CachePhase::Hit + | CachePhase::Stale + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the body reader during a cache admission(miss/expired) which decouples the downstream + /// read and upstream cache write + pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => { + let inner = self.inner_mut(); + if inner.storage.support_streaming_partial_write() { + inner.body_reader.as_mut() + } else { + // body_reader could be set even when the storage doesn't support streaming + // Expired cache would have the reader set. + None + } + } + _ => None, + } + } + + /// Call this when cache hit is fully read. + /// + /// This call will release resource if any and log the timing in tracing if set. + /// # Panic + /// Panic in phases where there is no cache hit. + pub async fn finish_hit_handler(&mut self) -> Result<()> { + match self.phase { + CachePhase::Hit + | CachePhase::Miss + | CachePhase::Expired + | CachePhase::Stale + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => { + let inner = self.inner_mut(); + if inner.body_reader.is_none() { + // already finished, we allow calling this function more than once + return Ok(()); + } + let body_reader = inner.body_reader.take().unwrap(); + let key = inner.key.as_ref().unwrap(); + let result = body_reader + .finish(inner.storage, key, &inner.traces.hit_span.handle()) + .await; + inner.traces.finish_hit_span(); + result + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Set the [MissHandler] according to cache_key and meta, can only call once + pub async fn set_miss_handler(&mut self) -> Result<()> { + match self.phase { + // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire). + // This is an artificial rule to enforce the state transitions + CachePhase::Miss | CachePhase::Expired => { + let max_file_size_bytes = self.max_file_size_bytes(); + + let inner = self.inner_mut(); + if inner.miss_handler.is_some() { + panic!("write handler is already set") + } + let meta = inner.meta.as_ref().unwrap(); + let key = inner.key.as_ref().unwrap(); + let miss_handler = inner + .storage + .get_miss_handler(key, meta, &inner.traces.get_miss_span()) + .await?; + + inner.miss_handler = if let Some(max_size) = max_file_size_bytes { + Some(Box::new(MaxFileSizeMissHandler::new( + miss_handler, + max_size, + ))) + } else { + Some(miss_handler) + }; + + if inner.storage.support_streaming_partial_write() { + // If reader can access partial write, the cache lock can be released here + // to let readers start reading the body. + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + inner.cache_lock.unwrap().release(key, LockStatus::Done); + } + // Downstream read and upstream write can be decoupled + let body_reader = inner + .storage + .lookup(key, &inner.traces.get_miss_span()) + .await?; + + if let Some((_meta, body_reader)) = body_reader { + inner.body_reader = Some(body_reader); + } else { + // body_reader should exist now because streaming_partial_write is to support it + panic!("unable to get body_reader for {:?}", meta); + } + } + Ok(()) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [MissHandler] to write the response body to cache. + /// + /// `None`: the handler has not been set or already finished + pub fn miss_handler(&mut self) -> Option<&mut MissHandler> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => self.inner_mut().miss_handler.as_mut(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Finish cache admission + /// + /// If [self] is dropped without calling this, the cache admission is considered incomplete and + /// should be cleaned up. + /// + /// This call will also trigger eviction if set. + pub async fn finish_miss_handler(&mut self) -> Result<()> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => { + let inner = self.inner_mut(); + if inner.miss_handler.is_none() { + // already finished, we allow calling this function more than once + return Ok(()); + } + let miss_handler = inner.miss_handler.take().unwrap(); + let size = miss_handler.finish().await?; + let lock = inner.lock.take(); + let key = inner.key.as_ref().unwrap(); + if let Some(Locked::Write(_r)) = lock { + // no need to call r.unlock() because release() will call it + // r is a guard to make sure the lock is unlocked when this request is dropped + inner.cache_lock.unwrap().release(key, LockStatus::Done); + } + if let Some(eviction) = inner.eviction { + let cache_key = key.to_compact(); + let meta = inner.meta.as_ref().unwrap(); + let evicted = eviction.admit(cache_key, size, meta.0.internal.fresh_until); + // TODO: make this async + let span = inner.traces.child("eviction"); + let handle = span.handle(); + for item in evicted { + // TODO: warn/log the error + let _ = inner.storage.purge(&item, &handle).await; + } + } + inner.traces.finish_miss_span(); + Ok(()) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Set the [CacheMeta] of the cache + pub fn set_cache_meta(&mut self, meta: CacheMeta) { + match self.phase { + // TODO: store the staled meta somewhere else for future use? + CachePhase::Stale | CachePhase::Miss => { + let inner = self.inner_mut(); + inner.traces.log_meta(&meta); + inner.meta = Some(meta); + } + _ => panic!("wrong phase {:?}", self.phase), + } + if self.phase == CachePhase::Stale { + self.phase = CachePhase::Expired; + } + } + + /// Set the [CacheMeta] of the cache after revalidation. + /// + /// Certain info such as the original cache admission time will be preserved. Others will + /// be replaced by the input `meta`. + pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> { + let result = match self.phase { + CachePhase::Stale => { + let inner = self.inner_mut(); + // TODO: we should keep old meta in place, just use new one to update it + // that requires cacheable_filter to take a mut header and just return InternalMeta + + // update new meta with old meta's created time + let created = inner.meta.as_ref().unwrap().0.internal.created; + meta.0.internal.created = created; + // meta.internal.updated was already set to new meta's `created`, + // no need to set `updated` here + + inner.meta.replace(meta); + + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + inner + .cache_lock + .unwrap() + .release(inner.key.as_ref().unwrap(), LockStatus::Done); + } + + let mut span = inner.traces.child("update_meta"); + // TODO: this call can be async + let result = inner + .storage + .update_meta( + inner.key.as_ref().unwrap(), + inner.meta.as_ref().unwrap(), + &span.handle(), + ) + .await; + span.set_tag(|| trace::Tag::new("updated", result.is_ok())); + result + } + _ => panic!("wrong phase {:?}", self.phase), + }; + self.phase = CachePhase::Revalidated; + result + } + + /// After a successful revalidation, update certain headers for the cached asset + /// such as `Etag` with the fresh response header `resp`. + pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader { + match self.phase { + CachePhase::Stale => { + /* + * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5 + * 304 response MUST generate ... would have been sent in a 200 ... + * - Content-Location, Date, ETag, and Vary + * - Cache-Control and Expires... + */ + let mut old_header = self.inner().meta.as_ref().unwrap().0.header.clone(); + let mut clone_header = |header_name: &'static str| { + // TODO: multiple headers + if let Some(value) = resp.headers.get(header_name) { + old_header.insert_header(header_name, value).unwrap(); + } + }; + clone_header("cache-control"); + clone_header("expires"); + clone_header("cache-tag"); + clone_header("cdn-cache-control"); + clone_header("etag"); + // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4 + // "...cache MUST update its header fields with the header fields provided in the 304..." + // But if the Vary header changes, the cached response may no longer match the + // incoming request. + // + // For simplicity, ignore changing Vary in revalidation for now. + // TODO: if we support vary during revalidation, there are a few edge cases to + // consider (what if Vary header appears/disappears/changes)? + // + // clone_header("vary"); + old_header + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Mark this asset uncacheable after revalidation + pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) { + match self.phase { + CachePhase::Stale => { + // replace cache meta header + self.inner_mut().meta.as_mut().unwrap().0.header = header; + } + _ => panic!("wrong phase {:?}", self.phase), + } + self.phase = CachePhase::RevalidatedNoCache(reason); + // TODO: remove this asset from cache once finished? + } + + /// Update the variance of the [CacheMeta]. + /// + /// Note that this process may change the lookup `key`, and eventually (when the asset is + /// written to storage) invalidate other cached variants under the same primary key as the + /// current asset. + pub fn update_variance(&mut self, variance: Option<HashBinary>) { + // If this is a cache miss, we will simply update the variance in the meta. + // + // If this is an expired response, we will have to consider a few cases: + // + // **Case 1**: Variance was absent, but caller sets it now. + // We will just insert it into the meta. The current asset becomes the primary variant. + // Because the current location of the asset is already the primary variant, nothing else + // needs to be done. + // + // **Case 2**: Variance was present, but it changed or was removed. + // We want the current asset to take over the primary slot, in order to invalidate all + // other variants derived under the old Vary. + // + // **Case 3**: Variance did not change. + // Nothing needs to happen. + let inner = match self.phase { + CachePhase::Miss | CachePhase::Expired => self.inner_mut(), + _ => panic!("wrong phase {:?}", self.phase), + }; + + // Update the variance in the meta + if let Some(variance_hash) = variance.as_ref() { + inner + .meta + .as_mut() + .unwrap() + .set_variance_key(*variance_hash); + } else { + inner.meta.as_mut().unwrap().remove_variance(); + } + + // Change the lookup `key` if necessary, in order to admit asset into the primary slot + // instead of the secondary slot. + let key = inner.key.as_ref().unwrap(); + if let Some(old_variance) = key.get_variance_key().as_ref() { + // This is a secondary variant slot. + if Some(*old_variance) != variance.as_ref() { + // This new variance does not match the variance in the cache key we used to look + // up this asset. + // Drop the cache lock to avoid leaving a dangling lock + // (because we locked with the old cache key for the secondary slot) + // TODO: maybe we should try to signal waiting readers to compete for the primary key + // lock instead? we will not be modifying this secondary slot so it's not actually + // ready for readers + if let Some(lock) = inner.cache_lock.as_ref() { + lock.release(key, LockStatus::Done); + } + // Remove the `variance` from the `key`, so that we admit this asset into the + // primary slot. (`key` is used to tell storage where to write the data.) + inner.key.as_mut().unwrap().remove_variance_key(); + } + } + } + + /// Return the [CacheMeta] of this asset + /// + /// # Panic + /// Panic in phases which has no cache meta. + pub fn cache_meta(&self) -> &CacheMeta { + match self.phase { + // TODO: allow in Bypass phase? + CachePhase::Stale + | CachePhase::Expired + | CachePhase::Hit + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref().unwrap(), + CachePhase::Miss => { + // this is the async body read case, safe because body_reader is only set + // after meta is retrieved + if self.inner().body_reader.is_some() { + self.inner().meta.as_ref().unwrap() + } else { + panic!("wrong phase {:?}", self.phase); + } + } + + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [CacheMeta] of this asset if any + /// + /// Different from [Self::cache_meta()], this function is allowed to be called in + /// [CachePhase::Miss] phase where the cache meta maybe set. + /// # Panic + /// Panic in phases that shouldn't have cache meta. + pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> { + match self.phase { + CachePhase::Miss + | CachePhase::Stale + | CachePhase::Expired + | CachePhase::Hit + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Perform the cache lookup from the given cache storage with the given cache key + /// + /// A cache hit will return [CacheMeta] which contains the header and meta info about + /// the cache as well as a [HitHandler] to read the cache hit body. + /// # Panic + /// Panic in other phases. + pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> { + match self.phase { + // Stale is allowed here because stale-> cache_lock -> lookup again + CachePhase::CacheKey | CachePhase::Stale => { + let inner = self.inner_mut(); + let mut span = inner.traces.child("lookup"); + let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key + let result = inner.storage.lookup(key, &span.handle()).await?; + let result = result.and_then(|(meta, header)| { + if let Some(ts) = inner.valid_after { + if meta.created() < ts { + span.set_tag(|| trace::Tag::new("not valid", true)); + return None; + } + } + Some((meta, header)) + }); + if result.is_none() { + if let Some(lock) = inner.cache_lock.as_ref() { + inner.lock = Some(lock.lock(key)); + } + } + span.set_tag(|| trace::Tag::new("found", result.is_some())); + Ok(result) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Update variance and see if the meta matches the current variance + /// + /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()` + /// This function allows callers to compute vary based on the initial cache hit. + /// `meta` should be the ones returned from the initial cache_lookup() + /// - return true if the meta is the variance. + /// - return false if the current meta doesn't match the variance, need to cache_lookup() again + pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool { + match self.phase { + CachePhase::CacheKey => { + let inner = self.inner_mut(); + // make sure that all variance found are fresher than this asset + // this is because when purging all the variance, only the primary slot is deleted + // the created TS of the primary is the tombstone of all the variances + inner.valid_after = Some(meta.created()); + + // update vary + let key = inner.key.as_mut().unwrap(); + // if no variance was previously set, then this is the first cache hit + let is_initial_cache_hit = key.get_variance_key().is_none(); + key.set_variance_key(variance); + let variance_binary = key.variance_bin(); + let matches_variance = meta.variance() == variance_binary; + + // We should remove the variance in the lookup `key` if this is the primary variant + // slot. We know this is the primary variant slot if this is the initial cache hit + // AND the variance in the `key` already matches the `meta`'s.) + // + // For the primary variant slot, the storage backend needs to use the primary key + // for both cache lookup and updating the meta. Otherwise it will look for the + // asset in the wrong location during revalidation. + // + // We can recreate the "full" cache key by using the meta's variance, if needed. + if matches_variance && is_initial_cache_hit { + inner.key.as_mut().unwrap().remove_variance_key(); + } + + matches_variance + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Whether this request is behind a cache lock in order to wait for another request to read the + /// asset. + pub fn is_cache_locked(&self) -> bool { + matches!(self.inner().lock, Some(Locked::Read(_))) + } + + /// Whether this request is the leader request to fetch the assets for itself and other requests + /// behind the cache lock. + pub fn is_cache_lock_writer(&self) -> bool { + matches!(self.inner().lock, Some(Locked::Write(_))) + } + + /// Take the write lock from this request to transfer it to another one. + /// # Panic + /// Call is_cache_lock_writer() to check first, will panic otherwise. + pub fn take_write_lock(&mut self) -> WritePermit { + let lock = self.inner_mut().lock.take().unwrap(); + match lock { + Locked::Write(w) => w, + Locked::Read(_) => panic!("take_write_lock() called on read lock"), + } + } + + /// Set the write lock, which is usually transferred from [Self::take_write_lock()] + pub fn set_write_lock(&mut self, write_lock: WritePermit) { + self.inner_mut().lock.replace(Locked::Write(write_lock)); + } + + /// Whether this request's cache hit is staled + fn has_staled_asset(&self) -> bool { + self.phase == CachePhase::Stale + } + + /// Whether this asset is staled and stale if error is allowed + pub fn can_serve_stale_error(&self) -> bool { + self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now()) + } + + /// Whether this asset is staled and stale while revalidate is allowed. + pub fn can_serve_stale_updating(&self) -> bool { + self.has_staled_asset() + && self + .cache_meta() + .serve_stale_while_revalidate(SystemTime::now()) + } + + /// Wait for the cache read lock to be unlocked + /// # Panic + /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock. + pub async fn cache_lock_wait(&mut self) -> LockStatus { + let inner = self.inner_mut(); + let _span = inner.traces.child("cache_lock"); + let lock = inner.lock.take(); // remove the lock from self + if let Some(Locked::Read(r)) = lock { + let now = std::time::Instant::now(); + r.wait().await; + let lock_duration = now.elapsed(); + // it's possible for a request to be locked more than once + inner.lock_duration = Some( + inner + .lock_duration + .map_or(lock_duration, |d| d + lock_duration), + ); + r.lock_status() // TODO: tag the span with lock status + } else { + // should always call is_cache_locked() before this function + panic!("cache_lock_wait on wrong type of lock") + } + } + + /// How long did this request wait behind the read lock + pub fn lock_duration(&self) -> Option<Duration> { + // FIXME: this duration is lost when cache is disabled + self.inner.as_ref().and_then(|i| i.lock_duration) + } + + /// Delete the asset from the cache storage + /// # Panic + /// Need to be called after cache key is set. Panic otherwise. + pub async fn purge(&mut self) -> Result<bool> { + match self.phase { + CachePhase::CacheKey => { + let inner = self.inner_mut(); + let mut span = inner.traces.child("purge"); + let key = inner.key.as_ref().unwrap().to_compact(); + let result = inner.storage.purge(&key, &span.handle()).await; + // FIXME: also need to remove from eviction manager + span.set_tag(|| trace::Tag::new("purged", matches!(result, Ok(true)))); + result + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Check the cacheable prediction + /// + /// Return true if the predictor is not set + pub fn cacheable_prediction(&self) -> bool { + if let Some(predictor) = self.inner().predictor { + predictor.cacheable_prediction(self.cache_key()) + } else { + true + } + } + + /// Tell the predictor that this response which is previously predicted to be uncacheable + /// is cacheable now. + pub fn response_became_cacheable(&self) { + if let Some(predictor) = self.inner().predictor { + predictor.mark_cacheable(self.cache_key()); + } + } + + /// Tell the predictor that this response is uncacheable so that it will know next time + /// this request arrives. + pub fn response_became_uncacheable(&self, reason: NoCacheReason) { + if let Some(predictor) = self.inner().predictor { + predictor.mark_uncacheable(self.cache_key(), reason); + } + } +} + +/// Set the header compression dictionary that help serialize http header. +/// +/// Return false if it is already set. +pub fn set_compression_dict_path(path: &str) -> bool { + crate::meta::COMPRESSION_DICT_PATH + .set(path.to_string()) + .is_ok() +} diff --git a/pingora-cache/src/lock.rs b/pingora-cache/src/lock.rs new file mode 100644 index 0000000..c5e3c31 --- /dev/null +++ b/pingora-cache/src/lock.rs @@ -0,0 +1,336 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache lock + +use crate::key::CacheHashKey; + +use crate::hashtable::ConcurrentHashTable; +use pingora_timeout::timeout; +use std::sync::Arc; + +const N_SHARDS: usize = 16; + +/// The global cache locking manager +pub struct CacheLock { + lock_table: ConcurrentHashTable<LockStub, N_SHARDS>, + timeout: Duration, // fixed timeout value for now +} + +/// A struct prepresenting a locked cache access +#[derive(Debug)] +pub enum Locked { + /// The writer is allowed to fetch the asset + Write(WritePermit), + /// The reader waits for the writer to fetch the asset + Read(ReadLock), +} + +impl Locked { + /// Is this a write lock + pub fn is_write(&self) -> bool { + matches!(self, Self::Write(_)) + } +} + +impl CacheLock { + /// Create a new [CacheLock] with the given lock timeout + /// + /// When the timeout is reached, the read locks are automatically unlocked + pub fn new(timeout: Duration) -> Self { + CacheLock { + lock_table: ConcurrentHashTable::new(), + timeout, + } + } + + /// Try to lock a cache fetch + /// + /// Users should call after a cache miss before fetching the asset. + /// The returned [Locked] will tell the caller either to fetch or wait. + pub fn lock<K: CacheHashKey>(&self, key: &K) -> Locked { + let hash = key.combined_bin(); + let key = u128::from_be_bytes(hash); // endianness doesn't matter + let table = self.lock_table.get(key); + if let Some(lock) = table.read().get(&key) { + // already has an ongoing request + if lock.0.lock_status() != LockStatus::Dangling { + return Locked::Read(lock.read_lock()); + } + // Dangling: the previous writer quit without unlocking the lock. Requests should + // compete for the write lock again. + } + + let (permit, stub) = WritePermit::new(self.timeout); + let mut table = table.write(); + // check again in case another request already added it + if let Some(lock) = table.get(&key) { + if lock.0.lock_status() != LockStatus::Dangling { + return Locked::Read(lock.read_lock()); + } + } + table.insert(key, stub); + Locked::Write(permit) + } + + /// Release a lock for the given key + /// + /// When the write lock is dropped without being released, the read lock holders will consider + /// it to be failed so that they will compete for the write lock again. + pub fn release<K: CacheHashKey>(&self, key: &K, reason: LockStatus) { + let hash = key.combined_bin(); + let key = u128::from_be_bytes(hash); // endianness doesn't matter + if let Some(lock) = self.lock_table.write(key).remove(&key) { + // make sure that the caller didn't forget to unlock it + if lock.0.locked() { + lock.0.unlock(reason); + } + } + } +} + +use std::sync::atomic::{AtomicU8, Ordering}; +use std::time::{Duration, Instant}; +use tokio::sync::Semaphore; + +/// Status which the read locks could possibly see. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum LockStatus { + /// Waiting for the writer to populate the asset + Waiting, + /// The writer finishes, readers can start + Done, + /// The writer encountered error, such as network issue. A new writer will be elected. + TransientError, + /// The writer observed that no cache lock is needed (e.g., uncacheable), readers should start + /// to fetch independently without a new writer + GiveUp, + /// The write lock is dropped without being unlocked + Dangling, + /// The lock is held for too long + Timeout, +} + +impl From<LockStatus> for u8 { + fn from(l: LockStatus) -> u8 { + match l { + LockStatus::Waiting => 0, + LockStatus::Done => 1, + LockStatus::TransientError => 2, + LockStatus::GiveUp => 3, + LockStatus::Dangling => 4, + LockStatus::Timeout => 5, + } + } +} + +impl From<u8> for LockStatus { + fn from(v: u8) -> Self { + match v { + 0 => Self::Waiting, + 1 => Self::Done, + 2 => Self::TransientError, + 3 => Self::GiveUp, + 4 => Self::Dangling, + 5 => Self::Timeout, + _ => Self::GiveUp, // placeholder + } + } +} + +#[derive(Debug)] +struct LockCore { + pub lock_start: Instant, + pub timeout: Duration, + pub(super) lock: Semaphore, + // use u8 for Atomic enum + lock_status: AtomicU8, +} + +impl LockCore { + pub fn new_arc(timeout: Duration) -> Arc<Self> { + Arc::new(LockCore { + lock: Semaphore::new(0), + timeout, + lock_start: Instant::now(), + lock_status: AtomicU8::new(LockStatus::Waiting.into()), + }) + } + + fn locked(&self) -> bool { + self.lock.available_permits() == 0 + } + + fn unlock(&self, reason: LockStatus) { + self.lock_status.store(reason.into(), Ordering::SeqCst); + // any small positive number will do, 10 is used for RwLock too + // no need to wake up all at once + self.lock.add_permits(10); + } + + fn lock_status(&self) -> LockStatus { + self.lock_status.load(Ordering::Relaxed).into() + } +} + +// all 3 structs below are just Arc<LockCore> with different interfaces + +/// ReadLock: requests who get it need to wait until it is released +#[derive(Debug)] +pub struct ReadLock(Arc<LockCore>); + +impl ReadLock { + /// Wait for the writer to release the lock + pub async fn wait(&self) { + if !self.locked() || self.expired() { + return; + } + + // TODO: should subtract now - start so that the lock don't wait beyond start + timeout + // Also need to be careful not to wake everyone up at the same time + // (maybe not an issue because regular cache lock release behaves that way) + let _ = timeout(self.0.timeout, self.0.lock.acquire()).await; + // permit is returned to Semaphore right away + } + + /// Test if it is still locked + pub fn locked(&self) -> bool { + self.0.locked() + } + + /// Whether the lock is expired, e.g., the writer has been holding the lock for too long + pub fn expired(&self) -> bool { + // NOTE: this whether the lock is currently expired + // not whether it was timed out during wait() + self.0.lock_start.elapsed() >= self.0.timeout + } + + /// The current status of the lock + pub fn lock_status(&self) -> LockStatus { + let status = self.0.lock_status(); + if matches!(status, LockStatus::Waiting) && self.expired() { + LockStatus::Timeout + } else { + status + } + } +} + +/// WritePermit: requires who get it need to populate the cache and then release it +#[derive(Debug)] +pub struct WritePermit(Arc<LockCore>); + +impl WritePermit { + fn new(timeout: Duration) -> (WritePermit, LockStub) { + let lock = LockCore::new_arc(timeout); + let stub = LockStub(lock.clone()); + (WritePermit(lock), stub) + } + + fn unlock(&self, reason: LockStatus) { + self.0.unlock(reason) + } +} + +impl Drop for WritePermit { + fn drop(&mut self) { + // writer exit without properly unlock, let others to compete for the write lock again + if self.0.locked() { + self.unlock(LockStatus::Dangling); + } + } +} + +struct LockStub(Arc<LockCore>); +impl LockStub { + pub fn read_lock(&self) -> ReadLock { + ReadLock(self.0.clone()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::CacheKey; + + #[test] + fn test_get_release() { + let cache_lock = CacheLock::new(Duration::from_secs(1000)); + let key1 = CacheKey::new("", "a", "1"); + let locked1 = cache_lock.lock(&key1); + assert!(locked1.is_write()); // write permit + let locked2 = cache_lock.lock(&key1); + assert!(!locked2.is_write()); // read lock + cache_lock.release(&key1, LockStatus::Done); + let locked3 = cache_lock.lock(&key1); + assert!(locked3.is_write()); // write permit again + } + + #[tokio::test] + async fn test_lock() { + let cache_lock = CacheLock::new(Duration::from_secs(1000)); + let key1 = CacheKey::new("", "a", "1"); + let permit = match cache_lock.lock(&key1) { + Locked::Write(w) => w, + _ => panic!(), + }; + let lock = match cache_lock.lock(&key1) { + Locked::Read(r) => r, + _ => panic!(), + }; + assert!(lock.locked()); + let handle = tokio::spawn(async move { + lock.wait().await; + assert_eq!(lock.lock_status(), LockStatus::Done); + }); + permit.unlock(LockStatus::Done); + handle.await.unwrap(); // check lock is unlocked and the task is returned + } + + #[tokio::test] + async fn test_lock_timeout() { + let cache_lock = CacheLock::new(Duration::from_secs(1)); + let key1 = CacheKey::new("", "a", "1"); + let permit = match cache_lock.lock(&key1) { + Locked::Write(w) => w, + _ => panic!(), + }; + let lock = match cache_lock.lock(&key1) { + Locked::Read(r) => r, + _ => panic!(), + }; + assert!(lock.locked()); + + let handle = tokio::spawn(async move { + // timed out + lock.wait().await; + assert_eq!(lock.lock_status(), LockStatus::Timeout); + }); + + tokio::time::sleep(Duration::from_secs(2)).await; + + // expired lock + let lock2 = match cache_lock.lock(&key1) { + Locked::Read(r) => r, + _ => panic!(), + }; + assert!(lock2.locked()); + assert_eq!(lock2.lock_status(), LockStatus::Timeout); + lock2.wait().await; + assert_eq!(lock2.lock_status(), LockStatus::Timeout); + + permit.unlock(LockStatus::Done); + handle.await.unwrap(); + } +} diff --git a/pingora-cache/src/max_file_size.rs b/pingora-cache/src/max_file_size.rs new file mode 100644 index 0000000..7d812f2 --- /dev/null +++ b/pingora-cache/src/max_file_size.rs @@ -0,0 +1,75 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Set limit on the largest size to cache + +use crate::storage::HandleMiss; +use crate::MissHandler; +use async_trait::async_trait; +use bytes::Bytes; +use pingora_error::{Error, ErrorType}; + +/// [MaxFileSizeMissHandler] wraps a MissHandler to enforce a maximum asset size that should be +/// written to the MissHandler. +/// +/// This is used to enforce a maximum cache size for a request when the +/// response size is not known ahead of time (no Content-Length header). When the response size _is_ +/// known ahead of time, it should be checked up front (when calculating cacheability) for efficiency. +/// Note: for requests with partial read support (where downstream reads the response from cache as +/// it is filled), this will cause the request as a whole to fail. The response will be remembered +/// as uncacheable, though, so downstream will be able to retry the request, since the cache will be +/// disabled for the retried request. +pub struct MaxFileSizeMissHandler { + inner: MissHandler, + max_file_size_bytes: usize, + bytes_written: usize, +} + +impl MaxFileSizeMissHandler { + /// Create a new [MaxFileSizeMissHandler] wrapping the given [MissHandler] + pub fn new(inner: MissHandler, max_file_size_bytes: usize) -> MaxFileSizeMissHandler { + MaxFileSizeMissHandler { + inner, + max_file_size_bytes, + bytes_written: 0, + } + } +} + +/// Error type returned when the limit is reached. +pub const ERR_RESPONSE_TOO_LARGE: ErrorType = ErrorType::Custom("response too large"); + +#[async_trait] +impl HandleMiss for MaxFileSizeMissHandler { + async fn write_body(&mut self, data: Bytes, eof: bool) -> pingora_error::Result<()> { + // fail if writing the body would exceed the max_file_size_bytes + if self.bytes_written + data.len() > self.max_file_size_bytes { + return Error::e_explain( + ERR_RESPONSE_TOO_LARGE, + format!( + "writing data of size {} bytes would exceed max file size of {} bytes", + data.len(), + self.max_file_size_bytes + ), + ); + } + + self.bytes_written += data.len(); + self.inner.write_body(data, eof).await + } + + async fn finish(self: Box<Self>) -> pingora_error::Result<usize> { + self.inner.finish().await + } +} diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs new file mode 100644 index 0000000..679517d --- /dev/null +++ b/pingora-cache/src/memory.rs @@ -0,0 +1,510 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Hash map based in memory cache +//! +//! For testing only, not for production use + +//TODO: Mark this module #[test] only + +use super::*; +use crate::key::{CacheHashKey, CompactCacheKey}; +use crate::storage::{HandleHit, HandleMiss, Storage}; +use crate::trace::SpanHandle; + +use async_trait::async_trait; +use bytes::Bytes; +use parking_lot::RwLock; +use pingora_error::*; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::watch; + +type BinaryMeta = (Vec<u8>, Vec<u8>); + +pub(crate) struct CacheObject { + pub meta: BinaryMeta, + pub body: Arc<Vec<u8>>, +} + +pub(crate) struct TempObject { + pub meta: BinaryMeta, + // these are Arc because they need to continue exist after this TempObject is removed + pub body: Arc<RwLock<Vec<u8>>>, + bytes_written: Arc<watch::Sender<PartialState>>, // this should match body.len() +} + +impl TempObject { + fn new(meta: BinaryMeta) -> Self { + let (tx, _rx) = watch::channel(PartialState::Partial(0)); + TempObject { + meta, + body: Arc::new(RwLock::new(Vec::new())), + bytes_written: Arc::new(tx), + } + } + // this is not at all optimized + fn make_cache_object(&self) -> CacheObject { + let meta = self.meta.clone(); + let body = Arc::new(self.body.read().clone()); + CacheObject { meta, body } + } +} + +/// Hash map based in memory cache +/// +/// For testing only, not for production use. +pub struct MemCache { + pub(crate) cached: Arc<RwLock<HashMap<String, CacheObject>>>, + pub(crate) temp: Arc<RwLock<HashMap<String, TempObject>>>, +} + +impl MemCache { + /// Create a new [MemCache] + pub fn new() -> Self { + MemCache { + cached: Arc::new(RwLock::new(HashMap::new())), + temp: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub enum MemHitHandler { + Complete(CompleteHit), + Partial(PartialHit), +} + +#[derive(Copy, Clone)] +enum PartialState { + Partial(usize), + Complete(usize), +} + +pub struct CompleteHit { + body: Arc<Vec<u8>>, + done: bool, + range_start: usize, + range_end: usize, +} + +impl CompleteHit { + fn get(&mut self) -> Option<Bytes> { + if self.done { + None + } else { + self.done = true; + Some(Bytes::copy_from_slice( + &self.body.as_slice()[self.range_start..self.range_end], + )) + } + } + + fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> { + if start >= self.body.len() { + return Error::e_explain( + ErrorType::InternalError, + format!("seek start out of range {start} >= {}", self.body.len()), + ); + } + self.range_start = start; + if let Some(end) = end { + // end over the actual last byte is allowed, we just need to return the actual bytes + self.range_end = std::cmp::min(self.body.len(), end); + } + // seek resets read so that one handler can be used for multiple ranges + self.done = false; + Ok(()) + } +} + +pub struct PartialHit { + body: Arc<RwLock<Vec<u8>>>, + bytes_written: watch::Receiver<PartialState>, + bytes_read: usize, +} + +impl PartialHit { + async fn read(&mut self) -> Option<Bytes> { + loop { + let bytes_written = *self.bytes_written.borrow_and_update(); + let bytes_end = match bytes_written { + PartialState::Partial(s) => s, + PartialState::Complete(c) => { + // no more data will arrive + if c == self.bytes_read { + return None; + } + c + } + }; + assert!(bytes_end >= self.bytes_read); + + // more data avaliable to read + if bytes_end > self.bytes_read { + let new_bytes = + Bytes::copy_from_slice(&self.body.read()[self.bytes_read..bytes_end]); + self.bytes_read = bytes_end; + return Some(new_bytes); + } + + // wait for more data + if self.bytes_written.changed().await.is_err() { + // err: sender dropped, body is finished + // FIXME: sender could drop because of an error + return None; + } + } + } +} + +#[async_trait] +impl HandleHit for MemHitHandler { + async fn read_body(&mut self) -> Result<Option<Bytes>> { + match self { + Self::Complete(c) => Ok(c.get()), + Self::Partial(p) => Ok(p.read().await), + } + } + async fn finish( + self: Box<Self>, // because self is always used as a trait object + _storage: &'static (dyn storage::Storage + Sync), + _key: &CacheKey, + _trace: &SpanHandle, + ) -> Result<()> { + Ok(()) + } + + fn can_seek(&self) -> bool { + match self { + Self::Complete(_) => true, + Self::Partial(_) => false, // TODO: support seeking in partial reads + } + } + + fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> { + match self { + Self::Complete(c) => c.seek(start, end), + Self::Partial(_) => Error::e_explain( + ErrorType::InternalError, + "seek not supported for partial cache", + ), + } + } + + fn as_any(&self) -> &(dyn Any + Send + Sync) { + self + } +} + +pub struct MemMissHandler { + body: Arc<RwLock<Vec<u8>>>, + bytes_written: Arc<watch::Sender<PartialState>>, + // these are used only in finish() to to data from temp to cache + key: String, + cache: Arc<RwLock<HashMap<String, CacheObject>>>, + temp: Arc<RwLock<HashMap<String, TempObject>>>, +} + +#[async_trait] +impl HandleMiss for MemMissHandler { + async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()> { + let current_bytes = match *self.bytes_written.borrow() { + PartialState::Partial(p) => p, + PartialState::Complete(_) => panic!("already EOF"), + }; + self.body.write().extend_from_slice(&data); + let written = current_bytes + data.len(); + let new_state = if eof { + PartialState::Complete(written) + } else { + PartialState::Partial(written) + }; + self.bytes_written.send_replace(new_state); + Ok(()) + } + + async fn finish(self: Box<Self>) -> Result<usize> { + // safe, the temp object is inserted when the miss handler is created + let cache_object = self.temp.read().get(&self.key).unwrap().make_cache_object(); + let size = cache_object.body.len(); // FIXME: this just body size, also track meta size + self.cache.write().insert(self.key.clone(), cache_object); + self.temp.write().remove(&self.key); + Ok(size) + } +} + +impl Drop for MemMissHandler { + fn drop(&mut self) { + self.temp.write().remove(&self.key); + } +} + +#[async_trait] +impl Storage for MemCache { + async fn lookup( + &'static self, + key: &CacheKey, + _trace: &SpanHandle, + ) -> Result<Option<(CacheMeta, HitHandler)>> { + let hash = key.combined(); + // always prefer partial read otherwise fresh asset will not be visible on expired asset + // until it is fully updated + if let Some(temp_obj) = self.temp.read().get(&hash) { + let meta = CacheMeta::deserialize(&temp_obj.meta.0, &temp_obj.meta.1)?; + let partial = PartialHit { + body: temp_obj.body.clone(), + bytes_written: temp_obj.bytes_written.subscribe(), + bytes_read: 0, + }; + let hit_handler = MemHitHandler::Partial(partial); + Ok(Some((meta, Box::new(hit_handler)))) + } else if let Some(obj) = self.cached.read().get(&hash) { + let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?; + let hit_handler = CompleteHit { + body: obj.body.clone(), + done: false, + range_start: 0, + range_end: obj.body.len(), + }; + let hit_handler = MemHitHandler::Complete(hit_handler); + Ok(Some((meta, Box::new(hit_handler)))) + } else { + Ok(None) + } + } + + async fn get_miss_handler( + &'static self, + key: &CacheKey, + meta: &CacheMeta, + _trace: &SpanHandle, + ) -> Result<MissHandler> { + // TODO: support multiple concurrent writes or panic if the is already a writer + let hash = key.combined(); + let meta = meta.serialize()?; + let temp_obj = TempObject::new(meta); + let miss_handler = MemMissHandler { + body: temp_obj.body.clone(), + bytes_written: temp_obj.bytes_written.clone(), + key: hash.clone(), + cache: self.cached.clone(), + temp: self.temp.clone(), + }; + self.temp.write().insert(hash, temp_obj); + Ok(Box::new(miss_handler)) + } + + async fn purge(&'static self, key: &CompactCacheKey, _trace: &SpanHandle) -> Result<bool> { + // TODO: purge partial + + // This usually purges the primary key because, without a lookup, variance key is usually + // empty + let hash = key.combined(); + Ok(self.cached.write().remove(&hash).is_some()) + } + + async fn update_meta( + &'static self, + key: &CacheKey, + meta: &CacheMeta, + _trace: &SpanHandle, + ) -> Result<bool> { + let hash = key.combined(); + if let Some(obj) = self.cached.write().get_mut(&hash) { + obj.meta = meta.serialize()?; + Ok(true) + } else { + panic!("no meta found") + } + } + + fn support_streaming_partial_write(&self) -> bool { + true + } + + fn as_any(&self) -> &(dyn Any + Send + Sync) { + self + } +} + +#[cfg(test)] +mod test { + use super::*; + use once_cell::sync::Lazy; + use rustracing::span::Span; + + fn gen_meta() -> CacheMeta { + let mut header = ResponseHeader::build(200, None).unwrap(); + header.append_header("foo1", "bar1").unwrap(); + header.append_header("foo2", "bar2").unwrap(); + header.append_header("foo3", "bar3").unwrap(); + header.append_header("Server", "Pingora").unwrap(); + let internal = crate::meta::InternalMeta::default(); + CacheMeta(Box::new(crate::meta::CacheMetaInner { + internal, + header, + extensions: http::Extensions::new(), + })) + } + + #[tokio::test] + async fn test_write_then_read() { + static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new); + let span = &Span::inactive().handle(); + + let key1 = CacheKey::new("", "a", "1"); + let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); + assert!(res.is_none()); + + let cache_meta = gen_meta(); + + let mut miss_handler = MEM_CACHE + .get_miss_handler(&key1, &cache_meta, span) + .await + .unwrap(); + miss_handler + .write_body(b"test1"[..].into(), false) + .await + .unwrap(); + miss_handler + .write_body(b"test2"[..].into(), false) + .await + .unwrap(); + miss_handler.finish().await.unwrap(); + + let (cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); + assert_eq!( + cache_meta.0.internal.fresh_until, + cache_meta2.0.internal.fresh_until + ); + + let data = hit_handler.read_body().await.unwrap().unwrap(); + assert_eq!("test1test2", data); + let data = hit_handler.read_body().await.unwrap(); + assert!(data.is_none()); + } + + #[tokio::test] + async fn test_read_range() { + static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new); + let span = &Span::inactive().handle(); + + let key1 = CacheKey::new("", "a", "1"); + let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); + assert!(res.is_none()); + + let cache_meta = gen_meta(); + + let mut miss_handler = MEM_CACHE + .get_miss_handler(&key1, &cache_meta, span) + .await + .unwrap(); + miss_handler + .write_body(b"test1test2"[..].into(), false) + .await + .unwrap(); + miss_handler.finish().await.unwrap(); + + let (cache_meta2, mut hit_handler) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); + assert_eq!( + cache_meta.0.internal.fresh_until, + cache_meta2.0.internal.fresh_until + ); + + // out of range + assert!(hit_handler.seek(10000, None).is_err()); + + assert!(hit_handler.seek(5, None).is_ok()); + let data = hit_handler.read_body().await.unwrap().unwrap(); + assert_eq!("test2", data); + let data = hit_handler.read_body().await.unwrap(); + assert!(data.is_none()); + + assert!(hit_handler.seek(4, Some(5)).is_ok()); + let data = hit_handler.read_body().await.unwrap().unwrap(); + assert_eq!("1", data); + let data = hit_handler.read_body().await.unwrap(); + assert!(data.is_none()); + } + + #[tokio::test] + async fn test_write_while_read() { + use futures::FutureExt; + + static MEM_CACHE: Lazy<MemCache> = Lazy::new(MemCache::new); + let span = &Span::inactive().handle(); + + let key1 = CacheKey::new("", "a", "1"); + let res = MEM_CACHE.lookup(&key1, span).await.unwrap(); + assert!(res.is_none()); + + let cache_meta = gen_meta(); + + let mut miss_handler = MEM_CACHE + .get_miss_handler(&key1, &cache_meta, span) + .await + .unwrap(); + + // first reader + let (cache_meta1, mut hit_handler1) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); + assert_eq!( + cache_meta.0.internal.fresh_until, + cache_meta1.0.internal.fresh_until + ); + + // No body to read + let res = hit_handler1.read_body().now_or_never(); + assert!(res.is_none()); + + miss_handler + .write_body(b"test1"[..].into(), false) + .await + .unwrap(); + + let data = hit_handler1.read_body().await.unwrap().unwrap(); + assert_eq!("test1", data); + let res = hit_handler1.read_body().now_or_never(); + assert!(res.is_none()); + + miss_handler + .write_body(b"test2"[..].into(), false) + .await + .unwrap(); + let data = hit_handler1.read_body().await.unwrap().unwrap(); + assert_eq!("test2", data); + + // second reader + let (cache_meta2, mut hit_handler2) = MEM_CACHE.lookup(&key1, span).await.unwrap().unwrap(); + assert_eq!( + cache_meta.0.internal.fresh_until, + cache_meta2.0.internal.fresh_until + ); + + let data = hit_handler2.read_body().await.unwrap().unwrap(); + assert_eq!("test1test2", data); + let res = hit_handler2.read_body().now_or_never(); + assert!(res.is_none()); + + let res = hit_handler1.read_body().now_or_never(); + assert!(res.is_none()); + + miss_handler.finish().await.unwrap(); + + let data = hit_handler1.read_body().await.unwrap(); + assert!(data.is_none()); + let data = hit_handler2.read_body().await.unwrap(); + assert!(data.is_none()); + } +} diff --git a/pingora-cache/src/meta.rs b/pingora-cache/src/meta.rs new file mode 100644 index 0000000..a534dc0 --- /dev/null +++ b/pingora-cache/src/meta.rs @@ -0,0 +1,608 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Metadata for caching + +use http::Extensions; +use pingora_error::{Error, ErrorType::*, OrErr, Result}; +use pingora_http::{HMap, ResponseHeader}; +use serde::{Deserialize, Serialize}; +use std::time::{Duration, SystemTime}; + +use crate::key::HashBinary; + +pub(crate) type InternalMeta = internal_meta::InternalMetaLatest; +mod internal_meta { + use super::*; + + pub(crate) type InternalMetaLatest = InternalMetaV2; + + #[derive(Debug, Deserialize, Serialize, Clone)] + pub(crate) struct InternalMetaV0 { + pub(crate) fresh_until: SystemTime, + pub(crate) created: SystemTime, + pub(crate) stale_while_revalidate_sec: u32, + pub(crate) stale_if_error_sec: u32, + // Do not add more field + } + + impl InternalMetaV0 { + #[allow(dead_code)] + fn serialize(&self) -> Result<Vec<u8>> { + rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta") + } + + fn deserialize(buf: &[u8]) -> Result<Self> { + rmp_serde::decode::from_slice(buf) + .or_err(InternalError, "failed to decode cache meta v0") + } + } + + #[derive(Debug, Deserialize, Serialize, Clone)] + pub(crate) struct InternalMetaV1 { + pub(crate) version: u8, + pub(crate) fresh_until: SystemTime, + pub(crate) created: SystemTime, + pub(crate) stale_while_revalidate_sec: u32, + pub(crate) stale_if_error_sec: u32, + // Do not add more field + } + + impl InternalMetaV1 { + #[allow(dead_code)] + pub const VERSION: u8 = 1; + + #[allow(dead_code)] + pub fn serialize(&self) -> Result<Vec<u8>> { + assert_eq!(self.version, 1); + rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta") + } + + fn deserialize(buf: &[u8]) -> Result<Self> { + rmp_serde::decode::from_slice(buf) + .or_err(InternalError, "failed to decode cache meta v1") + } + } + + #[derive(Debug, Deserialize, Serialize, Clone)] + pub(crate) struct InternalMetaV2 { + pub(crate) version: u8, + pub(crate) fresh_until: SystemTime, + pub(crate) created: SystemTime, + pub(crate) updated: SystemTime, + pub(crate) stale_while_revalidate_sec: u32, + pub(crate) stale_if_error_sec: u32, + // Only the extended field to be added below. One field at a time. + // 1. serde default in order to accept an older version schema without the field existing + // 2. serde skip_serializing_if in order for software with only an older version of this + // schema to decode it + // After full releases, remove `skip_serializing_if` so that we can add the next extended field. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) variance: Option<HashBinary>, + } + + impl Default for InternalMetaV2 { + fn default() -> Self { + let epoch = SystemTime::UNIX_EPOCH; + InternalMetaV2 { + version: InternalMetaV2::VERSION, + fresh_until: epoch, + created: epoch, + updated: epoch, + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + variance: None, + } + } + } + + impl InternalMetaV2 { + pub const VERSION: u8 = 2; + + pub fn serialize(&self) -> Result<Vec<u8>> { + assert_eq!(self.version, Self::VERSION); + rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta") + } + + fn deserialize(buf: &[u8]) -> Result<Self> { + rmp_serde::decode::from_slice(buf) + .or_err(InternalError, "failed to decode cache meta v2") + } + } + + impl From<InternalMetaV0> for InternalMetaV2 { + fn from(v0: InternalMetaV0) -> Self { + InternalMetaV2 { + version: InternalMetaV2::VERSION, + fresh_until: v0.fresh_until, + created: v0.created, + updated: v0.created, + stale_while_revalidate_sec: v0.stale_while_revalidate_sec, + stale_if_error_sec: v0.stale_if_error_sec, + ..Default::default() + } + } + } + + impl From<InternalMetaV1> for InternalMetaV2 { + fn from(v1: InternalMetaV1) -> Self { + InternalMetaV2 { + version: InternalMetaV2::VERSION, + fresh_until: v1.fresh_until, + created: v1.created, + updated: v1.created, + stale_while_revalidate_sec: v1.stale_while_revalidate_sec, + stale_if_error_sec: v1.stale_if_error_sec, + ..Default::default() + } + } + } + + // cross version decode + pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> { + const MIN_SIZE: usize = 10; // a small number to read the first few bytes + if buf.len() < MIN_SIZE { + return Error::e_explain( + InternalError, + format!("Buf too short ({}) to be InternalMeta", buf.len()), + ); + } + let preread_buf = &mut &buf[..MIN_SIZE]; + // the struct is always packed as a fixed size array + match rmp::decode::read_array_len(preread_buf) + .or_err(InternalError, "failed to decode cache meta array size")? + { + // v0 has 4 items and no version number + 4 => Ok(InternalMetaV0::deserialize(buf)?.into()), + // other V should has version number encoded + _ => { + // rmp will encode version < 128 into a fixint (one byte), + // so we use read_pfix + let version = rmp::decode::read_pfix(preread_buf) + .or_err(InternalError, "failed to decode meta version")?; + match version { + 1 => Ok(InternalMetaV1::deserialize(buf)?.into()), + 2 => InternalMetaV2::deserialize(buf), + _ => Error::e_explain( + InternalError, + format!("Unknown InternalMeta version {version}"), + ), + } + } + } + } + + #[cfg(test)] + mod tests { + use super::*; + + #[test] + fn test_internal_meta_serde_v0() { + let meta = InternalMetaV0 { + fresh_until: SystemTime::now(), + created: SystemTime::now(), + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + }; + let binary = meta.serialize().unwrap(); + let meta2 = InternalMetaV0::deserialize(&binary).unwrap(); + assert_eq!(meta.fresh_until, meta2.fresh_until); + } + + #[test] + fn test_internal_meta_serde_v1() { + let meta = InternalMetaV1 { + version: InternalMetaV1::VERSION, + fresh_until: SystemTime::now(), + created: SystemTime::now(), + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + }; + let binary = meta.serialize().unwrap(); + let meta2 = InternalMetaV1::deserialize(&binary).unwrap(); + assert_eq!(meta.fresh_until, meta2.fresh_until); + } + + #[test] + fn test_internal_meta_serde_v2() { + let meta = InternalMetaV2::default(); + let binary = meta.serialize().unwrap(); + let meta2 = InternalMetaV2::deserialize(&binary).unwrap(); + assert_eq!(meta2.version, 2); + assert_eq!(meta.fresh_until, meta2.fresh_until); + assert_eq!(meta.created, meta2.created); + assert_eq!(meta.updated, meta2.updated); + } + + #[test] + fn test_internal_meta_serde_across_versions() { + let meta = InternalMetaV0 { + fresh_until: SystemTime::now(), + created: SystemTime::now(), + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + }; + let binary = meta.serialize().unwrap(); + let meta2 = deserialize(&binary).unwrap(); + assert_eq!(meta2.version, 2); + assert_eq!(meta.fresh_until, meta2.fresh_until); + + let meta = InternalMetaV1 { + version: 1, + fresh_until: SystemTime::now(), + created: SystemTime::now(), + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + }; + let binary = meta.serialize().unwrap(); + let meta2 = deserialize(&binary).unwrap(); + assert_eq!(meta2.version, 2); + assert_eq!(meta.fresh_until, meta2.fresh_until); + // `updated` == `created` when upgrading to v2 + assert_eq!(meta2.created, meta2.updated); + } + + #[test] + fn test_internal_meta_serde_v2_extend_fields() { + // make sure that v2 format is backward compatible + // this is the base version of v2 without any extended fields + #[derive(Deserialize, Serialize)] + pub(crate) struct InternalMetaV2Base { + pub(crate) version: u8, + pub(crate) fresh_until: SystemTime, + pub(crate) created: SystemTime, + pub(crate) updated: SystemTime, + pub(crate) stale_while_revalidate_sec: u32, + pub(crate) stale_if_error_sec: u32, + } + + impl InternalMetaV2Base { + pub const VERSION: u8 = 2; + pub fn serialize(&self) -> Result<Vec<u8>> { + assert!(self.version >= Self::VERSION); + rmp_serde::encode::to_vec(self) + .or_err(InternalError, "failed to encode cache meta") + } + fn deserialize(buf: &[u8]) -> Result<Self> { + rmp_serde::decode::from_slice(buf) + .or_err(InternalError, "failed to decode cache meta v2") + } + } + + // ext V2 to base v2 + let meta = InternalMetaV2::default(); + let binary = meta.serialize().unwrap(); + let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap(); + assert_eq!(meta2.version, 2); + assert_eq!(meta.fresh_until, meta2.fresh_until); + assert_eq!(meta.created, meta2.created); + assert_eq!(meta.updated, meta2.updated); + + // base V2 to ext v2 + let now = SystemTime::now(); + let meta = InternalMetaV2Base { + version: InternalMetaV2::VERSION, + fresh_until: now, + created: now, + updated: now, + stale_while_revalidate_sec: 0, + stale_if_error_sec: 0, + }; + let binary = meta.serialize().unwrap(); + let meta2 = InternalMetaV2::deserialize(&binary).unwrap(); + assert_eq!(meta2.version, 2); + assert_eq!(meta.fresh_until, meta2.fresh_until); + assert_eq!(meta.created, meta2.created); + assert_eq!(meta.updated, meta2.updated); + } + } +} + +#[derive(Debug)] +pub(crate) struct CacheMetaInner { + // http header and Internal meta have different ways of serialization, so keep them separated + pub(crate) internal: InternalMeta, + pub(crate) header: ResponseHeader, + /// An opaque type map to hold extra information for communication between cache backends + /// and users. This field is **not** garanteed be persistently stored in the cache backend. + pub extensions: Extensions, +} + +/// The cacheable response header and cache metadata +#[derive(Debug)] +pub struct CacheMeta(pub(crate) Box<CacheMetaInner>); + +impl CacheMeta { + /// Create a [CacheMeta] from the given metadata and the response header + pub fn new( + fresh_until: SystemTime, + created: SystemTime, + stale_while_revalidate_sec: u32, + stale_if_error_sec: u32, + header: ResponseHeader, + ) -> CacheMeta { + CacheMeta(Box::new(CacheMetaInner { + internal: InternalMeta { + version: InternalMeta::VERSION, + fresh_until, + created, + updated: created, // created == updated for new meta + stale_while_revalidate_sec, + stale_if_error_sec, + ..Default::default() + }, + header, + extensions: Extensions::new(), + })) + } + + /// When the asset was created/admitted to cache + pub fn created(&self) -> SystemTime { + self.0.internal.created + } + + /// The last time the asset was revalidated + /// + /// This value will be the same as [Self::created()] if no revalidation ever happens + pub fn updated(&self) -> SystemTime { + self.0.internal.updated + } + + /// Is the asset still valid + pub fn is_fresh(&self, time: SystemTime) -> bool { + // NOTE: HTTP cache time resolution is second + self.0.internal.fresh_until >= time + } + + /// How long (in seconds) the asset should be fresh since its admission/revalidation + /// + /// This is essentially the max-age value (or its equivalence) + pub fn fresh_sec(&self) -> u64 { + // swallow `duration_since` error, assets that are always stale have earlier `fresh_until` than `created` + // practically speaking we can always treat these as 0 ttl + // XXX: return Error if `fresh_until` is much earlier than expected? + self.0 + .internal + .fresh_until + .duration_since(self.0.internal.updated) + .map_or(0, |duration| duration.as_secs()) + } + + /// Until when the asset is considered fresh + pub fn fresh_until(&self) -> SystemTime { + self.0.internal.fresh_until + } + + /// How old the asset is since its admission/revalidation + pub fn age(&self) -> Duration { + SystemTime::now() + .duration_since(self.updated()) + .unwrap_or_default() + } + + /// The stale-while-revalidate limit in seconds + pub fn stale_while_revalidate_sec(&self) -> u32 { + self.0.internal.stale_while_revalidate_sec + } + + /// The stale-if-error limit in seconds + pub fn stale_if_error_sec(&self) -> u32 { + self.0.internal.stale_if_error_sec + } + + /// Can the asset be used to serve stale during revalidation at the given time. + /// + /// NOTE: the serve stale functions do not check !is_fresh(time), + /// i.e. the object is already assumed to be stale. + pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool { + self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time) + } + + /// Can the asset be used to serve stale after error at the given time. + /// + /// NOTE: the serve stale functions do not check !is_fresh(time), + /// i.e. the object is already assumed to be stale. + pub fn serve_stale_if_error(&self, time: SystemTime) -> bool { + self.can_serve_stale(self.0.internal.stale_if_error_sec, time) + } + + /// Disable serve stale for this asset + pub fn disable_serve_stale(&mut self) { + self.0.internal.stale_if_error_sec = 0; + self.0.internal.stale_while_revalidate_sec = 0; + } + + /// Get the variance hash of this asset + pub fn variance(&self) -> Option<HashBinary> { + self.0.internal.variance + } + + /// Set the variance key of this asset + pub fn set_variance_key(&mut self, variance_key: HashBinary) { + self.0.internal.variance = Some(variance_key); + } + + /// Set the variance (hash) of this asset + pub fn set_variance(&mut self, variance: HashBinary) { + self.0.internal.variance = Some(variance) + } + + /// Removes the variance (hash) of this asset + pub fn remove_variance(&mut self) { + self.0.internal.variance = None + } + + /// Get the response header in this asset + pub fn response_header(&self) -> &ResponseHeader { + &self.0.header + } + + /// Modify the header in this asset + pub fn response_header_mut(&mut self) -> &mut ResponseHeader { + &mut self.0.header + } + + /// Expose the extensions to read + pub fn extensions(&self) -> &Extensions { + &self.0.extensions + } + + /// Expose the extensions to modify + pub fn extensions_mut(&mut self) -> &mut Extensions { + &mut self.0.extensions + } + + /// Get a copy of the response header + pub fn response_header_copy(&self) -> ResponseHeader { + self.0.header.clone() + } + + /// get all the headers of this asset + pub fn headers(&self) -> &HMap { + &self.0.header.headers + } + + fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool { + if serve_stale_sec == 0 { + return false; + } + if let Some(stale_until) = self + .0 + .internal + .fresh_until + .checked_add(Duration::from_secs(serve_stale_sec.into())) + { + stale_until >= time + } else { + // overflowed: treat as infinite ttl + true + } + } + + /// Serialize this object + pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> { + let internal = self.0.internal.serialize()?; + let header = header_serialize(&self.0.header)?; + Ok((internal, header)) + } + + /// Deserialize from the binary format + pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> { + let internal = internal_meta::deserialize(internal)?; + let header = header_deserialize(header)?; + Ok(CacheMeta(Box::new(CacheMetaInner { + internal, + header, + extensions: Extensions::new(), + }))) + } +} + +use http::StatusCode; + +/// The function to generate TTL from the given [StatusCode]. +pub type FreshSecByStatusFn = fn(StatusCode) -> Option<u32>; + +/// The default settings to generate [CacheMeta] +pub struct CacheMetaDefaults { + // if a status code is not included in fresh_sec, it's not considered cacheable by default. + fresh_sec_fn: FreshSecByStatusFn, + stale_while_revalidate_sec: u32, + // TODO: allow "error" condition to be configurable? + stale_if_error_sec: u32, +} + +impl CacheMetaDefaults { + /// Create a new [CacheMetaDefaults] + pub const fn new( + fresh_sec_fn: FreshSecByStatusFn, + stale_while_revalidate_sec: u32, + stale_if_error_sec: u32, + ) -> Self { + CacheMetaDefaults { + fresh_sec_fn, + stale_while_revalidate_sec, + stale_if_error_sec, + } + } + + /// Return the default TTL for the given [StatusCode] + /// + /// `None`: do no cache this code. + pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<u32> { + // safe guard to make sure 304 response to share the same default ttl of 200 + if resp_status == StatusCode::NOT_MODIFIED { + (self.fresh_sec_fn)(StatusCode::OK) + } else { + (self.fresh_sec_fn)(resp_status) + } + } + + /// The default SWR seconds + pub fn serve_stale_while_revalidate_sec(&self) -> u32 { + self.stale_while_revalidate_sec + } + + /// The default SIE seconds + pub fn serve_stale_if_error_sec(&self) -> u32 { + self.stale_if_error_sec + } +} + +use log::warn; +use once_cell::sync::{Lazy, OnceCell}; +use pingora_header_serde::HeaderSerde; +use std::fs::File; +use std::io::Read; + +/* load header compression engine and its' dictionary globally */ +pub(crate) static COMPRESSION_DICT_PATH: OnceCell<String> = OnceCell::new(); + +fn load_file(path: &String) -> Option<Vec<u8>> { + let mut file = File::open(path) + .map_err(|e| { + warn!( + "failed to open header compress dictionary file at {}, {:?}", + path, e + ); + e + }) + .ok()?; + let mut dict = Vec::new(); + file.read_to_end(&mut dict) + .map_err(|e| { + warn!( + "failed to read header compress dictionary file at {}, {:?}", + path, e + ); + e + }) + .ok()?; + + Some(dict) +} + +static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| { + let dict = COMPRESSION_DICT_PATH.get().and_then(load_file); + HeaderSerde::new(dict) +}); + +pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> { + HEADER_SERDE.serialize(header) +} + +pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> { + HEADER_SERDE.deserialize(buf.as_ref()) +} diff --git a/pingora-cache/src/predictor.rs b/pingora-cache/src/predictor.rs new file mode 100644 index 0000000..df8f374 --- /dev/null +++ b/pingora-cache/src/predictor.rs @@ -0,0 +1,228 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cacheability Predictor + +use crate::hashtable::{ConcurrentLruCache, LruShard}; + +pub type CustomReasonPredicate = fn(&'static str) -> bool; + +/// Cacheability Predictor +/// +/// Remembers previously uncacheable assets. +/// Allows bypassing cache / cache lock early based on historical precedent. +/// +/// NOTE: to simply avoid caching requests with certain characteristics, +/// add checks in request_cache_filter to avoid enabling cache in the first place. +/// The predictor's bypass mechanism handles cases where the request _looks_ cacheable +/// but its previous responses suggest otherwise. The request _could_ be cacheable in the future. +pub struct Predictor<const N_SHARDS: usize> { + uncacheable_keys: ConcurrentLruCache<(), N_SHARDS>, + skip_custom_reasons_fn: Option<CustomReasonPredicate>, +} + +use crate::{key::CacheHashKey, CacheKey, NoCacheReason}; +use log::debug; + +/// The cache predictor trait. +/// +/// This trait allows user defined predictor to replace [Predictor]. +pub trait CacheablePredictor { + /// Return true if likely cacheable, false if likely not. + fn cacheable_prediction(&self, key: &CacheKey) -> bool; + + /// Mark cacheable to allow next request to cache. + /// Returns false if the key was already marked cacheable. + fn mark_cacheable(&self, key: &CacheKey) -> bool; + + /// Mark uncacheable to actively bypass cache on the next request. + /// May skip marking on certain NoCacheReasons. + /// Returns None if we skipped marking uncacheable. + /// Returns Some(false) if the key was already marked uncacheable. + fn mark_uncacheable(&self, key: &CacheKey, reason: NoCacheReason) -> Option<bool>; +} + +// This particular bit of `where [LruShard...; N]: Default` nonsense arises from +// ConcurrentLruCache needing this trait bound, which in turns arises from the Rust +// compiler not being able to guarantee that all array sizes N implement `Default`. +// See https://github.com/rust-lang/rust/issues/61415 +impl<const N_SHARDS: usize> Predictor<N_SHARDS> +where + [LruShard<()>; N_SHARDS]: Default, +{ + /// Create a new Predictor with `N_SHARDS * shard_capacity` total capacity for + /// uncacheable cache keys. + /// + /// - `shard_capacity`: defines number of keys remembered as uncacheable per LRU shard. + /// - `skip_custom_reasons_fn`: an optional predicate used in `mark_uncacheable` + /// that can customize which `Custom` `NoCacheReason`s ought to be remembered as uncacheable. + /// If the predicate returns true, then the predictor will skip remembering the current + /// cache key as uncacheable (and avoid bypassing cache on the next request). + pub fn new( + shard_capacity: usize, + skip_custom_reasons_fn: Option<CustomReasonPredicate>, + ) -> Predictor<N_SHARDS> { + Predictor { + uncacheable_keys: ConcurrentLruCache::<(), N_SHARDS>::new(shard_capacity), + skip_custom_reasons_fn, + } + } +} + +impl<const N_SHARDS: usize> CacheablePredictor for Predictor<N_SHARDS> +where + [LruShard<()>; N_SHARDS]: Default, +{ + fn cacheable_prediction(&self, key: &CacheKey) -> bool { + // variance key is ignored because this check happens before cache lookup + let hash = key.primary_bin(); + let key = u128::from_be_bytes(hash); // Endianness doesn't matter + + // Note: LRU updated in mark_* functions only, + // as we assume the caller always updates the cacheability of the response later + !self.uncacheable_keys.read(key).contains(&key) + } + + fn mark_cacheable(&self, key: &CacheKey) -> bool { + // variance key is ignored because cacheable_prediction() is called before cache lookup + // where the variance key is unknown + let hash = key.primary_bin(); + let key = u128::from_be_bytes(hash); + + let cache = self.uncacheable_keys.get(key); + if !cache.read().contains(&key) { + // not in uncacheable list, nothing to do + return true; + } + + let mut cache = cache.write(); + cache.pop(&key); + debug!("bypassed request became cacheable"); + false + } + + fn mark_uncacheable(&self, key: &CacheKey, reason: NoCacheReason) -> Option<bool> { + // only mark as uncacheable for the future on certain reasons, + // (e.g. InternalErrors) + use NoCacheReason::*; + match reason { + // CacheLockGiveUp: the writer will set OriginNotCache (if applicable) + // readers don't need to do it + NeverEnabled | StorageError | InternalError | Deferred | CacheLockGiveUp + | CacheLockTimeout => { + return None; + } + // Skip certain NoCacheReason::Custom according to user + Custom(reason) if self.skip_custom_reasons_fn.map_or(false, |f| f(reason)) => { + return None; + } + Custom(_) | OriginNotCache | ResponseTooLarge => { /* mark uncacheable for these only */ + } + } + + // variance key is ignored because cacheable_prediction() is called before cache lookup + // where the variance key is unknown + let hash = key.primary_bin(); + let key = u128::from_be_bytes(hash); + + let mut cache = self.uncacheable_keys.get(key).write(); + // put() returns Some(old_value) if the key existed, else None + let new_key = cache.put(key, ()).is_none(); + if new_key { + debug!("request marked uncacheable"); + } + Some(new_key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_mark_cacheability() { + let predictor = Predictor::<1>::new(10, None); + let key = CacheKey::new("a", "b", "c"); + // cacheable if no history + assert!(predictor.cacheable_prediction(&key)); + + // don't remember internal / storage errors + predictor.mark_uncacheable(&key, NoCacheReason::InternalError); + assert!(predictor.cacheable_prediction(&key)); + predictor.mark_uncacheable(&key, NoCacheReason::StorageError); + assert!(predictor.cacheable_prediction(&key)); + + // origin explicitly said uncacheable + predictor.mark_uncacheable(&key, NoCacheReason::OriginNotCache); + assert!(!predictor.cacheable_prediction(&key)); + + // mark cacheable again + predictor.mark_cacheable(&key); + assert!(predictor.cacheable_prediction(&key)); + } + + #[test] + fn test_custom_skip_predicate() { + let predictor = Predictor::<1>::new( + 10, + Some(|custom_reason| matches!(custom_reason, "Skipping")), + ); + let key = CacheKey::new("a", "b", "c"); + // cacheable if no history + assert!(predictor.cacheable_prediction(&key)); + + // custom predicate still uses default skip reasons + predictor.mark_uncacheable(&key, NoCacheReason::InternalError); + assert!(predictor.cacheable_prediction(&key)); + + // other custom reasons can still be marked uncacheable + predictor.mark_uncacheable(&key, NoCacheReason::Custom("DontCacheMe")); + assert!(!predictor.cacheable_prediction(&key)); + + let key = CacheKey::new("a", "c", "d"); + assert!(predictor.cacheable_prediction(&key)); + // specific custom reason is skipped + predictor.mark_uncacheable(&key, NoCacheReason::Custom("Skipping")); + assert!(predictor.cacheable_prediction(&key)); + } + + #[test] + fn test_mark_uncacheable_lru() { + let predictor = Predictor::<1>::new(3, None); + let key1 = CacheKey::new("a", "b", "c"); + predictor.mark_uncacheable(&key1, NoCacheReason::OriginNotCache); + assert!(!predictor.cacheable_prediction(&key1)); + + let key2 = CacheKey::new("a", "bc", "c"); + predictor.mark_uncacheable(&key2, NoCacheReason::OriginNotCache); + assert!(!predictor.cacheable_prediction(&key2)); + + let key3 = CacheKey::new("a", "cd", "c"); + predictor.mark_uncacheable(&key3, NoCacheReason::OriginNotCache); + assert!(!predictor.cacheable_prediction(&key3)); + + // promote / reinsert key1 + predictor.mark_uncacheable(&key1, NoCacheReason::OriginNotCache); + + let key4 = CacheKey::new("a", "de", "c"); + predictor.mark_uncacheable(&key4, NoCacheReason::OriginNotCache); + assert!(!predictor.cacheable_prediction(&key4)); + + // key 1 was recently used + assert!(!predictor.cacheable_prediction(&key1)); + // key 2 was evicted + assert!(predictor.cacheable_prediction(&key2)); + assert!(!predictor.cacheable_prediction(&key3)); + assert!(!predictor.cacheable_prediction(&key4)); + } +} diff --git a/pingora-cache/src/put.rs b/pingora-cache/src/put.rs new file mode 100644 index 0000000..c50cc2b --- /dev/null +++ b/pingora-cache/src/put.rs @@ -0,0 +1,754 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache Put module + +use crate::*; +use bytes::Bytes; +use http::header; +use pingora_core::protocols::http::{ + v1::common::header_value_content_length, HttpTask, ServerSession, +}; + +/// The interface to define cache put behavior +pub trait CachePut { + /// Return whether to cache the asset according to the given response header. + fn cacheable(&self, response: &ResponseHeader) -> RespCacheable { + let cc = cache_control::CacheControl::from_resp_headers(response); + filters::resp_cacheable(cc.as_ref(), response, false, Self::cache_defaults()) + } + + /// Return the [CacheMetaDefaults] + fn cache_defaults() -> &'static CacheMetaDefaults; +} + +use parse_response::ResponseParse; + +/// The cache put context +pub struct CachePutCtx<C: CachePut> { + cache_put: C, // the user defined cache put behavior + key: CacheKey, + storage: &'static (dyn storage::Storage + Sync), // static for now + eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + miss_handler: Option<MissHandler>, + max_file_size_bytes: Option<usize>, + meta: Option<CacheMeta>, + parser: ResponseParse, + // FIXME: cache put doesn't have cache lock but some storage cannot handle concurrent put + // to the same asset. + trace: trace::Span, +} + +impl<C: CachePut> CachePutCtx<C> { + /// Create a new [CachePutCtx] + pub fn new( + cache_put: C, + key: CacheKey, + storage: &'static (dyn storage::Storage + Sync), + eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + trace: trace::Span, + ) -> Self { + CachePutCtx { + cache_put, + key, + storage, + eviction, + miss_handler: None, + max_file_size_bytes: None, + meta: None, + parser: ResponseParse::new(), + trace, + } + } + + /// Set the max cacheable size limit + pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) { + self.max_file_size_bytes = Some(max_file_size_bytes); + } + + async fn put_header(&mut self, meta: CacheMeta) -> Result<()> { + let trace = self.trace.child("cache put header", |o| o.start()).handle(); + let miss_handler = self + .storage + .get_miss_handler(&self.key, &meta, &trace) + .await?; + self.miss_handler = Some( + if let Some(max_file_size_bytes) = self.max_file_size_bytes { + Box::new(MaxFileSizeMissHandler::new( + miss_handler, + max_file_size_bytes, + )) + } else { + miss_handler + }, + ); + self.meta = Some(meta); + Ok(()) + } + + async fn put_body(&mut self, data: Bytes, eof: bool) -> Result<()> { + let miss_handler = self.miss_handler.as_mut().unwrap(); + miss_handler.write_body(data, eof).await + } + + async fn finish(&mut self) -> Result<()> { + let Some(miss_handler) = self.miss_handler.take() else { + // no miss_handler, uncacheable + return Ok(()); + }; + let size = miss_handler.finish().await?; + if let Some(eviction) = self.eviction.as_ref() { + let cache_key = self.key.to_compact(); + let meta = self.meta.as_ref().unwrap(); + let evicted = eviction.admit(cache_key, size, meta.0.internal.fresh_until); + // TODO: make this async + let trace = self + .trace + .child("cache put eviction", |o| o.start()) + .handle(); + for item in evicted { + // TODO: warn/log the error + let _ = self.storage.purge(&item, &trace).await; + } + } + + Ok(()) + } + + async fn do_cache_put(&mut self, data: &[u8]) -> Result<Option<NoCacheReason>> { + let tasks = self.parser.inject_data(data)?; + for task in tasks { + match task { + HttpTask::Header(header, _eos) => match self.cache_put.cacheable(&header) { + RespCacheable::Cacheable(meta) => { + if let Some(max_file_size_bytes) = self.max_file_size_bytes { + let content_length_hdr = header.headers.get(header::CONTENT_LENGTH); + if let Some(content_length) = + header_value_content_length(content_length_hdr) + { + if content_length > max_file_size_bytes { + return Ok(Some(NoCacheReason::ResponseTooLarge)); + } + } + } + + self.put_header(meta).await?; + } + RespCacheable::Uncacheable(reason) => { + return Ok(Some(reason)); + } + }, + HttpTask::Body(data, eos) => { + if let Some(data) = data { + self.put_body(data, eos).await?; + } + } + _ => { + panic!("unexpected HttpTask during cache put {task:?}"); + } + } + } + Ok(None) + } + + /// Start the cache put logic for the given request + /// + /// This function will start to read the request body to put into cache. + /// Return: + /// - `Ok(None)` when the payload will be cache. + /// - `Ok(Some(reason))` when the payload is not cacheable + pub async fn cache_put( + &mut self, + session: &mut ServerSession, + ) -> Result<Option<NoCacheReason>> { + let mut no_cache_reason = None; + while let Some(data) = session.read_request_body().await? { + if no_cache_reason.is_some() { + // even uncacheable, the entire body needs to be drains for 1. downstream + // not throwing errors 2. connection reuse + continue; + } + no_cache_reason = self.do_cache_put(&data).await? + } + self.parser.finish()?; + self.finish().await?; + Ok(no_cache_reason) + } +} + +#[cfg(test)] +mod test { + use super::*; + use once_cell::sync::Lazy; + use rustracing::span::Span; + + struct TestCachePut(); + impl CachePut for TestCachePut { + fn cache_defaults() -> &'static CacheMetaDefaults { + const DEFAULT: CacheMetaDefaults = CacheMetaDefaults::new(|_| Some(1), 1, 1); + &DEFAULT + } + } + + type TestCachePutCtx = CachePutCtx<TestCachePut>; + static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new); + + #[tokio::test] + async fn test_cache_put() { + let key = CacheKey::new("", "a", "1"); + let span = Span::inactive(); + let put = TestCachePut(); + let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span); + let payload = b"HTTP/1.1 200 OK\r\n\ + Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\ + Content-Type: text/html; charset=utf-8\r\n\ + Connection: keep-alive\r\n\ + X-Frame-Options: SAMEORIGIN\r\n\ + Cache-Control: public, max-age=1\r\n\ + Server: origin-server\r\n\ + Content-Length: 4\r\n\r\nrust"; + // here we skip mocking a real http session for simplicity + let res = ctx.do_cache_put(payload).await.unwrap(); + assert!(res.is_none()); // cacheable + ctx.parser.finish().unwrap(); + ctx.finish().await.unwrap(); + + let span = Span::inactive(); + let (meta, mut hit) = CACHE_BACKEND + .lookup(&key, &span.handle()) + .await + .unwrap() + .unwrap(); + assert_eq!( + meta.headers().get("date").unwrap(), + "Thu, 26 Apr 2018 05:42:05 GMT" + ); + let data = hit.read_body().await.unwrap().unwrap(); + assert_eq!(data, "rust"); + } + + #[tokio::test] + async fn test_cache_put_uncacheable() { + let key = CacheKey::new("", "a", "1"); + let span = Span::inactive(); + let put = TestCachePut(); + let mut ctx = TestCachePutCtx::new(put, key.clone(), &*CACHE_BACKEND, None, span); + let payload = b"HTTP/1.1 200 OK\r\n\ + Date: Thu, 26 Apr 2018 05:42:05 GMT\r\n\ + Content-Type: text/html; charset=utf-8\r\n\ + Connection: keep-alive\r\n\ + X-Frame-Options: SAMEORIGIN\r\n\ + Cache-Control: no-store\r\n\ + Server: origin-server\r\n\ + Content-Length: 4\r\n\r\nrust"; + // here we skip mocking a real http session for simplicity + let no_cache = ctx.do_cache_put(payload).await.unwrap().unwrap(); + assert_eq!(no_cache, NoCacheReason::OriginNotCache); + ctx.parser.finish().unwrap(); + ctx.finish().await.unwrap(); + } +} + +// maybe this can simplify some logic in pingora::h1 + +mod parse_response { + use super::*; + use bytes::{Bytes, BytesMut}; + use httparse::Status; + use pingora_error::{ + Error, + ErrorType::{self, *}, + Result, + }; + use pingora_http::ResponseHeader; + + pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk"); + pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody"); + + const MAX_HEADERS: usize = 256; + const INIT_HEADER_BUF_SIZE: usize = 4096; + const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n + + #[derive(Debug, Clone, Copy)] + enum ParseState { + Init, + PartialHeader, + PartialBodyContentLength(usize, usize), + PartialChunkedBody(usize), + PartialHttp10Body(usize), + Done(usize), + Invalid(httparse::Error), + } + + impl ParseState { + fn is_done(&self) -> bool { + matches!(self, Self::Done(_)) + } + fn read_header(&self) -> bool { + matches!(self, Self::Init | Self::PartialHeader) + } + fn read_body(&self) -> bool { + matches!( + self, + Self::PartialBodyContentLength(..) + | Self::PartialChunkedBody(_) + | Self::PartialHttp10Body(_) + ) + } + } + + pub(super) struct ResponseParse { + state: ParseState, + buf: BytesMut, + header_bytes: Bytes, + } + + impl ResponseParse { + pub fn new() -> Self { + ResponseParse { + state: ParseState::Init, + buf: BytesMut::with_capacity(INIT_HEADER_BUF_SIZE), + header_bytes: Bytes::new(), + } + } + + pub fn inject_data(&mut self, data: &[u8]) -> Result<Vec<HttpTask>> { + self.put_data(data); + + let mut tasks = vec![]; + while !self.state.is_done() { + if self.state.read_header() { + let header = self.parse_header()?; + let Some(header) = header else { + break; + }; + tasks.push(HttpTask::Header(Box::new(header), self.state.is_done())); + } else if self.state.read_body() { + let body = self.parse_body()?; + let Some(body) = body else { + break; + }; + tasks.push(HttpTask::Body(Some(body), self.state.is_done())); + } else { + break; + } + } + Ok(tasks) + } + + fn put_data(&mut self, data: &[u8]) { + use ParseState::*; + if matches!(self.state, Done(_) | Invalid(_)) { + panic!("Wrong phase {:?}", self.state); + } + self.buf.extend_from_slice(data); + } + + fn parse_header(&mut self) -> Result<Option<ResponseHeader>> { + let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS]; + let mut resp = httparse::Response::new(&mut headers); + let mut parser = httparse::ParserConfig::default(); + parser.allow_spaces_after_header_name_in_responses(true); + parser.allow_obsolete_multiline_headers_in_responses(true); + + let res = parser.parse_response(&mut resp, &self.buf); + let res = match res { + Ok(res) => res, + Err(e) => { + self.state = ParseState::Invalid(e); + return Error::e_because( + InvalidHTTPHeader, + format!("buf: {:?}", String::from_utf8_lossy(&self.buf)), + e, + ); + } + }; + + let split_to = match res { + Status::Complete(s) => s, + Status::Partial => { + self.state = ParseState::PartialHeader; + return Ok(None); + } + }; + // safe to unwrap, valid response always has code set. + let mut response = + ResponseHeader::build(resp.code.unwrap(), Some(resp.headers.len())).unwrap(); + for header in resp.headers { + // TODO: consider hold a Bytes and all header values can be Bytes referencing the + // original buffer without reallocation + response.append_header(header.name.to_owned(), header.value.to_owned())?; + } + // TODO: see above, we can make header value `Bytes` referencing header_bytes + let header_bytes = self.buf.split_to(split_to).freeze(); + self.header_bytes = header_bytes; + self.state = body_type(&response); + + Ok(Some(response)) + } + + fn parse_body(&mut self) -> Result<Option<Bytes>> { + use ParseState::*; + if self.buf.is_empty() { + return Ok(None); + } + match self.state { + Init | PartialHeader | Invalid(_) => { + panic!("Wrong phase {:?}", self.state); + } + Done(_) => Ok(None), + PartialBodyContentLength(total, mut seen) => { + let end = if total < self.buf.len() + seen { + // TODO: warn! more data than expected + total - seen + } else { + self.buf.len() + }; + seen += end; + if seen >= total { + self.state = Done(seen); + } else { + self.state = PartialBodyContentLength(total, seen); + } + Ok(Some(self.buf.split_to(end).freeze())) + } + PartialChunkedBody(seen) => { + let parsed = httparse::parse_chunk_size(&self.buf).map_err(|e| { + self.state = Done(seen); + Error::explain(INVALID_CHUNK, format!("Invalid chucked encoding: {e:?}")) + })?; + match parsed { + httparse::Status::Complete((header_len, body_len)) => { + // 4\r\nRust\r\n: header: "4\r\n", body: "Rust", "\r\n" + let total_chunk_size = + header_len + body_len as usize + CHUNK_DELIMITER_SIZE; + if self.buf.len() < total_chunk_size { + // wait for the full chunk tob read + // Note that we have to buffer the entire chunk in this design + Ok(None) + } else { + if body_len == 0 { + self.state = Done(seen); + } else { + self.state = PartialChunkedBody(seen + body_len as usize); + } + let mut chunk_bytes = self.buf.split_to(total_chunk_size); + let mut chunk_body = chunk_bytes.split_off(header_len); + chunk_body.truncate(body_len as usize); + // Note that the final 0 sized chunk will return an empty Bytes + // instead of not None + Ok(Some(chunk_body.freeze())) + } + } + httparse::Status::Partial => { + // not even a full chunk, continue waiting for more data + Ok(None) + } + } + } + PartialHttp10Body(seen) => { + self.state = PartialHttp10Body(seen + self.buf.len()); + Ok(Some(self.buf.split().freeze())) + } + } + } + + pub fn finish(&mut self) -> Result<()> { + if let ParseState::PartialHttp10Body(seen) = self.state { + self.state = ParseState::Done(seen); + } + if !self.state.is_done() { + Error::e_explain(INCOMPLETE_BODY, format!("{:?}", self.state)) + } else { + Ok(()) + } + } + } + + fn body_type(resp: &ResponseHeader) -> ParseState { + use http::StatusCode; + + if matches!( + resp.status, + StatusCode::NO_CONTENT | StatusCode::NOT_MODIFIED + ) { + // these status code cannot have body by definition + return ParseState::Done(0); + } + if let Some(encoding) = resp.headers.get(http::header::TRANSFER_ENCODING) { + // TODO: case sensitive? + if encoding.as_bytes() == b"chunked" { + return ParseState::PartialChunkedBody(0); + } + } + if let Some(cl) = resp.headers.get(http::header::CONTENT_LENGTH) { + // ignore invalid header value + if let Some(cl) = std::str::from_utf8(cl.as_bytes()) + .ok() + .and_then(|cl| cl.parse::<usize>().ok()) + { + return if cl == 0 { + ParseState::Done(0) + } else { + ParseState::PartialBodyContentLength(cl, 0) + }; + } + } + ParseState::PartialHttp10Body(0) + } + + #[cfg(test)] + mod test { + use super::*; + + #[test] + fn test_basic_response() { + let input = b"HTTP/1.1 200 OK\r\n\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Header(header, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + assert!(!eos); + + let body = b"abc"; + let output = parser.inject_data(body).unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Body(data, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), &body[..]); + parser.finish().unwrap(); + } + + #[test] + fn test_partial_response_headers() { + let input = b"HTTP/1.1 200 OK\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + // header is not complete + assert_eq!(output.len(), 0); + + let output = parser + .inject_data("Server: pingora\r\n\r\n".as_bytes()) + .unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Header(header, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + assert_eq!(header.headers.get("Server").unwrap(), "pingora"); + assert!(!eos); + } + + #[test] + fn test_invalid_headers() { + let input = b"HTP/1.1 200 OK\r\nServer: pingora\r\n\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input); + // header is not complete + assert!(output.is_err()); + } + + #[test] + fn test_body_content_length() { + let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 2); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "abc"); + assert!(!eos); + + let output = parser.inject_data(b"def").unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Body(data, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "def"); + assert!(eos); + + parser.finish().unwrap(); + } + + #[test] + fn test_body_chunked() { + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 2); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "rust"); + assert!(!eos); + + let output = parser.inject_data(b"0\r\n\r\n").unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Body(data, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), ""); + assert!(eos); + + parser.finish().unwrap(); + } + + #[test] + fn test_body_content_length_early() { + let input = b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nabc"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 2); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "abc"); + assert!(!eos); + + parser.finish().unwrap_err(); + } + + #[test] + fn test_body_content_length_more_data() { + let input = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nabc"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 2); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "ab"); + assert!(eos); + + // extra data is dropped without error + parser.finish().unwrap(); + } + + #[test] + fn test_body_chunked_early() { + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 2); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "rust"); + assert!(!eos); + + parser.finish().unwrap_err(); + } + + #[test] + fn test_body_chunked_partial_chunk() { + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nru"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 1); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let output = parser.inject_data(b"st\r\n").unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Body(data, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "rust"); + assert!(!eos); + } + + #[test] + fn test_body_chunked_partial_chunk_head() { + let input = b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 1); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + + let output = parser.inject_data(b"\nrust\r\n").unwrap(); + assert_eq!(output.len(), 1); + let HttpTask::Body(data, eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "rust"); + assert!(!eos); + } + + #[test] + fn test_body_chunked_many_chunks() { + let input = + b"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n4\r\nrust\r\n1\r\ny\r\n"; + let mut parser = ResponseParse::new(); + let output = parser.inject_data(input).unwrap(); + + assert_eq!(output.len(), 3); + let HttpTask::Header(header, _eos) = &output[0] else { + panic!("{:?}", output); + }; + assert_eq!(header.status, 200); + let HttpTask::Body(data, eos) = &output[1] else { + panic!("{:?}", output); + }; + assert!(!eos); + assert_eq!(data.as_ref().unwrap(), "rust"); + let HttpTask::Body(data, eos) = &output[2] else { + panic!("{:?}", output); + }; + assert_eq!(data.as_ref().unwrap(), "y"); + assert!(!eos); + } + } +} diff --git a/pingora-cache/src/storage.rs b/pingora-cache/src/storage.rs new file mode 100644 index 0000000..c6365c7 --- /dev/null +++ b/pingora-cache/src/storage.rs @@ -0,0 +1,122 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Cache backend storage abstraction + +use super::{CacheKey, CacheMeta}; +use crate::key::CompactCacheKey; +use crate::trace::SpanHandle; + +use async_trait::async_trait; +use pingora_error::Result; +use std::any::Any; + +/// Cache storage interface +#[async_trait] +pub trait Storage { + // TODO: shouldn't have to be static + + /// Lookup the storage for the given [CacheKey] + async fn lookup( + &'static self, + key: &CacheKey, + trace: &SpanHandle, + ) -> Result<Option<(CacheMeta, HitHandler)>>; + + /// Write the given [CacheMeta] to the storage. Return [MissHandler] to write the body later. + async fn get_miss_handler( + &'static self, + key: &CacheKey, + meta: &CacheMeta, + trace: &SpanHandle, + ) -> Result<MissHandler>; + + /// Delete the cached asset for the given key + /// + /// [CompactCacheKey] is used here because it is how eviction managers store the keys + async fn purge(&'static self, key: &CompactCacheKey, trace: &SpanHandle) -> Result<bool>; + + /// Update cache header and metadata for the already stored asset. + async fn update_meta( + &'static self, + key: &CacheKey, + meta: &CacheMeta, + trace: &SpanHandle, + ) -> Result<bool>; + + /// Whether this storage backend supports reading partially written data + /// + /// This is to indicate when cache should unlock readers + fn support_streaming_partial_write(&self) -> bool { + false + } + + /// Helper function to cast the trait object to concrete types + fn as_any(&self) -> &(dyn Any + Send + Sync + 'static); +} + +/// Cache hit handling trait +#[async_trait] +pub trait HandleHit { + /// Read cached body + /// + /// Return `None` when no more body to read. + async fn read_body(&mut self) -> Result<Option<bytes::Bytes>>; + + /// Finish the current cache hit + async fn finish( + self: Box<Self>, // because self is always used as a trait object + storage: &'static (dyn Storage + Sync), + key: &CacheKey, + trace: &SpanHandle, + ) -> Result<()>; + + /// Whether this storage allow seeking to a certain range of body + fn can_seek(&self) -> bool { + false + } + + /// Try to seek to a certain range of the body + /// + /// `end: None` means to read to the end of the body. + fn seek(&mut self, _start: usize, _end: Option<usize>) -> Result<()> { + // to prevent impl can_seek() without impl seek + todo!("seek() needs to be implemented") + } + // TODO: fn is_stream_hit() + + /// Helper function to cast the trait object to concrete types + fn as_any(&self) -> &(dyn Any + Send + Sync); +} + +/// Hit Handler +pub type HitHandler = Box<(dyn HandleHit + Sync + Send)>; + +/// Cache miss handling trait +#[async_trait] +pub trait HandleMiss { + /// Write the given body to the storage + async fn write_body(&mut self, data: bytes::Bytes, eof: bool) -> Result<()>; + + /// Finish the cache admission + /// + /// When `self` is dropped without calling this function, the storage should consider this write + /// failed. + async fn finish( + self: Box<Self>, // because self is always used as a trait object + ) -> Result<usize>; +} + +/// Miss Handler +pub type MissHandler = Box<(dyn HandleMiss + Sync + Send)>; diff --git a/pingora-cache/src/trace.rs b/pingora-cache/src/trace.rs new file mode 100644 index 0000000..c385aea --- /dev/null +++ b/pingora-cache/src/trace.rs @@ -0,0 +1,98 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Distributed tracing helpers + +use rustracing_jaeger::span::SpanContextState; +use std::time::SystemTime; + +use crate::{CacheMeta, CachePhase, HitStatus}; + +pub use rustracing::tag::Tag; + +pub type Span = rustracing::span::Span<SpanContextState>; +pub type SpanHandle = rustracing::span::SpanHandle<SpanContextState>; + +#[derive(Debug)] +pub(crate) struct CacheTraceCTX { + // parent span + pub cache_span: Span, + // only spans across multiple calls need to store here + pub miss_span: Span, + pub hit_span: Span, +} + +impl CacheTraceCTX { + pub fn new() -> Self { + CacheTraceCTX { + cache_span: Span::inactive(), + miss_span: Span::inactive(), + hit_span: Span::inactive(), + } + } + + pub fn enable(&mut self, cache_span: Span) { + self.cache_span = cache_span; + } + + #[inline] + pub fn child(&self, name: &'static str) -> Span { + self.cache_span.child(name, |o| o.start()) + } + + pub fn start_miss_span(&mut self) { + self.miss_span = self.child("miss"); + } + + pub fn get_miss_span(&self) -> SpanHandle { + self.miss_span.handle() + } + + pub fn finish_miss_span(&mut self) { + self.miss_span.set_finish_time(SystemTime::now); + } + + pub fn start_hit_span(&mut self, phase: CachePhase, hit_status: HitStatus) { + self.hit_span = self.child("hit"); + self.hit_span.set_tag(|| Tag::new("phase", phase.as_str())); + self.hit_span + .set_tag(|| Tag::new("status", hit_status.as_str())); + } + + pub fn finish_hit_span(&mut self) { + self.hit_span.set_finish_time(SystemTime::now); + } + + pub fn log_meta(&mut self, meta: &CacheMeta) { + fn ts2epoch(ts: SystemTime) -> f64 { + ts.duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() // should never overflow but be safe here + .as_secs_f64() + } + let internal = &meta.0.internal; + self.hit_span.set_tags(|| { + [ + Tag::new("created", ts2epoch(internal.created)), + Tag::new("fresh_until", ts2epoch(internal.fresh_until)), + Tag::new("updated", ts2epoch(internal.updated)), + Tag::new("stale_if_error_sec", internal.stale_if_error_sec as i64), + Tag::new( + "stale_while_revalidate_sec", + internal.stale_while_revalidate_sec as i64, + ), + Tag::new("variance", internal.variance.is_some()), + ] + }); + } +} diff --git a/pingora-cache/src/variance.rs b/pingora-cache/src/variance.rs new file mode 100644 index 0000000..cce8160 --- /dev/null +++ b/pingora-cache/src/variance.rs @@ -0,0 +1,120 @@ +use std::{borrow::Cow, collections::BTreeMap}; + +use blake2::Digest; + +use crate::key::{Blake2b128, HashBinary}; + +/// A builder for variance keys, used for distinguishing multiple cached assets +/// at the same URL. This is intended to be easily passed to helper functions, +/// which can each populate a portion of the variance. +pub struct VarianceBuilder<'a> { + values: BTreeMap<Cow<'a, str>, Cow<'a, [u8]>>, +} + +impl<'a> VarianceBuilder<'a> { + /// Create an empty variance key. Has no variance by default - add some variance using + /// [`Self::add_value`]. + pub fn new() -> Self { + VarianceBuilder { + values: BTreeMap::new(), + } + } + + /// Add a byte string to the variance key. Not sensitive to insertion order. + /// `value` is intended to take either `&str` or `&[u8]`. + pub fn add_value(&mut self, name: &'a str, value: &'a (impl AsRef<[u8]> + ?Sized)) { + self.values + .insert(name.into(), Cow::Borrowed(value.as_ref())); + } + + /// Move a byte string to the variance key. Not sensitive to insertion order. Useful when + /// writing helper functions which generate a value then add said value to the VarianceBuilder. + /// Without this, the helper function would have to move the value to the calling function + /// to extend its lifetime to at least match the VarianceBuilder. + pub fn add_owned_value(&mut self, name: &'a str, value: Vec<u8>) { + self.values.insert(name.into(), Cow::Owned(value)); + } + + /// Check whether this variance key actually has variance, or just refers to the root asset + pub fn has_variance(&self) -> bool { + !self.values.is_empty() + } + + /// Hash this variance key. Returns [`None`] if [`Self::has_variance`] is false. + pub fn finalize(self) -> Option<HashBinary> { + const SALT: &[u8; 1] = &[0u8; 1]; + if self.has_variance() { + let mut hash = Blake2b128::new(); + for (name, value) in self.values.iter() { + hash.update(name.as_bytes()); + hash.update(SALT); + hash.update(value); + hash.update(SALT); + } + Some(hash.finalize().into()) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_basic() { + let key_empty = VarianceBuilder::new().finalize(); + assert_eq!(None, key_empty); + + let mut key_value = VarianceBuilder::new(); + key_value.add_value("a", "a"); + let key_value = key_value.finalize(); + + let mut key_owned_value = VarianceBuilder::new(); + key_owned_value.add_owned_value("a", "a".as_bytes().to_vec()); + let key_owned_value = key_owned_value.finalize(); + + assert_ne!(key_empty, key_value); + assert_ne!(key_empty, key_owned_value); + assert_eq!(key_value, key_owned_value); + } + + #[test] + fn test_value_ordering() { + let mut key_abc = VarianceBuilder::new(); + key_abc.add_value("a", "a"); + key_abc.add_value("b", "b"); + key_abc.add_value("c", "c"); + let key_abc = key_abc.finalize().unwrap(); + + let mut key_bac = VarianceBuilder::new(); + key_bac.add_value("b", "b"); + key_bac.add_value("a", "a"); + key_bac.add_value("c", "c"); + let key_bac = key_bac.finalize().unwrap(); + + let mut key_cba = VarianceBuilder::new(); + key_cba.add_value("c", "c"); + key_cba.add_value("b", "b"); + key_cba.add_value("a", "a"); + let key_cba = key_cba.finalize().unwrap(); + + assert_eq!(key_abc, key_bac); + assert_eq!(key_abc, key_cba); + } + + #[test] + fn test_value_overriding() { + let mut key_a = VarianceBuilder::new(); + key_a.add_value("a", "a"); + let key_a = key_a.finalize().unwrap(); + + let mut key_b = VarianceBuilder::new(); + key_b.add_value("a", "b"); + key_b.add_value("a", "a"); + let key_b = key_b.finalize().unwrap(); + + assert_eq!(key_a, key_b); + } +} |