diff options
author | Yuchen Wu <[email protected]> | 2024-05-13 15:39:21 -0700 |
---|---|---|
committer | Edward Wang <[email protected]> | 2024-05-24 10:00:06 -0700 |
commit | f38f3b9a38cae13e116f868ef23e977252dc24a2 (patch) | |
tree | 8d797e6f631071e48e78d9ca18948c50a7ced945 | |
parent | 34b2a35d7b8cac98fe3ac9f576b7c8255b274283 (diff) | |
download | pingora-f38f3b9a38cae13e116f868ef23e977252dc24a2.tar.gz pingora-f38f3b9a38cae13e116f868ef23e977252dc24a2.zip |
Add request body filter
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h1.rs | 80 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h2.rs | 78 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_trait.rs | 24 |
4 files changed, 123 insertions, 61 deletions
@@ -1 +1 @@ -2c9d4c55853235e908a1acd20454ebe7b979d246
\ No newline at end of file +1952c0cbc08e1cef0a5ed280ce55b17e7066e49d
\ No newline at end of file diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index c702af3..feac860 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> { .reserve() .await .or_err(InternalError, "reserving body pipe")?; - send_body_to_pipe(buffer, downstream_state.is_done(), send_permit).await; + self.send_body_to_pipe( + session, + buffer, + downstream_state.is_done(), + send_permit, + ctx, + ) + .await?; } let mut response_state = ResponseStateMachine::new(); @@ -288,12 +295,15 @@ impl<SV> HttpProxy<SV> { response_state.maybe_set_upstream_done(true); } // TODO: consider just drain this if serve_from_cache is set - let request_done = send_body_to_pipe( + let is_body_done = session.is_body_done(); + let request_done = self.send_body_to_pipe( + session, body, - session.is_body_done(), + is_body_done, send_permit.unwrap(), // safe because we checked is_ok() + ctx, ) - .await; + .await?; downstream_state.maybe_finished(request_done); }, @@ -520,30 +530,48 @@ impl<SV> HttpProxy<SV> { HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down } } -} -// TODO:: use this function to replace send_body_to2 -pub(crate) async fn send_body_to_pipe( - data: Option<Bytes>, - end_of_body: bool, - tx: mpsc::Permit<'_, HttpTask>, -) -> bool { - match data { - Some(data) => { - debug!("Read {} bytes body from downstream", data.len()); - if data.is_empty() && !end_of_body { - /* it is normal to get 0 bytes because of multi-chunk - * don't write 0 bytes to downstream since it will be - * misread as the terminating chunk */ - return false; - } - tx.send(HttpTask::Body(Some(data), end_of_body)); - end_of_body - } - None => { - tx.send(HttpTask::Body(None, true)); - true + // TODO:: use this function to replace send_body_to2 + async fn send_body_to_pipe( + &self, + session: &mut Session, + mut data: Option<Bytes>, + end_of_body: bool, + tx: mpsc::Permit<'_, HttpTask>, + ctx: &mut SV::CTX, + ) -> Result<bool> + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + // None: end of body + // this var is to signal if downstream finish sending the body, which shouldn't be + // affected by the request_body_filter + let end_of_body = end_of_body || data.is_none(); + + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; + + // the flag to signal to upstream + let upstream_end_of_body = end_of_body || data.is_none(); + + /* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to + * output anything yet. + * Don't write 0 bytes to the network since it will be + * treated as the terminating chunk */ + if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) { + return Ok(false); } + + debug!( + "Read {} bytes body from downstream", + data.as_ref().map_or(-1, |d| d.len() as isize) + ); + + tx.send(HttpTask::Body(data, upstream_end_of_body)); + + Ok(end_of_body) } } diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 8f0dffa..534564f 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> { // retry, send buffer if it exists if let Some(buffer) = session.as_mut().get_retry_buffer() { - send_body_to2(Ok(Some(buffer)), downstream_state.is_done(), client_body)?; + self.send_body_to2( + session, + Some(buffer), + downstream_state.is_done(), + client_body, + ctx, + ) + .await?; } let mut response_state = ResponseStateMachine::new(); @@ -260,7 +267,10 @@ impl<SV> HttpProxy<SV> { } } }; - let request_done = send_body_to2(Ok(body), session.is_body_done(), client_body)?; + let is_body_done = session.is_body_done(); + let request_done = + self.send_body_to2(session, body, is_body_done, client_body, ctx) + .await?; downstream_state.maybe_finished(request_done); }, @@ -497,38 +507,40 @@ impl<SV> HttpProxy<SV> { HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down } } -} -pub(crate) fn send_body_to2( - data: Result<Option<Bytes>>, - end_of_body: bool, - client_body: &mut h2::SendStream<bytes::Bytes>, -) -> Result<bool> { - match data { - Ok(res) => match res { - Some(data) => { - let data_len = data.len(); - debug!( - "Read {} bytes body from downstream, body end: {}", - data_len, end_of_body - ); - if data_len == 0 && !end_of_body { - /* it is normal to get 0 bytes because of multi-chunk parsing */ - return Ok(false); - } - write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?; - debug!("Write {} bytes body to h2 upstream", data_len); - Ok(end_of_body) - } - None => { - debug!("Read downstream body done"); - /* send a standalone END_STREAM flag */ - write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?; - debug!("Write END_STREAM to h2 upstream"); - Ok(true) - } - }, - Err(e) => e.into_down().into_err(), + async fn send_body_to2( + &self, + session: &mut Session, + mut data: Option<Bytes>, + end_of_body: bool, + client_body: &mut h2::SendStream<bytes::Bytes>, + ctx: &mut SV::CTX, + ) -> Result<bool> + where + SV: ProxyHttp + Send + Sync, + SV::CTX: Send + Sync, + { + self.inner + .request_body_filter(session, &mut data, end_of_body, ctx) + .await?; + + /* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter. + * Although there is no harm writing empty byte to h2, unlike h1, we ignore it + * for consistency */ + if !end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) { + return Ok(false); + } + + if let Some(data) = data { + debug!("Write {} bytes body to h2 upstream", data.len()); + write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?; + } else { + debug!("Read downstream body done"); + /* send a standalone END_STREAM flag */ + write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?; + } + + Ok(end_of_body) } } diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 0158681..adeddd8 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -14,6 +14,7 @@ use super::*; use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*}; +use std::time::Duration; /// The interface to control the HTTP proxy /// @@ -55,6 +56,27 @@ pub trait ProxyHttp { Ok(false) } + /// Handle the incoming request body. + /// + /// This function will be called every time a piece of request body is received. The `body` is + /// **not the entire request body**. + /// + /// The async nature of this function allows to throttle the upload speed and/or executing + /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads + /// who process the requests themselves. + async fn request_body_filter( + &self, + _session: &mut Session, + _body: &mut Option<Bytes>, + _end_of_stream: bool, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + Ok(()) + } + /// This filter decides if the request is cacheable and what cache backend to use /// /// The caller can interact with `Session.cache` to enable caching. @@ -238,7 +260,7 @@ pub trait ProxyHttp { _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX, - ) -> Result<Option<std::time::Duration>> + ) -> Result<Option<Duration>> where Self::CTX: Send + Sync, { |