diff options
Diffstat (limited to 'pingora-proxy')
-rw-r--r-- | pingora-proxy/examples/grpc_web_module.rs | 90 | ||||
-rw-r--r-- | pingora-proxy/src/lib.rs | 15 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h2.rs | 25 |
3 files changed, 117 insertions, 13 deletions
diff --git a/pingora-proxy/examples/grpc_web_module.rs b/pingora-proxy/examples/grpc_web_module.rs new file mode 100644 index 0000000..ecd11fd --- /dev/null +++ b/pingora-proxy/examples/grpc_web_module.rs @@ -0,0 +1,90 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use clap::Parser; + +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_core::{ + modules::http::{ + grpc_web::{GrpcWeb, GrpcWebBridge}, + HttpModules, + }, + prelude::Opt, +}; +use pingora_proxy::{ProxyHttp, Session}; + +/// This example shows how to use the gRPC-web bridge module + +pub struct GrpcWebBridgeProxy; + +#[async_trait] +impl ProxyHttp for GrpcWebBridgeProxy { + type CTX = (); + fn new_ctx(&self) -> Self::CTX {} + + fn init_downstream_modules(&self, modules: &mut HttpModules) { + // Add the gRPC web module + modules.add_module(Box::new(GrpcWeb)) + } + + async fn early_request_filter( + &self, + session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<()> { + let grpc = session + .downstream_modules_ctx + .get_mut::<GrpcWebBridge>() + .expect("GrpcWebBridge module added"); + + // initialize gRPC module for this request + grpc.init(); + Ok(()) + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<Box<HttpPeer>> { + // this needs to be your gRPC server + let grpc_peer = Box::new(HttpPeer::new( + ("1.1.1.1", 443), + true, + "one.one.one.one".to_string(), + )); + Ok(grpc_peer) + } +} + +// RUST_LOG=INFO cargo run --example grpc_web_module + +fn main() { + env_logger::init(); + + // read command line arguments + let opt = Opt::parse(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = + pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy); + my_proxy.add_tcp("0.0.0.0:6194"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/pingora-proxy/src/lib.rs b/pingora-proxy/src/lib.rs index 890af98..5621170 100644 --- a/pingora-proxy/src/lib.rs +++ b/pingora-proxy/src/lib.rs @@ -405,7 +405,20 @@ impl Session { self.downstream_modules_ctx .response_body_filter(data, *end)?; } - _ => { /* HttpModules doesn't handle trailer yet */ } + HttpTask::Trailer(trailers) => { + if let Some(buf) = self + .downstream_modules_ctx + .response_trailer_filter(trailers)? + { + // Write the trailers into the body if the filter + // returns a buffer. + // + // Note, this will not work if end of stream has already + // been seen or we've written content-length bytes. + *task = HttpTask::Body(Some(buf), true); + } + } + _ => { /* Done or Failed */ } } } self.downstream_session.response_duplex_vec(tasks).await diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 5216ee9..0750160 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -124,6 +124,9 @@ impl<SV> HttpProxy<SV> { session.upstream_compression.request_filter(&req); let body_empty = session.as_mut().is_body_empty(); + // whether we support sending END_STREAM on HEADERS if body is empty + let send_end_stream = req.send_end_stream().expect("req must be h2"); + let mut req: http::request::Parts = req.into(); // H2 requires authority to be set, so copy that from H1 host if that is set @@ -133,27 +136,25 @@ impl<SV> HttpProxy<SV> { } } - debug!("Request to h2: {:?}", req); + debug!("Request to h2: {req:?}"); - // don't send END_STREAM on HEADERS for no_header_eos - let send_header_eos = !peer.options.no_header_eos && body_empty; + // send END_STREAM on HEADERS + let send_header_eos = send_end_stream && body_empty; + debug!("send END_STREAM on HEADERS: {send_end_stream}"); let req = Box::new(RequestHeader::from(req)); - match client_session.write_request_header(req, send_header_eos) { - Ok(v) => v, - Err(e) => { - return (false, Some(e.into_up())); - } - }; + if let Err(e) = client_session.write_request_header(req, send_header_eos) { + return (false, Some(e.into_up())); + } - // send END_STREAM on empty DATA frame for no_headers_eos - if peer.options.no_header_eos && body_empty { + if !send_end_stream && body_empty { + // send END_STREAM on empty DATA frame match client_session.write_request_body(Bytes::new(), true) { Ok(()) => debug!("sent empty DATA frame to h2"), Err(e) => { return (false, Some(e.into_up())); } - }; + } } client_session.read_timeout = peer.options.read_timeout; |