aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/protocols/http/v2/client.rs41
-rw-r--r--pingora-proxy/src/proxy_h2.rs44
-rw-r--r--pingora-proxy/tests/test_upstream.rs30
-rw-r--r--pingora-proxy/tests/utils/conf/origin/conf/nginx.conf9
-rw-r--r--pingora-proxy/tests/utils/server_utils.rs9
6 files changed, 117 insertions, 18 deletions
diff --git a/.bleep b/.bleep
index a555de0..3fcdfe3 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-88def7550ecab6e9511d7931cb97f03de54b89a5 \ No newline at end of file
+92f47c7c3d831163573e595b4f99db68670647fa \ No newline at end of file
diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs
index 4d5559a..7cc112d 100644
--- a/pingora-core/src/protocols/http/v2/client.rs
+++ b/pingora-core/src/protocols/http/v2/client.rs
@@ -16,6 +16,7 @@
// TODO: this module needs a refactor
use bytes::Bytes;
+use futures::FutureExt;
use h2::client::{self, ResponseFuture, SendRequest};
use h2::{Reason, RecvStream, SendStream};
use http::HeaderMap;
@@ -194,10 +195,6 @@ impl Http2Session {
return Ok(None);
};
- if body_reader.is_end_stream() {
- return Ok(None);
- }
-
let fut = body_reader.data();
let res = match self.read_timeout {
Some(t) => timeout(t, fut)
@@ -234,6 +231,42 @@ impl Http2Session {
.map_or(false, |reader| reader.is_end_stream())
}
+ /// Check whether stream finished with error.
+ /// Like `response_finished`, but also attempts to poll the h2 stream for errors that may have
+ /// caused the stream to terminate, and returns them as `H2Error`s.
+ pub fn check_response_end_or_error(&mut self) -> Result<bool> {
+ let Some(reader) = self.response_body_reader.as_mut() else {
+ // response is not even read
+ return Ok(false);
+ };
+
+ if !reader.is_end_stream() {
+ return Ok(false);
+ }
+
+ // https://github.com/hyperium/h2/issues/806
+ // The fundamental issue is that h2::RecvStream may return `is_end_stream` true
+ // when the stream was naturally closed via END_STREAM /OR/ if there was an error
+ // while reading data frames that forced the closure.
+ // The h2 API as-is makes it difficult to determine which situation is occurring.
+ //
+ // `poll_data` should be returning None after `is_end_stream`, if the stream
+ // is truly expecting no more data to be sent.
+ // https://docs.rs/h2/latest/h2/struct.RecvStream.html#method.is_end_stream
+ // So poll the data once to check this condition. If an error is returned, that indicates
+ // that the stream closed due to an error e.g. h2 protocol error.
+ match reader.data().now_or_never() {
+ Some(None) => Ok(true),
+ Some(Some(Ok(_))) => Error::e_explain(H2Error, "unexpected data after end stream"),
+ Some(Some(Err(e))) => Error::e_because(H2Error, "while checking end stream", e),
+ None => {
+ // RecvStream data() should be ready to poll after the stream ends,
+ // this indicates an unexpected change in the h2 crate
+ panic!("data() not ready after end stream")
+ }
+ }
+ }
+
/// Read the optional trailer headers
pub async fn read_trailers(&mut self) -> Result<Option<HeaderMap>> {
let Some(reader) = self.response_body_reader.as_mut() else {
diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs
index 53b2c14..fad91c2 100644
--- a/pingora-proxy/src/proxy_h2.rs
+++ b/pingora-proxy/src/proxy_h2.rs
@@ -569,9 +569,21 @@ pub(crate) async fn pipe_2to1_response(
let resp_header = Box::new(client.response_header().expect("just read").clone());
- tx.send(HttpTask::Header(resp_header, client.response_finished()))
- .await
- .or_err(InternalError, "sending h2 headers to pipe")?;
+ match client.check_response_end_or_error() {
+ Ok(eos) => {
+ tx.send(HttpTask::Header(resp_header, eos))
+ .await
+ .or_err(InternalError, "sending h2 headers to pipe")?;
+ }
+ Err(e) => {
+ // If upstream errored, then push error to downstream and then quit
+ // Don't care if send fails (which means downstream already gone)
+ // we were still able to retrieve the headers, so try sending
+ let _ = tx.send(HttpTask::Header(resp_header, false)).await;
+ let _ = tx.send(HttpTask::Failed(e.into_up())).await;
+ return Ok(());
+ }
+ }
while let Some(chunk) = client
.read_response_body()
@@ -583,21 +595,29 @@ pub(crate) async fn pipe_2to1_response(
Ok(d) => d,
Err(e) => {
// Push the error to downstream and then quit
- // Don't care if send fails: downstream already gone
let _ = tx.send(HttpTask::Failed(e.into_up())).await;
// Downstream should consume all remaining data and handle the error
return Ok(());
}
};
- if data.is_empty() && !client.response_finished() {
- /* it is normal to get 0 bytes because of multi-chunk
- * don't write 0 bytes to downstream since it will be
- * misread as the terminating chunk */
- continue;
+ match client.check_response_end_or_error() {
+ Ok(eos) => {
+ if data.is_empty() && !eos {
+ /* it is normal to get 0 bytes because of multi-chunk
+ * don't write 0 bytes to downstream since it will be
+ * misread as the terminating chunk */
+ continue;
+ }
+ tx.send(HttpTask::Body(Some(data), eos))
+ .await
+ .or_err(InternalError, "sending h2 body to pipe")?;
+ }
+ Err(e) => {
+ // Similar to above, push the error to downstream and then quit
+ let _ = tx.send(HttpTask::Failed(e.into_up())).await;
+ return Ok(());
+ }
}
- tx.send(HttpTask::Body(Some(data), client.response_finished()))
- .await
- .or_err(InternalError, "sending h2 body to pipe")?;
}
// attempt to get trailers
diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs
index 42319a3..d391b9d 100644
--- a/pingora-proxy/tests/test_upstream.rs
+++ b/pingora-proxy/tests/test_upstream.rs
@@ -2047,4 +2047,34 @@ mod test_cache {
assert_eq!(headers["x-cache-status"], "no-cache");
assert_eq!(res.text().await.unwrap(), "hello world");
}
+
+ #[tokio::test]
+ async fn test_cache_h2_premature_end() {
+ init();
+ let url = "http://127.0.0.1:6148/set_content_length/test_cache_h2_premature_end.txt";
+ // try to fill cache
+ reqwest::Client::new()
+ .get(url)
+ .header("x-lock", "true")
+ .header("x-h2", "true")
+ .header("x-set-content-length", "13") // 2 more than "hello world"
+ .send()
+ .await
+ .unwrap();
+ // h2 protocol error with content length mismatch
+
+ // did not get saved into cache, next request will be cache miss
+ let res = reqwest::Client::new()
+ .get(url)
+ .header("x-lock", "true")
+ .header("x-h2", "true")
+ .header("x-set-content-length", "11")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::OK);
+ let headers = res.headers();
+ assert_eq!(headers["x-cache-status"], "miss");
+ assert_eq!(res.text().await.unwrap(), "hello world");
+ }
}
diff --git a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
index 2718f88..0818649 100644
--- a/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
+++ b/pingora-proxy/tests/utils/conf/origin/conf/nginx.conf
@@ -408,6 +408,15 @@ http {
}
}
+ location /set_content_length {
+ header_filter_by_lua_block {
+ if ngx.var.http_x_set_content_length then
+ ngx.header["Content-Length"] = ngx.var.http_x_set_content_length
+ end
+ }
+ return 200 "hello world";
+ }
+
location /slow_body {
content_by_lua_block {
local sleep_sec = tonumber(ngx.var.http_x_set_sleep) or 1
diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs
index 1f19a16..f762f33 100644
--- a/pingora-proxy/tests/utils/server_utils.rs
+++ b/pingora-proxy/tests/utils/server_utils.rs
@@ -359,11 +359,18 @@ impl ProxyHttp for ExampleProxyCache {
.headers
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
- let peer = Box::new(HttpPeer::new(
+
+ let mut peer = Box::new(HttpPeer::new(
format!("127.0.0.1:{}", port),
false,
"".to_string(),
));
+
+ if session.get_header_bytes("x-h2") == b"true" {
+ // default is 1, 1
+ peer.options.set_http_version(2, 2);
+ }
+
Ok(peer)
}