aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorewang <[email protected]>2024-03-05 16:56:37 -0800
committerEdward Wang <[email protected]>2024-03-15 14:37:56 -0700
commit20fd391f3e78c9349149f11fae94da9e4657478e (patch)
tree6608f5320d7844b4a6d9d9f3f6d6b838253d3296
parent3e09114c4d1fd5ccae8ef0526c72d232ef1fdc58 (diff)
downloadpingora-20fd391f3e78c9349149f11fae94da9e4657478e.tar.gz
pingora-20fd391f3e78c9349149f11fae94da9e4657478e.zip
Add server_addr and client_addr to Session
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/apps/mod.rs14
-rw-r--r--pingora-core/src/connectors/http/v1.rs5
-rw-r--r--pingora-core/src/connectors/http/v2.rs1
-rw-r--r--pingora-core/src/connectors/l4.rs13
-rw-r--r--pingora-core/src/protocols/digest.rs45
-rw-r--r--pingora-core/src/protocols/http/client.rs20
-rw-r--r--pingora-core/src/protocols/http/server.rs18
-rw-r--r--pingora-core/src/protocols/http/v1/client.rs19
-rw-r--r--pingora-core/src/protocols/http/v1/server.rs34
-rw-r--r--pingora-core/src/protocols/http/v2/client.rs20
-rw-r--r--pingora-core/src/protocols/http/v2/server.rs37
-rw-r--r--pingora-core/src/protocols/l4/listener.rs31
-rw-r--r--pingora-core/src/protocols/l4/socket.rs64
-rw-r--r--pingora-core/src/protocols/l4/stream.rs38
-rw-r--r--pingora-core/src/protocols/mod.rs22
-rw-r--r--pingora-core/src/protocols/ssl/client.rs16
-rw-r--r--pingora-proxy/src/subrequest.rs10
-rw-r--r--pingora-proxy/tests/test_basic.rs95
-rw-r--r--pingora-proxy/tests/utils/server_utils.rs107
20 files changed, 568 insertions, 43 deletions
diff --git a/.bleep b/.bleep
index 66c4bb2..1ac1f3c 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-90d84b32f4528ede68b8351c896a101af788113d \ No newline at end of file
+c16c9e8bfd9334b77a6a7c1123954f41037c06c3 \ No newline at end of file
diff --git a/pingora-core/src/apps/mod.rs b/pingora-core/src/apps/mod.rs
index db8f11b..6a436c7 100644
--- a/pingora-core/src/apps/mod.rs
+++ b/pingora-core/src/apps/mod.rs
@@ -24,6 +24,7 @@ use std::sync::Arc;
use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
+use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
@@ -91,6 +92,15 @@ where
) -> Option<Stream> {
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
+ // create a shared connection digest
+ let digest = Arc::new(Digest {
+ ssl_digest: stream.get_ssl_digest(),
+ // TODO: log h2 handshake time
+ timing_digest: stream.get_timing_digest(),
+ proxy_digest: stream.get_proxy_digest(),
+ socket_digest: stream.get_socket_digest(),
+ });
+
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
@@ -100,10 +110,12 @@ where
}
Ok(c) => c,
};
+
loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
- let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await;
+ let h2_stream =
+ server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
// It is common for client to just disconnect TCP without properly
diff --git a/pingora-core/src/connectors/http/v1.rs b/pingora-core/src/connectors/http/v1.rs
index 513fed1..7958a09 100644
--- a/pingora-core/src/connectors/http/v1.rs
+++ b/pingora-core/src/connectors/http/v1.rs
@@ -65,6 +65,7 @@ impl Connector {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::protocols::l4::socket::SocketAddr;
use crate::upstreams::peer::HttpPeer;
use pingora_http::RequestHeader;
@@ -85,6 +86,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
// make a new connection to 1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
+ let server_addr = http.server_addr().unwrap();
+ assert_eq!(*server_addr, "1.1.1.1:80".parse::<SocketAddr>().unwrap());
assert!(!reused);
// this http is not even used, so not be able to reuse
@@ -104,6 +107,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
// make a new connection to https://1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
+ let server_addr = http.server_addr().unwrap();
+ assert_eq!(*server_addr, "1.1.1.1:443".parse::<SocketAddr>().unwrap());
assert!(!reused);
// this http is not even used, so not be able to reuse
diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs
index 389bd4e..6f26b46 100644
--- a/pingora-core/src/connectors/http/v2.rs
+++ b/pingora-core/src/connectors/http/v2.rs
@@ -369,6 +369,7 @@ async fn handshake(
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
+ socket_digest: stream.get_socket_digest(),
};
// TODO: make these configurable
let (send_req, connection) = Builder::new()
diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs
index 6f0f5fd..3db2771 100644
--- a/pingora-core/src/connectors/l4.rs
+++ b/pingora-core/src/connectors/l4.rs
@@ -16,10 +16,12 @@ use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
use std::net::SocketAddr as InetSocketAddr;
+use std::os::unix::io::AsRawFd;
use crate::protocols::l4::ext::{connect as tcp_connect, connect_uds, set_tcp_keepalive};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::l4::stream::Stream;
+use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
@@ -32,7 +34,8 @@ where
.await
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
- let mut stream: Stream = match peer.address() {
+ let peer_addr = peer.address();
+ let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref());
let conn_res = match peer.connection_timeout() {
@@ -97,6 +100,14 @@ where
}
stream.set_nodelay()?;
+
+ let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
+ digest
+ .peer_addr
+ .set(Some(peer_addr.clone()))
+ .expect("newly created OnceCell must be empty");
+ stream.set_socket_digest(digest);
+
Ok(stream)
}
diff --git a/pingora-core/src/protocols/digest.rs b/pingora-core/src/protocols/digest.rs
index 13ce35c..594dbba 100644
--- a/pingora-core/src/protocols/digest.rs
+++ b/pingora-core/src/protocols/digest.rs
@@ -17,11 +17,14 @@
use std::sync::Arc;
use std::time::SystemTime;
+use once_cell::sync::OnceCell;
+
+use super::l4::socket::SocketAddr;
use super::raw_connect::ProxyDigest;
use super::ssl::digest::SslDigest;
/// The information can be extracted from a connection
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Default)]
pub struct Digest {
/// Information regarding the TLS of this connection if any
pub ssl_digest: Option<Arc<SslDigest>>,
@@ -29,6 +32,8 @@ pub struct Digest {
pub timing_digest: Vec<Option<TimingDigest>>,
/// information regarding the CONNECT proxy this connection uses.
pub proxy_digest: Option<Arc<ProxyDigest>>,
+ /// Information about underlying socket/fd of this connection
+ pub socket_digest: Option<Arc<SocketDigest>>,
}
/// The interface to return protocol related information
@@ -53,6 +58,38 @@ impl Default for TimingDigest {
}
}
+#[derive(Debug)]
+/// The interface to return socket-related information
+pub struct SocketDigest {
+ raw_fd: std::os::unix::io::RawFd,
+ /// Remote socket address
+ pub peer_addr: OnceCell<Option<SocketAddr>>,
+ /// Local socket address
+ pub local_addr: OnceCell<Option<SocketAddr>>,
+}
+
+impl SocketDigest {
+ pub fn from_raw_fd(raw_fd: std::os::unix::io::RawFd) -> SocketDigest {
+ SocketDigest {
+ raw_fd,
+ peer_addr: OnceCell::new(),
+ local_addr: OnceCell::new(),
+ }
+ }
+
+ pub fn peer_addr(&self) -> Option<&SocketAddr> {
+ self.peer_addr
+ .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, true))
+ .as_ref()
+ }
+
+ pub fn local_addr(&self) -> Option<&SocketAddr> {
+ self.local_addr
+ .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, false))
+ .as_ref()
+ }
+}
+
/// The interface to return timing information
pub trait GetTimingDigest {
/// Return the timing for each layer from the lowest layer to upper
@@ -64,3 +101,9 @@ pub trait GetProxyDigest {
fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>>;
fn set_proxy_digest(&mut self, _digest: ProxyDigest) {}
}
+
+/// The interface to set or return socket information
+pub trait GetSocketDigest {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>>;
+ fn set_socket_digest(&mut self, _socket_digest: SocketDigest) {}
+}
diff --git a/pingora-core/src/protocols/http/client.rs b/pingora-core/src/protocols/http/client.rs
index 0fe6c90..83a7618 100644
--- a/pingora-core/src/protocols/http/client.rs
+++ b/pingora-core/src/protocols/http/client.rs
@@ -19,7 +19,7 @@ use std::time::Duration;
use super::v1::client::HttpSession as Http1Session;
use super::v2::client::Http2Session;
-use crate::protocols::Digest;
+use crate::protocols::{Digest, SocketAddr};
/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
pub enum HttpSession {
@@ -151,11 +151,27 @@ impl HttpSession {
/// Return the [Digest] of the connection
///
/// For reused connection, the timing in the digest will reflect its initial handshakes
- /// The caller should check if the connection is reused to avoid misuse the timing field
+ /// The caller should check if the connection is reused to avoid misuse of the timing field
pub fn digest(&self) -> Option<&Digest> {
match self {
Self::H1(s) => Some(s.digest()),
Self::H2(s) => s.digest(),
}
}
+
+ /// Return the server (peer) address of the connection.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ match self {
+ Self::H1(s) => s.server_addr(),
+ Self::H2(s) => s.server_addr(),
+ }
+ }
+
+ /// Return the client (local) address of the connection.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ match self {
+ Self::H1(s) => s.client_addr(),
+ Self::H2(s) => s.client_addr(),
+ }
+ }
}
diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs
index c4a0b07..6edea75 100644
--- a/pingora-core/src/protocols/http/server.rs
+++ b/pingora-core/src/protocols/http/server.rs
@@ -18,7 +18,7 @@ use super::error_resp;
use super::v1::server::HttpSession as SessionV1;
use super::v2::server::HttpSession as SessionV2;
use super::HttpTask;
-use crate::protocols::Stream;
+use crate::protocols::{SocketAddr, Stream};
use bytes::Bytes;
use http::header::AsHeaderName;
use http::HeaderValue;
@@ -330,4 +330,20 @@ impl Session {
Self::H2(s) => s.body_bytes_sent(),
}
}
+
+ /// Return the client (peer) address of the connnection.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ match self {
+ Self::H1(s) => s.client_addr(),
+ Self::H2(s) => s.client_addr(),
+ }
+ }
+
+ /// Return the server (local) address of the connection.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ match self {
+ Self::H1(s) => s.server_addr(),
+ Self::H2(s) => s.server_addr(),
+ }
+ }
}
diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs
index 13e0482..7881e59 100644
--- a/pingora-core/src/protocols/http/v1/client.rs
+++ b/pingora-core/src/protocols/http/v1/client.rs
@@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::body::{BodyReader, BodyWriter};
use super::common::*;
use crate::protocols::http::HttpTask;
-use crate::protocols::{Digest, Stream, UniqueID};
+use crate::protocols::{Digest, SocketAddr, Stream, UniqueID};
use crate::utils::{BufRef, KVRef};
/// The HTTP 1.x client session
@@ -65,6 +65,7 @@ impl HttpSession {
ssl_digest: stream.get_ssl_digest(),
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
+ socket_digest: stream.get_socket_digest(),
});
HttpSession {
underlying_stream: stream,
@@ -601,6 +602,22 @@ impl HttpSession {
pub fn digest(&self) -> &Digest {
&self.digest
}
+
+ /// Return the server (peer) address recorded in the connection digest.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ self.digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.peer_addr())?
+ }
+
+ /// Return the client (local) address recorded in the connection digest.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ self.digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.local_addr())?
+ }
}
#[inline]
diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs
index c1b2d75..0b19970 100644
--- a/pingora-core/src/protocols/http/v1/server.rs
+++ b/pingora-core/src/protocols/http/v1/server.rs
@@ -31,7 +31,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::body::{BodyReader, BodyWriter};
use super::common::*;
use crate::protocols::http::{body_buffer::FixedBuffer, date, error_resp, HttpTask};
-use crate::protocols::Stream;
+use crate::protocols::{Digest, SocketAddr, Stream};
use crate::utils::{BufRef, KVRef};
/// The HTTP 1.x server session
@@ -68,6 +68,8 @@ pub struct HttpSession {
/// Whether this session is an upgraded session. This flag is calculated when sending the
/// response header to the client.
upgraded: bool,
+ /// Digest to track underlying connection metrics
+ digest: Box<Digest>,
}
impl HttpSession {
@@ -75,6 +77,14 @@ impl HttpSession {
/// The created session needs to call [`Self::read_request()`] first before performing
/// any other operations.
pub fn new(underlying_stream: Stream) -> Self {
+ // TODO: maybe we should put digest in the connection itself
+ let digest = Box::new(Digest {
+ ssl_digest: underlying_stream.get_ssl_digest(),
+ timing_digest: underlying_stream.get_timing_digest(),
+ proxy_digest: underlying_stream.get_proxy_digest(),
+ socket_digest: underlying_stream.get_socket_digest(),
+ });
+
HttpSession {
underlying_stream,
buf: Bytes::new(), // zero size, with be replaced by parsed header later
@@ -92,6 +102,7 @@ impl HttpSession {
body_bytes_sent: 0,
retry_buffer: None,
upgraded: false,
+ digest,
}
}
@@ -751,6 +762,27 @@ impl HttpSession {
}
}
+ /// Return the [Digest] of the connection.
+ pub fn digest(&self) -> &Digest {
+ &self.digest
+ }
+
+ /// Return the client (peer) address of the underlying connnection.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ self.digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.peer_addr())?
+ }
+
+ /// Return the server (local) address of the underlying connnection.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ self.digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.local_addr())?
+ }
+
/// Consume `self`, if the connection can be reused, the underlying stream will be returned
/// to be fed to the next [`Self::new()`]. The next session can just call [`Self::read_request()`].
/// If the connection cannot be reused, the underlying stream will be closed and `None` will be
diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs
index 48551ec..15f86d0 100644
--- a/pingora-core/src/protocols/http/v2/client.rs
+++ b/pingora-core/src/protocols/http/v2/client.rs
@@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::watch;
use crate::connectors::http::v2::ConnectionRef;
-use crate::protocols::Digest;
+use crate::protocols::{Digest, SocketAddr};
pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout");
@@ -310,6 +310,24 @@ impl Http2Session {
Some(self.conn.digest())
}
+ /// Return the server (peer) address recorded in the connection digest.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ self.conn
+ .digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.peer_addr())?
+ }
+
+ /// Return the client (local) address recorded in the connection digest.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ self.conn
+ .digest()
+ .socket_digest
+ .as_ref()
+ .map(|d| d.local_addr())?
+ }
+
/// the FD of the underlying connection
pub fn fd(&self) -> i32 {
self.conn.id()
diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs
index 6ec75e8..a811769 100644
--- a/pingora-core/src/protocols/http/v2/server.rs
+++ b/pingora-core/src/protocols/http/v2/server.rs
@@ -23,12 +23,13 @@ use http::header::HeaderName;
use http::{header, Response};
use log::{debug, warn};
use pingora_http::{RequestHeader, ResponseHeader};
+use std::sync::Arc;
use crate::protocols::http::body_buffer::FixedBuffer;
use crate::protocols::http::date::get_cached_date;
use crate::protocols::http::v1::client::http_req_header_to_wire;
use crate::protocols::http::HttpTask;
-use crate::protocols::Stream;
+use crate::protocols::{Digest, SocketAddr, Stream};
use crate::{Error, ErrorType, OrErr, Result};
const BODY_BUF_LIMIT: usize = 1024 * 64;
@@ -95,6 +96,8 @@ pub struct HttpSession {
body_sent: usize,
// buffered request body for retry logic
retry_buffer: Option<FixedBuffer>,
+ // digest to record underlying connection info
+ digest: Arc<Digest>,
}
impl HttpSession {
@@ -102,11 +105,19 @@ impl HttpSession {
/// This function returns a new HTTP/2 session when the provided HTTP/2 connection, `conn`,
/// establishes a new HTTP/2 stream to this server.
///
+ /// A [`Digest`] from the IO stream is also stored in the resulting session, since the
+ /// session doesn't have access to the underlying stream (and the stream itself isn't
+ /// accessible from the `h2::server::Connection`).
+ ///
/// Note: in order to handle all **existing** and new HTTP/2 sessions, the server must call
/// this function in a loop until the client decides to close the connection.
///
/// `None` will be returned when the connection is closing so that the loop can exit.
- pub async fn from_h2_conn(conn: &mut H2Connection<Stream>) -> Result<Option<Self>> {
+ ///
+ pub async fn from_h2_conn(
+ conn: &mut H2Connection<Stream>,
+ digest: Arc<Digest>,
+ ) -> Result<Option<Self>> {
// NOTE: conn.accept().await is what drives the entire connection.
let res = conn.accept().await.transpose().or_err(
ErrorType::H2Error,
@@ -125,6 +136,7 @@ impl HttpSession {
body_read: 0,
body_sent: 0,
retry_buffer: None,
+ digest,
}
}))
}
@@ -405,6 +417,21 @@ impl HttpSession {
pub fn body_bytes_sent(&self) -> usize {
self.body_sent
}
+
+ /// Return the [Digest] of the connection.
+ pub fn digest(&self) -> Option<&Digest> {
+ Some(&self.digest)
+ }
+
+ /// Return the server (local) address recorded in the connection digest.
+ pub fn server_addr(&self) -> Option<&SocketAddr> {
+ self.digest.socket_digest.as_ref().map(|d| d.local_addr())?
+ }
+
+ /// Return the client (peer) address recorded in the connection digest.
+ pub fn client_addr(&self) -> Option<&SocketAddr> {
+ self.digest.socket_digest.as_ref().map(|d| d.peer_addr())?
+ }
}
#[cfg(test)]
@@ -444,8 +471,12 @@ mod test {
});
let mut connection = handshake(Box::new(server), None).await.unwrap();
+ let digest = Arc::new(Digest::default());
- while let Some(mut http) = HttpSession::from_h2_conn(&mut connection).await.unwrap() {
+ while let Some(mut http) = HttpSession::from_h2_conn(&mut connection, digest.clone())
+ .await
+ .unwrap()
+ {
tokio::spawn(async move {
let req = http.req_header();
assert_eq!(req.method, Method::GET);
diff --git a/pingora-core/src/protocols/l4/listener.rs b/pingora-core/src/protocols/l4/listener.rs
index 6473fb4..29cc9e9 100644
--- a/pingora-core/src/protocols/l4/listener.rs
+++ b/pingora-core/src/protocols/l4/listener.rs
@@ -18,6 +18,7 @@ use std::io;
use std::os::unix::io::AsRawFd;
use tokio::net::{TcpListener, UnixListener};
+use crate::protocols::digest::{GetSocketDigest, SocketDigest};
use crate::protocols::l4::stream::Stream;
/// The type for generic listener for both TCP and Unix domain socket
@@ -40,7 +41,7 @@ impl From<UnixListener> for Listener {
}
impl AsRawFd for Listener {
- fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
match &self {
Self::Tcp(l) => l.as_raw_fd(),
Self::Unix(l) => l.as_raw_fd(),
@@ -52,8 +53,32 @@ impl Listener {
/// Accept a connection from the listening endpoint
pub async fn accept(&self) -> io::Result<Stream> {
match &self {
- Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()),
- Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()),
+ Self::Tcp(l) => l.accept().await.map(|(stream, peer_addr)| {
+ let mut s: Stream = stream.into();
+ let digest = SocketDigest::from_raw_fd(s.as_raw_fd());
+ digest
+ .peer_addr
+ .set(Some(peer_addr.into()))
+ .expect("newly created OnceCell must be empty");
+ s.set_socket_digest(digest);
+ // TODO: if listening on a specific bind address, we could save
+ // an extra syscall looking up the local_addr later if we can pass
+ // and init it in the socket digest here
+ s
+ }),
+ Self::Unix(l) => l.accept().await.map(|(stream, peer_addr)| {
+ let mut s: Stream = stream.into();
+ let digest = SocketDigest::from_raw_fd(s.as_raw_fd());
+ // note: if unnamed/abstract UDS, it will be `None`
+ // (see TryFrom<tokio::net::unix::SocketAddr>)
+ let addr = peer_addr.try_into().ok();
+ digest
+ .peer_addr
+ .set(addr)
+ .expect("newly created OnceCell must be empty");
+ s.set_socket_digest(digest);
+ s
+ }),
}
}
}
diff --git a/pingora-core/src/protocols/l4/socket.rs b/pingora-core/src/protocols/l4/socket.rs
index 02eab36..186fbec 100644
--- a/pingora-core/src/protocols/l4/socket.rs
+++ b/pingora-core/src/protocols/l4/socket.rs
@@ -15,10 +15,12 @@
//! Generic socket type
use crate::{Error, OrErr};
+use nix::sys::socket::{getpeername, getsockname, SockaddrStorage};
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::net::SocketAddr as StdSockAddr;
use std::os::unix::net::SocketAddr as StdUnixSockAddr;
+use tokio::net::unix::SocketAddr as TokioUnixSockAddr;
/// [`SocketAddr`] is a storage type that contains either a Internet (IP address)
/// socket address or a Unix domain socket address.
@@ -53,6 +55,40 @@ impl SocketAddr {
addr.set_port(port)
}
}
+
+ fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option<SocketAddr> {
+ if let Some(v4) = sock.as_sockaddr_in() {
+ return Some(SocketAddr::Inet(StdSockAddr::V4(
+ std::net::SocketAddrV4::new(v4.ip().into(), v4.port()),
+ )));
+ } else if let Some(v6) = sock.as_sockaddr_in6() {
+ return Some(SocketAddr::Inet(StdSockAddr::V6(
+ std::net::SocketAddrV6::new(v6.ip(), v6.port(), v6.flowinfo(), v6.scope_id()),
+ )));
+ }
+
+ // TODO: don't set abstract / unnamed for now,
+ // for parity with how we treat these types in TryFrom<TokioUnixSockAddr>
+ Some(SocketAddr::Unix(
+ sock.as_unix_addr()
+ .map(|addr| addr.path().map(StdUnixSockAddr::from_pathname))??
+ .ok()?,
+ ))
+ }
+
+ pub fn from_raw_fd(fd: std::os::unix::io::RawFd, peer_addr: bool) -> Option<SocketAddr> {
+ let sockaddr_storage = if peer_addr {
+ getpeername(fd)
+ } else {
+ getsockname(fd)
+ };
+ match sockaddr_storage {
+ Ok(sockaddr) => Self::from_sockaddr_storage(&sockaddr),
+ // could be errors such as EBADF, i.e. fd is no longer a valid socket
+ // fail open in this case
+ Err(_e) => None,
+ }
+ }
}
impl std::fmt::Display for SocketAddr {
@@ -167,6 +203,34 @@ impl std::net::ToSocketAddrs for SocketAddr {
}
}
+impl From<StdSockAddr> for SocketAddr {
+ fn from(sockaddr: StdSockAddr) -> Self {
+ SocketAddr::Inet(sockaddr)
+ }
+}
+
+impl From<StdUnixSockAddr> for SocketAddr {
+ fn from(sockaddr: StdUnixSockAddr) -> Self {
+ SocketAddr::Unix(sockaddr)
+ }
+}
+
+// TODO: ideally mio/tokio will start using the std version of the unix `SocketAddr`
+// so we can avoid a fallible conversion
+// https://github.com/tokio-rs/mio/issues/1527
+impl TryFrom<TokioUnixSockAddr> for SocketAddr {
+ type Error = String;
+
+ fn try_from(value: TokioUnixSockAddr) -> Result<Self, Self::Error> {
+ if let Some(Ok(addr)) = value.as_pathname().map(StdUnixSockAddr::from_pathname) {
+ Ok(addr.into())
+ } else {
+ // may be unnamed/abstract UDS
+ Err(format!("could not convert {value:?} to SocketAddr"))
+ }
+ }
+}
+
#[cfg(test)]
mod test {
use super::*;
diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs
index ebe4f67..e2efdbf 100644
--- a/pingora-core/src/protocols/l4/stream.rs
+++ b/pingora-core/src/protocols/l4/stream.rs
@@ -27,7 +27,10 @@ use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf};
use tokio::net::{TcpStream, UnixStream};
use crate::protocols::raw_connect::ProxyDigest;
-use crate::protocols::{GetProxyDigest, GetTimingDigest, Shutdown, Ssl, TimingDigest, UniqueID};
+use crate::protocols::{
+ GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest,
+ UniqueID,
+};
use crate::upstreams::peer::Tracer;
#[derive(Debug)]
@@ -105,6 +108,15 @@ impl AsyncWrite for RawStream {
}
}
+impl AsRawFd for RawStream {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
+ match self {
+ RawStream::Tcp(s) => s.as_raw_fd(),
+ RawStream::Unix(s) => s.as_raw_fd(),
+ }
+ }
+}
+
// Large read buffering helps reducing syscalls with little trade-off
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
const BUF_READ_SIZE: usize = 64 * 1024;
@@ -123,6 +135,7 @@ pub struct Stream {
stream: BufStream<RawStream>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
+ socket_digest: Option<Arc<SocketDigest>>,
/// When this connection is established
pub established_ts: SystemTime,
/// The distributed tracing object for this stream
@@ -147,6 +160,7 @@ impl From<TcpStream> for Stream {
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
+ socket_digest: None,
tracer: None,
}
}
@@ -159,17 +173,21 @@ impl From<UnixStream> for Stream {
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
+ socket_digest: None,
tracer: None,
}
}
}
+impl AsRawFd for Stream {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
+ self.stream.get_ref().as_raw_fd()
+ }
+}
+
impl UniqueID for Stream {
fn id(&self) -> i32 {
- match &self.stream.get_ref() {
- RawStream::Tcp(s) => s.as_raw_fd(),
- RawStream::Unix(s) => s.as_raw_fd(),
- }
+ self.as_raw_fd()
}
}
@@ -204,6 +222,16 @@ impl GetProxyDigest for Stream {
}
}
+impl GetSocketDigest for Stream {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ self.socket_digest.clone()
+ }
+
+ fn set_socket_digest(&mut self, socket_digest: SocketDigest) {
+ self.socket_digest = Some(Arc::new(socket_digest))
+ }
+}
+
impl Drop for Stream {
fn drop(&mut self) {
if let Some(t) = self.tracer.as_ref() {
diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs
index 4df6da8..6b7a357 100644
--- a/pingora-core/src/protocols/mod.rs
+++ b/pingora-core/src/protocols/mod.rs
@@ -20,7 +20,10 @@ pub mod l4;
pub mod raw_connect;
pub mod ssl;
-pub use digest::{Digest, GetProxyDigest, GetTimingDigest, ProtoDigest, TimingDigest};
+pub use digest::{
+ Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
+ TimingDigest,
+};
pub use ssl::ALPN;
use async_trait::async_trait;
@@ -71,6 +74,7 @@ pub trait IO:
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ + GetSocketDigest
+ Unpin
+ Debug
+ Send
@@ -90,6 +94,7 @@ impl<
+ Ssl
+ GetTimingDigest
+ GetProxyDigest
+ + GetSocketDigest
+ Unpin
+ Debug
+ Send
@@ -134,6 +139,11 @@ mod ext_io_impl {
None
}
}
+ impl GetSocketDigest for Mock {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ None
+ }
+ }
use std::io::Cursor;
@@ -157,6 +167,11 @@ mod ext_io_impl {
None
}
}
+ impl<T> GetSocketDigest for Cursor<T> {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ None
+ }
+ }
use tokio::io::DuplexStream;
@@ -180,6 +195,11 @@ mod ext_io_impl {
None
}
}
+ impl GetSocketDigest for DuplexStream {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ None
+ }
+ }
}
pub(crate) trait ConnFdReusable {
diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs
index 7ed683f..07811dd 100644
--- a/pingora-core/src/protocols/ssl/client.rs
+++ b/pingora-core/src/protocols/ssl/client.rs
@@ -16,7 +16,9 @@
use super::SslStream;
use crate::protocols::raw_connect::ProxyDigest;
-use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO};
+use crate::protocols::{
+ GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, TimingDigest, IO,
+};
use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
@@ -90,3 +92,15 @@ where
self.get_ref().get_proxy_digest()
}
}
+
+impl<S> GetSocketDigest for SslStream<S>
+where
+ S: GetSocketDigest,
+{
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ self.get_ref().get_socket_digest()
+ }
+ fn set_socket_digest(&mut self, socket_digest: SocketDigest) {
+ self.get_mut().set_socket_digest(socket_digest)
+ }
+}
diff --git a/pingora-proxy/src/subrequest.rs b/pingora-proxy/src/subrequest.rs
index 9490a40..e75dcc6 100644
--- a/pingora-proxy/src/subrequest.rs
+++ b/pingora-proxy/src/subrequest.rs
@@ -17,7 +17,9 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use pingora_cache::lock::WritePermit;
use pingora_core::protocols::raw_connect::ProxyDigest;
-use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID};
+use pingora_core::protocols::{
+ GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID,
+};
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf};
@@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO {
}
}
+impl GetSocketDigest for DummyIO {
+ fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
+ None
+ }
+}
+
#[async_trait]
impl pingora_core::protocols::Shutdown for DummyIO {
async fn shutdown(&mut self) -> () {}
diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs
index a4730bf..4f11c2b 100644
--- a/pingora-proxy/tests/test_basic.rs
+++ b/pingora-proxy/tests/test_basic.rs
@@ -20,6 +20,10 @@ use reqwest::{header, StatusCode};
use utils::server_utils::init;
+fn is_specified_port(port: u16) -> bool {
+ (1..65535).contains(&port)
+}
+
#[tokio::test]
async fn test_origin_alive() {
init();
@@ -36,8 +40,27 @@ async fn test_simple_proxy() {
init();
let res = reqwest::get("http://127.0.0.1:6147").await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -53,8 +76,28 @@ async fn test_h2_to_h1() {
let res = client.get("https://127.0.0.1:6150").send().await.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
+
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -75,8 +118,27 @@ async fn test_h2_to_h2() {
.unwrap();
assert_eq!(res.status(), reqwest::StatusCode::OK);
assert_eq!(res.version(), reqwest::Version::HTTP_2);
+
let headers = res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6150");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
@@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() {
assert_eq!(res.status(), reqwest::StatusCode::OK);
let (resp, body) = res.into_parts();
- assert_eq!(resp.headers[header::CONTENT_LENGTH], "13");
+
+ let headers = &resp.headers;
+ assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock");
+ assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS
+
+ assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000");
+ let sockaddr = headers["x-upstream-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.2");
+ assert!(is_specified_port(sockaddr.port()));
+
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(body.as_ref(), b"Hello World!\n");
}
@@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() {
async fn test_simple_proxy_uds_peer() {
init();
let client = reqwest::Client::new();
+
let res = client
.get("http://127.0.0.1:6147")
.header("x-uds-peer", "1") // force upstream peer to be UDS
.send()
.await
.unwrap();
+
assert_eq!(res.status(), StatusCode::OK);
- let headers = res.headers();
+
+ let headers = &res.headers();
assert_eq!(headers[header::CONTENT_LENGTH], "13");
+ assert_eq!(headers["x-server-addr"], "127.0.0.1:6147");
+ let sockaddr = headers["x-client-addr"]
+ .to_str()
+ .unwrap()
+ .parse::<std::net::SocketAddr>()
+ .unwrap();
+ assert_eq!(sockaddr.ip().to_string(), "127.0.0.1");
+ assert!(is_specified_port(sockaddr.port()));
+
+ assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS
+ assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock");
+
let body = res.text().await.unwrap();
assert_eq!(body, "Hello World!\n");
}
diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs
index 32f3275..50fc804 100644
--- a/pingora-proxy/tests/utils/server_utils.rs
+++ b/pingora-proxy/tests/utils/server_utils.rs
@@ -23,7 +23,7 @@ use pingora_cache::{
set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason,
RespCacheable,
};
-use pingora_core::protocols::Digest;
+use pingora_core::protocols::{l4::socket::SocketAddr, Digest};
use pingora_core::server::configuration::Opt;
use pingora_core::services::Service;
use pingora_core::upstreams::peer::HttpPeer;
@@ -38,15 +38,72 @@ use structopt::StructOpt;
pub struct ExampleProxyHttps {}
#[allow(clippy::upper_case_acronyms)]
+#[derive(Default)]
pub struct CTX {
conn_reused: bool,
+ upstream_client_addr: Option<SocketAddr>,
+ upstream_server_addr: Option<SocketAddr>,
+}
+
+// Common logic for both ProxyHttp(s) types
+fn connected_to_upstream_common(
+ reused: bool,
+ digest: Option<&Digest>,
+ ctx: &mut CTX,
+) -> Result<()> {
+ ctx.conn_reused = reused;
+ let socket_digest = digest
+ .expect("upstream connector digest should be set for HTTP sessions")
+ .socket_digest
+ .as_ref()
+ .expect("socket digest should be set for HTTP sessions");
+ ctx.upstream_client_addr = socket_digest.local_addr().cloned();
+ ctx.upstream_server_addr = socket_digest.peer_addr().cloned();
+
+ Ok(())
+}
+
+fn response_filter_common(
+ session: &mut Session,
+ response: &mut ResponseHeader,
+ ctx: &mut CTX,
+) -> Result<()> {
+ if ctx.conn_reused {
+ response.insert_header("x-conn-reuse", "1")?;
+ }
+
+ let client_addr = session.client_addr();
+ let server_addr = session.server_addr();
+ response.insert_header(
+ "x-client-addr",
+ client_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+ response.insert_header(
+ "x-server-addr",
+ server_addr.map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+
+ response.insert_header(
+ "x-upstream-client-addr",
+ ctx.upstream_client_addr
+ .as_ref()
+ .map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+ response.insert_header(
+ "x-upstream-server-addr",
+ ctx.upstream_server_addr
+ .as_ref()
+ .map_or_else(|| "unset".into(), |a| a.to_string()),
+ )?;
+
+ Ok(())
}
#[async_trait]
impl ProxyHttp for ExampleProxyHttps {
type CTX = CTX;
fn new_ctx(&self) -> Self::CTX {
- CTX { conn_reused: false }
+ CTX::default()
}
async fn upstream_peer(
@@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps {
async fn response_filter(
&self,
- _session: &mut Session,
+ session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
- if ctx.conn_reused {
- upstream_response.insert_header("x-conn-reuse", "1")?;
- }
- Ok(())
+ response_filter_common(session, upstream_response, ctx)
}
async fn upstream_request_filter(
@@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps {
session: &mut Session,
req: &mut RequestHeader,
_ctx: &mut Self::CTX,
- ) -> Result<()>
- where
- Self::CTX: Send + Sync,
- {
+ ) -> Result<()> {
let host = session.get_header_bytes("host-override");
if host != b"" {
req.insert_header("host", host)?;
@@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps {
reused: bool,
_peer: &HttpPeer,
_fd: std::os::unix::io::RawFd,
- _digest: Option<&Digest>,
+ digest: Option<&Digest>,
ctx: &mut CTX,
) -> Result<()> {
- ctx.conn_reused = reused;
- Ok(())
+ connected_to_upstream_common(reused, digest, ctx)
}
}
@@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {}
#[async_trait]
impl ProxyHttp for ExampleProxyHttp {
- type CTX = ();
- fn new_ctx(&self) -> Self::CTX {}
+ type CTX = CTX;
+ fn new_ctx(&self) -> Self::CTX {
+ CTX::default()
+ }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req = session.req_header();
@@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp {
Ok(false)
}
+ async fn response_filter(
+ &self,
+ session: &mut Session,
+ upstream_response: &mut ResponseHeader,
+ ctx: &mut Self::CTX,
+ ) -> Result<()> {
+ response_filter_common(session, upstream_response, ctx)
+ }
+
async fn upstream_peer(
&self,
session: &mut Session,
@@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp {
.get("x-port")
.map_or("8000", |v| v.to_str().unwrap());
let peer = Box::new(HttpPeer::new(
- format!("127.0.0.1:{}", port),
+ format!("127.0.0.1:{port}"),
false,
"".to_string(),
));
Ok(peer)
}
+
+ async fn connected_to_upstream(
+ &self,
+ _http_session: &mut Session,
+ reused: bool,
+ _peer: &HttpPeer,
+ _fd: std::os::unix::io::RawFd,
+ digest: Option<&Digest>,
+ ctx: &mut CTX,
+ ) -> Result<()> {
+ connected_to_upstream_common(reused, digest, ctx)
+ }
}
static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new);