diff options
author | ewang <[email protected]> | 2024-03-05 16:56:37 -0800 |
---|---|---|
committer | Edward Wang <[email protected]> | 2024-03-15 14:37:56 -0700 |
commit | 20fd391f3e78c9349149f11fae94da9e4657478e (patch) | |
tree | 6608f5320d7844b4a6d9d9f3f6d6b838253d3296 /pingora-proxy | |
parent | 3e09114c4d1fd5ccae8ef0526c72d232ef1fdc58 (diff) | |
download | pingora-20fd391f3e78c9349149f11fae94da9e4657478e.tar.gz pingora-20fd391f3e78c9349149f11fae94da9e4657478e.zip |
Add server_addr and client_addr to Session
Diffstat (limited to 'pingora-proxy')
-rw-r--r-- | pingora-proxy/src/subrequest.rs | 10 | ||||
-rw-r--r-- | pingora-proxy/tests/test_basic.rs | 95 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/server_utils.rs | 107 |
3 files changed, 192 insertions, 20 deletions
diff --git a/pingora-proxy/src/subrequest.rs b/pingora-proxy/src/subrequest.rs index 9490a40..e75dcc6 100644 --- a/pingora-proxy/src/subrequest.rs +++ b/pingora-proxy/src/subrequest.rs @@ -17,7 +17,9 @@ use core::pin::Pin; use core::task::{Context, Poll}; use pingora_cache::lock::WritePermit; use pingora_core::protocols::raw_connect::ProxyDigest; -use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID}; +use pingora_core::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID, +}; use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf}; @@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO { } } +impl GetSocketDigest for DummyIO { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + None + } +} + #[async_trait] impl pingora_core::protocols::Shutdown for DummyIO { async fn shutdown(&mut self) -> () {} diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index a4730bf..4f11c2b 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -20,6 +20,10 @@ use reqwest::{header, StatusCode}; use utils::server_utils::init; +fn is_specified_port(port: u16) -> bool { + (1..65535).contains(&port) +} + #[tokio::test] async fn test_origin_alive() { init(); @@ -36,8 +40,27 @@ async fn test_simple_proxy() { init(); let res = reqwest::get("http://127.0.0.1:6147").await.unwrap(); assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -53,8 +76,28 @@ async fn test_h2_to_h1() { let res = client.get("https://127.0.0.1:6150").send().await.unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -75,8 +118,27 @@ async fn test_h2_to_h2() { .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() { assert_eq!(res.status(), reqwest::StatusCode::OK); let (resp, body) = res.into_parts(); - assert_eq!(resp.headers[header::CONTENT_LENGTH], "13"); + + let headers = &resp.headers; + assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock"); + assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = hyper::body::to_bytes(body).await.unwrap(); assert_eq!(body.as_ref(), b"Hello World!\n"); } @@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() { async fn test_simple_proxy_uds_peer() { init(); let client = reqwest::Client::new(); + let res = client .get("http://127.0.0.1:6147") .header("x-uds-peer", "1") // force upstream peer to be UDS .send() .await .unwrap(); + assert_eq!(res.status(), StatusCode::OK); - let headers = res.headers(); + + let headers = &res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS + assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock"); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 32f3275..50fc804 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -23,7 +23,7 @@ use pingora_cache::{ set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason, RespCacheable, }; -use pingora_core::protocols::Digest; +use pingora_core::protocols::{l4::socket::SocketAddr, Digest}; use pingora_core::server::configuration::Opt; use pingora_core::services::Service; use pingora_core::upstreams::peer::HttpPeer; @@ -38,15 +38,72 @@ use structopt::StructOpt; pub struct ExampleProxyHttps {} #[allow(clippy::upper_case_acronyms)] +#[derive(Default)] pub struct CTX { conn_reused: bool, + upstream_client_addr: Option<SocketAddr>, + upstream_server_addr: Option<SocketAddr>, +} + +// Common logic for both ProxyHttp(s) types +fn connected_to_upstream_common( + reused: bool, + digest: Option<&Digest>, + ctx: &mut CTX, +) -> Result<()> { + ctx.conn_reused = reused; + let socket_digest = digest + .expect("upstream connector digest should be set for HTTP sessions") + .socket_digest + .as_ref() + .expect("socket digest should be set for HTTP sessions"); + ctx.upstream_client_addr = socket_digest.local_addr().cloned(); + ctx.upstream_server_addr = socket_digest.peer_addr().cloned(); + + Ok(()) +} + +fn response_filter_common( + session: &mut Session, + response: &mut ResponseHeader, + ctx: &mut CTX, +) -> Result<()> { + if ctx.conn_reused { + response.insert_header("x-conn-reuse", "1")?; + } + + let client_addr = session.client_addr(); + let server_addr = session.server_addr(); + response.insert_header( + "x-client-addr", + client_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-server-addr", + server_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + response.insert_header( + "x-upstream-client-addr", + ctx.upstream_client_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-upstream-server-addr", + ctx.upstream_server_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + Ok(()) } #[async_trait] impl ProxyHttp for ExampleProxyHttps { type CTX = CTX; fn new_ctx(&self) -> Self::CTX { - CTX { conn_reused: false } + CTX::default() } async fn upstream_peer( @@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps { async fn response_filter( &self, - _session: &mut Session, + session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { - if ctx.conn_reused { - upstream_response.insert_header("x-conn-reuse", "1")?; - } - Ok(()) + response_filter_common(session, upstream_response, ctx) } async fn upstream_request_filter( @@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps { session: &mut Session, req: &mut RequestHeader, _ctx: &mut Self::CTX, - ) -> Result<()> - where - Self::CTX: Send + Sync, - { + ) -> Result<()> { let host = session.get_header_bytes("host-override"); if host != b"" { req.insert_header("host", host)?; @@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps { reused: bool, _peer: &HttpPeer, _fd: std::os::unix::io::RawFd, - _digest: Option<&Digest>, + digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { - ctx.conn_reused = reused; - Ok(()) + connected_to_upstream_common(reused, digest, ctx) } } @@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {} #[async_trait] impl ProxyHttp for ExampleProxyHttp { - type CTX = (); - fn new_ctx(&self) -> Self::CTX {} + type CTX = CTX; + fn new_ctx(&self) -> Self::CTX { + CTX::default() + } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { let req = session.req_header(); @@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp { Ok(false) } + async fn response_filter( + &self, + session: &mut Session, + upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) -> Result<()> { + response_filter_common(session, upstream_response, ctx) + } + async fn upstream_peer( &self, session: &mut Session, @@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp { .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); let peer = Box::new(HttpPeer::new( - format!("127.0.0.1:{}", port), + format!("127.0.0.1:{port}"), false, "".to_string(), )); Ok(peer) } + + async fn connected_to_upstream( + &self, + _http_session: &mut Session, + reused: bool, + _peer: &HttpPeer, + _fd: std::os::unix::io::RawFd, + digest: Option<&Digest>, + ctx: &mut CTX, + ) -> Result<()> { + connected_to_upstream_common(reused, digest, ctx) + } } static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new); |