aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-proxy
diff options
context:
space:
mode:
authorewang <[email protected]>2024-03-05 16:56:37 -0800
committerEdward Wang <[email protected]>2024-03-15 14:37:56 -0700
commit20fd391f3e78c9349149f11fae94da9e4657478e (patch)
tree6608f5320d7844b4a6d9d9f3f6d6b838253d3296 /pingora-proxy
parent3e09114c4d1fd5ccae8ef0526c72d232ef1fdc58 (diff)
downloadpingora-20fd391f3e78c9349149f11fae94da9e4657478e.tar.gz
pingora-20fd391f3e78c9349149f11fae94da9e4657478e.zip
Add server_addr and client_addr to Session
Diffstat (limited to 'pingora-proxy')
-rw-r--r--pingora-proxy/src/subrequest.rs10
-rw-r--r--pingora-proxy/tests/test_basic.rs95
-rw-r--r--pingora-proxy/tests/utils/server_utils.rs107
3 files changed, 192 insertions, 20 deletions
diff --git a/pingora-proxy/src/subrequest.rs b/pingora-proxy/src/subrequest.rs
index 9490a40..e75dcc6 100644
--- a/pingora-proxy/src/subrequest.rs
+++ b/pingora-proxy/src/subrequest.rs
@@ -17,7 +17,9 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use pingora_cache::lock::WritePermit;
use pingora_core::protocols::raw_connect::ProxyDigest;
-use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID};
+use pingora_core::protocols::{
+ GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID,
+};
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf};
@@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO {
}
}
+impl GetSocketDigest for DummyIO {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ None
+ }
+}
+
#[async_trait]
impl pingora_core::protocols::Shutdown for DummyIO {
async fn shutdown(&mut self) -> () {}
diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs
index a4730bf..4f11c2b 100644
--- a/pingora-proxy/tests/test_basic.rs
+++ b/pingora-proxy/tests/test_basic.rs
@@ -20,6 +20,10 @@ use reqwest::{header, StatusCode};
use utils::server_utils::init;
+fn is_specified_port(port: u16) -> bool {
+ (1..65535).contains(&port)
+}
+
#[tokio::test]
async fn test_origin_alive() {
init();
@@ -36,8 +40,27 @@ async fn test_simple_proxy() {
init();
let res = reqwest::get("http://127.0.0.1:6147").await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -53,8 +76,28 @@ async fn test_h2_to_h1() {
let res = client.get("https://127.0.0.1:6150").send().await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
+
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -75,8 +118,27 @@ async fn test_h2_to_h2() {
.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() {
assert_eq!(res.status(), reqwest::StatusCode::OK);
let (resp, body) = res.into_parts();
- assert_eq!(resp.headers[header::CONTENT_LENGTH], "13");
+
+ let headers = &resp.headers;
+ assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock");
+ assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
}
@@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() {
async fn test_simple_proxy_uds_peer() {
init();
let client = reqwest::Client::new();
+
let res = client
.get("http://127.0.0.1:6147")
.header("x-uds-peer", "1") // force upstream peer to be UDS
.send()
.await
.unwrap();
+
assert_eq!(res.status(), StatusCode::OK);
- let headers = res.headers();
+
+ let headers = &res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS
+ assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock");
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs
index 32f3275..50fc804 100644
--- a/pingora-proxy/tests/utils/server_utils.rs
+++ b/pingora-proxy/tests/utils/server_utils.rs
@@ -23,7 +23,7 @@ use pingora_cache::{
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
-use pingora_core::protocols::Digest;
+use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
use pingora_core::server::configuration::Opt;
use pingora_core::services::Service;
use pingora_core::upstreams::peer::HttpPeer;
@@ -38,15 +38,72 @@ use structopt::StructOpt;
pub struct ExampleProxyHttps {}
#[allow(clippy::upper_case_acronyms)]
+#[derive(Default)]
pub struct CTX {
conn_reused: bool,
+ upstream_client_addr: Option<SocketAddr>,
+ upstream_server_addr: Option<SocketAddr>,
+}
+
+// Common logic for both ProxyHttp(s) types
+fn connected_to_upstream_common(
+ reused: bool,
+ digest: Option<&Digest>,
+ ctx: &mut CTX,
+) -> Result<()> {
+ ctx.conn_reused = reused;
+ let socket_digest = digest
+ .expect("upstream connector digest should be set for HTTP sessions")
+ .socket_digest
+ .as_ref()
+ .expect("socket digest should be set for HTTP sessions");
+ ctx.upstream_client_addr = socket_digest.local_addr().cloned();
+ ctx.upstream_server_addr = socket_digest.peer_addr().cloned();
+
+ Ok(())
+}
+
+fn response_filter_common(
+ session: &mut Session,
+ response: &mut ResponseHeader,
+ ctx: &mut CTX,
+) -> Result<()> {
+ if ctx.conn_reused {
+ response.insert_header("x-conn-reuse", "1")?;
+ }
+
+ let client_addr = session.client_addr();
+ let server_addr = session.server_addr();
+ response.insert_header(
+ "x-client-addr",
+ client_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+ response.insert_header(
+ "x-server-addr",
+ server_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+
+ response.insert_header(
+ "x-upstream-client-addr",
+ ctx.upstream_client_addr
+ .as_ref()
+ .map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+ response.insert_header(
+ "x-upstream-server-addr",
+ ctx.upstream_server_addr
+ .as_ref()
+ .map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+
+ Ok(())
}
#[async_trait]
impl ProxyHttp for ExampleProxyHttps {
type CTX = CTX;
fn new_ctx(&self) -> Self::CTX {
- CTX { conn_reused: false }
+ CTX::default()
}
async fn upstream_peer(
@@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps {
async fn response_filter(
&self,
- _session: &mut Session,
+ session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
- if ctx.conn_reused {
- upstream_response.insert_header("x-conn-reuse", "1")?;
- }
- Ok(())
+ response_filter_common(session, upstream_response, ctx)
}
async fn upstream_request_filter(
@@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps {
session: &mut Session,
req: &mut RequestHeader,
_ctx: &mut Self::CTX,
- ) -> Result<()>
- where
- Self::CTX: Send + Sync,
- {
+ ) -> Result<()> {
let host = session.get_header_bytes("host-override");
if host != b"" {
req.insert_header("host", host)?;
@@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps {
reused: bool,
_peer: &HttpPeer,
_fd: std::os::unix::io::RawFd,
- _digest: Option<&Digest>,
+ digest: Option<&Digest>,
ctx: &mut CTX,
) -> Result<()> {
- ctx.conn_reused = reused;
- Ok(())
+ connected_to_upstream_common(reused, digest, ctx)
}
}
@@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {}
#[async_trait]
impl ProxyHttp for ExampleProxyHttp {
- type CTX = ();
- fn new_ctx(&self) -> Self::CTX {}
+ type CTX = CTX;
+ fn new_ctx(&self) -> Self::CTX {
+ CTX::default()
+ }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req = session.req_header();
@@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp {
Ok(false)
}
+ async fn response_filter(
+ &self,
+ session: &mut Session,
+ upstream_response: &mut ResponseHeader,
+ ctx: &mut Self::CTX,
+ ) -> Result<()> {
+ response_filter_common(session, upstream_response, ctx)
+ }
+
async fn upstream_peer(
&self,
session: &mut Session,
@@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp {
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
let peer = Box::new(HttpPeer::new(
- format!("127.0.0.1:{}", port),
+ format!("127.0.0.1:{port}"),
false,
"".to_string(),
));
Ok(peer)
}
+
+ async fn connected_to_upstream(
+ &self,
+ _http_session: &mut Session,
+ reused: bool,
+ _peer: &HttpPeer,
+ _fd: std::os::unix::io::RawFd,
+ digest: Option<&Digest>,
+ ctx: &mut CTX,
+ ) -> Result<()> {
+ connected_to_upstream_common(reused, digest, ctx)
+ }
}
static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);