diff options
Diffstat (limited to 'pingora-proxy/src')
-rw-r--r-- | pingora-proxy/src/proxy_cache.rs | 155 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_trait.rs | 20 |
2 files changed, 94 insertions, 81 deletions
diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index c720575..454416e 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -17,7 +17,7 @@ use http::{Method, StatusCode}; use pingora_cache::key::CacheHashKey; use pingora_cache::lock::LockStatus; use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE; -use pingora_cache::{HitStatus, RespCacheable::*}; +use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*}; use pingora_core::protocols::http::conditional_filter::to_304; use pingora_core::protocols::http::v1::common::header_value_content_length; use pingora_core::ErrorType; @@ -79,6 +79,7 @@ impl<SV> HttpProxy<SV> { // for cache lock, TODO: cap the max number of loops match session.cache.cache_lookup().await { Ok(res) => { + let mut hit_status_opt = None; if let Some((mut meta, handler)) = res { // Vary logic // Because this branch can be called multiple times in a loop, and we only @@ -112,10 +113,8 @@ impl<SV> HttpProxy<SV> { // hit // TODO: maybe round and/or cache now() let hit_status = if meta.is_fresh(std::time::SystemTime::now()) { - // check if we should force expire - // (this is a soft purge which tries to revalidate, + // check if we should force expire or miss // vs. hard purge which forces miss) - // TODO: allow hard purge match self.inner.cache_hit_filter(session, &meta, ctx).await { Err(e) => { error!( @@ -125,86 +124,26 @@ impl<SV> HttpProxy<SV> { // this return value will cause us to fetch from upstream HitStatus::FailedHitFilter } - Ok(expired) => { + Ok(None) => HitStatus::Fresh, + Ok(Some(ForcedInvalidationKind::ForceExpired)) => { // force expired asset should not be serve as stale // because force expire is usually to remove data - if expired { - meta.disable_serve_stale(); - HitStatus::ForceExpired - } else { - HitStatus::Fresh - } + meta.disable_serve_stale(); + HitStatus::ForceExpired } + Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss, } } else { HitStatus::Expired }; + + hit_status_opt = Some(hit_status); + // init cache for hit / stale session.cache.cache_found(meta, handler, hit_status); + } - if !hit_status.is_fresh() { - // expired or force expired asset - if session.cache.is_cache_locked() { - // first if this is the sub request for the background cache update - if let Some(write_lock) = session - .subrequest_ctx - .as_mut() - .and_then(|ctx| ctx.write_lock.take()) - { - // Put the write lock in the request - session.cache.set_write_lock(write_lock); - session.cache.tag_as_subrequest(); - // and then let it go to upstream - break None; - } - let will_serve_stale = session.cache.can_serve_stale_updating() - && self.inner.should_serve_stale(session, ctx, None); - if !will_serve_stale { - let lock_status = session.cache.cache_lock_wait().await; - if self.handle_lock_status(session, ctx, lock_status) { - continue; - } else { - break None; - } - } - // else continue to serve stale - session.cache.set_stale_updating(); - } else if session.cache.is_cache_lock_writer() { - // stale while revalidate logic for the writer - let will_serve_stale = session.cache.can_serve_stale_updating() - && self.inner.should_serve_stale(session, ctx, None); - if will_serve_stale { - // create a background thread to do the actual update - let subrequest = - Box::new(crate::subrequest::create_dummy_session(session)); - let new_app = self.clone(); // Clone the Arc - let sub_req_ctx = Box::new(SubReqCtx { - write_lock: Some(session.cache.take_write_lock()), - }); - tokio::spawn(async move { - new_app.process_subrequest(subrequest, sub_req_ctx).await; - }); - // continue to serve stale for this request - session.cache.set_stale_updating(); - } else { - // return to fetch from upstream - break None; - } - } else { - // return to fetch from upstream - break None; - } - } - let (reuse, err) = self.proxy_cache_hit(session, ctx).await; - if let Some(e) = err.as_ref() { - error!( - "Fail to serve cache: {e}, {}", - self.inner.request_summary(session, ctx) - ); - } - // responses is served from cache, exit - break Some((reuse, err)); - } else { + if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) { // cache miss if session.cache.is_cache_locked() { // Another request is filling the cache; try waiting til that's done and retry. @@ -219,6 +158,74 @@ impl<SV> HttpProxy<SV> { break None; } } + + // Safe because an empty hit status would have broken out + // in the block above + let hit_status = hit_status_opt.expect("None case handled as miss"); + + if !hit_status.is_fresh() { + // expired or force expired asset + if session.cache.is_cache_locked() { + // first if this is the sub request for the background cache update + if let Some(write_lock) = session + .subrequest_ctx + .as_mut() + .and_then(|ctx| ctx.write_lock.take()) + { + // Put the write lock in the request + session.cache.set_write_lock(write_lock); + session.cache.tag_as_subrequest(); + // and then let it go to upstream + break None; + } + let will_serve_stale = session.cache.can_serve_stale_updating() + && self.inner.should_serve_stale(session, ctx, None); + if !will_serve_stale { + let lock_status = session.cache.cache_lock_wait().await; + if self.handle_lock_status(session, ctx, lock_status) { + continue; + } else { + break None; + } + } + // else continue to serve stale + session.cache.set_stale_updating(); + } else if session.cache.is_cache_lock_writer() { + // stale while revalidate logic for the writer + let will_serve_stale = session.cache.can_serve_stale_updating() + && self.inner.should_serve_stale(session, ctx, None); + if will_serve_stale { + // create a background thread to do the actual update + let subrequest = + Box::new(crate::subrequest::create_dummy_session(session)); + let new_app = self.clone(); // Clone the Arc + let sub_req_ctx = Box::new(SubReqCtx { + write_lock: Some(session.cache.take_write_lock()), + }); + tokio::spawn(async move { + new_app.process_subrequest(subrequest, sub_req_ctx).await; + }); + // continue to serve stale for this request + session.cache.set_stale_updating(); + } else { + // return to fetch from upstream + break None; + } + } else { + // return to fetch from upstream + break None; + } + } + + let (reuse, err) = self.proxy_cache_hit(session, ctx).await; + if let Some(e) = err.as_ref() { + error!( + "Fail to serve cache: {e}, {}", + self.inner.request_summary(session, ctx) + ); + } + // responses is served from cache, exit + break Some((reuse, err)); } Err(e) => { // Allow cache miss to fill cache even if cache lookup errors diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 6970ec0..00e54f4 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -13,7 +13,11 @@ // limitations under the License. use super::*; -use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*}; +use pingora_cache::{ + key::HashBinary, + CacheKey, CacheMeta, ForcedInvalidationKind, + RespCacheable::{self, *}, +}; use std::time::Duration; /// The interface to control the HTTP proxy @@ -129,21 +133,23 @@ pub trait ProxyHttp { session.cache.cache_miss(); } - /// This filter is called after a successful cache lookup and before the cache asset is ready to - /// be used. + /// This filter is called after a successful cache lookup and before the + /// cache asset is ready to be used. /// - /// This filter allow the user to log or force expire the asset. - // flex purge, other filtering, returns whether asset is should be force expired or not + /// This filter allows the user to log or force invalidate the asset. + /// + /// The value returned indicates if the force invalidation should be used, + /// and which kind. Returning `None` indicates no forced invalidation async fn cache_hit_filter( &self, _session: &Session, _meta: &CacheMeta, _ctx: &mut Self::CTX, - ) -> Result<bool> + ) -> Result<Option<ForcedInvalidationKind>> where Self::CTX: Send + Sync, { - Ok(false) + Ok(None) } /// Decide if a request should continue to upstream after not being served from cache. |