aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-proxy
diff options
context:
space:
mode:
authorAndrew Hauck <[email protected]>2024-08-09 16:56:27 -0700
committerYuchen Wu <[email protected]>2024-09-23 10:43:26 -0700
commit9917177c646a0ab58197f15ec57a3bcbe1e0a201 (patch)
tree69ca2826322aa3fafeb0949cc89782620d164550 /pingora-proxy
parent760dda4fa881157e4ef930c2288412126afc320f (diff)
downloadpingora-9917177c646a0ab58197f15ec57a3bcbe1e0a201.tar.gz
pingora-9917177c646a0ab58197f15ec57a3bcbe1e0a201.zip
Add support for gRPC-web module to bridge gRPC-web client requests to gRPC server requests
Diffstat (limited to 'pingora-proxy')
-rw-r--r--pingora-proxy/examples/grpc_web_module.rs90
-rw-r--r--pingora-proxy/src/lib.rs15
-rw-r--r--pingora-proxy/src/proxy_h2.rs25
3 files changed, 117 insertions, 13 deletions
diff --git a/pingora-proxy/examples/grpc_web_module.rs b/pingora-proxy/examples/grpc_web_module.rs
new file mode 100644
index 0000000..ecd11fd
--- /dev/null
+++ b/pingora-proxy/examples/grpc_web_module.rs
@@ -0,0 +1,90 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use async_trait::async_trait;
+use clap::Parser;
+
+use pingora_core::server::Server;
+use pingora_core::upstreams::peer::HttpPeer;
+use pingora_core::Result;
+use pingora_core::{
+ modules::http::{
+ grpc_web::{GrpcWeb, GrpcWebBridge},
+ HttpModules,
+ },
+ prelude::Opt,
+};
+use pingora_proxy::{ProxyHttp, Session};
+
+/// This example shows how to use the gRPC-web bridge module
+
+pub struct GrpcWebBridgeProxy;
+
+#[async_trait]
+impl ProxyHttp for GrpcWebBridgeProxy {
+ type CTX = ();
+ fn new_ctx(&self) -> Self::CTX {}
+
+ fn init_downstream_modules(&self, modules: &mut HttpModules) {
+ // Add the gRPC web module
+ modules.add_module(Box::new(GrpcWeb))
+ }
+
+ async fn early_request_filter(
+ &self,
+ session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()> {
+ let grpc = session
+ .downstream_modules_ctx
+ .get_mut::<GrpcWebBridge>()
+ .expect("GrpcWebBridge module added");
+
+ // initialize gRPC module for this request
+ grpc.init();
+ Ok(())
+ }
+
+ async fn upstream_peer(
+ &self,
+ _session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<Box<HttpPeer>> {
+ // this needs to be your gRPC server
+ let grpc_peer = Box::new(HttpPeer::new(
+ ("1.1.1.1", 443),
+ true,
+ "one.one.one.one".to_string(),
+ ));
+ Ok(grpc_peer)
+ }
+}
+
+// RUST_LOG=INFO cargo run --example grpc_web_module
+
+fn main() {
+ env_logger::init();
+
+ // read command line arguments
+ let opt = Opt::parse();
+ let mut my_server = Server::new(Some(opt)).unwrap();
+ my_server.bootstrap();
+
+ let mut my_proxy =
+ pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
+ my_proxy.add_tcp("0.0.0.0:6194");
+
+ my_server.add_service(my_proxy);
+ my_server.run_forever();
+}
diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs
index 890af98..5621170 100644
--- a/pingora-proxy/src/lib.rs
+++ b/pingora-proxy/src/lib.rs
@@ -405,7 +405,20 @@ impl Session {
self.downstream_modules_ctx
.response_body_filter(data, *end)?;
}
- _ => { /* HttpModules doesn't handle trailer yet */ }
+ HttpTask::Trailer(trailers) => {
+ if let Some(buf) = self
+ .downstream_modules_ctx
+ .response_trailer_filter(trailers)?
+ {
+ // Write the trailers into the body if the filter
+ // returns a buffer.
+ //
+ // Note, this will not work if end of stream has already
+ // been seen or we've written content-length bytes.
+ *task = HttpTask::Body(Some(buf), true);
+ }
+ }
+ _ => { /* Done or Failed */ }
}
}
self.downstream_session.response_duplex_vec(tasks).await
diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs
index 5216ee9..0750160 100644
--- a/pingora-proxy/src/proxy_h2.rs
+++ b/pingora-proxy/src/proxy_h2.rs
@@ -124,6 +124,9 @@ impl<SV> HttpProxy<SV> {
session.upstream_compression.request_filter(&req);
let body_empty = session.as_mut().is_body_empty();
+ // whether we support sending END_STREAM on HEADERS if body is empty
+ let send_end_stream = req.send_end_stream().expect("req must be h2");
+
let mut req: http::request::Parts = req.into();
// H2 requires authority to be set, so copy that from H1 host if that is set
@@ -133,27 +136,25 @@ impl<SV> HttpProxy<SV> {
}
}
- debug!("Request to h2: {:?}", req);
+ debug!("Request to h2: {req:?}");
- // don't send END_STREAM on HEADERS for no_header_eos
- let send_header_eos = !peer.options.no_header_eos && body_empty;
+ // send END_STREAM on HEADERS
+ let send_header_eos = send_end_stream && body_empty;
+ debug!("send END_STREAM on HEADERS: {send_end_stream}");
let req = Box::new(RequestHeader::from(req));
- match client_session.write_request_header(req, send_header_eos) {
- Ok(v) => v,
- Err(e) => {
- return (false, Some(e.into_up()));
- }
- };
+ if let Err(e) = client_session.write_request_header(req, send_header_eos) {
+ return (false, Some(e.into_up()));
+ }
- // send END_STREAM on empty DATA frame for no_headers_eos
- if peer.options.no_header_eos && body_empty {
+ if !send_end_stream && body_empty {
+ // send END_STREAM on empty DATA frame
match client_session.write_request_body(Bytes::new(), true) {
Ok(()) => debug!("sent empty DATA frame to h2"),
Err(e) => {
return (false, Some(e.into_up()));
}
- };
+ }
}
client_session.read_timeout = peer.options.read_timeout;