aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKevin Guthrie <[email protected]>2024-11-18 10:43:52 -0500
committerYuchen Wu <[email protected]>2024-12-13 17:27:40 -0800
commit9850e92e33af870a0761c2473673f9a0ada61b0b (patch)
tree9f663f0694653c2764e1dc1d0d7fce99b34fd65b
parenta8a6e77eef2c0f4d2a45f00c5b0e316dd373f2f2 (diff)
downloadpingora-9850e92e33af870a0761c2473673f9a0ada61b0b.tar.gz
pingora-9850e92e33af870a0761c2473673f9a0ada61b0b.zip
Add the ability to trigger forced-miss behavior from the `cache_hit_filter` function
-rw-r--r--.bleep2
-rw-r--r--pingora-cache/src/lib.rs114
-rw-r--r--pingora-proxy/src/proxy_cache.rs155
-rw-r--r--pingora-proxy/src/proxy_trait.rs20
-rw-r--r--pingora-proxy/tests/test_upstream.rs37
-rw-r--r--pingora-proxy/tests/utils/server_utils.rs13
6 files changed, 216 insertions, 125 deletions
diff --git a/.bleep b/.bleep
index 22e32a6..dd7f31a 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-7dbf3a97f9e59d8ad1d8e5d199cae6ee49869b9c \ No newline at end of file
+f664d443d0b1b709e2276e8ea74cc1c26659a372 \ No newline at end of file
diff --git a/pingora-cache/src/lib.rs b/pingora-cache/src/lib.rs
index 6ac49a5..c65f602 100644
--- a/pingora-cache/src/lib.rs
+++ b/pingora-cache/src/lib.rs
@@ -23,6 +23,7 @@ use pingora_error::Result;
use pingora_http::ResponseHeader;
use rustracing::tag::Tag;
use std::time::{Duration, Instant, SystemTime};
+use strum::IntoStaticStr;
use trace::CacheTraceCTX;
pub mod cache_control;
@@ -210,34 +211,59 @@ impl RespCacheable {
}
}
+/// Indicators of which level of purge logic to apply to an asset. As in should
+/// the purged file be revalidated or re-retrieved altogether
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ForcedInvalidationKind {
+ /// Indicates the asset should be considered stale and revalidated
+ ForceExpired,
+
+ /// Indicates the asset should be considered absent and treated like a miss
+ /// instead of a hit
+ ForceMiss,
+}
+
/// Freshness state of cache hit asset
///
///
-#[derive(Debug, Copy, Clone)]
+#[derive(Debug, Copy, Clone, IntoStaticStr, PartialEq, Eq)]
+#[strum(serialize_all = "snake_case")]
pub enum HitStatus {
+ /// The asset's freshness directives indicate it has expired
Expired,
+
+ /// The asset was marked as expired, and should be treated as stale
ForceExpired,
+
+ /// The asset was marked as absent, and should be treated as a miss
+ ForceMiss,
+
+ /// An error occurred while processing the asset, so it should be treated as
+ /// a miss
FailedHitFilter,
+
+ /// The asset is not expired
Fresh,
}
impl HitStatus {
/// For displaying cache hit status
pub fn as_str(&self) -> &'static str {
- match self {
- Self::Expired => "expired",
- Self::ForceExpired => "force_expired",
- Self::FailedHitFilter => "failed_hit_filter",
- Self::Fresh => "fresh",
- }
+ self.into()
}
/// Whether cached asset can be served as fresh
pub fn is_fresh(&self) -> bool {
- match self {
- Self::Expired | Self::ForceExpired | Self::FailedHitFilter => false,
- Self::Fresh => true,
- }
+ *self == HitStatus::Fresh
+ }
+
+ /// Check whether the hit status should be treated as a miss. A forced miss
+ /// is obviously treated as a miss. A hit-filter failure is treated as a
+ /// miss because we can't use the asset as an actual hit. If we treat it as
+ /// expired, we still might not be able to use it even if revalidation
+ /// succeeds.
+ pub fn is_treated_as_miss(self) -> bool {
+ matches!(self, HitStatus::ForceMiss | HitStatus::FailedHitFilter)
}
}
@@ -496,34 +522,46 @@ impl HttpCache {
///
/// The `hit_status` enum allows the caller to force expire assets.
pub fn cache_found(&mut self, meta: CacheMeta, hit_handler: HitHandler, hit_status: HitStatus) {
- match self.phase {
- // Stale allowed because of cache lock and then retry
- CachePhase::CacheKey | CachePhase::Stale => {
- self.phase = if hit_status.is_fresh() {
- CachePhase::Hit
- } else {
- CachePhase::Stale
- };
- let phase = self.phase;
- let inner = self.inner_mut();
- let key = inner.key.as_ref().unwrap();
- if phase == CachePhase::Stale {
- if let Some(lock) = inner.cache_lock.as_ref() {
- inner.lock = Some(lock.lock(key));
- }
- }
- inner.traces.start_hit_span(phase, hit_status);
- inner.traces.log_meta_in_hit_span(&meta);
- if let Some(eviction) = inner.eviction {
- // TODO: make access() accept CacheKey
- let cache_key = key.to_compact();
- // FIXME: get size
- eviction.access(&cache_key, 0, meta.0.internal.fresh_until);
- }
- inner.meta = Some(meta);
- inner.body_reader = Some(hit_handler);
+ // Stale allowed because of cache lock and then retry
+ if !matches!(self.phase, CachePhase::CacheKey | CachePhase::Stale) {
+ panic!("wrong phase {:?}", self.phase)
+ }
+
+ self.phase = match hit_status {
+ HitStatus::Fresh => CachePhase::Hit,
+ HitStatus::Expired | HitStatus::ForceExpired => CachePhase::Stale,
+ HitStatus::FailedHitFilter | HitStatus::ForceMiss => self.phase,
+ };
+
+ let phase = self.phase;
+ let inner = self.inner_mut();
+
+ let key = inner.key.as_ref().unwrap();
+
+ // The cache lock might not be set for stale hit or hits treated as
+ // misses, so we need to initialize it here
+ if phase == CachePhase::Stale || hit_status.is_treated_as_miss() {
+ if let Some(lock) = inner.cache_lock.as_ref() {
+ inner.lock = Some(lock.lock(key));
}
- _ => panic!("wrong phase {:?}", self.phase),
+ }
+
+ if hit_status.is_treated_as_miss() {
+ // Clear the body and meta for hits that are treated as misses
+ inner.body_reader = None;
+ inner.meta = None;
+ } else {
+ // Set the metadata appropriately for legit hits
+ inner.traces.start_hit_span(phase, hit_status);
+ inner.traces.log_meta_in_hit_span(&meta);
+ if let Some(eviction) = inner.eviction {
+ // TODO: make access() accept CacheKey
+ let cache_key = key.to_compact();
+ // FIXME(PINGORA-1914): get size
+ eviction.access(&cache_key, 0, meta.0.internal.fresh_until);
+ }
+ inner.meta = Some(meta);
+ inner.body_reader = Some(hit_handler);
}
}
diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs
index c720575..454416e 100644
--- a/pingora-proxy/src/proxy_cache.rs
+++ b/pingora-proxy/src/proxy_cache.rs
@@ -17,7 +17,7 @@ use http::{Method, StatusCode};
use pingora_cache::key::CacheHashKey;
use pingora_cache::lock::LockStatus;
use pingora_cache::max_file_size::ERR_RESPONSE_TOO_LARGE;
-use pingora_cache::{HitStatus, RespCacheable::*};
+use pingora_cache::{ForcedInvalidationKind, HitStatus, RespCacheable::*};
use pingora_core::protocols::http::conditional_filter::to_304;
use pingora_core::protocols::http::v1::common::header_value_content_length;
use pingora_core::ErrorType;
@@ -79,6 +79,7 @@ impl<SV> HttpProxy<SV> {
// for cache lock, TODO: cap the max number of loops
match session.cache.cache_lookup().await {
Ok(res) => {
+ let mut hit_status_opt = None;
if let Some((mut meta, handler)) = res {
// Vary logic
// Because this branch can be called multiple times in a loop, and we only
@@ -112,10 +113,8 @@ impl<SV> HttpProxy<SV> {
// hit
// TODO: maybe round and/or cache now()
let hit_status = if meta.is_fresh(std::time::SystemTime::now()) {
- // check if we should force expire
- // (this is a soft purge which tries to revalidate,
+ // check if we should force expire or miss
// vs. hard purge which forces miss)
- // TODO: allow hard purge
match self.inner.cache_hit_filter(session, &meta, ctx).await {
Err(e) => {
error!(
@@ -125,86 +124,26 @@ impl<SV> HttpProxy<SV> {
// this return value will cause us to fetch from upstream
HitStatus::FailedHitFilter
}
- Ok(expired) => {
+ Ok(None) => HitStatus::Fresh,
+ Ok(Some(ForcedInvalidationKind::ForceExpired)) => {
// force expired asset should not be serve as stale
// because force expire is usually to remove data
- if expired {
- meta.disable_serve_stale();
- HitStatus::ForceExpired
- } else {
- HitStatus::Fresh
- }
+ meta.disable_serve_stale();
+ HitStatus::ForceExpired
}
+ Ok(Some(ForcedInvalidationKind::ForceMiss)) => HitStatus::ForceMiss,
}
} else {
HitStatus::Expired
};
+
+ hit_status_opt = Some(hit_status);
+
// init cache for hit / stale
session.cache.cache_found(meta, handler, hit_status);
+ }
- if !hit_status.is_fresh() {
- // expired or force expired asset
- if session.cache.is_cache_locked() {
- // first if this is the sub request for the background cache update
- if let Some(write_lock) = session
- .subrequest_ctx
- .as_mut()
- .and_then(|ctx| ctx.write_lock.take())
- {
- // Put the write lock in the request
- session.cache.set_write_lock(write_lock);
- session.cache.tag_as_subrequest();
- // and then let it go to upstream
- break None;
- }
- let will_serve_stale = session.cache.can_serve_stale_updating()
- && self.inner.should_serve_stale(session, ctx, None);
- if !will_serve_stale {
- let lock_status = session.cache.cache_lock_wait().await;
- if self.handle_lock_status(session, ctx, lock_status) {
- continue;
- } else {
- break None;
- }
- }
- // else continue to serve stale
- session.cache.set_stale_updating();
- } else if session.cache.is_cache_lock_writer() {
- // stale while revalidate logic for the writer
- let will_serve_stale = session.cache.can_serve_stale_updating()
- && self.inner.should_serve_stale(session, ctx, None);
- if will_serve_stale {
- // create a background thread to do the actual update
- let subrequest =
- Box::new(crate::subrequest::create_dummy_session(session));
- let new_app = self.clone(); // Clone the Arc
- let sub_req_ctx = Box::new(SubReqCtx {
- write_lock: Some(session.cache.take_write_lock()),
- });
- tokio::spawn(async move {
- new_app.process_subrequest(subrequest, sub_req_ctx).await;
- });
- // continue to serve stale for this request
- session.cache.set_stale_updating();
- } else {
- // return to fetch from upstream
- break None;
- }
- } else {
- // return to fetch from upstream
- break None;
- }
- }
- let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
- if let Some(e) = err.as_ref() {
- error!(
- "Fail to serve cache: {e}, {}",
- self.inner.request_summary(session, ctx)
- );
- }
- // responses is served from cache, exit
- break Some((reuse, err));
- } else {
+ if hit_status_opt.map_or(true, HitStatus::is_treated_as_miss) {
// cache miss
if session.cache.is_cache_locked() {
// Another request is filling the cache; try waiting til that's done and retry.
@@ -219,6 +158,74 @@ impl<SV> HttpProxy<SV> {
break None;
}
}
+
+ // Safe because an empty hit status would have broken out
+ // in the block above
+ let hit_status = hit_status_opt.expect("None case handled as miss");
+
+ if !hit_status.is_fresh() {
+ // expired or force expired asset
+ if session.cache.is_cache_locked() {
+ // first if this is the sub request for the background cache update
+ if let Some(write_lock) = session
+ .subrequest_ctx
+ .as_mut()
+ .and_then(|ctx| ctx.write_lock.take())
+ {
+ // Put the write lock in the request
+ session.cache.set_write_lock(write_lock);
+ session.cache.tag_as_subrequest();
+ // and then let it go to upstream
+ break None;
+ }
+ let will_serve_stale = session.cache.can_serve_stale_updating()
+ && self.inner.should_serve_stale(session, ctx, None);
+ if !will_serve_stale {
+ let lock_status = session.cache.cache_lock_wait().await;
+ if self.handle_lock_status(session, ctx, lock_status) {
+ continue;
+ } else {
+ break None;
+ }
+ }
+ // else continue to serve stale
+ session.cache.set_stale_updating();
+ } else if session.cache.is_cache_lock_writer() {
+ // stale while revalidate logic for the writer
+ let will_serve_stale = session.cache.can_serve_stale_updating()
+ && self.inner.should_serve_stale(session, ctx, None);
+ if will_serve_stale {
+ // create a background thread to do the actual update
+ let subrequest =
+ Box::new(crate::subrequest::create_dummy_session(session));
+ let new_app = self.clone(); // Clone the Arc
+ let sub_req_ctx = Box::new(SubReqCtx {
+ write_lock: Some(session.cache.take_write_lock()),
+ });
+ tokio::spawn(async move {
+ new_app.process_subrequest(subrequest, sub_req_ctx).await;
+ });
+ // continue to serve stale for this request
+ session.cache.set_stale_updating();
+ } else {
+ // return to fetch from upstream
+ break None;
+ }
+ } else {
+ // return to fetch from upstream
+ break None;
+ }
+ }
+
+ let (reuse, err) = self.proxy_cache_hit(session, ctx).await;
+ if let Some(e) = err.as_ref() {
+ error!(
+ "Fail to serve cache: {e}, {}",
+ self.inner.request_summary(session, ctx)
+ );
+ }
+ // responses is served from cache, exit
+ break Some((reuse, err));
}
Err(e) => {
// Allow cache miss to fill cache even if cache lookup errors
diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs
index 6970ec0..00e54f4 100644
--- a/pingora-proxy/src/proxy_trait.rs
+++ b/pingora-proxy/src/proxy_trait.rs
@@ -13,7 +13,11 @@
// limitations under the License.
use super::*;
-use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
+use pingora_cache::{
+ key::HashBinary,
+ CacheKey, CacheMeta, ForcedInvalidationKind,
+ RespCacheable::{self, *},
+};
use std::time::Duration;
/// The interface to control the HTTP proxy
@@ -129,21 +133,23 @@ pub trait ProxyHttp {
session.cache.cache_miss();
}
- /// This filter is called after a successful cache lookup and before the cache asset is ready to
- /// be used.
+ /// This filter is called after a successful cache lookup and before the
+ /// cache asset is ready to be used.
///
- /// This filter allow the user to log or force expire the asset.
- // flex purge, other filtering, returns whether asset is should be force expired or not
+ /// This filter allows the user to log or force invalidate the asset.
+ ///
+ /// The value returned indicates if the force invalidation should be used,
+ /// and which kind. Returning `None` indicates no forced invalidation
async fn cache_hit_filter(
&self,
_session: &Session,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
- ) -> Result<bool>
+ ) -> Result<Option<ForcedInvalidationKind>>
where
Self::CTX: Send + Sync,
{
- Ok(false)
+ Ok(None)
}
/// Decide if a request should continue to upstream after not being served from cache.
diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs
index b2d9575..bbac874 100644
--- a/pingora-proxy/tests/test_upstream.rs
+++ b/pingora-proxy/tests/test_upstream.rs
@@ -388,6 +388,43 @@ mod test_cache {
}
#[tokio::test]
+ async fn test_force_miss() {
+ init();
+ let url = "http://127.0.0.1:6148/unique/test_froce_miss/revalidate_now";
+
+ let res = reqwest::get(url).await.unwrap();
+ assert_eq!(res.status(), StatusCode::OK);
+ let headers = res.headers();
+ let cache_miss_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap();
+ assert_eq!(headers["x-cache-status"], "miss");
+ assert_eq!(headers["x-upstream-status"], "200");
+ assert_eq!(res.text().await.unwrap(), "hello world");
+
+ let res = reqwest::get(url).await.unwrap();
+ assert_eq!(res.status(), StatusCode::OK);
+ let headers = res.headers();
+ let cache_hit_epoch = headers["x-epoch"].to_str().unwrap().parse::<f64>().unwrap();
+ assert_eq!(headers["x-cache-status"], "hit");
+ assert!(headers.get("x-upstream-status").is_none());
+ assert_eq!(res.text().await.unwrap(), "hello world");
+
+ assert_eq!(cache_miss_epoch, cache_hit_epoch);
+
+ let res = reqwest::Client::new()
+ .get(url)
+ .header("x-force-miss", "1")
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(res.status(), StatusCode::OK);
+ let headers = res.headers();
+ assert_eq!(headers["x-cache-status"], "miss");
+ assert_eq!(headers["x-upstream-status"], "200");
+ assert_eq!(res.text().await.unwrap(), "hello world");
+ }
+
+ #[tokio::test]
async fn test_cache_downstream_revalidation_etag() {
init();
let url = "http://127.0.0.1:6148/unique/test_downstream_revalidation_etag/revalidate_now";
diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs
index f762f33..0c5a4e5 100644
--- a/pingora-proxy/tests/utils/server_utils.rs
+++ b/pingora-proxy/tests/utils/server_utils.rs
@@ -21,12 +21,12 @@ use http::HeaderValue;
use once_cell::sync::Lazy;
use pingora_cache::cache_control::CacheControl;
use pingora_cache::key::HashBinary;
-use pingora_cache::VarianceBuilder;
use pingora_cache::{
eviction::simple_lru::Manager, filters::resp_cacheable, lock::CacheLock, predictor::Predictor,
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
+use pingora_cache::{ForcedInvalidationKind, PurgeType, VarianceBuilder};
use pingora_core::apps::{HttpServerApp, HttpServerOptions};
use pingora_core::modules::http::compression::ResponseCompression;
use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
@@ -415,12 +415,15 @@ impl ProxyHttp for ExampleProxyCache {
session: &Session,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
- ) -> Result<bool> {
- // allow test header to control force expiry
+ ) -> Result<Option<ForcedInvalidationKind>> {
+ // allow test header to control force expiry/miss
+ if session.get_header_bytes("x-force-miss") != b"" {
+ return Ok(Some(ForcedInvalidationKind::ForceMiss));
+ }
if session.get_header_bytes("x-force-expire") != b"" {
- return Ok(true);
+ return Ok(Some(ForcedInvalidationKind::ForceExpired));
}
- Ok(false)
+ Ok(None)
}
fn cache_vary_filter(