aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-proxy/src
diff options
context:
space:
mode:
Diffstat (limited to 'pingora-proxy/src')
-rw-r--r--pingora-proxy/src/proxy_cache.rs155
-rw-r--r--pingora-proxy/src/proxy_trait.rs20
2 files changed, 94 insertions, 81 deletions
diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs
index c720575..454416e 100644
--- a/pingora-proxy/src/proxy_cache.rs
+++ b/pingora-proxy/src/proxy_cache.rs
@@ -17,7 +17,7 @@ use http::{Method, StatusCode};
use pingora_cache::key::CacheHashKey;
use pingora_cache::lock::LockStatus;
use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
-use pingora_cache::{HitStatus, RespCacheable::*};
+use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
use pingora_core::protocols::http::conditional_filter::to_304;
use pingora_core::protocols::http::v1::common::header_value_content_length;
use pingora_core::ErrorType;
@@ -79,6 +79,7 @@ impl<SV> HttpProxy<SV> {
// for cache lock, TODO: cap the max number of loops
match session.cache.cache_lookup().await {
Ok(res) => {
+ let mut hit_status_opt = None;
if let Some((mut meta, handler)) = res {
// Vary logic
// Because this branch can be called multiple times in a loop, and we only
@@ -112,10 +113,8 @@ impl<SV> HttpProxy<SV> {
// hit
// TODO: maybe round and/or cache now()
let hit_status = if meta.is_fresh(std::time::SystemTime::now()) {
- // check if we should force expire
- // (this is a soft purge which tries to revalidate,
+ // check if we should force expire or miss
// vs. hard purge which forces miss)
- // TODO: allow hard purge
match self.inner.cache_hit_filter(session, &meta, ctx).await {
Err(e) => {
error!(
@@ -125,86 +124,26 @@ impl<SV> HttpProxy<SV> {
// this return value will cause us to fetch from upstream
HitStatus::FailedHitFilter
}
- Ok(expired) => {
+ Ok(None) => HitStatus::Fresh,
+ Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
// force expired asset should not be serve as stale
// because force expire is usually to remove data
- if expired {
- meta.disable_serve_stale();
- HitStatus::ForceExpired
- } else {
- HitStatus::Fresh
- }
+ meta.disable_serve_stale();
+ HitStatus::ForceExpired
}
+ Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
}
} else {
HitStatus::Expired
};
+
+ hit_status_opt = Some(hit_status);
+
// init cache for hit / stale
session.cache.cache_found(meta, handler, hit_status);
+ }
- if !hit_status.is_fresh() {
- // expired or force expired asset
- if session.cache.is_cache_locked() {
- // first if this is the sub request for the background cache update
- if let Some(write_lock) = session
- .subrequest_ctx
- .as_mut()
- .and_then(|ctx| ctx.write_lock.take())
- {
- // Put the write lock in the request
- session.cache.set_write_lock(write_lock);
- session.cache.tag_as_subrequest();
- // and then let it go to upstream
- break None;
- }
- let will_serve_stale = session.cache.can_serve_stale_updating()
- && self.inner.should_serve_stale(session, ctx, None);
- if !will_serve_stale {
- let lock_status = session.cache.cache_lock_wait().await;
- if self.handle_lock_status(session, ctx, lock_status) {
- continue;
- } else {
- break None;
- }
- }
- // else continue to serve stale
- session.cache.set_stale_updating();
- } else if session.cache.is_cache_lock_writer() {
- // stale while revalidate logic for the writer
- let will_serve_stale = session.cache.can_serve_stale_updating()
- && self.inner.should_serve_stale(session, ctx, None);
- if will_serve_stale {
- // create a background thread to do the actual update
- let subrequest =
- Box::new(crate::subrequest::create_dummy_session(session));
- let new_app = self.clone(); // Clone the Arc
- let sub_req_ctx = Box::new(SubReqCtx {
- write_lock: Some(session.cache.take_write_lock()),
- });
- tokio::spawn(async move {
- new_app.process_subrequest(subrequest, sub_req_ctx).await;
- });
- // continue to serve stale for this request
- session.cache.set_stale_updating();
- } else {
- // return to fetch from upstream
- break None;
- }
- } else {
- // return to fetch from upstream
- break None;
- }
- }
- let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
- if let Some(e) = err.as_ref() {
- error!(
- "Fail to serve cache: {e}, {}",
- self.inner.request_summary(session, ctx)
- );
- }
- // responses is served from cache, exit
- break Some((reuse, err));
- } else {
+ if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
// cache miss
if session.cache.is_cache_locked() {
// Another request is filling the cache; try waiting til that's done and retry.
@@ -219,6 +158,74 @@ impl<SV> HttpProxy<SV> {
break None;
}
}
+
+ // Safe because an empty hit status would have broken out
+ // in the block above
+ let hit_status = hit_status_opt.expect("None case handled as miss");
+
+ if !hit_status.is_fresh() {
+ // expired or force expired asset
+ if session.cache.is_cache_locked() {
+ // first if this is the sub request for the background cache update
+ if let Some(write_lock) = session
+ .subrequest_ctx
+ .as_mut()
+ .and_then(|ctx| ctx.write_lock.take())
+ {
+ // Put the write lock in the request
+ session.cache.set_write_lock(write_lock);
+ session.cache.tag_as_subrequest();
+ // and then let it go to upstream
+ break None;
+ }
+ let will_serve_stale = session.cache.can_serve_stale_updating()
+ && self.inner.should_serve_stale(session, ctx, None);
+ if !will_serve_stale {
+ let lock_status = session.cache.cache_lock_wait().await;
+ if self.handle_lock_status(session, ctx, lock_status) {
+ continue;
+ } else {
+ break None;
+ }
+ }
+ // else continue to serve stale
+ session.cache.set_stale_updating();
+ } else if session.cache.is_cache_lock_writer() {
+ // stale while revalidate logic for the writer
+ let will_serve_stale = session.cache.can_serve_stale_updating()
+ && self.inner.should_serve_stale(session, ctx, None);
+ if will_serve_stale {
+ // create a background thread to do the actual update
+ let subrequest =
+ Box::new(crate::subrequest::create_dummy_session(session));
+ let new_app = self.clone(); // Clone the Arc
+ let sub_req_ctx = Box::new(SubReqCtx {
+ write_lock: Some(session.cache.take_write_lock()),
+ });
+ tokio::spawn(async move {
+ new_app.process_subrequest(subrequest, sub_req_ctx).await;
+ });
+ // continue to serve stale for this request
+ session.cache.set_stale_updating();
+ } else {
+ // return to fetch from upstream
+ break None;
+ }
+ } else {
+ // return to fetch from upstream
+ break None;
+ }
+ }
+
+ let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
+ if let Some(e) = err.as_ref() {
+ error!(
+ "Fail to serve cache: {e}, {}",
+ self.inner.request_summary(session, ctx)
+ );
+ }
+ // responses is served from cache, exit
+ break Some((reuse, err));
}
Err(e) => {
// Allow cache miss to fill cache even if cache lookup errors
diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs
index 6970ec0..00e54f4 100644
--- a/pingora-proxy/src/proxy_trait.rs
+++ b/pingora-proxy/src/proxy_trait.rs
@@ -13,7 +13,11 @@
// limitations under the License.
use super::*;
-use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
+use pingora_cache::{
+ key::HashBinary,
+ CacheKey, CacheMeta, ForcedInvalidationKind,
+ RespCacheable::{self, *},
+};
use std::time::Duration;
/// The interface to control the HTTP proxy
@@ -129,21 +133,23 @@ pub trait ProxyHttp {
session.cache.cache_miss();
}
- /// This filter is called after a successful cache lookup and before the cache asset is ready to
- /// be used.
+ /// This filter is called after a successful cache lookup and before the
+ /// cache asset is ready to be used.
///
- /// This filter allow the user to log or force expire the asset.
- // flex purge, other filtering, returns whether asset is should be force expired or not
+ /// This filter allows the user to log or force invalidate the asset.
+ ///
+ /// The value returned indicates if the force invalidation should be used,
+ /// and which kind. Returning `None` indicates no forced invalidation
async fn cache_hit_filter(
&self,
_session: &Session,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
- ) -> Result<bool>
+ ) -> Result<Option<ForcedInvalidationKind>>
where
Self::CTX: Send + Sync,
{
- Ok(false)
+ Ok(None)
}
/// Decide if a request should continue to upstream after not being served from cache.