diff options
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-proxy/src/lib.rs | 1 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_cache.rs | 6 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_purge.rs | 110 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_trait.rs | 16 |
5 files changed, 93 insertions, 42 deletions
@@ -1 +1 @@ -64c5e3fa4a538348e25a1ac5fe18245fc6d1eefc
\ No newline at end of file +b10bac2a769c0b0307110f10280c9518705512dd
\ No newline at end of file diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 40783a0..7880476 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -79,6 +79,7 @@ mod subrequest; use subrequest::Ctx as SubReqCtx; +pub use proxy_purge::PurgeStatus; pub use proxy_trait::ProxyHttp; pub mod prelude { diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index f7e35b1..00b61da 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -62,11 +62,7 @@ impl<SV> HttpProxy<SV> { // cache purge logic: PURGE short-circuits rest of request if self.inner.is_purge(session, ctx) { - if session.cache.enabled() { - return self.proxy_purge(session, ctx).await; - } else { - return Some(proxy_purge::write_no_purge_response(session).await); - } + return self.proxy_purge(session, ctx).await; } // bypass cache lookup if we predict to be uncacheable diff --git a/pingora-proxy/src/proxy_purge.rs b/pingora-proxy/src/proxy_purge.rs index 73e27a0..1bfce7c 100644 --- a/pingora-proxy/src/proxy_purge.rs +++ b/pingora-proxy/src/proxy_purge.rs @@ -13,6 +13,33 @@ // limitations under the License. use super::*; +use pingora_core::protocols::http::error_resp; +use std::borrow::Cow; + +#[derive(Debug)] +pub enum PurgeStatus { + /// Cache was not enabled, purge ineffectual. + NoCache, + /// Asset was found in cache (and presumably purged or being purged). + Found, + /// Asset was not found in cache. + NotFound, + /// Cache returned a purge error. + /// Contains causing error in case it should affect the downstream response. + Error(Box<Error>), +} + +// Return a canned response to a purge request, based on whether the cache had the asset or not +// (or otherwise returned an error). +fn purge_response(purge_status: &PurgeStatus) -> Cow<'static, ResponseHeader> { + let resp = match purge_status { + PurgeStatus::NoCache => &*NOT_PURGEABLE, + PurgeStatus::Found => &*OK, + PurgeStatus::NotFound => &*NOT_FOUND, + PurgeStatus::Error(ref _e) => &*INTERNAL_ERROR, + }; + Cow::Borrowed(resp) +} fn gen_purge_response(code: u16) -> ResponseHeader { let mut resp = ResponseHeader::build(code, Some(3)).unwrap(); @@ -25,27 +52,12 @@ fn gen_purge_response(code: u16) -> ResponseHeader { resp } -async fn write_purge_response( - session: &mut Session, - resp: &ResponseHeader, -) -> (bool, Option<Box<Error>>) { - match session.as_mut().write_response_header_ref(resp).await { - Ok(_) => (true, None), - // dirty, not reusable - Err(e) => (false, Some(e.into_down())), - } -} - -/// Write a response for a rejected cache purge requests -pub async fn write_no_purge_response(session: &mut Session) -> (bool, Option<Box<Error>>) { - // TODO: log send error - write_purge_response(session, &NOT_PURGEABLE).await -} - static OK: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(200)); static NOT_FOUND: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(404)); // for when purge is sent to uncacheable assets static NOT_PURGEABLE: Lazy<ResponseHeader> = Lazy::new(|| gen_purge_response(405)); +// on cache storage or proxy error +static INTERNAL_ERROR: Lazy<ResponseHeader> = Lazy::new(|| error_resp::gen_error_response(500)); impl<SV> HttpProxy<SV> { pub(crate) async fn proxy_purge( @@ -57,31 +69,57 @@ impl<SV> HttpProxy<SV> { SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { - match session.cache.purge().await { - Ok(found) => { - // canned PURGE response based on whether we found the asset or not - let resp = if found { &*OK } else { &*NOT_FOUND }; - let (reuse, err) = write_purge_response(session, resp).await; - if let Some(e) = err.as_ref() { - error!( - "Failed to send purge response: {}, {}", - e, + let purge_status = if session.cache.enabled() { + match session.cache.purge().await { + Ok(found) => { + if found { + PurgeStatus::Found + } else { + PurgeStatus::NotFound + } + } + Err(e) => { + session.cache.disable(NoCacheReason::StorageError); + warn!( + "Fail to purge cache: {e}, {}", self.inner.request_summary(session, ctx) - ) + ); + PurgeStatus::Error(e) } - Some((reuse, err)) } + } else { + // cache was not enabled + PurgeStatus::NoCache + }; + + let mut purge_resp = purge_response(&purge_status); + if let Err(e) = + self.inner + .purge_response_filter(session, ctx, purge_status, &mut purge_resp) + { + error!( + "Failed purge response filter: {e}, {}", + self.inner.request_summary(session, ctx) + ); + purge_resp = Cow::Borrowed(&*INTERNAL_ERROR) + } + + let write_result = match purge_resp { + Cow::Borrowed(r) => session.as_mut().write_response_header_ref(r).await, + Cow::Owned(r) => session.as_mut().write_response_header(Box::new(r)).await, + }; + let (reuse, err) = match write_result { + Ok(_) => (true, None), + // dirty, not reusable Err(e) => { - session.cache.disable(NoCacheReason::StorageError); - warn!( - "Fail to purge cache: {}, {}", - e, + let e = e.into_down(); + error!( + "Failed to send purge response: {e}, {}", self.inner.request_summary(session, ctx) ); - session.downstream_session.respond_error(500).await; - // still reusable - Some((true, Some(e))) + (false, Some(e)) } - } + }; + Some((reuse, err)) } } diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 888854c..0158681 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -400,4 +400,20 @@ pub trait ProxyHttp { fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool { false } + + /// This filter is called after the proxy cache generates the downstream response to the purge + /// request (to invalidate or delete from the HTTP cache), based on the purge status, which + /// indicates whether the request succeeded or failed. + /// + /// The filter allows the user to modify or replace the generated downstream response. + /// If the filter returns `Err`, the proxy will instead send a 500 response. + fn purge_response_filter( + &self, + _session: &Session, + _ctx: &mut Self::CTX, + _purge_status: PurgeStatus, + _purge_response: &mut std::borrow::Cow<'static, ResponseHeader>, + ) -> Result<()> { + Ok(()) + } } |