diff options
author | Andrew Hauck <[email protected]> | 2024-06-13 15:54:44 -0700 |
---|---|---|
committer | Edward Wang <[email protected]> | 2024-06-28 12:34:25 -0700 |
commit | fbf3a9574930077ed01f955e9e8df2ec74a51215 (patch) | |
tree | 0c83473fb4e05cdae3debfbaef3591d63ac2df73 | |
parent | 6e83d51ab12249b07cac60ecfd79d20bca5f49b5 (diff) | |
download | pingora-fbf3a9574930077ed01f955e9e8df2ec74a51215.tar.gz pingora-fbf3a9574930077ed01f955e9e8df2ec74a51215.zip |
Add a write timeout to write body buf and an option to set a minimum send rate
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/client.rs | 4 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/server.rs | 30 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v1/server.rs | 116 | ||||
-rw-r--r-- | pingora-proxy/tests/test_upstream.rs | 54 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/conf/origin/conf/nginx.conf | 9 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/server_utils.rs | 19 |
7 files changed, 230 insertions, 4 deletions
@@ -1 +1 @@ -1fe0ed665dfcf6222a4d08f6120172be64d27eb9
\ No newline at end of file +19506db280fe5f52641e5c1ffd48e4b62f536f18
\ No newline at end of file diff --git a/pingora-core/src/protocols/http/client.rs b/pingora-core/src/protocols/http/client.rs index 5c44b6f..ed73152 100644 --- a/pingora-core/src/protocols/http/client.rs +++ b/pingora-core/src/protocols/http/client.rs @@ -89,7 +89,9 @@ impl HttpSession { /// Set the write timeout for writing header and body. /// - /// The timeout is per write operation, not on the overall time writing the entire request + /// The timeout is per write operation, not on the overall time writing the entire request. + /// + /// This is a noop for h2. pub fn set_write_timeout(&mut self, timeout: Duration) { match self { HttpSession::H1(h1) => h1.write_timeout = Some(timeout), diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs index d4113b0..78c1856 100644 --- a/pingora-core/src/protocols/http/server.rs +++ b/pingora-core/src/protocols/http/server.rs @@ -25,6 +25,7 @@ use http::{header::AsHeaderName, HeaderMap}; use log::error; use pingora_error::Result; use pingora_http::{RequestHeader, ResponseHeader}; +use std::time::Duration; /// HTTP server session object for both HTTP/1.x and HTTP/2 pub enum Session { @@ -188,6 +189,35 @@ impl Session { } } + /// Sets the downstream write timeout. This will trigger if we're unable + /// to write to the stream after `duration`. If a `min_send_rate` is + /// configured then the `min_send_rate` calculated timeout has higher priority. + /// + /// This is a noop for h2. + pub fn set_write_timeout(&mut self, timeout: Duration) { + match self { + Self::H1(s) => s.set_write_timeout(timeout), + Self::H2(_) => {} + } + } + + /// Sets the minimum downstream send rate in bytes per second. This + /// is used to calculate a write timeout in seconds based on the size + /// of the buffer being written. If a `min_send_rate` is configured it + /// has higher priority over a set `write_timeout`. The minimum send + /// rate must be greater than zero. + /// + /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate` + /// is greater than zero, a send rate of zero is a noop. + /// + /// This is a noop for h2. + pub fn set_min_send_rate(&mut self, rate: usize) { + match self { + Self::H1(s) => s.set_min_send_rate(rate), + Self::H2(_) => {} + } + } + /// Return a digest of the request including the method, path and Host header // TODO: make this use a `Formatter` pub fn request_summary(&self) -> String { diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index 909381c..f545b6b 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -72,6 +72,8 @@ pub struct HttpSession { upgraded: bool, /// Digest to track underlying connection metrics digest: Box<Digest>, + /// Minimum send rate to the client + min_send_rate: Option<usize>, } impl HttpSession { @@ -106,6 +108,7 @@ impl HttpSession { retry_buffer: None, upgraded: false, digest, + min_send_rate: None, } } @@ -511,6 +514,18 @@ impl HttpSession { is_buf_keepalive(self.get_header(header::CONNECTION)) } + // calculate write timeout from min_send_rate if set, otherwise return write_timeout + fn write_timeout(&self, buf_len: usize) -> Option<Duration> { + let Some(min_send_rate) = self.min_send_rate.filter(|r| *r > 0) else { + return self.write_timeout; + }; + + // min timeout is 1s + let ms = (buf_len.max(min_send_rate) as f64 / min_send_rate as f64) * 1000.0; + // truncates unrealistically large values (we'll be out of memory before this happens) + Some(Duration::from_millis(ms as u64)) + } + /// Apply keepalive settings according to the client /// For HTTP 1.1, assume keepalive as long as there is no `Connection: Close` request header. /// For HTTP 1.0, only keepalive if there is an explicit header `Connection: keep-alive`. @@ -579,7 +594,7 @@ impl HttpSession { /// to be written, e.g., writing more bytes than what the `Content-Length` header suggests pub async fn write_body(&mut self, buf: &[u8]) -> Result<Option<usize>> { // TODO: check if the response header is written - match self.write_timeout { + match self.write_timeout(buf.len()) { Some(t) => match timeout(t, self.do_write_body(buf)).await { Ok(res) => res, Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")), @@ -588,7 +603,7 @@ impl HttpSession { } } - async fn write_body_buf(&mut self) -> Result<Option<usize>> { + async fn do_write_body_buf(&mut self) -> Result<Option<usize>> { // Don't flush empty chunks, they are considered end of body for chunks if self.body_write_buf.is_empty() { return Ok(None); @@ -609,6 +624,16 @@ impl HttpSession { written } + async fn write_body_buf(&mut self) -> Result<Option<usize>> { + match self.write_timeout(self.body_write_buf.len()) { + Some(t) => match timeout(t, self.do_write_body_buf()).await { + Ok(res) => res, + Err(_) => Error::e_explain(WriteTimedout, format!("writing body, timeout: {t:?}")), + }, + None => self.do_write_body_buf().await, + } + } + fn maybe_force_close_body_reader(&mut self) { if self.upgraded && !self.body_reader.body_done() { // response is done, reset the request body to close @@ -778,6 +803,27 @@ impl HttpSession { } } + /// Sets the downstream write timeout. This will trigger if we're unable + /// to write to the stream after `duration`. If a `min_send_rate` is + /// configured then the `min_send_rate` calculated timeout has higher priority. + pub fn set_write_timeout(&mut self, timeout: Duration) { + self.write_timeout = Some(timeout); + } + + /// Sets the minimum downstream send rate in bytes per second. This + /// is used to calculate a write timeout in seconds based on the size + /// of the buffer being written. If a `min_send_rate` is configured it + /// has higher priority over a set `write_timeout`. The minimum send + /// rate must be greater than zero. + /// + /// Calculated write timeout is guaranteed to be at least 1s if `min_send_rate` + /// is greater than zero, a send rate of zero is a noop. + pub fn set_min_send_rate(&mut self, min_send_rate: usize) { + if min_send_rate > 0 { + self.min_send_rate = Some(min_send_rate); + } + } + /// Return the [Digest] of the connection. pub fn digest(&self) -> &Digest { &self.digest @@ -1584,6 +1630,30 @@ mod tests_stream { } #[tokio::test] + #[should_panic(expected = "There is still data left to write.")] + async fn test_write_body_buf_write_timeout() { + let wire1 = b"HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\n"; + let wire2 = b"abc"; + let mock_io = Builder::new() + .write(wire1) + .wait(Duration::from_millis(500)) + .write(wire2) + .build(); + let mut http_stream = HttpSession::new(Box::new(mock_io)); + http_stream.write_timeout = Some(Duration::from_millis(100)); + let mut new_response = ResponseHeader::build(StatusCode::OK, None).unwrap(); + new_response.append_header("Content-Length", "3").unwrap(); + http_stream.update_resp_headers = false; + http_stream + .write_response_header_ref(&new_response) + .await + .unwrap(); + http_stream.body_write_buf = BytesMut::from(&b"abc"[..]); + let res = http_stream.write_body_buf().await; + assert_eq!(res.unwrap_err().etype(), &WriteTimedout); + } + + #[tokio::test] async fn test_write_continue_resp() { let wire = b"HTTP/1.1 100 Continue\r\n\r\n"; let mock_io = Builder::new().write(wire).build(); @@ -1610,6 +1680,48 @@ mod tests_stream { response.set_version(http::Version::HTTP_11); assert!(!is_upgrade_resp(&response)); } + + #[test] + fn test_get_write_timeout() { + let mut http_stream = HttpSession::new(Box::new(Builder::new().build())); + let expected = Duration::from_secs(5); + + http_stream.set_write_timeout(expected); + assert_eq!(Some(expected), http_stream.write_timeout(50)); + } + + #[test] + fn test_get_write_timeout_none() { + let http_stream = HttpSession::new(Box::new(Builder::new().build())); + assert!(http_stream.write_timeout(50).is_none()); + } + + #[test] + fn test_get_write_timeout_min_send_rate_zero_noop() { + let mut http_stream = HttpSession::new(Box::new(Builder::new().build())); + http_stream.set_min_send_rate(0); + assert!(http_stream.write_timeout(50).is_none()); + } + + #[test] + fn test_get_write_timeout_min_send_rate_overrides_write_timeout() { + let mut http_stream = HttpSession::new(Box::new(Builder::new().build())); + let expected = Duration::from_millis(29800); + + http_stream.set_write_timeout(Duration::from_secs(60)); + http_stream.set_min_send_rate(5000); + + assert_eq!(Some(expected), http_stream.write_timeout(149000)); + } + + #[test] + fn test_get_write_timeout_min_send_rate_max_zero_buf() { + let mut http_stream = HttpSession::new(Box::new(Builder::new().build())); + let expected = Duration::from_secs(1); + + http_stream.set_min_send_rate(1); + assert_eq!(Some(expected), http_stream.write_timeout(0)); + } } #[cfg(test)] diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index ad7aaaf..2600832 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -133,6 +133,60 @@ async fn test_ws_server_ends_conn() { assert!(ws_stream.next().await.is_none()); } +#[tokio::test] +async fn test_download_timeout() { + init(); + use hyper::body::HttpBody; + use tokio::time::sleep; + + let client = hyper::Client::new(); + let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap(); + let req = hyper::Request::builder() + .uri(uri) + .header("x-write-timeout", "1") + .body(hyper::Body::empty()) + .unwrap(); + let mut res = client.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + let mut err = false; + sleep(Duration::from_secs(2)).await; + while let Some(chunk) = res.body_mut().data().await { + if chunk.is_err() { + err = true; + } + } + assert!(err); +} + +#[tokio::test] +async fn test_download_timeout_min_rate() { + init(); + use hyper::body::HttpBody; + use tokio::time::sleep; + + let client = hyper::Client::new(); + let uri: hyper::Uri = "http://127.0.0.1:6147/download/".parse().unwrap(); + let req = hyper::Request::builder() + .uri(uri) + .header("x-write-timeout", "1") + .header("x-min-rate", "10000") + .body(hyper::Body::empty()) + .unwrap(); + let mut res = client.request(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + let mut err = false; + sleep(Duration::from_secs(2)).await; + while let Some(chunk) = res.body_mut().data().await { + if chunk.is_err() { + err = true; + } + } + // no error as write timeout is overridden by min rate + assert!(!err); +} + mod test_cache { use super::*; use std::str::FromStr; diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf index 07e57e8..a41a743 100644 --- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -296,6 +296,15 @@ http { } } + location /download/ { + content_by_lua_block { + ngx.req.read_body() + local body = string.rep("A", 4194304) + ngx.header["Content-Length"] = #body + ngx.print(body) + } + } + location /tls_verify { keepalive_timeout 0; return 200; diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 7b20630..ec1a962 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -39,6 +39,7 @@ use pingora_proxy::{ProxyHttp, Session}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::thread; +use std::time::Duration; pub struct ExampleProxyHttps {} @@ -230,6 +231,17 @@ impl ProxyHttp for ExampleProxyHttp { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { let req = session.req_header(); + + let write_timeout = req + .headers + .get("x-write-timeout") + .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok())); + + let min_rate = req + .headers + .get("x-min-rate") + .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok())); + let downstream_compression = req.headers.get("x-downstream-compression").is_some(); if !downstream_compression { // enable upstream compression for all requests by default @@ -242,6 +254,13 @@ impl ProxyHttp for ExampleProxyHttp { .adjust_level(0); } + if let Some(min_rate) = min_rate { + session.set_min_send_rate(min_rate); + } + if let Some(write_timeout) = write_timeout { + session.set_write_timeout(Duration::from_secs(write_timeout)); + } + Ok(false) } |