diff options
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v2/client.rs | 41 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h2.rs | 44 | ||||
-rw-r--r-- | pingora-proxy/tests/test_upstream.rs | 30 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/conf/origin/conf/nginx.conf | 9 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/server_utils.rs | 9 |
6 files changed, 117 insertions, 18 deletions
@@ -1 +1 @@ -88def7550ecab6e9511d7931cb97f03de54b89a5
\ No newline at end of file +92f47c7c3d831163573e595b4f99db68670647fa
\ No newline at end of file diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 4d5559a..7cc112d 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -16,6 +16,7 @@ // TODO: this module needs a refactor use bytes::Bytes; +use futures::FutureExt; use h2::client::{self, ResponseFuture, SendRequest}; use h2::{Reason, RecvStream, SendStream}; use http::HeaderMap; @@ -194,10 +195,6 @@ impl Http2Session { return Ok(None); }; - if body_reader.is_end_stream() { - return Ok(None); - } - let fut = body_reader.data(); let res = match self.read_timeout { Some(t) => timeout(t, fut) @@ -234,6 +231,42 @@ impl Http2Session { .map_or(false, |reader| reader.is_end_stream()) } + /// Check whether stream finished with error. + /// Like `response_finished`, but also attempts to poll the h2 stream for errors that may have + /// caused the stream to terminate, and returns them as `H2Error`s. + pub fn check_response_end_or_error(&mut self) -> Result<bool> { + let Some(reader) = self.response_body_reader.as_mut() else { + // response is not even read + return Ok(false); + }; + + if !reader.is_end_stream() { + return Ok(false); + } + + // https://github.com/hyperium/h2/issues/806 + // The fundamental issue is that h2::RecvStream may return `is_end_stream` true + // when the stream was naturally closed via END_STREAM /OR/ if there was an error + // while reading data frames that forced the closure. + // The h2 API as-is makes it difficult to determine which situation is occurring. + // + // `poll_data` should be returning None after `is_end_stream`, if the stream + // is truly expecting no more data to be sent. + // https://docs.rs/h2/latest/h2/struct.RecvStream.html#method.is_end_stream + // So poll the data once to check this condition. If an error is returned, that indicates + // that the stream closed due to an error e.g. h2 protocol error. + match reader.data().now_or_never() { + Some(None) => Ok(true), + Some(Some(Ok(_))) => Error::e_explain(H2Error, "unexpected data after end stream"), + Some(Some(Err(e))) => Error::e_because(H2Error, "while checking end stream", e), + None => { + // RecvStream data() should be ready to poll after the stream ends, + // this indicates an unexpected change in the h2 crate + panic!("data() not ready after end stream") + } + } + } + /// Read the optional trailer headers pub async fn read_trailers(&mut self) -> Result<Option<HeaderMap>> { let Some(reader) = self.response_body_reader.as_mut() else { diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 53b2c14..fad91c2 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -569,9 +569,21 @@ pub(crate) async fn pipe_2to1_response( let resp_header = Box::new(client.response_header().expect("just read").clone()); - tx.send(HttpTask::Header(resp_header, client.response_finished())) - .await - .or_err(InternalError, "sending h2 headers to pipe")?; + match client.check_response_end_or_error() { + Ok(eos) => { + tx.send(HttpTask::Header(resp_header, eos)) + .await + .or_err(InternalError, "sending h2 headers to pipe")?; + } + Err(e) => { + // If upstream errored, then push error to downstream and then quit + // Don't care if send fails (which means downstream already gone) + // we were still able to retrieve the headers, so try sending + let _ = tx.send(HttpTask::Header(resp_header, false)).await; + let _ = tx.send(HttpTask::Failed(e.into_up())).await; + return Ok(()); + } + } while let Some(chunk) = client .read_response_body() @@ -583,21 +595,29 @@ pub(crate) async fn pipe_2to1_response( Ok(d) => d, Err(e) => { // Push the error to downstream and then quit - // Don't care if send fails: downstream already gone let _ = tx.send(HttpTask::Failed(e.into_up())).await; // Downstream should consume all remaining data and handle the error return Ok(()); } }; - if data.is_empty() && !client.response_finished() { - /* it is normal to get 0 bytes because of multi-chunk - * don't write 0 bytes to downstream since it will be - * misread as the terminating chunk */ - continue; + match client.check_response_end_or_error() { + Ok(eos) => { + if data.is_empty() && !eos { + /* it is normal to get 0 bytes because of multi-chunk + * don't write 0 bytes to downstream since it will be + * misread as the terminating chunk */ + continue; + } + tx.send(HttpTask::Body(Some(data), eos)) + .await + .or_err(InternalError, "sending h2 body to pipe")?; + } + Err(e) => { + // Similar to above, push the error to downstream and then quit + let _ = tx.send(HttpTask::Failed(e.into_up())).await; + return Ok(()); + } } - tx.send(HttpTask::Body(Some(data), client.response_finished())) - .await - .or_err(InternalError, "sending h2 body to pipe")?; } // attempt to get trailers diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 42319a3..d391b9d 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -2047,4 +2047,34 @@ mod test_cache { assert_eq!(headers["x-cache-status"], "no-cache"); assert_eq!(res.text().await.unwrap(), "hello world"); } + + #[tokio::test] + async fn test_cache_h2_premature_end() { + init(); + let url = "http://127.0.0.1:6148/set_content_length/test_cache_h2_premature_end.txt"; + // try to fill cache + reqwest::Client::new() + .get(url) + .header("x-lock", "true") + .header("x-h2", "true") + .header("x-set-content-length", "13") // 2 more than "hello world" + .send() + .await + .unwrap(); + // h2 protocol error with content length mismatch + + // did not get saved into cache, next request will be cache miss + let res = reqwest::Client::new() + .get(url) + .header("x-lock", "true") + .header("x-h2", "true") + .header("x-set-content-length", "11") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello world"); + } } diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf index 2718f88..0818649 100644 --- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -408,6 +408,15 @@ http { } } + location /set_content_length { + header_filter_by_lua_block { + if ngx.var.http_x_set_content_length then + ngx.header["Content-Length"] = ngx.var.http_x_set_content_length + end + } + return 200 "hello world"; + } + location /slow_body { content_by_lua_block { local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1 diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 1f19a16..f762f33 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -359,11 +359,18 @@ impl ProxyHttp for ExampleProxyCache { .headers .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); - let peer = Box::new(HttpPeer::new( + + let mut peer = Box::new(HttpPeer::new( format!("127.0.0.1:{}", port), false, "".to_string(), )); + + if session.get_header_bytes("x-h2") == b"true" { + // default is 1, 1 + peer.options.set_http_version(2, 2); + } + Ok(peer) } |