diff options
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-proxy/Cargo.toml | 3 | ||||
-rw-r--r-- | pingora-proxy/examples/modify_response.rs | 140 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h1.rs | 7 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_h2.rs | 7 | ||||
-rw-r--r-- | pingora-proxy/src/proxy_trait.rs | 5 |
6 files changed, 157 insertions, 7 deletions
@@ -1 +1 @@ -69cdf488ece81b5a77ba555b98509d231c48af96
\ No newline at end of file +e09bb86c57cba6f6090b3cc9e0d417359c48036e
\ No newline at end of file diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index d76ab48..2e25246 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -47,3 +47,6 @@ tokio-tungstenite = "0.20.1" pingora-load-balancing = { version = "0.1.0", path = "../pingora-load-balancing" } prometheus = "0" futures-util = "0.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_yaml = "0.8"
\ No newline at end of file diff --git a/pingora-proxy/examples/modify_response.rs b/pingora-proxy/examples/modify_response.rs new file mode 100644 index 0000000..5166dc6 --- /dev/null +++ b/pingora-proxy/examples/modify_response.rs @@ -0,0 +1,140 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::net::ToSocketAddrs; +use structopt::StructOpt; + +use pingora_core::server::configuration::Opt; +use pingora_core::server::Server; +use pingora_core::upstreams::peer::HttpPeer; +use pingora_core::Result; +use pingora_http::ResponseHeader; +use pingora_proxy::{ProxyHttp, Session}; + +const HOST: &str = "ip.jsontest.com"; + +#[derive(Serialize, Deserialize)] +pub struct Resp { + ip: String, +} + +pub struct Json2Yaml { + addr: std::net::SocketAddr, +} + +pub struct MyCtx { + buffer: Vec<u8>, +} + +#[async_trait] +impl ProxyHttp for Json2Yaml { + type CTX = MyCtx; + fn new_ctx(&self) -> Self::CTX { + MyCtx { buffer: vec![] } + } + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<Box<HttpPeer>> { + let peer = Box::new(HttpPeer::new(self.addr, false, HOST.to_owned())); + Ok(peer) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut pingora_http::RequestHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> { + upstream_request + .insert_header("Host", HOST.to_owned()) + .unwrap(); + Ok(()) + } + + async fn response_filter( + &self, + _session: &mut Session, + upstream_response: &mut ResponseHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + // Remove content-length because the size of the new body is unknown + upstream_response.remove_header("Content-Length"); + upstream_response + .insert_header("Transfer-Encoding", "Chunked") + .unwrap(); + Ok(()) + } + + fn response_body_filter( + &self, + _session: &mut Session, + body: &mut Option<Bytes>, + end_of_stream: bool, + ctx: &mut Self::CTX, + ) -> Result<Option<std::time::Duration>> + where + Self::CTX: Send + Sync, + { + // buffer the data + if let Some(b) = body { + ctx.buffer.extend(&b[..]); + // drop the body + b.clear(); + } + if end_of_stream { + // This is the last chunk, we can process the data now + let json_body: Resp = serde_json::de::from_slice(&ctx.buffer).unwrap(); + let yaml_body = serde_yaml::to_string(&json_body).unwrap(); + *body = Some(Bytes::copy_from_slice(yaml_body.as_bytes())); + } + + Ok(None) + } +} + +// RUST_LOG=INFO cargo run --example modify_response +// curl 127.0.0.1:6191 +fn main() { + env_logger::init(); + + let opt = Opt::from_args(); + let mut my_server = Server::new(Some(opt)).unwrap(); + my_server.bootstrap(); + + let mut my_proxy = pingora_proxy::http_proxy_service( + &my_server.configuration, + Json2Yaml { + // hardcode the IP of ip.jsontest.com for now + addr: ("142.251.2.121", 80) + .to_socket_addrs() + .unwrap() + .next() + .unwrap(), + }, + ); + + my_proxy.add_tcp("127.0.0.1:6191"); + + my_server.add_service(my_proxy); + my_server.run_forever(); +} diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index f50d310..5e653e2 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -504,8 +504,11 @@ impl<SV> HttpProxy<SV> { } } HttpTask::Body(data, end) => { - let data = range_body_filter.filter_body(data); - if let Some(duration) = self.inner.response_body_filter(session, &data, ctx)? { + let mut data = range_body_filter.filter_body(data); + if let Some(duration) = self + .inner + .response_body_filter(session, &mut data, end, ctx)? + { trace!("delaying response for {:?}", duration); time::sleep(duration).await; } diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index 87bb895..b1a2ca8 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -451,8 +451,11 @@ impl<SV> HttpProxy<SV> { Ok(HttpTask::Header(header, eos)) } HttpTask::Body(data, eos) => { - let data = range_body_filter.filter_body(data); - if let Some(duration) = self.inner.response_body_filter(session, &data, ctx)? { + let mut data = range_body_filter.filter_body(data); + if let Some(duration) = self + .inner + .response_body_filter(session, &mut data, eos, ctx)? + { trace!("delaying response for {:?}", duration); time::sleep(duration).await; } diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 8293a24..b98fff0 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -192,7 +192,7 @@ pub trait ProxyHttp { fn upstream_response_body_filter( &self, _session: &mut Session, - _body: &Option<Bytes>, + _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX, ) { @@ -202,7 +202,8 @@ pub trait ProxyHttp { fn response_body_filter( &self, _session: &mut Session, - _body: &Option<Bytes>, + _body: &mut Option<Bytes>, + _end_of_stream: bool, _ctx: &mut Self::CTX, ) -> Result<Option<std::time::Duration>> where |