aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrew Hauck <[email protected]>2024-06-13 15:54:44 -0700
committerEdward Wang <[email protected]>2024-06-28 12:34:25 -0700
commitfbf3a9574930077ed01f955e9e8df2ec74a51215 (patch)
tree0c83473fb4e05cdae3debfbaef3591d63ac2df73
parent6e83d51ab12249b07cac60ecfd79d20bca5f49b5 (diff)
downloadpingora-fbf3a9574930077ed01f955e9e8df2ec74a51215.tar.gz
pingora-fbf3a9574930077ed01f955e9e8df2ec74a51215.zip
Add a write timeout to write body buf and an option to set a minimum send rate
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/protocols/http/client.rs4
-rw-r--r--pingora-core/src/protocols/http/server.rs30
-rw-r--r--pingora-core/src/protocols/http/v1/server.rs116
-rw-r--r--pingora-proxy/tests/test_upstream.rs54
-rw-r--r--pingora-proxy/tests/utils/conf/origin/conf/nginx.conf9
-rw-r--r--pingora-proxy/tests/utils/server_utils.rs19
7 files changed, 230 insertions, 4 deletions
diff --git a/.bleep b/.bleep
index df99f64..dd860b4 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-1fe0ed665dfcf6222a4d08f6120172be64d27eb9 \ No newline at end of file
+19506db280fe5f52641e5c1ffd48e4b62f536f18 \ No newline at end of file
diff --git a/pingora-core/src/protocols/http/client.rs b/pingora-core/src/protocols/http/client.rs
index 5c44b6f..ed73152 100644
--- a/pingora-core/src/protocols/http/client.rs
+++ b/pingora-core/src/protocols/http/client.rs
@@ -89,7 +89,9 @@ impl HttpSession {
/// Set the write timeout for writing header and body.
///
- /// The timeout is per write operation, not on the overall time writing the entire request
+ /// The timeout is per write operation, not on the overall time writing the entire request.
+ ///
+ /// This is a noop for h2.
pub fn set_write_timeout(&mut self, timeout: Duration) {
match self {
HttpSession::H1(h1) => h1.write_timeout = Some(timeout),
diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs
index d4113b0..78c1856 100644
--- a/pingora-core/src/protocols/http/server.rs
+++ b/pingora-core/src/protocols/http/server.rs
@@ -25,6 +25,7 @@ use http::{header::AsHeaderName, HeaderMap};
use log::error;
use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader};
+use std::time::Duration;
/// HTTP server session object for both HTTP/1.x and HTTP/2
pub enum Session {
@@ -188,6 +189,35 @@ impl Session {
}
}
+ /// Sets the downstream write timeout. This will trigger if we're unable
+ /// to write to the stream after `duration`. If a `min_send_rate` is
+ /// configured then the `min_send_rate` calculated timeout has higher priority.
+ ///
+ /// This is a noop for h2.
+ pub fn set_write_timeout(&mut self, timeout: Duration) {
+ match self {
+ Self::H1(s) => s.set_write_timeout(timeout),
+ Self::H2(_) => {}
+ }
+ }
+
+ /// Sets the minimum downstream send rate in bytes per second. This
+ /// is used to calculate a write timeout in seconds based on the size
+ /// of the buffer being written. If a `min_send_rate` is configured it
+ /// has higher priority over a set `write_timeout`. The minimum send
+ /// rate must be greater than zero.
+ ///
+ /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
+ /// is greater than zero, a send rate of zero is a noop.
+ ///
+ /// This is a noop for h2.
+ pub fn set_min_send_rate(&mut self, rate: usize) {
+ match self {
+ Self::H1(s) => s.set_min_send_rate(rate),
+ Self::H2(_) => {}
+ }
+ }
+
/// Return a digest of the request including the method, path and Host header
// TODO: make this use a `Formatter`
pub fn request_summary(&self) -> String {
diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs
index 909381c..f545b6b 100644
--- a/pingora-core/src/protocols/http/v1/server.rs
+++ b/pingora-core/src/protocols/http/v1/server.rs
@@ -72,6 +72,8 @@ pub struct HttpSession {
upgraded: bool,
/// Digest to track underlying connection metrics
digest: Box<Digest>,
+ /// Minimum send rate to the client
+ min_send_rate: Option<usize>,
}
impl HttpSession {
@@ -106,6 +108,7 @@ impl HttpSession {
retry_buffer: None,
upgraded: false,
digest,
+ min_send_rate: None,
}
}
@@ -511,6 +514,18 @@ impl HttpSession {
is_buf_keepalive(self.get_header(header::CONNECTION))
}
+ // calculate write timeout from min_send_rate if set, otherwise return write_timeout
+ fn write_timeout(&self, buf_len: usize) -> Option<Duration> {
+ let Some(min_send_rate) = self.min_send_rate.filter(|r| *r > 0) else {
+ return self.write_timeout;
+ };
+
+ // min timeout is 1s
+ let ms = (buf_len.max(min_send_rate) as f64 / min_send_rate as f64) * 1000.0;
+ // truncates unrealistically large values (we'll be out of memory before this happens)
+ Some(Duration::from_millis(ms as u64))
+ }
+
/// Apply keepalive settings according to the client
/// For HTTP 1.1, assume keepalive as long as there is no `Connection: Close` request header.
/// For HTTP 1.0, only keepalive if there is an explicit header `Connection: keep-alive`.
@@ -579,7 +594,7 @@ impl HttpSession {
/// to be written, e.g., writing more bytes than what the `Content-Length` header suggests
pub async fn write_body(&mut self, buf: &[u8]) -> Result<Option<usize>> {
// TODO: check if the response header is written
- match self.write_timeout {
+ match self.write_timeout(buf.len()) {
Some(t) => match timeout(t, self.do_write_body(buf)).await {
Ok(res) => res,
Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")),
@@ -588,7 +603,7 @@ impl HttpSession {
}
}
- async fn write_body_buf(&mut self) -> Result<Option<usize>> {
+ async fn do_write_body_buf(&mut self) -> Result<Option<usize>> {
// Don't flush empty chunks, they are considered end of body for chunks
if self.body_write_buf.is_empty() {
return Ok(None);
@@ -609,6 +624,16 @@ impl HttpSession {
written
}
+ async fn write_body_buf(&mut self) -> Result<Option<usize>> {
+ match self.write_timeout(self.body_write_buf.len()) {
+ Some(t) => match timeout(t, self.do_write_body_buf()).await {
+ Ok(res) => res,
+ Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")),
+ },
+ None => self.do_write_body_buf().await,
+ }
+ }
+
fn maybe_force_close_body_reader(&mut self) {
if self.upgraded && !self.body_reader.body_done() {
// response is done, reset the request body to close
@@ -778,6 +803,27 @@ impl HttpSession {
}
}
+ /// Sets the downstream write timeout. This will trigger if we're unable
+ /// to write to the stream after `duration`. If a `min_send_rate` is
+ /// configured then the `min_send_rate` calculated timeout has higher priority.
+ pub fn set_write_timeout(&mut self, timeout: Duration) {
+ self.write_timeout = Some(timeout);
+ }
+
+ /// Sets the minimum downstream send rate in bytes per second. This
+ /// is used to calculate a write timeout in seconds based on the size
+ /// of the buffer being written. If a `min_send_rate` is configured it
+ /// has higher priority over a set `write_timeout`. The minimum send
+ /// rate must be greater than zero.
+ ///
+ /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate`
+ /// is greater than zero, a send rate of zero is a noop.
+ pub fn set_min_send_rate(&mut self, min_send_rate: usize) {
+ if min_send_rate > 0 {
+ self.min_send_rate = Some(min_send_rate);
+ }
+ }
+
/// Return the [Digest] of the connection.
pub fn digest(&self) -> &Digest {
&self.digest
@@ -1584,6 +1630,30 @@ mod tests_stream {
}
#[tokio::test]
+ #[should_panic(expected = "There is still data left to write.")]
+ async fn test_write_body_buf_write_timeout() {
+ let wire1 = b"HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\n";
+ let wire2 = b"abc";
+ let mock_io = Builder::new()
+ .write(wire1)
+ .wait(Duration::from_millis(500))
+ .write(wire2)
+ .build();
+ let mut http_stream = HttpSession::new(Box::new(mock_io));
+ http_stream.write_timeout = Some(Duration::from_millis(100));
+ let mut new_response = ResponseHeader::build(StatusCode::OK, None).unwrap();
+ new_response.append_header("Content-Length", "3").unwrap();
+ http_stream.update_resp_headers = false;
+ http_stream
+ .write_response_header_ref(&new_response)
+ .await
+ .unwrap();
+ http_stream.body_write_buf = BytesMut::from(&b"abc"[..]);
+ let res = http_stream.write_body_buf().await;
+ assert_eq!(res.unwrap_err().etype(), &WriteTimedout);
+ }
+
+ #[tokio::test]
async fn test_write_continue_resp() {
let wire = b"HTTP/1.1 100 Continue\r\n\r\n";
let mock_io = Builder::new().write(wire).build();
@@ -1610,6 +1680,48 @@ mod tests_stream {
response.set_version(http::Version::HTTP_11);
assert!(!is_upgrade_resp(&response));
}
+
+ #[test]
+ fn test_get_write_timeout() {
+ let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
+ let expected = Duration::from_secs(5);
+
+ http_stream.set_write_timeout(expected);
+ assert_eq!(Some(expected), http_stream.write_timeout(50));
+ }
+
+ #[test]
+ fn test_get_write_timeout_none() {
+ let http_stream = HttpSession::new(Box::new(Builder::new().build()));
+ assert!(http_stream.write_timeout(50).is_none());
+ }
+
+ #[test]
+ fn test_get_write_timeout_min_send_rate_zero_noop() {
+ let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
+ http_stream.set_min_send_rate(0);
+ assert!(http_stream.write_timeout(50).is_none());
+ }
+
+ #[test]
+ fn test_get_write_timeout_min_send_rate_overrides_write_timeout() {
+ let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
+ let expected = Duration::from_millis(29800);
+
+ http_stream.set_write_timeout(Duration::from_secs(60));
+ http_stream.set_min_send_rate(5000);
+
+ assert_eq!(Some(expected), http_stream.write_timeout(149000));
+ }
+
+ #[test]
+ fn test_get_write_timeout_min_send_rate_max_zero_buf() {
+ let mut http_stream = HttpSession::new(Box::new(Builder::new().build()));
+ let expected = Duration::from_secs(1);
+
+ http_stream.set_min_send_rate(1);
+ assert_eq!(Some(expected), http_stream.write_timeout(0));
+ }
}
#[cfg(test)]
diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs
index ad7aaaf..2600832 100644
--- a/pingora-proxy/tests/test_upstream.rs
+++ b/pingora-proxy/tests/test_upstream.rs
@@ -133,6 +133,60 @@ async fn test_ws_server_ends_conn() {
assert!(ws_stream.next().await.is_none());
}
+#[tokio::test]
+async fn test_download_timeout() {
+ init();
+ use hyper::body::HttpBody;
+ use tokio::time::sleep;
+
+ let client = hyper::Client::new();
+ let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap();
+ let req = hyper::Request::builder()
+ .uri(uri)
+ .header("x-write-timeout", "1")
+ .body(hyper::Body::empty())
+ .unwrap();
+ let mut res = client.request(req).await.unwrap();
+ assert_eq!(res.status(), StatusCode::OK);
+
+ let mut err = false;
+ sleep(Duration::from_secs(2)).await;
+ while let Some(chunk) = res.body_mut().data().await {
+ if chunk.is_err() {
+ err = true;
+ }
+ }
+ assert!(err);
+}
+
+#[tokio::test]
+async fn test_download_timeout_min_rate() {
+ init();
+ use hyper::body::HttpBody;
+ use tokio::time::sleep;
+
+ let client = hyper::Client::new();
+ let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap();
+ let req = hyper::Request::builder()
+ .uri(uri)
+ .header("x-write-timeout", "1")
+ .header("x-min-rate", "10000")
+ .body(hyper::Body::empty())
+ .unwrap();
+ let mut res = client.request(req).await.unwrap();
+ assert_eq!(res.status(), StatusCode::OK);
+
+ let mut err = false;
+ sleep(Duration::from_secs(2)).await;
+ while let Some(chunk) = res.body_mut().data().await {
+ if chunk.is_err() {
+ err = true;
+ }
+ }
+ // no error as write timeout is overridden by min rate
+ assert!(!err);
+}
+
mod test_cache {
use super::*;
use std::str::FromStr;
diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
index 07e57e8..a41a743 100644
--- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
+++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
@@ -296,6 +296,15 @@ http {
}
}
+ location /download/ {
+ content_by_lua_block {
+ ngx.req.read_body()
+ local body = string.rep("A", 4194304)
+ ngx.header["Content-Length"] = #body
+ ngx.print(body)
+ }
+ }
+
location /tls_verify {
keepalive_timeout 0;
return 200;
diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs
index 7b20630..ec1a962 100644
--- a/pingora-proxy/tests/utils/server_utils.rs
+++ b/pingora-proxy/tests/utils/server_utils.rs
@@ -39,6 +39,7 @@ use pingora_proxy::{ProxyHttp, Session};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::thread;
+use std::time::Duration;
pub struct ExampleProxyHttps {}
@@ -230,6 +231,17 @@ impl ProxyHttp for ExampleProxyHttp {
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req = session.req_header();
+
+ let write_timeout = req
+ .headers
+ .get("x-write-timeout")
+ .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok()));
+
+ let min_rate = req
+ .headers
+ .get("x-min-rate")
+ .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok()));
+
let downstream_compression = req.headers.get("x-downstream-compression").is_some();
if !downstream_compression {
// enable upstream compression for all requests by default
@@ -242,6 +254,13 @@ impl ProxyHttp for ExampleProxyHttp {
.adjust_level(0);
}
+ if let Some(min_rate) = min_rate {
+ session.set_min_send_rate(min_rate);
+ }
+ if let Some(write_timeout) = write_timeout {
+ session.set_write_timeout(Duration::from_secs(write_timeout));
+ }
+
Ok(false)
}