diff options
author | Kevin Guthrie <[email protected]> | 2024-11-18 10:43:52 -0500 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-12-13 17:27:40 -0800 |
commit | 9850e92e33af870a0761c2473673f9a0ada61b0b (patch) | |
tree | 9f663f0694653c2764e1dc1d0d7fce99b34fd65b | |
parent | a8a6e77eef2c0f4d2a45f00c5b0e316dd373f2f2 (diff) | |
download | pingora-9850e92e33af870a0761c2473673f9a0ada61b0b.tar.gz pingora-9850e92e33af870a0761c2473673f9a0ada61b0b.zip |
Add the ability to trigger forced-miss behavior from the `cache_hit_filter` function
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-cache/src/lib.rs | 114 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_cache.rs | 155 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_trait.rs | 20 | ||||
-rw-r--r-- | pingora-proxy/tests/test_upstream.rs | 37 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/server_utils.rs | 13 |
6 files changed, 216 insertions, 125 deletions
@@ -1 +1 @@ -7dbf3a97f9e59d8ad1d8e5d199cae6ee49869b9c
\ No newline at end of file +f664d443d0b1b709e2276e8ea74cc1c26659a372
\ No newline at end of file diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs index 6ac49a5..c65f602 100644 --- a/pingora-cache/src/lib.rs +++ b/pingora-cache/src/lib.rs @@ -23,6 +23,7 @@ use pingora_error::Result; use pingora_http::ResponseHeader; use rustracing::tag::Tag; use std::time::{Duration, Instant, SystemTime}; +use strum::IntoStaticStr; use trace::CacheTraceCTX; pub mod cache_control; @@ -210,34 +211,59 @@ impl RespCacheable { } } +/// Indicators of which level of purge logic to apply to an asset. As in should +/// the purged file be revalidated or re-retrieved altogether +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ForcedInvalidationKind { + /// Indicates the asset should be considered stale and revalidated + ForceExpired, + + /// Indicates the asset should be considered absent and treated like a miss + /// instead of a hit + ForceMiss, +} + /// Freshness state of cache hit asset /// /// -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)] +#[strum(serialize_all = "snake_case")] pub enum HitStatus { + /// The asset's freshness directives indicate it has expired Expired, + + /// The asset was marked as expired, and should be treated as stale ForceExpired, + + /// The asset was marked as absent, and should be treated as a miss + ForceMiss, + + /// An error occurred while processing the asset, so it should be treated as + /// a miss FailedHitFilter, + + /// The asset is not expired Fresh, } impl HitStatus { /// For displaying cache hit status pub fn as_str(&self) -> &'static str { - match self { - Self::Expired => "expired", - Self::ForceExpired => "force_expired", - Self::FailedHitFilter => "failed_hit_filter", - Self::Fresh => "fresh", - } + self.into() } /// Whether cached asset can be served as fresh pub fn is_fresh(&self) -> bool { - match self { - Self::Expired | Self::ForceExpired | Self::FailedHitFilter => false, - Self::Fresh => true, - } + *self == HitStatus::Fresh + } + + /// Check whether the hit status should be treated as a miss. A forced miss + /// is obviously treated as a miss. A hit-filter failure is treated as a + /// miss because we can't use the asset as an actual hit. If we treat it as + /// expired, we still might not be able to use it even if revalidation + /// succeeds. + pub fn is_treated_as_miss(self) -> bool { + matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter) } } @@ -496,34 +522,46 @@ impl HttpCache { /// /// The `hit_status` enum allows the caller to force expire assets. pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) { - match self.phase { - // Stale allowed because of cache lock and then retry - CachePhase::CacheKey | CachePhase::Stale => { - self.phase = if hit_status.is_fresh() { - CachePhase::Hit - } else { - CachePhase::Stale - }; - let phase = self.phase; - let inner = self.inner_mut(); - let key = inner.key.as_ref().unwrap(); - if phase == CachePhase::Stale { - if let Some(lock) = inner.cache_lock.as_ref() { - inner.lock = Some(lock.lock(key)); - } - } - inner.traces.start_hit_span(phase, hit_status); - inner.traces.log_meta_in_hit_span(&meta); - if let Some(eviction) = inner.eviction { - // TODO: make access() accept CacheKey - let cache_key = key.to_compact(); - // FIXME: get size - eviction.access(&cache_key, 0, meta.0.internal.fresh_until); - } - inner.meta = Some(meta); - inner.body_reader = Some(hit_handler); + // Stale allowed because of cache lock and then retry + if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) { + panic!("wrong phase {:?}", self.phase) + } + + self.phase = match hit_status { + HitStatus::Fresh => CachePhase::Hit, + HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale, + HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase, + }; + + let phase = self.phase; + let inner = self.inner_mut(); + + let key = inner.key.as_ref().unwrap(); + + // The cache lock might not be set for stale hit or hits treated as + // misses, so we need to initialize it here + if phase == CachePhase::Stale || hit_status.is_treated_as_miss() { + if let Some(lock) = inner.cache_lock.as_ref() { + inner.lock = Some(lock.lock(key)); } - _ => panic!("wrong phase {:?}", self.phase), + } + + if hit_status.is_treated_as_miss() { + // Clear the body and meta for hits that are treated as misses + inner.body_reader = None; + inner.meta = None; + } else { + // Set the metadata appropriately for legit hits + inner.traces.start_hit_span(phase, hit_status); + inner.traces.log_meta_in_hit_span(&meta); + if let Some(eviction) = inner.eviction { + // TODO: make access() accept CacheKey + let cache_key = key.to_compact(); + // FIXME(PINGORA-1914): get size + eviction.access(&cache_key, 0, meta.0.internal.fresh_until); + } + inner.meta = Some(meta); + inner.body_reader = Some(hit_handler); } } diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index c720575..454416e 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -17,7 +17,7 @@ use http::{Method, StatusCode}; use pingora_cache::key::CacheHashKey; use pingora_cache::lock::LockStatus; use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE; -use pingora_cache::{HitStatus, RespCacheable::*}; +use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*}; use pingora_core::protocols::http::conditional_filter::to_304; use pingora_core::protocols::http::v1::common::header_value_content_length; use pingora_core::ErrorType; @@ -79,6 +79,7 @@ impl<SV> HttpProxy<SV> { // for cache lock, TODO: cap the max number of loops match session.cache.cache_lookup().await { Ok(res) => { + let mut hit_status_opt = None; if let Some((mut meta, handler)) = res { // Vary logic // Because this branch can be called multiple times in a loop, and we only @@ -112,10 +113,8 @@ impl<SV> HttpProxy<SV> { // hit // TODO: maybe round and/or cache now() let hit_status = if meta.is_fresh(std::time::SystemTime::now()) { - // check if we should force expire - // (this is a soft purge which tries to revalidate, + // check if we should force expire or miss // vs. hard purge which forces miss) - // TODO: allow hard purge match self.inner.cache_hit_filter(session, &meta, ctx).await { Err(e) => { error!( @@ -125,86 +124,26 @@ impl<SV> HttpProxy<SV> { // this return value will cause us to fetch from upstream HitStatus::FailedHitFilter } - Ok(expired) => { + Ok(None) => HitStatus::Fresh, + Ok(Some(ForcedInvalidationKind::ForceExpired)) => { // force expired asset should not be serve as stale // because force expire is usually to remove data - if expired { - meta.disable_serve_stale(); - HitStatus::ForceExpired - } else { - HitStatus::Fresh - } + meta.disable_serve_stale(); + HitStatus::ForceExpired } + Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss, } } else { HitStatus::Expired }; + + hit_status_opt = Some(hit_status); + // init cache for hit / stale session.cache.cache_found(meta, handler, hit_status); + } - if !hit_status.is_fresh() { - // expired or force expired asset - if session.cache.is_cache_locked() { - // first if this is the sub request for the background cache update - if let Some(write_lock) = session - .subrequest_ctx - .as_mut() - .and_then(|ctx| ctx.write_lock.take()) - { - // Put the write lock in the request - session.cache.set_write_lock(write_lock); - session.cache.tag_as_subrequest(); - // and then let it go to upstream - break None; - } - let will_serve_stale = session.cache.can_serve_stale_updating() - && self.inner.should_serve_stale(session, ctx, None); - if !will_serve_stale { - let lock_status = session.cache.cache_lock_wait().await; - if self.handle_lock_status(session, ctx, lock_status) { - continue; - } else { - break None; - } - } - // else continue to serve stale - session.cache.set_stale_updating(); - } else if session.cache.is_cache_lock_writer() { - // stale while revalidate logic for the writer - let will_serve_stale = session.cache.can_serve_stale_updating() - && self.inner.should_serve_stale(session, ctx, None); - if will_serve_stale { - // create a background thread to do the actual update - let subrequest = - Box::new(crate::subrequest::create_dummy_session(session)); - let new_app = self.clone(); // Clone the Arc - let sub_req_ctx = Box::new(SubReqCtx { - write_lock: Some(session.cache.take_write_lock()), - }); - tokio::spawn(async move { - new_app.process_subrequest(subrequest, sub_req_ctx).await; - }); - // continue to serve stale for this request - session.cache.set_stale_updating(); - } else { - // return to fetch from upstream - break None; - } - } else { - // return to fetch from upstream - break None; - } - } - let (reuse, err) = self.proxy_cache_hit(session, ctx).await; - if let Some(e) = err.as_ref() { - error!( - "Fail to serve cache: {e}, {}", - self.inner.request_summary(session, ctx) - ); - } - // responses is served from cache, exit - break Some((reuse, err)); - } else { + if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) { // cache miss if session.cache.is_cache_locked() { // Another request is filling the cache; try waiting til that's done and retry. @@ -219,6 +158,74 @@ impl<SV> HttpProxy<SV> { break None; } } + + // Safe because an empty hit status would have broken out + // in the block above + let hit_status = hit_status_opt.expect("None case handled as miss"); + + if !hit_status.is_fresh() { + // expired or force expired asset + if session.cache.is_cache_locked() { + // first if this is the sub request for the background cache update + if let Some(write_lock) = session + .subrequest_ctx + .as_mut() + .and_then(|ctx| ctx.write_lock.take()) + { + // Put the write lock in the request + session.cache.set_write_lock(write_lock); + session.cache.tag_as_subrequest(); + // and then let it go to upstream + break None; + } + let will_serve_stale = session.cache.can_serve_stale_updating() + && self.inner.should_serve_stale(session, ctx, None); + if !will_serve_stale { + let lock_status = session.cache.cache_lock_wait().await; + if self.handle_lock_status(session, ctx, lock_status) { + continue; + } else { + break None; + } + } + // else continue to serve stale + session.cache.set_stale_updating(); + } else if session.cache.is_cache_lock_writer() { + // stale while revalidate logic for the writer + let will_serve_stale = session.cache.can_serve_stale_updating() + && self.inner.should_serve_stale(session, ctx, None); + if will_serve_stale { + // create a background thread to do the actual update + let subrequest = + Box::new(crate::subrequest::create_dummy_session(session)); + let new_app = self.clone(); // Clone the Arc + let sub_req_ctx = Box::new(SubReqCtx { + write_lock: Some(session.cache.take_write_lock()), + }); + tokio::spawn(async move { + new_app.process_subrequest(subrequest, sub_req_ctx).await; + }); + // continue to serve stale for this request + session.cache.set_stale_updating(); + } else { + // return to fetch from upstream + break None; + } + } else { + // return to fetch from upstream + break None; + } + } + + let (reuse, err) = self.proxy_cache_hit(session, ctx).await; + if let Some(e) = err.as_ref() { + error!( + "Fail to serve cache: {e}, {}", + self.inner.request_summary(session, ctx) + ); + } + // responses is served from cache, exit + break Some((reuse, err)); } Err(e) => { // Allow cache miss to fill cache even if cache lookup errors diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 6970ec0..00e54f4 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -13,7 +13,11 @@ // limitations under the License. use super::*; -use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*}; +use pingora_cache::{ + key::HashBinary, + CacheKey, CacheMeta, ForcedInvalidationKind, + RespCacheable::{self, *}, +}; use std::time::Duration; /// The interface to control the HTTP proxy @@ -129,21 +133,23 @@ pub trait ProxyHttp { session.cache.cache_miss(); } - /// This filter is called after a successful cache lookup and before the cache asset is ready to - /// be used. + /// This filter is called after a successful cache lookup and before the + /// cache asset is ready to be used. /// - /// This filter allow the user to log or force expire the asset. - // flex purge, other filtering, returns whether asset is should be force expired or not + /// This filter allows the user to log or force invalidate the asset. + /// + /// The value returned indicates if the force invalidation should be used, + /// and which kind. Returning `None` indicates no forced invalidation async fn cache_hit_filter( &self, _session: &Session, _meta: &CacheMeta, _ctx: &mut Self::CTX, - ) -> Result<bool> + ) -> Result<Option<ForcedInvalidationKind>> where Self::CTX: Send + Sync, { - Ok(false) + Ok(None) } /// Decide if a request should continue to upstream after not being served from cache. diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index b2d9575..bbac874 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -388,6 +388,43 @@ mod test_cache { } #[tokio::test] + async fn test_force_miss() { + init(); + let url = "http://127.0.0.1:6148/unique/test_froce_miss/revalidate_now"; + + let res = reqwest::get(url).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + let cache_miss_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(headers["x-upstream-status"], "200"); + assert_eq!(res.text().await.unwrap(), "hello world"); + + let res = reqwest::get(url).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + let cache_hit_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap(); + assert_eq!(headers["x-cache-status"], "hit"); + assert!(headers.get("x-upstream-status").is_none()); + assert_eq!(res.text().await.unwrap(), "hello world"); + + assert_eq!(cache_miss_epoch, cache_hit_epoch); + + let res = reqwest::Client::new() + .get(url) + .header("x-force-miss", "1") + .send() + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(headers["x-upstream-status"], "200"); + assert_eq!(res.text().await.unwrap(), "hello world"); + } + + #[tokio::test] async fn test_cache_downstream_revalidation_etag() { init(); let url = "http://127.0.0.1:6148/unique/test_downstream_revalidation_etag/revalidate_now"; diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index f762f33..0c5a4e5 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -21,12 +21,12 @@ use http::HeaderValue; use once_cell::sync::Lazy; use pingora_cache::cache_control::CacheControl; use pingora_cache::key::HashBinary; -use pingora_cache::VarianceBuilder; use pingora_cache::{ eviction::simple_lru::Manager, filters::resp_cacheable, lock::CacheLock, predictor::Predictor, set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason, RespCacheable, }; +use pingora_cache::{ForcedInvalidationKind, PurgeType, VarianceBuilder}; use pingora_core::apps::{HttpServerApp, HttpServerOptions}; use pingora_core::modules::http::compression::ResponseCompression; use pingora_core::protocols::{l4::socket::SocketAddr, Digest}; @@ -415,12 +415,15 @@ impl ProxyHttp for ExampleProxyCache { session: &Session, _meta: &CacheMeta, _ctx: &mut Self::CTX, - ) -> Result<bool> { - // allow test header to control force expiry + ) -> Result<Option<ForcedInvalidationKind>> { + // allow test header to control force expiry/miss + if session.get_header_bytes("x-force-miss") != b"" { + return Ok(Some(ForcedInvalidationKind::ForceMiss)); + } if session.get_header_bytes("x-force-expire") != b"" { - return Ok(true); + return Ok(Some(ForcedInvalidationKind::ForceExpired)); } - Ok(false) + Ok(None) } fn cache_vary_filter( |