diff options
author | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
commit | 8797329225018c4d0ab990166dd020338ae292dc (patch) | |
tree | 1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-cache/src/lib.rs | |
parent | 0bca116c1027a878469b72352e1e9e3916e85dde (diff) | |
download | pingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz pingora-8797329225018c4d0ab990166dd020338ae292dc.zip |
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]>
Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-cache/src/lib.rs')
-rw-r--r-- | pingora-cache/src/lib.rs | 1093 |
1 files changed, 1093 insertions, 0 deletions
diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs new file mode 100644 index 0000000..be352dc --- /dev/null +++ b/pingora-cache/src/lib.rs @@ -0,0 +1,1093 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The HTTP caching layer for proxies. + +#![allow(clippy::new_without_default)] + +use http::{method::Method, request::Parts as ReqHeader, response::Parts as RespHeader}; +use key::{CacheHashKey, HashBinary}; +use lock::WritePermit; +use pingora_error::Result; +use pingora_http::ResponseHeader; +use std::time::{Duration, SystemTime}; +use trace::CacheTraceCTX; + +pub mod cache_control; +pub mod eviction; +pub mod filters; +pub mod hashtable; +pub mod key; +pub mod lock; +pub mod max_file_size; +mod memory; +pub mod meta; +pub mod predictor; +pub mod put; +pub mod storage; +pub mod trace; +mod variance; + +use crate::max_file_size::MaxFileSizeMissHandler; +pub use key::CacheKey; +use lock::{CacheLock, LockStatus, Locked}; +pub use memory::MemCache; +pub use meta::{CacheMeta, CacheMetaDefaults}; +pub use storage::{HitHandler, MissHandler, Storage}; +pub use variance::VarianceBuilder; + +pub mod prelude {} + +/// The state machine for http caching +/// +/// This object is used to handle the state and transitions for HTTP caching through the life of a +/// request. +pub struct HttpCache { + phase: CachePhase, + // Box the rest so that a disabled HttpCache struct is small + inner: Option<Box<HttpCacheInner>>, +} + +/// This reflects the phase of HttpCache during the lifetime of a request +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum CachePhase { + /// Cache disabled, with reason (NeverEnabled if never explicitly used) + Disabled(NoCacheReason), + /// Cache enabled but nothing is set yet + Uninit, + /// Cache was enabled, the request decided not to use it + // HttpCache.inner is kept + Bypass, + /// Awaiting cache key to be generated + CacheKey, + /// Cache hit + Hit, + /// No cached asset is found + Miss, + /// A staled (expired) asset is found + Stale, + /// A staled (expired) asset was found, so a fresh one was fetched + Expired, + /// A staled (expired) asset was found, and it was revalidated to be fresh + Revalidated, + /// Revalidated, but deemed uncacheable so we do not freshen it + RevalidatedNoCache(NoCacheReason), +} + +impl CachePhase { + /// Convert [CachePhase] as `str`, for logging and debugging. + pub fn as_str(&self) -> &'static str { + match self { + CachePhase::Disabled(_) => "disabled", + CachePhase::Uninit => "uninitialized", + CachePhase::Bypass => "bypass", + CachePhase::CacheKey => "key", + CachePhase::Hit => "hit", + CachePhase::Miss => "miss", + CachePhase::Stale => "stale", + CachePhase::Expired => "expired", + CachePhase::Revalidated => "revalidated", + CachePhase::RevalidatedNoCache(_) => "revalidated-nocache", + } + } +} + +/// The possible reasons for not caching +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum NoCacheReason { + /// Caching is not enabled to begin with + NeverEnabled, + /// Origin directives indicated this was not cacheable + OriginNotCache, + /// Response size was larger than the cache's configured maximum asset size + ResponseTooLarge, + /// Due to internal caching storage error + StorageError, + /// Due to other type of internal issues + InternalError, + /// will be cacheable but skip cache admission now + /// + /// This happens when the cache predictor predicted that this request is not cacheable but + /// the response turns out to be OK to cache. However it might be too large to re-enable caching + /// for this request. + Deferred, + /// The writer of the cache lock sees that the request is not cacheable (Could be OriginNotCache) + CacheLockGiveUp, + /// This request waited too long for the writer of the cache lock to finish, so this request will + /// fetch from the origin without caching + CacheLockTimeout, + /// Other custom defined reasons + Custom(&'static str), +} + +impl NoCacheReason { + /// Convert [NoCacheReason] as `str`, for logging and debugging. + pub fn as_str(&self) -> &'static str { + use NoCacheReason::*; + match self { + NeverEnabled => "NeverEnabled", + OriginNotCache => "OriginNotCache", + ResponseTooLarge => "ResponseTooLarge", + StorageError => "StorageError", + InternalError => "InternalError", + Deferred => "Deferred", + CacheLockGiveUp => "CacheLockGiveUp", + CacheLockTimeout => "CacheLockTimeout", + Custom(s) => s, + } + } +} + +/// Response cacheable decision +/// +/// +#[derive(Debug)] +pub enum RespCacheable { + Cacheable(CacheMeta), + Uncacheable(NoCacheReason), +} + +impl RespCacheable { + /// Whether it is cacheable + #[inline] + pub fn is_cacheable(&self) -> bool { + matches!(*self, Self::Cacheable(_)) + } + + /// Unwrap [RespCacheable] to get the [CacheMeta] stored + /// # Panic + /// Panic when this object is not cacheable. Check [Self::is_cacheable()] first. + pub fn unwrap_meta(self) -> CacheMeta { + match self { + Self::Cacheable(meta) => meta, + Self::Uncacheable(_) => panic!("expected Cacheable value"), + } + } +} + +/// Freshness state of cache hit asset +/// +/// +#[derive(Debug, Copy, Clone)] +pub enum HitStatus { + Expired, + ForceExpired, + FailedHitFilter, + Fresh, +} + +impl HitStatus { + /// For displaying cache hit status + pub fn as_str(&self) -> &'static str { + match self { + Self::Expired => "expired", + Self::ForceExpired => "force_expired", + Self::FailedHitFilter => "failed_hit_filter", + Self::Fresh => "fresh", + } + } + + /// Whether cached asset can be served as fresh + pub fn is_fresh(&self) -> bool { + match self { + Self::Expired | Self::ForceExpired | Self::FailedHitFilter => false, + Self::Fresh => true, + } + } +} + +struct HttpCacheInner { + pub key: Option<CacheKey>, + pub meta: Option<CacheMeta>, + // when set, even if an asset exists, it would only be considered valid after this timestamp + pub valid_after: Option<SystemTime>, + // when set, an asset will be rejected from the cache if it exceeds this size in bytes + pub max_file_size_bytes: Option<usize>, + pub miss_handler: Option<MissHandler>, + pub body_reader: Option<HitHandler>, + pub storage: &'static (dyn storage::Storage + Sync), // static for now + pub eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + pub predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>, + pub lock: Option<Locked>, // TODO: these 3 fields should come in 1 sub struct + pub cache_lock: Option<&'static CacheLock>, + pub lock_duration: Option<Duration>, + pub traces: trace::CacheTraceCTX, +} + +impl HttpCache { + /// Create a new [HttpCache]. + /// + /// Caching is not enabled by default. + pub fn new() -> Self { + HttpCache { + phase: CachePhase::Disabled(NoCacheReason::NeverEnabled), + inner: None, + } + } + + /// Whether the cache is enabled + pub fn enabled(&self) -> bool { + !matches!(self.phase, CachePhase::Disabled(_) | CachePhase::Bypass) + } + + /// Whether the cache is being bypassed + pub fn bypassing(&self) -> bool { + matches!(self.phase, CachePhase::Bypass) + } + + /// Return the [CachePhase] + pub fn phase(&self) -> CachePhase { + self.phase + } + + /// Whether anything was fetched from the upstream + /// + /// This essentially checks all possible [CachePhase] who need to contact the upstream server + pub fn upstream_used(&self) -> bool { + use CachePhase::*; + match self.phase { + Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true, + Hit | Stale => false, + Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple + } + } + + /// Check whether the backend storage is the type `T`. + pub fn storage_type_is<T: 'static>(&self) -> bool { + self.inner + .as_ref() + .and_then(|inner| inner.storage.as_any().downcast_ref::<T>()) + .is_some() + } + + /// Disable caching + pub fn disable(&mut self, reason: NoCacheReason) { + use NoCacheReason::*; + match self.phase { + CachePhase::Disabled(_) => { + // replace reason + self.phase = CachePhase::Disabled(reason); + } + _ => { + self.phase = CachePhase::Disabled(reason); + if let Some(inner) = self.inner.as_mut() { + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + let lock_status = match reason { + // let next request try to fetch it + InternalError | StorageError | Deferred => LockStatus::TransientError, + // no need for the lock anymore + OriginNotCache | ResponseTooLarge => LockStatus::GiveUp, + // not sure which LockStatus make sense, we treat it as GiveUp for now + Custom(_) => LockStatus::GiveUp, + // should never happen, NeverEnabled shouldn't hold a lock + NeverEnabled => panic!("NeverEnabled holds a write lock"), + CacheLockGiveUp | CacheLockTimeout => { + panic!("CacheLock* are for cache lock readers only") + } + }; + inner + .cache_lock + .unwrap() + .release(inner.key.as_ref().unwrap(), lock_status); + } + } + // log initial disable reason + self.inner_mut() + .traces + .cache_span + .set_tag(|| trace::Tag::new("disable_reason", reason.as_str())); + self.inner = None; + } + } + } + + /* The following methods panic when they are used in the wrong phase. + * This is better than returning errors as such panics are only caused by coding error, which + * should be fixed right away. Tokio runtime only crashes the current task instead of the whole + * program when these panics happen. */ + + /// Set the cache to bypass + /// + /// # Panic + /// This call is only allowed in [CachePhase::CacheKey] phase (before any cache lookup is performed). + /// Use it in any other phase will lead to panic. + pub fn bypass(&mut self) { + match self.phase { + CachePhase::CacheKey => { + // before cache lookup / found / miss + self.phase = CachePhase::Bypass; + self.inner_mut() + .traces + .cache_span + .set_tag(|| trace::Tag::new("bypassed", true)); + } + _ => panic!("wrong phase to bypass HttpCache {:?}", self.phase), + } + } + + /// Enable the cache + /// + /// - `storage`: the cache storage backend that implements [storage::Storage] + /// - `eviction`: optionally the eviction mananger, without it, nothing will be evicted from the storage + /// - `predictor`: optionally a cache predictor. The cache predictor predicts whether something is likely + /// to be cacheable or not. This is useful because the proxy can apply different types of optimization to + /// cacheable and uncacheable requests. + /// - `cache_lock`: optionally a cache lock which handles concurrent lookups to the same asset. Without it + /// such lookups will all be allowed to fetch the asset independently. + pub fn enable( + &mut self, + storage: &'static (dyn storage::Storage + Sync), + eviction: Option<&'static (dyn eviction::EvictionManager + Sync)>, + predictor: Option<&'static (dyn predictor::CacheablePredictor + Sync)>, + cache_lock: Option<&'static CacheLock>, + ) { + match self.phase { + CachePhase::Disabled(_) => { + self.phase = CachePhase::Uninit; + self.inner = Some(Box::new(HttpCacheInner { + key: None, + meta: None, + valid_after: None, + max_file_size_bytes: None, + miss_handler: None, + body_reader: None, + storage, + eviction, + predictor, + lock: None, + cache_lock, + lock_duration: None, + traces: CacheTraceCTX::new(), + })); + } + _ => panic!("Cannot enable already enabled HttpCache {:?}", self.phase), + } + } + + // Enable distributed tracing + pub fn enable_tracing(&mut self, parent_span: trace::Span) { + if let Some(inner) = self.inner.as_mut() { + inner.traces.enable(parent_span); + } + } + + // Get the cache `miss` tracing span + pub fn get_miss_span(&mut self) -> Option<trace::SpanHandle> { + self.inner.as_mut().map(|i| i.traces.get_miss_span()) + } + + // shortcut to access inner, panic if phase is disabled + #[inline] + fn inner_mut(&mut self) -> &mut HttpCacheInner { + self.inner.as_mut().unwrap() + } + + #[inline] + fn inner(&self) -> &HttpCacheInner { + self.inner.as_ref().unwrap() + } + + /// Set the cache key + /// # Panic + /// Cache key is only allowed to be set in its own phase. Set it in other phases will cause panic. + pub fn set_cache_key(&mut self, key: CacheKey) { + match self.phase { + CachePhase::Uninit | CachePhase::CacheKey => { + self.phase = CachePhase::CacheKey; + self.inner_mut().key = Some(key); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the cache key used for asset lookup + /// # Panic + /// Can only be called after cache key is set and cache is not disabled. Panic otherwise. + pub fn cache_key(&self) -> &CacheKey { + match self.phase { + CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase), + _ => self.inner().key.as_ref().unwrap(), + } + } + + /// Return the max size allowed to be cached. + pub fn max_file_size_bytes(&self) -> Option<usize> { + match self.phase { + CachePhase::Disabled(_) | CachePhase::Uninit => panic!("wrong phase {:?}", self.phase), + _ => self.inner().max_file_size_bytes, + } + } + + /// Set the maximum response _body_ size in bytes that will be admitted to the cache. + /// + /// Response header size does not contribute to the max file size. + pub fn set_max_file_size_bytes(&mut self, max_file_size_bytes: usize) { + match self.phase { + CachePhase::Disabled(_) => panic!("wrong phase {:?}", self.phase), + _ => { + self.inner_mut().max_file_size_bytes = Some(max_file_size_bytes); + } + } + } + + /// Set that cache is found in cache storage. + /// + /// This function is called after [Self::cache_lookup()] which returns the [CacheMeta] and + /// [HitHandler]. + /// + /// The `hit_status` enum allows the caller to force expire assets. + pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) { + match self.phase { + // Stale allowed because of cache lock and then retry + CachePhase::CacheKey | CachePhase::Stale => { + self.phase = if hit_status.is_fresh() { + CachePhase::Hit + } else { + CachePhase::Stale + }; + let phase = self.phase; + let inner = self.inner_mut(); + let key = inner.key.as_ref().unwrap(); + if phase == CachePhase::Stale { + if let Some(lock) = inner.cache_lock.as_ref() { + inner.lock = Some(lock.lock(key)); + } + } + inner.traces.log_meta(&meta); + if let Some(eviction) = inner.eviction { + // TODO: make access() accept CacheKey + let cache_key = key.to_compact(); + // FIXME: get size + eviction.access(&cache_key, 0, meta.0.internal.fresh_until); + } + inner.traces.start_hit_span(phase, hit_status); + inner.meta = Some(meta); + inner.body_reader = Some(hit_handler); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Mark `self` to be cache miss. + /// + /// This function is called after [Self::cache_lookup()] finds nothing or the caller decides + /// not to use the assets found. + /// # Panic + /// Panic in other phases. + pub fn cache_miss(&mut self) { + match self.phase { + // from CacheKey: set state to miss during cache lookup + // from Bypass: response became cacheable, set state to miss to cache + CachePhase::CacheKey | CachePhase::Bypass => { + self.phase = CachePhase::Miss; + self.inner_mut().traces.start_miss_span(); + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [HitHandler] + /// # Panic + /// Call this after [Self::cache_found()], panic in other phases. + pub fn hit_handler(&mut self) -> &mut HitHandler { + match self.phase { + CachePhase::Hit + | CachePhase::Stale + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the body reader during a cache admission(miss/expired) which decouples the downstream + /// read and upstream cache write + pub fn miss_body_reader(&mut self) -> Option<&mut HitHandler> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => { + let inner = self.inner_mut(); + if inner.storage.support_streaming_partial_write() { + inner.body_reader.as_mut() + } else { + // body_reader could be set even when the storage doesn't support streaming + // Expired cache would have the reader set. + None + } + } + _ => None, + } + } + + /// Call this when cache hit is fully read. + /// + /// This call will release resource if any and log the timing in tracing if set. + /// # Panic + /// Panic in phases where there is no cache hit. + pub async fn finish_hit_handler(&mut self) -> Result<()> { + match self.phase { + CachePhase::Hit + | CachePhase::Miss + | CachePhase::Expired + | CachePhase::Stale + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => { + let inner = self.inner_mut(); + if inner.body_reader.is_none() { + // already finished, we allow calling this function more than once + return Ok(()); + } + let body_reader = inner.body_reader.take().unwrap(); + let key = inner.key.as_ref().unwrap(); + let result = body_reader + .finish(inner.storage, key, &inner.traces.hit_span.handle()) + .await; + inner.traces.finish_hit_span(); + result + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Set the [MissHandler] according to cache_key and meta, can only call once + pub async fn set_miss_handler(&mut self) -> Result<()> { + match self.phase { + // set_miss_handler() needs to be called after set_cache_meta() (which change Stale to Expire). + // This is an artificial rule to enforce the state transitions + CachePhase::Miss | CachePhase::Expired => { + let max_file_size_bytes = self.max_file_size_bytes(); + + let inner = self.inner_mut(); + if inner.miss_handler.is_some() { + panic!("write handler is already set") + } + let meta = inner.meta.as_ref().unwrap(); + let key = inner.key.as_ref().unwrap(); + let miss_handler = inner + .storage + .get_miss_handler(key, meta, &inner.traces.get_miss_span()) + .await?; + + inner.miss_handler = if let Some(max_size) = max_file_size_bytes { + Some(Box::new(MaxFileSizeMissHandler::new( + miss_handler, + max_size, + ))) + } else { + Some(miss_handler) + }; + + if inner.storage.support_streaming_partial_write() { + // If reader can access partial write, the cache lock can be released here + // to let readers start reading the body. + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + inner.cache_lock.unwrap().release(key, LockStatus::Done); + } + // Downstream read and upstream write can be decoupled + let body_reader = inner + .storage + .lookup(key, &inner.traces.get_miss_span()) + .await?; + + if let Some((_meta, body_reader)) = body_reader { + inner.body_reader = Some(body_reader); + } else { + // body_reader should exist now because streaming_partial_write is to support it + panic!("unable to get body_reader for {:?}", meta); + } + } + Ok(()) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [MissHandler] to write the response body to cache. + /// + /// `None`: the handler has not been set or already finished + pub fn miss_handler(&mut self) -> Option<&mut MissHandler> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => self.inner_mut().miss_handler.as_mut(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Finish cache admission + /// + /// If [self] is dropped without calling this, the cache admission is considered incomplete and + /// should be cleaned up. + /// + /// This call will also trigger eviction if set. + pub async fn finish_miss_handler(&mut self) -> Result<()> { + match self.phase { + CachePhase::Miss | CachePhase::Expired => { + let inner = self.inner_mut(); + if inner.miss_handler.is_none() { + // already finished, we allow calling this function more than once + return Ok(()); + } + let miss_handler = inner.miss_handler.take().unwrap(); + let size = miss_handler.finish().await?; + let lock = inner.lock.take(); + let key = inner.key.as_ref().unwrap(); + if let Some(Locked::Write(_r)) = lock { + // no need to call r.unlock() because release() will call it + // r is a guard to make sure the lock is unlocked when this request is dropped + inner.cache_lock.unwrap().release(key, LockStatus::Done); + } + if let Some(eviction) = inner.eviction { + let cache_key = key.to_compact(); + let meta = inner.meta.as_ref().unwrap(); + let evicted = eviction.admit(cache_key, size, meta.0.internal.fresh_until); + // TODO: make this async + let span = inner.traces.child("eviction"); + let handle = span.handle(); + for item in evicted { + // TODO: warn/log the error + let _ = inner.storage.purge(&item, &handle).await; + } + } + inner.traces.finish_miss_span(); + Ok(()) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Set the [CacheMeta] of the cache + pub fn set_cache_meta(&mut self, meta: CacheMeta) { + match self.phase { + // TODO: store the staled meta somewhere else for future use? + CachePhase::Stale | CachePhase::Miss => { + let inner = self.inner_mut(); + inner.traces.log_meta(&meta); + inner.meta = Some(meta); + } + _ => panic!("wrong phase {:?}", self.phase), + } + if self.phase == CachePhase::Stale { + self.phase = CachePhase::Expired; + } + } + + /// Set the [CacheMeta] of the cache after revalidation. + /// + /// Certain info such as the original cache admission time will be preserved. Others will + /// be replaced by the input `meta`. + pub async fn revalidate_cache_meta(&mut self, mut meta: CacheMeta) -> Result<bool> { + let result = match self.phase { + CachePhase::Stale => { + let inner = self.inner_mut(); + // TODO: we should keep old meta in place, just use new one to update it + // that requires cacheable_filter to take a mut header and just return InternalMeta + + // update new meta with old meta's created time + let created = inner.meta.as_ref().unwrap().0.internal.created; + meta.0.internal.created = created; + // meta.internal.updated was already set to new meta's `created`, + // no need to set `updated` here + + inner.meta.replace(meta); + + let lock = inner.lock.take(); + if let Some(Locked::Write(_r)) = lock { + inner + .cache_lock + .unwrap() + .release(inner.key.as_ref().unwrap(), LockStatus::Done); + } + + let mut span = inner.traces.child("update_meta"); + // TODO: this call can be async + let result = inner + .storage + .update_meta( + inner.key.as_ref().unwrap(), + inner.meta.as_ref().unwrap(), + &span.handle(), + ) + .await; + span.set_tag(|| trace::Tag::new("updated", result.is_ok())); + result + } + _ => panic!("wrong phase {:?}", self.phase), + }; + self.phase = CachePhase::Revalidated; + result + } + + /// After a successful revalidation, update certain headers for the cached asset + /// such as `Etag` with the fresh response header `resp`. + pub fn revalidate_merge_header(&mut self, resp: &RespHeader) -> ResponseHeader { + match self.phase { + CachePhase::Stale => { + /* + * https://datatracker.ietf.org/doc/html/rfc9110#section-15.4.5 + * 304 response MUST generate ... would have been sent in a 200 ... + * - Content-Location, Date, ETag, and Vary + * - Cache-Control and Expires... + */ + let mut old_header = self.inner().meta.as_ref().unwrap().0.header.clone(); + let mut clone_header = |header_name: &'static str| { + // TODO: multiple headers + if let Some(value) = resp.headers.get(header_name) { + old_header.insert_header(header_name, value).unwrap(); + } + }; + clone_header("cache-control"); + clone_header("expires"); + clone_header("cache-tag"); + clone_header("cdn-cache-control"); + clone_header("etag"); + // https://datatracker.ietf.org/doc/html/rfc9111#section-4.3.4 + // "...cache MUST update its header fields with the header fields provided in the 304..." + // But if the Vary header changes, the cached response may no longer match the + // incoming request. + // + // For simplicity, ignore changing Vary in revalidation for now. + // TODO: if we support vary during revalidation, there are a few edge cases to + // consider (what if Vary header appears/disappears/changes)? + // + // clone_header("vary"); + old_header + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Mark this asset uncacheable after revalidation + pub fn revalidate_uncacheable(&mut self, header: ResponseHeader, reason: NoCacheReason) { + match self.phase { + CachePhase::Stale => { + // replace cache meta header + self.inner_mut().meta.as_mut().unwrap().0.header = header; + } + _ => panic!("wrong phase {:?}", self.phase), + } + self.phase = CachePhase::RevalidatedNoCache(reason); + // TODO: remove this asset from cache once finished? + } + + /// Update the variance of the [CacheMeta]. + /// + /// Note that this process may change the lookup `key`, and eventually (when the asset is + /// written to storage) invalidate other cached variants under the same primary key as the + /// current asset. + pub fn update_variance(&mut self, variance: Option<HashBinary>) { + // If this is a cache miss, we will simply update the variance in the meta. + // + // If this is an expired response, we will have to consider a few cases: + // + // **Case 1**: Variance was absent, but caller sets it now. + // We will just insert it into the meta. The current asset becomes the primary variant. + // Because the current location of the asset is already the primary variant, nothing else + // needs to be done. + // + // **Case 2**: Variance was present, but it changed or was removed. + // We want the current asset to take over the primary slot, in order to invalidate all + // other variants derived under the old Vary. + // + // **Case 3**: Variance did not change. + // Nothing needs to happen. + let inner = match self.phase { + CachePhase::Miss | CachePhase::Expired => self.inner_mut(), + _ => panic!("wrong phase {:?}", self.phase), + }; + + // Update the variance in the meta + if let Some(variance_hash) = variance.as_ref() { + inner + .meta + .as_mut() + .unwrap() + .set_variance_key(*variance_hash); + } else { + inner.meta.as_mut().unwrap().remove_variance(); + } + + // Change the lookup `key` if necessary, in order to admit asset into the primary slot + // instead of the secondary slot. + let key = inner.key.as_ref().unwrap(); + if let Some(old_variance) = key.get_variance_key().as_ref() { + // This is a secondary variant slot. + if Some(*old_variance) != variance.as_ref() { + // This new variance does not match the variance in the cache key we used to look + // up this asset. + // Drop the cache lock to avoid leaving a dangling lock + // (because we locked with the old cache key for the secondary slot) + // TODO: maybe we should try to signal waiting readers to compete for the primary key + // lock instead? we will not be modifying this secondary slot so it's not actually + // ready for readers + if let Some(lock) = inner.cache_lock.as_ref() { + lock.release(key, LockStatus::Done); + } + // Remove the `variance` from the `key`, so that we admit this asset into the + // primary slot. (`key` is used to tell storage where to write the data.) + inner.key.as_mut().unwrap().remove_variance_key(); + } + } + } + + /// Return the [CacheMeta] of this asset + /// + /// # Panic + /// Panic in phases which has no cache meta. + pub fn cache_meta(&self) -> &CacheMeta { + match self.phase { + // TODO: allow in Bypass phase? + CachePhase::Stale + | CachePhase::Expired + | CachePhase::Hit + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref().unwrap(), + CachePhase::Miss => { + // this is the async body read case, safe because body_reader is only set + // after meta is retrieved + if self.inner().body_reader.is_some() { + self.inner().meta.as_ref().unwrap() + } else { + panic!("wrong phase {:?}", self.phase); + } + } + + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Return the [CacheMeta] of this asset if any + /// + /// Different from [Self::cache_meta()], this function is allowed to be called in + /// [CachePhase::Miss] phase where the cache meta maybe set. + /// # Panic + /// Panic in phases that shouldn't have cache meta. + pub fn maybe_cache_meta(&self) -> Option<&CacheMeta> { + match self.phase { + CachePhase::Miss + | CachePhase::Stale + | CachePhase::Expired + | CachePhase::Hit + | CachePhase::Revalidated + | CachePhase::RevalidatedNoCache(_) => self.inner().meta.as_ref(), + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Perform the cache lookup from the given cache storage with the given cache key + /// + /// A cache hit will return [CacheMeta] which contains the header and meta info about + /// the cache as well as a [HitHandler] to read the cache hit body. + /// # Panic + /// Panic in other phases. + pub async fn cache_lookup(&mut self) -> Result<Option<(CacheMeta, HitHandler)>> { + match self.phase { + // Stale is allowed here because stale-> cache_lock -> lookup again + CachePhase::CacheKey | CachePhase::Stale => { + let inner = self.inner_mut(); + let mut span = inner.traces.child("lookup"); + let key = inner.key.as_ref().unwrap(); // safe, this phase should have cache key + let result = inner.storage.lookup(key, &span.handle()).await?; + let result = result.and_then(|(meta, header)| { + if let Some(ts) = inner.valid_after { + if meta.created() < ts { + span.set_tag(|| trace::Tag::new("not valid", true)); + return None; + } + } + Some((meta, header)) + }); + if result.is_none() { + if let Some(lock) = inner.cache_lock.as_ref() { + inner.lock = Some(lock.lock(key)); + } + } + span.set_tag(|| trace::Tag::new("found", result.is_some())); + Ok(result) + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Update variance and see if the meta matches the current variance + /// + /// `cache_lookup() -> compute vary hash -> cache_vary_lookup()` + /// This function allows callers to compute vary based on the initial cache hit. + /// `meta` should be the ones returned from the initial cache_lookup() + /// - return true if the meta is the variance. + /// - return false if the current meta doesn't match the variance, need to cache_lookup() again + pub fn cache_vary_lookup(&mut self, variance: HashBinary, meta: &CacheMeta) -> bool { + match self.phase { + CachePhase::CacheKey => { + let inner = self.inner_mut(); + // make sure that all variance found are fresher than this asset + // this is because when purging all the variance, only the primary slot is deleted + // the created TS of the primary is the tombstone of all the variances + inner.valid_after = Some(meta.created()); + + // update vary + let key = inner.key.as_mut().unwrap(); + // if no variance was previously set, then this is the first cache hit + let is_initial_cache_hit = key.get_variance_key().is_none(); + key.set_variance_key(variance); + let variance_binary = key.variance_bin(); + let matches_variance = meta.variance() == variance_binary; + + // We should remove the variance in the lookup `key` if this is the primary variant + // slot. We know this is the primary variant slot if this is the initial cache hit + // AND the variance in the `key` already matches the `meta`'s.) + // + // For the primary variant slot, the storage backend needs to use the primary key + // for both cache lookup and updating the meta. Otherwise it will look for the + // asset in the wrong location during revalidation. + // + // We can recreate the "full" cache key by using the meta's variance, if needed. + if matches_variance && is_initial_cache_hit { + inner.key.as_mut().unwrap().remove_variance_key(); + } + + matches_variance + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Whether this request is behind a cache lock in order to wait for another request to read the + /// asset. + pub fn is_cache_locked(&self) -> bool { + matches!(self.inner().lock, Some(Locked::Read(_))) + } + + /// Whether this request is the leader request to fetch the assets for itself and other requests + /// behind the cache lock. + pub fn is_cache_lock_writer(&self) -> bool { + matches!(self.inner().lock, Some(Locked::Write(_))) + } + + /// Take the write lock from this request to transfer it to another one. + /// # Panic + /// Call is_cache_lock_writer() to check first, will panic otherwise. + pub fn take_write_lock(&mut self) -> WritePermit { + let lock = self.inner_mut().lock.take().unwrap(); + match lock { + Locked::Write(w) => w, + Locked::Read(_) => panic!("take_write_lock() called on read lock"), + } + } + + /// Set the write lock, which is usually transferred from [Self::take_write_lock()] + pub fn set_write_lock(&mut self, write_lock: WritePermit) { + self.inner_mut().lock.replace(Locked::Write(write_lock)); + } + + /// Whether this request's cache hit is staled + fn has_staled_asset(&self) -> bool { + self.phase == CachePhase::Stale + } + + /// Whether this asset is staled and stale if error is allowed + pub fn can_serve_stale_error(&self) -> bool { + self.has_staled_asset() && self.cache_meta().serve_stale_if_error(SystemTime::now()) + } + + /// Whether this asset is staled and stale while revalidate is allowed. + pub fn can_serve_stale_updating(&self) -> bool { + self.has_staled_asset() + && self + .cache_meta() + .serve_stale_while_revalidate(SystemTime::now()) + } + + /// Wait for the cache read lock to be unlocked + /// # Panic + /// Check [Self::is_cache_locked()], panic if this request doesn't have a read lock. + pub async fn cache_lock_wait(&mut self) -> LockStatus { + let inner = self.inner_mut(); + let _span = inner.traces.child("cache_lock"); + let lock = inner.lock.take(); // remove the lock from self + if let Some(Locked::Read(r)) = lock { + let now = std::time::Instant::now(); + r.wait().await; + let lock_duration = now.elapsed(); + // it's possible for a request to be locked more than once + inner.lock_duration = Some( + inner + .lock_duration + .map_or(lock_duration, |d| d + lock_duration), + ); + r.lock_status() // TODO: tag the span with lock status + } else { + // should always call is_cache_locked() before this function + panic!("cache_lock_wait on wrong type of lock") + } + } + + /// How long did this request wait behind the read lock + pub fn lock_duration(&self) -> Option<Duration> { + // FIXME: this duration is lost when cache is disabled + self.inner.as_ref().and_then(|i| i.lock_duration) + } + + /// Delete the asset from the cache storage + /// # Panic + /// Need to be called after cache key is set. Panic otherwise. + pub async fn purge(&mut self) -> Result<bool> { + match self.phase { + CachePhase::CacheKey => { + let inner = self.inner_mut(); + let mut span = inner.traces.child("purge"); + let key = inner.key.as_ref().unwrap().to_compact(); + let result = inner.storage.purge(&key, &span.handle()).await; + // FIXME: also need to remove from eviction manager + span.set_tag(|| trace::Tag::new("purged", matches!(result, Ok(true)))); + result + } + _ => panic!("wrong phase {:?}", self.phase), + } + } + + /// Check the cacheable prediction + /// + /// Return true if the predictor is not set + pub fn cacheable_prediction(&self) -> bool { + if let Some(predictor) = self.inner().predictor { + predictor.cacheable_prediction(self.cache_key()) + } else { + true + } + } + + /// Tell the predictor that this response which is previously predicted to be uncacheable + /// is cacheable now. + pub fn response_became_cacheable(&self) { + if let Some(predictor) = self.inner().predictor { + predictor.mark_cacheable(self.cache_key()); + } + } + + /// Tell the predictor that this response is uncacheable so that it will know next time + /// this request arrives. + pub fn response_became_uncacheable(&self, reason: NoCacheReason) { + if let Some(predictor) = self.inner().predictor { + predictor.mark_uncacheable(self.cache_key(), reason); + } + } +} + +/// Set the header compression dictionary that help serialize http header. +/// +/// Return false if it is already set. +pub fn set_compression_dict_path(path: &str) -> bool { + crate::meta::COMPRESSION_DICT_PATH + .set(path.to_string()) + .is_ok() +} |