From 4e4eb5a362fe8dc5f8bff8bc4deabf549c7f8ac5 Mon Sep 17 00:00:00 2001 From: Edward Wang Date: Wed, 30 Oct 2024 17:03:59 -0700 Subject: Check h2 stream end for error Certain APIs consuming HTTPTasks want to know if the response has finished. However, in cases such as protocol errors the h2 RecvStream will return `is_end_stream` which may be associated with an HTTPTask that was not actually the end of stream. This affects systems such as caching which wants to know when the response has properly finished for finishing a cache miss. This change attempts to check if the stream has ended due to an h2 error or not to forward a successful or Failed HTTPTask as appropriate. --- .bleep | 2 +- pingora-core/src/protocols/http/v2/client.rs | 41 ++++++++++++++++++-- pingora-proxy/src/proxy_h2.rs | 44 ++++++++++++++++------ pingora-proxy/tests/test_upstream.rs | 30 +++++++++++++++ .../tests/utils/conf/origin/conf/nginx.conf | 9 +++++ pingora-proxy/tests/utils/server_utils.rs | 9 ++++- 6 files changed, 117 insertions(+), 18 deletions(-) diff --git a/.bleep b/.bleep index a555de0..3fcdfe3 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -88def7550ecab6e9511d7931cb97f03de54b89a5 \ No newline at end of file +92f47c7c3d831163573e595b4f99db68670647fa \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 4d5559a..7cc112d 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -16,6 +16,7 @@ // TODO: this module needs a refactor use bytes::Bytes; +use futures::FutureExt; use h2::client::{self, ResponseFuture, SendRequest}; use h2::{Reason, RecvStream, SendStream}; use http::HeaderMap; @@ -194,10 +195,6 @@ impl Http2Session { return Ok(None); }; - if body_reader.is_end_stream() { - return Ok(None); - } - let fut = body_reader.data(); let res = match self.read_timeout { Some(t) => timeout(t, fut) @@ -234,6 +231,42 @@ impl Http2Session { .map_or(false, |reader| reader.is_end_stream()) } + /// Check whether stream finished with error. + /// Like `response_finished`, but also attempts to poll the h2 stream for errors that may have + /// caused the stream to terminate, and returns them as `H2Error`s. + pub fn check_response_end_or_error(&mut self) -> Result { + let Some(reader) = self.response_body_reader.as_mut() else { + // response is not even read + return Ok(false); + }; + + if !reader.is_end_stream() { + return Ok(false); + } + + // https://github.com/hyperium/h2/issues/806 + // The fundamental issue is that h2::RecvStream may return `is_end_stream` true + // when the stream was naturally closed via END_STREAM /OR/ if there was an error + // while reading data frames that forced the closure. + // The h2 API as-is makes it difficult to determine which situation is occurring. + // + // `poll_data` should be returning None after `is_end_stream`, if the stream + // is truly expecting no more data to be sent. + // https://docs.rs/h2/latest/h2/struct.RecvStream.html#method.is_end_stream + // So poll the data once to check this condition. If an error is returned, that indicates + // that the stream closed due to an error e.g. h2 protocol error. + match reader.data().now_or_never() { + Some(None) => Ok(true), + Some(Some(Ok(_))) => Error::e_explain(H2Error, "unexpected data after end stream"), + Some(Some(Err(e))) => Error::e_because(H2Error, "while checking end stream", e), + None => { + // RecvStream data() should be ready to poll after the stream ends, + // this indicates an unexpected change in the h2 crate + panic!("data() not ready after end stream") + } + } + } + /// Read the optional trailer headers pub async fn read_trailers(&mut self) -> Result> { let Some(reader) = self.response_body_reader.as_mut() else { diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 53b2c14..fad91c2 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -569,9 +569,21 @@ pub(crate) async fn pipe_2to1_response( let resp_header = Box::new(client.response_header().expect("just read").clone()); - tx.send(HttpTask::Header(resp_header, client.response_finished())) - .await - .or_err(InternalError, "sending h2 headers to pipe")?; + match client.check_response_end_or_error() { + Ok(eos) => { + tx.send(HttpTask::Header(resp_header, eos)) + .await + .or_err(InternalError, "sending h2 headers to pipe")?; + } + Err(e) => { + // If upstream errored, then push error to downstream and then quit + // Don't care if send fails (which means downstream already gone) + // we were still able to retrieve the headers, so try sending + let _ = tx.send(HttpTask::Header(resp_header, false)).await; + let _ = tx.send(HttpTask::Failed(e.into_up())).await; + return Ok(()); + } + } while let Some(chunk) = client .read_response_body() @@ -583,21 +595,29 @@ pub(crate) async fn pipe_2to1_response( Ok(d) => d, Err(e) => { // Push the error to downstream and then quit - // Don't care if send fails: downstream already gone let _ = tx.send(HttpTask::Failed(e.into_up())).await; // Downstream should consume all remaining data and handle the error return Ok(()); } }; - if data.is_empty() && !client.response_finished() { - /* it is normal to get 0 bytes because of multi-chunk - * don't write 0 bytes to downstream since it will be - * misread as the terminating chunk */ - continue; + match client.check_response_end_or_error() { + Ok(eos) => { + if data.is_empty() && !eos { + /* it is normal to get 0 bytes because of multi-chunk + * don't write 0 bytes to downstream since it will be + * misread as the terminating chunk */ + continue; + } + tx.send(HttpTask::Body(Some(data), eos)) + .await + .or_err(InternalError, "sending h2 body to pipe")?; + } + Err(e) => { + // Similar to above, push the error to downstream and then quit + let _ = tx.send(HttpTask::Failed(e.into_up())).await; + return Ok(()); + } } - tx.send(HttpTask::Body(Some(data), client.response_finished())) - .await - .or_err(InternalError, "sending h2 body to pipe")?; } // attempt to get trailers diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 42319a3..d391b9d 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -2047,4 +2047,34 @@ mod test_cache { assert_eq!(headers["x-cache-status"], "no-cache"); assert_eq!(res.text().await.unwrap(), "hello world"); } + + #[tokio::test] + async fn test_cache_h2_premature_end() { + init(); + let url = "http://127.0.0.1:6148/set_content_length/test_cache_h2_premature_end.txt"; + // try to fill cache + reqwest::Client::new() + .get(url) + .header("x-lock", "true") + .header("x-h2", "true") + .header("x-set-content-length", "13") // 2 more than "hello world" + .send() + .await + .unwrap(); + // h2 protocol error with content length mismatch + + // did not get saved into cache, next request will be cache miss + let res = reqwest::Client::new() + .get(url) + .header("x-lock", "true") + .header("x-h2", "true") + .header("x-set-content-length", "11") + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); + assert_eq!(headers["x-cache-status"], "miss"); + assert_eq!(res.text().await.unwrap(), "hello world"); + } } diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf index 2718f88..0818649 100644 --- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf +++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf @@ -408,6 +408,15 @@ http { } } + location /set_content_length { + header_filter_by_lua_block { + if ngx.var.http_x_set_content_length then + ngx.header["Content-Length"] = ngx.var.http_x_set_content_length + end + } + return 200 "hello world"; + } + location /slow_body { content_by_lua_block { local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1 diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 1f19a16..f762f33 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -359,11 +359,18 @@ impl ProxyHttp for ExampleProxyCache { .headers .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); - let peer = Box::new(HttpPeer::new( + + let mut peer = Box::new(HttpPeer::new( format!("127.0.0.1:{}", port), false, "".to_string(), )); + + if session.get_header_bytes("x-h2") == b"true" { + // default is 1, 1 + peer.options.set_http_version(2, 2); + } + Ok(peer) } -- cgit v1.2.3