aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrew Hauck <[email protected]>2024-08-09 16:56:27 -0700
committerYuchen Wu <[email protected]>2024-09-23 10:43:26 -0700
commit9917177c646a0ab58197f15ec57a3bcbe1e0a201 (patch)
tree69ca2826322aa3fafeb0949cc89782620d164550
parent760dda4fa881157e4ef930c2288412126afc320f (diff)
downloadpingora-9917177c646a0ab58197f15ec57a3bcbe1e0a201.tar.gz
pingora-9917177c646a0ab58197f15ec57a3bcbe1e0a201.zip
Add support for gRPC-web module to bridge gRPC-web client requests to gRPC server requests
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/modules/http/grpc_web.rs80
-rw-r--r--pingora-core/src/modules/http/mod.rs30
-rw-r--r--pingora-core/src/protocols/http/bridge/grpc_web.rs341
-rw-r--r--pingora-core/src/protocols/http/bridge/mod.rs15
-rw-r--r--pingora-core/src/protocols/http/mod.rs1
-rw-r--r--pingora-core/src/upstreams/peer.rs5
-rw-r--r--pingora-http/src/lib.rs38
-rw-r--r--pingora-proxy/examples/grpc_web_module.rs90
-rw-r--r--pingora-proxy/src/lib.rs15
-rw-r--r--pingora-proxy/src/proxy_h2.rs25
11 files changed, 623 insertions, 19 deletions
diff --git a/.bleep b/.bleep
index 3fdcf46..c2d9bce 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-9b92c0fed7b703c61415414c69ff196e9deb11eb \ No newline at end of file
+761f676b044dcf0d34205f96921e3385ffac7810 \ No newline at end of file
diff --git a/pingora-core/src/modules/http/grpc_web.rs b/pingora-core/src/modules/http/grpc_web.rs
new file mode 100644
index 0000000..85b6ea6
--- /dev/null
+++ b/pingora-core/src/modules/http/grpc_web.rs
@@ -0,0 +1,80 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::*;
+use crate::protocols::http::bridge::grpc_web::GrpcWebCtx;
+use std::ops::{Deref, DerefMut};
+
+/// gRPC-web bridge module, this will convert
+/// HTTP/1.1 gRPC-web requests to H2 gRPC requests
+#[derive(Default)]
+pub struct GrpcWebBridge(GrpcWebCtx);
+
+impl Deref for GrpcWebBridge {
+ type Target = GrpcWebCtx;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for GrpcWebBridge {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+#[async_trait]
+impl HttpModule for GrpcWebBridge {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
+ self
+ }
+
+ async fn request_header_filter(&mut self, req: &mut RequestHeader) -> Result<()> {
+ self.0.request_header_filter(req);
+ Ok(())
+ }
+
+ async fn response_header_filter(
+ &mut self,
+ resp: &mut ResponseHeader,
+ _end_of_stream: bool,
+ ) -> Result<()> {
+ self.0.response_header_filter(resp);
+ Ok(())
+ }
+
+ fn response_trailer_filter(
+ &mut self,
+ trailers: &mut Option<Box<HeaderMap>>,
+ ) -> Result<Option<Bytes>> {
+ if let Some(trailers) = trailers {
+ return self.0.response_trailer_filter(trailers);
+ }
+ Ok(None)
+ }
+}
+
+/// The builder for gRPC-web bridge module
+pub struct GrpcWeb;
+
+impl HttpModuleBuilder for GrpcWeb {
+ fn init(&self) -> Module {
+ Box::new(GrpcWebBridge::default())
+ }
+}
diff --git a/pingora-core/src/modules/http/mod.rs b/pingora-core/src/modules/http/mod.rs
index 02a3e0c..f4b17ad 100644
--- a/pingora-core/src/modules/http/mod.rs
+++ b/pingora-core/src/modules/http/mod.rs
@@ -19,9 +19,11 @@
//! See the [ResponseCompression] module for an example of how to implement a basic module.
pub mod compression;
+pub mod grpc_web;
use async_trait::async_trait;
use bytes::Bytes;
+use http::HeaderMap;
use once_cell::sync::OnceCell;
use pingora_error::Result;
use pingora_http::{RequestHeader, ResponseHeader};
@@ -61,6 +63,13 @@ pub trait HttpModule {
Ok(())
}
+ fn response_trailer_filter(
+ &mut self,
+ _trailers: &mut Option<Box<HeaderMap>>,
+ ) -> Result<Option<Bytes>> {
+ Ok(None)
+ }
+
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}
@@ -226,6 +235,27 @@ impl HttpModuleCtx {
}
Ok(())
}
+
+ /// Run the `response_trailer_filter` for all the modules according to their orders.
+ ///
+ /// Returns an `Option<Bytes>` which can be used to write response trailers into
+ /// the response body. Note, if multiple modules attempt to write trailers into
+ /// the body the last one will be used.
+ ///
+ /// Implementors that intend to write trailers into the body need to ensure their filter
+ /// is using an encoding that supports this.
+ pub fn response_trailer_filter(
+ &mut self,
+ trailers: &mut Option<Box<HeaderMap>>,
+ ) -> Result<Option<Bytes>> {
+ let mut encoded = None;
+ for filter in self.module_ctx.iter_mut() {
+ if let Some(buf) = filter.response_trailer_filter(trailers)? {
+ encoded = Some(buf);
+ }
+ }
+ Ok(encoded)
+ }
}
#[cfg(test)]
diff --git a/pingora-core/src/protocols/http/bridge/grpc_web.rs b/pingora-core/src/protocols/http/bridge/grpc_web.rs
new file mode 100644
index 0000000..b5737e8
--- /dev/null
+++ b/pingora-core/src/protocols/http/bridge/grpc_web.rs
@@ -0,0 +1,341 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use http::{
+ header::{CONTENT_LENGTH, CONTENT_TYPE, TRANSFER_ENCODING},
+ HeaderMap,
+};
+use pingora_error::{ErrorType::ReadError, OrErr, Result};
+use pingora_http::{RequestHeader, ResponseHeader};
+
+/// Used for bridging gRPC to gRPC-web and vice-versa.
+/// See gRPC-web [spec](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) and
+/// gRPC h2 [spec](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md) for more details.
+#[derive(Default, PartialEq, Debug)]
+pub enum GrpcWebCtx {
+ #[default]
+ Disabled,
+ Init,
+ Upgrade,
+ Trailers,
+ Done,
+}
+
+const GRPC: &str = "application/grpc";
+const GRPC_WEB: &str = "application/grpc-web";
+
+impl GrpcWebCtx {
+ pub fn init(&mut self) {
+ *self = Self::Init;
+ }
+
+ /// gRPC-web request is fed into this filter, if the module is initialized
+ /// we attempt to convert it to a gRPC request
+ pub fn request_header_filter(&mut self, req: &mut RequestHeader) {
+ if *self != Self::Init {
+ // not enabled
+ return;
+ }
+
+ let content_type = req
+ .headers
+ .get(CONTENT_TYPE)
+ .and_then(|v| v.to_str().ok())
+ .unwrap_or_default();
+
+ // check we have a valid grpc-web prefix
+ if !(content_type.len() >= GRPC_WEB.len()
+ && content_type[..GRPC_WEB.len()].eq_ignore_ascii_case(GRPC_WEB))
+ {
+ // not gRPC-web
+ return;
+ }
+
+ // change content type to grpc
+ let ct = content_type.to_lowercase().replace(GRPC_WEB, GRPC);
+ req.insert_header(CONTENT_TYPE, ct).expect("insert header");
+
+ // The 'te' request header is used to detect incompatible proxies
+ // which are supposed to remove 'te' if it is unsupported.
+ // This header is required by gRPC over h2 protocol.
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
+ req.insert_header("te", "trailers").expect("insert header");
+
+ // For gRPC requests, EOS (end-of-stream) is indicated by the presence of the
+ // END_STREAM flag on the last received DATA frame.
+ // In scenarios where the Request stream needs to be closed
+ // but no data remains to be sent implementations
+ // MUST send an empty DATA frame with this flag set.
+ req.set_send_end_stream(false);
+
+ *self = Self::Upgrade
+ }
+
+ /// gRPC response is fed into this filter, if the module is in the bridge state
+ /// attempt to convert the response it to a gRPC-web response
+ pub fn response_header_filter(&mut self, resp: &mut ResponseHeader) {
+ if *self != Self::Upgrade {
+ // not an upgrade
+ return;
+ }
+
+ if resp.status.is_informational() {
+ // proxy informational statuses through
+ return;
+ }
+
+ let content_type = resp
+ .headers
+ .get(CONTENT_TYPE)
+ .and_then(|v| v.to_str().ok())
+ .unwrap_or_default();
+
+ // upstream h2, no reason to normalize case
+ if !content_type.starts_with(GRPC) {
+ // not gRPC
+ *self = Self::Disabled;
+ return;
+ }
+
+ // change content type to gRPC-web
+ let ct = content_type.replace(GRPC, GRPC_WEB);
+ resp.insert_header(CONTENT_TYPE, ct).expect("insert header");
+
+ // always use chunked for gRPC-web
+ resp.remove_header(&CONTENT_LENGTH);
+ resp.insert_header(TRANSFER_ENCODING, "chunked")
+ .expect("insert header");
+
+ *self = Self::Trailers
+ }
+
+ /// Used to convert gRPC trailers into gRPC-web trailers, note
+ /// gRPC-web trailers are encoded into the response body so we return
+ /// the encoded bytes here.
+ pub fn response_trailer_filter(
+ &mut self,
+ resp_trailers: &mut HeaderMap,
+ ) -> Result<Option<Bytes>> {
+ /* Trailer header frame and trailer headers
+ 0 - - 1 - - 2 - - 3 - - 4 - - 5 - - 6 - - 7 - - 8
+ | Ind | Length | Headers | <- trailer header indicator, length of headers
+ | Headers | <- rest is headers
+ | Headers |
+ */
+ // TODO compressed trailer?
+ // grpc-web trailers frame head
+ const GRPC_WEB_TRAILER: u8 = 0x80;
+
+ // number of bytes in trailer header
+ const GRPC_TRAILER_HEADER_LEN: usize = 5;
+
+ // just some estimate
+ const DEFAULT_TRAILER_BUFFER_SIZE: usize = 256;
+
+ if *self != Self::Trailers {
+ // not an upgrade
+ *self = Self::Disabled;
+ return Ok(None);
+ }
+
+ // trailers are expected to arrive all at once encoded into a single trailers frame
+ // trailers in frame are separated by CRLFs
+ let mut buf = BytesMut::with_capacity(DEFAULT_TRAILER_BUFFER_SIZE);
+ let mut trailers = buf.split_off(GRPC_TRAILER_HEADER_LEN);
+
+ // iterate the key/value pairs and encode them into the tmp buffer
+ for (key, value) in resp_trailers.iter() {
+ // encode header
+ trailers.put_slice(key.as_ref());
+ trailers.put_slice(b":");
+
+ // encode value
+ trailers.put_slice(value.as_ref());
+
+ // encode header separator
+ trailers.put_slice(b"\r\n");
+ }
+
+ // ensure trailer length within u32
+ let len = trailers.len().try_into().or_err_with(ReadError, || {
+ format!("invalid gRPC trailer length: {}", trailers.len())
+ })?;
+ buf.put_u8(GRPC_WEB_TRAILER);
+ buf.put_u32(len);
+ buf.unsplit(trailers);
+
+ *self = Self::Done;
+ Ok(Some(buf.freeze()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use http::{request::Request, response::Response, Version};
+
+ #[test]
+ fn non_grpc_web_request_ignored() {
+ let request = Request::get("https://pingora.dev/")
+ .header(CONTENT_TYPE, "application/grpc-we")
+ .version(Version::HTTP_2) // only set this to verify send_end_stream is configured
+ .body(())
+ .unwrap();
+ let mut request = request.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::default();
+ filter.init();
+ filter.request_header_filter(&mut request);
+ assert_eq!(filter, GrpcWebCtx::Init);
+
+ let headers = &request.headers;
+ assert_eq!(headers.get("te"), None);
+ assert_eq!(headers.get("application/grpc"), None);
+ assert_eq!(request.send_end_stream(), Some(true));
+ }
+
+ #[test]
+ fn grpc_web_request_module_disabled_ignored() {
+ let request = Request::get("https://pingora.dev/")
+ .header(CONTENT_TYPE, "application/grpc-web")
+ .version(Version::HTTP_2) // only set this to verify send_end_stream is configured
+ .body(())
+ .unwrap();
+ let mut request = request.into_parts().0.into();
+
+ // do not init
+ let mut filter = GrpcWebCtx::default();
+ filter.request_header_filter(&mut request);
+ assert_eq!(filter, GrpcWebCtx::Disabled);
+
+ let headers = &request.headers;
+ assert_eq!(headers.get("te"), None);
+ assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "application/grpc-web");
+ assert_eq!(request.send_end_stream(), Some(true));
+ }
+
+ #[test]
+ fn grpc_web_request_upgrade() {
+ let request = Request::get("https://pingora.org/")
+ .header(CONTENT_TYPE, "application/gRPC-web+thrift")
+ .version(Version::HTTP_2) // only set this to verify send_end_stream is configured
+ .body(())
+ .unwrap();
+ let mut request = request.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::default();
+ filter.init();
+ filter.request_header_filter(&mut request);
+ assert_eq!(filter, GrpcWebCtx::Upgrade);
+
+ let headers = &request.headers;
+ assert_eq!(headers.get("te").unwrap(), "trailers");
+ assert_eq!(
+ headers.get(CONTENT_TYPE).unwrap(),
+ "application/grpc+thrift"
+ );
+ assert_eq!(request.send_end_stream(), Some(false));
+ }
+
+ #[test]
+ fn non_grpc_response_ignored() {
+ let response = Response::builder()
+ .header(CONTENT_TYPE, "text/html")
+ .header(CONTENT_LENGTH, "10")
+ .body(())
+ .unwrap();
+ let mut response = response.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::Upgrade;
+ filter.response_header_filter(&mut response);
+ assert_eq!(filter, GrpcWebCtx::Disabled);
+
+ let headers = &response.headers;
+ assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "text/html");
+ assert_eq!(headers.get(CONTENT_LENGTH).unwrap(), "10");
+ }
+
+ #[test]
+ fn grpc_response_module_disabled_ignored() {
+ let response = Response::builder()
+ .header(CONTENT_TYPE, "application/grpc")
+ .body(())
+ .unwrap();
+ let mut response = response.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::default();
+ filter.response_header_filter(&mut response);
+ assert_eq!(filter, GrpcWebCtx::Disabled);
+
+ let headers = &response.headers;
+ assert_eq!(headers.get(CONTENT_TYPE).unwrap(), "application/grpc");
+ }
+
+ #[test]
+ fn grpc_response_upgrade() {
+ let response = Response::builder()
+ .header(CONTENT_TYPE, "application/grpc+proto")
+ .header(CONTENT_LENGTH, "0")
+ .body(())
+ .unwrap();
+ let mut response = response.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::Upgrade;
+ filter.response_header_filter(&mut response);
+ assert_eq!(filter, GrpcWebCtx::Trailers);
+
+ let headers = &response.headers;
+ assert_eq!(
+ headers.get(CONTENT_TYPE).unwrap(),
+ "application/grpc-web+proto"
+ );
+ assert_eq!(headers.get(TRANSFER_ENCODING).unwrap(), "chunked");
+ assert!(headers.get(CONTENT_LENGTH).is_none());
+ }
+
+ #[test]
+ fn grpc_response_informational_proxied() {
+ let response = Response::builder().status(100).body(()).unwrap();
+ let mut response = response.into_parts().0.into();
+
+ let mut filter = GrpcWebCtx::Upgrade;
+ filter.response_header_filter(&mut response);
+ assert_eq!(filter, GrpcWebCtx::Upgrade); // still upgrade
+ }
+
+ #[test]
+ fn grpc_response_trailer_headers_convert_to_byte_buf() {
+ let mut response = Response::builder()
+ .header("grpc-status", "0")
+ .header("grpc-message", "OK")
+ .body(())
+ .unwrap();
+ let response = response.headers_mut();
+
+ let mut filter = GrpcWebCtx::Trailers;
+ let buf = filter.response_trailer_filter(response).unwrap().unwrap();
+ assert_eq!(filter, GrpcWebCtx::Done);
+
+ let expected = b"grpc-status:0\r\ngrpc-message:OK\r\n";
+ let expected_len: u32 = expected.len() as u32; // 32 bytes
+
+ // assert the length prefix message frame
+ // [1 byte (header)| 4 byte (length) | 15 byte (grpc-status:0\r\n) | 17 bytes (grpc-message:OK\r\n)]
+ assert_eq!(0x80, buf[0]); // frame should start with trailer header
+ assert_eq!(expected_len.to_be_bytes(), buf[1..5]); // next 4 bytes length of trailer
+ assert_eq!(expected[..15], buf[5..20]); // grpc-status:0\r\n (15 bytes)
+ assert_eq!(expected[15..], buf[20..]); // grpc-message:OK\r\n (17 bytes)
+ }
+}
diff --git a/pingora-core/src/protocols/http/bridge/mod.rs b/pingora-core/src/protocols/http/bridge/mod.rs
new file mode 100644
index 0000000..f40d5f5
--- /dev/null
+++ b/pingora-core/src/protocols/http/bridge/mod.rs
@@ -0,0 +1,15 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub mod grpc_web;
diff --git a/pingora-core/src/protocols/http/mod.rs b/pingora-core/src/protocols/http/mod.rs
index a2d9540..94814eb 100644
--- a/pingora-core/src/protocols/http/mod.rs
+++ b/pingora-core/src/protocols/http/mod.rs
@@ -15,6 +15,7 @@
//! HTTP/1.x and HTTP/2 implementation APIs
mod body_buffer;
+pub mod bridge;
pub mod client;
pub mod compression;
pub mod conditional_filter;
diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs
index 78eb762..7b80857 100644
--- a/pingora-core/src/upstreams/peer.rs
+++ b/pingora-core/src/upstreams/peer.rs
@@ -309,7 +309,6 @@ pub struct PeerOptions {
pub tcp_keepalive: Option<TcpKeepalive>,
pub tcp_recv_buf: Option<usize>,
pub dscp: Option<u8>,
- pub no_header_eos: bool,
pub h2_ping_interval: Option<Duration>,
// how many concurrent h2 stream are allowed in the same connection
pub max_h2_streams: usize,
@@ -345,7 +344,6 @@ impl PeerOptions {
tcp_keepalive: None,
tcp_recv_buf: None,
dscp: None,
- no_header_eos: false,
h2_ping_interval: None,
max_h2_streams: 1,
extra_proxy_headers: BTreeMap::new(),
@@ -397,9 +395,6 @@ impl Display for PeerOptions {
if let Some(tcp_keepalive) = &self.tcp_keepalive {
write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
}
- if self.no_header_eos {
- write!(f, "no_header_eos: true,")?;
- }
if let Some(h2_ping_interval) = self.h2_ping_interval {
write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
}
diff --git a/pingora-http/src/lib.rs b/pingora-http/src/lib.rs
index 71f81ee..d57998d 100644
--- a/pingora-http/src/lib.rs
+++ b/pingora-http/src/lib.rs
@@ -69,6 +69,8 @@ pub struct RequestHeader {
header_name_map: Option<CaseMap>,
// store the raw path bytes only if it is invalid utf-8
raw_path_fallback: Vec<u8>, // can also be Box<[u8]>
+ // whether we send END_STREAM with HEADERS for h2 requests
+ send_end_stream: bool,
}
impl AsRef<ReqParts> for RequestHeader {
@@ -93,6 +95,7 @@ impl RequestHeader {
base,
header_name_map: None,
raw_path_fallback: vec![],
+ send_end_stream: true,
}
}
@@ -211,6 +214,20 @@ impl RequestHeader {
self.base.uri = uri;
}
+ /// Set whether we send an END_STREAM on H2 request HEADERS if body is empty.
+ pub fn set_send_end_stream(&mut self, send_end_stream: bool) {
+ self.send_end_stream = send_end_stream;
+ }
+
+ /// Returns if we support sending an END_STREAM on H2 request HEADERS if body is empty,
+ /// returns None if not H2.
+ pub fn send_end_stream(&self) -> Option<bool> {
+ if self.base.version != Version::HTTP_2 {
+ return None;
+ }
+ Some(self.send_end_stream)
+ }
+
/// Return the request path in its raw format
///
/// Non-UTF8 is supported.
@@ -256,6 +273,7 @@ impl Clone for RequestHeader {
base: self.as_owned_parts(),
header_name_map: self.header_name_map.clone(),
raw_path_fallback: self.raw_path_fallback.clone(),
+ send_end_stream: self.send_end_stream,
}
}
}
@@ -268,6 +286,7 @@ impl From<ReqParts> for RequestHeader {
header_name_map: None,
// no illegal path
raw_path_fallback: vec![],
+ send_end_stream: true,
}
}
}
@@ -716,4 +735,23 @@ mod tests {
let reason = resp.get_reason_phrase().unwrap();
assert_eq!(reason, "OK");
}
+
+ #[test]
+ fn set_test_send_end_stream() {
+ let mut req = RequestHeader::build("GET", b"/", None).unwrap();
+ req.set_send_end_stream(true);
+
+ // None for requests that are not h2
+ assert!(req.send_end_stream().is_none());
+
+ let mut req = RequestHeader::build("GET", b"/", None).unwrap();
+ req.set_version(Version::HTTP_2);
+
+ // Some(true) by default for h2
+ assert!(req.send_end_stream().unwrap());
+
+ req.set_send_end_stream(false);
+ // Some(false)
+ assert!(!req.send_end_stream().unwrap());
+ }
}
diff --git a/pingora-proxy/examples/grpc_web_module.rs b/pingora-proxy/examples/grpc_web_module.rs
new file mode 100644
index 0000000..ecd11fd
--- /dev/null
+++ b/pingora-proxy/examples/grpc_web_module.rs
@@ -0,0 +1,90 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use async_trait::async_trait;
+use clap::Parser;
+
+use pingora_core::server::Server;
+use pingora_core::upstreams::peer::HttpPeer;
+use pingora_core::Result;
+use pingora_core::{
+ modules::http::{
+ grpc_web::{GrpcWeb, GrpcWebBridge},
+ HttpModules,
+ },
+ prelude::Opt,
+};
+use pingora_proxy::{ProxyHttp, Session};
+
+/// This example shows how to use the gRPC-web bridge module
+
+pub struct GrpcWebBridgeProxy;
+
+#[async_trait]
+impl ProxyHttp for GrpcWebBridgeProxy {
+ type CTX = ();
+ fn new_ctx(&self) -> Self::CTX {}
+
+ fn init_downstream_modules(&self, modules: &mut HttpModules) {
+ // Add the gRPC web module
+ modules.add_module(Box::new(GrpcWeb))
+ }
+
+ async fn early_request_filter(
+ &self,
+ session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()> {
+ let grpc = session
+ .downstream_modules_ctx
+ .get_mut::<GrpcWebBridge>()
+ .expect("GrpcWebBridge module added");
+
+ // initialize gRPC module for this request
+ grpc.init();
+ Ok(())
+ }
+
+ async fn upstream_peer(
+ &self,
+ _session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<Box<HttpPeer>> {
+ // this needs to be your gRPC server
+ let grpc_peer = Box::new(HttpPeer::new(
+ ("1.1.1.1", 443),
+ true,
+ "one.one.one.one".to_string(),
+ ));
+ Ok(grpc_peer)
+ }
+}
+
+// RUST_LOG=INFO cargo run --example grpc_web_module
+
+fn main() {
+ env_logger::init();
+
+ // read command line arguments
+ let opt = Opt::parse();
+ let mut my_server = Server::new(Some(opt)).unwrap();
+ my_server.bootstrap();
+
+ let mut my_proxy =
+ pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
+ my_proxy.add_tcp("0.0.0.0:6194");
+
+ my_server.add_service(my_proxy);
+ my_server.run_forever();
+}
diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs
index 890af98..5621170 100644
--- a/pingora-proxy/src/lib.rs
+++ b/pingora-proxy/src/lib.rs
@@ -405,7 +405,20 @@ impl Session {
self.downstream_modules_ctx
.response_body_filter(data, *end)?;
}
- _ => { /* HttpModules doesn't handle trailer yet */ }
+ HttpTask::Trailer(trailers) => {
+ if let Some(buf) = self
+ .downstream_modules_ctx
+ .response_trailer_filter(trailers)?
+ {
+ // Write the trailers into the body if the filter
+ // returns a buffer.
+ //
+ // Note, this will not work if end of stream has already
+ // been seen or we've written content-length bytes.
+ *task = HttpTask::Body(Some(buf), true);
+ }
+ }
+ _ => { /* Done or Failed */ }
}
}
self.downstream_session.response_duplex_vec(tasks).await
diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs
index 5216ee9..0750160 100644
--- a/pingora-proxy/src/proxy_h2.rs
+++ b/pingora-proxy/src/proxy_h2.rs
@@ -124,6 +124,9 @@ impl<SV> HttpProxy<SV> {
session.upstream_compression.request_filter(&req);
let body_empty = session.as_mut().is_body_empty();
+ // whether we support sending END_STREAM on HEADERS if body is empty
+ let send_end_stream = req.send_end_stream().expect("req must be h2");
+
let mut req: http::request::Parts = req.into();
// H2 requires authority to be set, so copy that from H1 host if that is set
@@ -133,27 +136,25 @@ impl<SV> HttpProxy<SV> {
}
}
- debug!("Request to h2: {:?}", req);
+ debug!("Request to h2: {req:?}");
- // don't send END_STREAM on HEADERS for no_header_eos
- let send_header_eos = !peer.options.no_header_eos && body_empty;
+ // send END_STREAM on HEADERS
+ let send_header_eos = send_end_stream && body_empty;
+ debug!("send END_STREAM on HEADERS: {send_end_stream}");
let req = Box::new(RequestHeader::from(req));
- match client_session.write_request_header(req, send_header_eos) {
- Ok(v) => v,
- Err(e) => {
- return (false, Some(e.into_up()));
- }
- };
+ if let Err(e) = client_session.write_request_header(req, send_header_eos) {
+ return (false, Some(e.into_up()));
+ }
- // send END_STREAM on empty DATA frame for no_headers_eos
- if peer.options.no_header_eos && body_empty {
+ if !send_end_stream && body_empty {
+ // send END_STREAM on empty DATA frame
match client_session.write_request_body(Bytes::new(), true) {
Ok(()) => debug!("sent empty DATA frame to h2"),
Err(e) => {
return (false, Some(e.into_up()));
}
- };
+ }
}
client_session.read_timeout = peer.options.read_timeout;