aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2024-05-13 15:39:21 -0700
committerEdward Wang <[email protected]>2024-05-24 10:00:06 -0700
commitf38f3b9a38cae13e116f868ef23e977252dc24a2 (patch)
tree8d797e6f631071e48e78d9ca18948c50a7ced945
parent34b2a35d7b8cac98fe3ac9f576b7c8255b274283 (diff)
downloadpingora-f38f3b9a38cae13e116f868ef23e977252dc24a2.tar.gz
pingora-f38f3b9a38cae13e116f868ef23e977252dc24a2.zip
Add request body filter
-rw-r--r--.bleep2
-rw-r--r--pingora-proxy/src/proxy_h1.rs80
-rw-r--r--pingora-proxy/src/proxy_h2.rs78
-rw-r--r--pingora-proxy/src/proxy_trait.rs24
4 files changed, 123 insertions, 61 deletions
diff --git a/.bleep b/.bleep
index 186f68f..3040aae 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-2c9d4c55853235e908a1acd20454ebe7b979d246 \ No newline at end of file
+1952c0cbc08e1cef0a5ed280ce55b17e7066e49d \ No newline at end of file
diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs
index c702af3..feac860 100644
--- a/pingora-proxy/src/proxy_h1.rs
+++ b/pingora-proxy/src/proxy_h1.rs
@@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> {
.reserve()
.await
.or_err(InternalError, "reserving body pipe")?;
- send_body_to_pipe(buffer, downstream_state.is_done(), send_permit).await;
+ self.send_body_to_pipe(
+ session,
+ buffer,
+ downstream_state.is_done(),
+ send_permit,
+ ctx,
+ )
+ .await?;
}
let mut response_state = ResponseStateMachine::new();
@@ -288,12 +295,15 @@ impl<SV> HttpProxy<SV> {
response_state.maybe_set_upstream_done(true);
}
// TODO: consider just drain this if serve_from_cache is set
- let request_done = send_body_to_pipe(
+ let is_body_done = session.is_body_done();
+ let request_done = self.send_body_to_pipe(
+ session,
body,
- session.is_body_done(),
+ is_body_done,
send_permit.unwrap(), // safe because we checked is_ok()
+ ctx,
)
- .await;
+ .await?;
downstream_state.maybe_finished(request_done);
},
@@ -520,30 +530,48 @@ impl<SV> HttpProxy<SV> {
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
-}
-// TODO:: use this function to replace send_body_to2
-pub(crate) async fn send_body_to_pipe(
- data: Option<Bytes>,
- end_of_body: bool,
- tx: mpsc::Permit<'_, HttpTask>,
-) -> bool {
- match data {
- Some(data) => {
- debug!("Read {} bytes body from downstream", data.len());
- if data.is_empty() && !end_of_body {
- /* it is normal to get 0 bytes because of multi-chunk
- * don't write 0 bytes to downstream since it will be
- * misread as the terminating chunk */
- return false;
- }
- tx.send(HttpTask::Body(Some(data), end_of_body));
- end_of_body
- }
- None => {
- tx.send(HttpTask::Body(None, true));
- true
+ // TODO:: use this function to replace send_body_to2
+ async fn send_body_to_pipe(
+ &self,
+ session: &mut Session,
+ mut data: Option<Bytes>,
+ end_of_body: bool,
+ tx: mpsc::Permit<'_, HttpTask>,
+ ctx: &mut SV::CTX,
+ ) -> Result<bool>
+ where
+ SV: ProxyHttp + Send + Sync,
+ SV::CTX: Send + Sync,
+ {
+ // None: end of body
+ // this var is to signal if downstream finish sending the body, which shouldn't be
+ // affected by the request_body_filter
+ let end_of_body = end_of_body || data.is_none();
+
+ self.inner
+ .request_body_filter(session, &mut data, end_of_body, ctx)
+ .await?;
+
+ // the flag to signal to upstream
+ let upstream_end_of_body = end_of_body || data.is_none();
+
+ /* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to
+ * output anything yet.
+ * Don't write 0 bytes to the network since it will be
+ * treated as the terminating chunk */
+ if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
+ return Ok(false);
}
+
+ debug!(
+ "Read {} bytes body from downstream",
+ data.as_ref().map_or(-1, |d| d.len() as isize)
+ );
+
+ tx.send(HttpTask::Body(data, upstream_end_of_body));
+
+ Ok(end_of_body)
}
}
diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs
index 8f0dffa..534564f 100644
--- a/pingora-proxy/src/proxy_h2.rs
+++ b/pingora-proxy/src/proxy_h2.rs
@@ -223,7 +223,14 @@ impl<SV> HttpProxy<SV> {
// retry, send buffer if it exists
if let Some(buffer) = session.as_mut().get_retry_buffer() {
- send_body_to2(Ok(Some(buffer)), downstream_state.is_done(), client_body)?;
+ self.send_body_to2(
+ session,
+ Some(buffer),
+ downstream_state.is_done(),
+ client_body,
+ ctx,
+ )
+ .await?;
}
let mut response_state = ResponseStateMachine::new();
@@ -260,7 +267,10 @@ impl<SV> HttpProxy<SV> {
}
}
};
- let request_done = send_body_to2(Ok(body), session.is_body_done(), client_body)?;
+ let is_body_done = session.is_body_done();
+ let request_done =
+ self.send_body_to2(session, body, is_body_done, client_body, ctx)
+ .await?;
downstream_state.maybe_finished(request_done);
},
@@ -497,38 +507,40 @@ impl<SV> HttpProxy<SV> {
HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down
}
}
-}
-pub(crate) fn send_body_to2(
- data: Result<Option<Bytes>>,
- end_of_body: bool,
- client_body: &mut h2::SendStream<bytes::Bytes>,
-) -> Result<bool> {
- match data {
- Ok(res) => match res {
- Some(data) => {
- let data_len = data.len();
- debug!(
- "Read {} bytes body from downstream, body end: {}",
- data_len, end_of_body
- );
- if data_len == 0 && !end_of_body {
- /* it is normal to get 0 bytes because of multi-chunk parsing */
- return Ok(false);
- }
- write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
- debug!("Write {} bytes body to h2 upstream", data_len);
- Ok(end_of_body)
- }
- None => {
- debug!("Read downstream body done");
- /* send a standalone END_STREAM flag */
- write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
- debug!("Write END_STREAM to h2 upstream");
- Ok(true)
- }
- },
- Err(e) => e.into_down().into_err(),
+ async fn send_body_to2(
+ &self,
+ session: &mut Session,
+ mut data: Option<Bytes>,
+ end_of_body: bool,
+ client_body: &mut h2::SendStream<bytes::Bytes>,
+ ctx: &mut SV::CTX,
+ ) -> Result<bool>
+ where
+ SV: ProxyHttp + Send + Sync,
+ SV::CTX: Send + Sync,
+ {
+ self.inner
+ .request_body_filter(session, &mut data, end_of_body, ctx)
+ .await?;
+
+ /* it is normal to get 0 bytes because of multi-chunk parsing or request_body_filter.
+ * Although there is no harm writing empty byte to h2, unlike h1, we ignore it
+ * for consistency */
+ if !end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) {
+ return Ok(false);
+ }
+
+ if let Some(data) = data {
+ debug!("Write {} bytes body to h2 upstream", data.len());
+ write_body(client_body, data, end_of_body).map_err(|e| e.into_up())?;
+ } else {
+ debug!("Read downstream body done");
+ /* send a standalone END_STREAM flag */
+ write_body(client_body, Bytes::new(), true).map_err(|e| e.into_up())?;
+ }
+
+ Ok(end_of_body)
}
}
diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs
index 0158681..adeddd8 100644
--- a/pingora-proxy/src/proxy_trait.rs
+++ b/pingora-proxy/src/proxy_trait.rs
@@ -14,6 +14,7 @@
use super::*;
use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
+use std::time::Duration;
/// The interface to control the HTTP proxy
///
@@ -55,6 +56,27 @@ pub trait ProxyHttp {
Ok(false)
}
+ /// Handle the incoming request body.
+ ///
+ /// This function will be called every time a piece of request body is received. The `body` is
+ /// **not the entire request body**.
+ ///
+ /// The async nature of this function allows to throttle the upload speed and/or executing
+ /// heavy computation logic such as WAF rules on offloaded threads without blocking the threads
+ /// who process the requests themselves.
+ async fn request_body_filter(
+ &self,
+ _session: &mut Session,
+ _body: &mut Option<Bytes>,
+ _end_of_stream: bool,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()>
+ where
+ Self::CTX: Send + Sync,
+ {
+ Ok(())
+ }
+
/// This filter decides if the request is cacheable and what cache backend to use
///
/// The caller can interact with `Session.cache` to enable caching.
@@ -238,7 +260,7 @@ pub trait ProxyHttp {
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
- ) -> Result<Option<std::time::Duration>>
+ ) -> Result<Option<Duration>>
where
Self::CTX: Send + Sync,
{