diff options
author | ewang <[email protected]> | 2024-03-05 16:56:37 -0800 |
---|---|---|
committer | Edward Wang <[email protected]> | 2024-03-15 14:37:56 -0700 |
commit | 20fd391f3e78c9349149f11fae94da9e4657478e (patch) | |
tree | 6608f5320d7844b4a6d9d9f3f6d6b838253d3296 | |
parent | 3e09114c4d1fd5ccae8ef0526c72d232ef1fdc58 (diff) | |
download | pingora-20fd391f3e78c9349149f11fae94da9e4657478e.tar.gz pingora-20fd391f3e78c9349149f11fae94da9e4657478e.zip |
Add server_addr and client_addr to Session
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/apps/mod.rs | 14 | ||||
-rw-r--r-- | pingora-core/src/connectors/http/v1.rs | 5 | ||||
-rw-r--r-- | pingora-core/src/connectors/http/v2.rs | 1 | ||||
-rw-r--r-- | pingora-core/src/connectors/l4.rs | 13 | ||||
-rw-r--r-- | pingora-core/src/protocols/digest.rs | 45 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/client.rs | 20 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/server.rs | 18 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v1/client.rs | 19 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v1/server.rs | 34 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v2/client.rs | 20 | ||||
-rw-r--r-- | pingora-core/src/protocols/http/v2/server.rs | 37 | ||||
-rw-r--r-- | pingora-core/src/protocols/l4/listener.rs | 31 | ||||
-rw-r--r-- | pingora-core/src/protocols/l4/socket.rs | 64 | ||||
-rw-r--r-- | pingora-core/src/protocols/l4/stream.rs | 38 | ||||
-rw-r--r-- | pingora-core/src/protocols/mod.rs | 22 | ||||
-rw-r--r-- | pingora-core/src/protocols/ssl/client.rs | 16 | ||||
-rw-r--r-- | pingora-proxy/src/subrequest.rs | 10 | ||||
-rw-r--r-- | pingora-proxy/tests/test_basic.rs | 95 | ||||
-rw-r--r-- | pingora-proxy/tests/utils/server_utils.rs | 107 |
20 files changed, 568 insertions, 43 deletions
@@ -1 +1 @@ -90d84b32f4528ede68b8351c896a101af788113d
\ No newline at end of file +c16c9e8bfd9334b77a6a7c1123954f41037c06c3
\ No newline at end of file diff --git a/pingora-core/src/apps/mod.rs b/pingora-core/src/apps/mod.rs index db8f11b..6a436c7 100644 --- a/pingora-core/src/apps/mod.rs +++ b/pingora-core/src/apps/mod.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use crate::protocols::http::v2::server; use crate::protocols::http::ServerSession; +use crate::protocols::Digest; use crate::protocols::Stream; use crate::protocols::ALPN; @@ -91,6 +92,15 @@ where ) -> Option<Stream> { match stream.selected_alpn_proto() { Some(ALPN::H2) => { + // create a shared connection digest + let digest = Arc::new(Digest { + ssl_digest: stream.get_ssl_digest(), + // TODO: log h2 handshake time + timing_digest: stream.get_timing_digest(), + proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), + }); + let h2_options = self.h2_options(); let h2_conn = server::handshake(stream, h2_options).await; let mut h2_conn = match h2_conn { @@ -100,10 +110,12 @@ where } Ok(c) => c, }; + loop { // this loop ends when the client decides to close the h2 conn // TODO: add a timeout? - let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await; + let h2_stream = + server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await; let h2_stream = match h2_stream { Err(e) => { // It is common for client to just disconnect TCP without properly diff --git a/pingora-core/src/connectors/http/v1.rs b/pingora-core/src/connectors/http/v1.rs index 513fed1..7958a09 100644 --- a/pingora-core/src/connectors/http/v1.rs +++ b/pingora-core/src/connectors/http/v1.rs @@ -65,6 +65,7 @@ impl Connector { #[cfg(test)] mod tests { use super::*; + use crate::protocols::l4::socket::SocketAddr; use crate::upstreams::peer::HttpPeer; use pingora_http::RequestHeader; @@ -85,6 +86,8 @@ mod tests { let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into()); // make a new connection to 1.1.1.1 let (http, reused) = connector.get_http_session(&peer).await.unwrap(); + let server_addr = http.server_addr().unwrap(); + assert_eq!(*server_addr, "1.1.1.1:80".parse::<SocketAddr>().unwrap()); assert!(!reused); // this http is not even used, so not be able to reuse @@ -104,6 +107,8 @@ mod tests { let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into()); // make a new connection to https://1.1.1.1 let (http, reused) = connector.get_http_session(&peer).await.unwrap(); + let server_addr = http.server_addr().unwrap(); + assert_eq!(*server_addr, "1.1.1.1:443".parse::<SocketAddr>().unwrap()); assert!(!reused); // this http is not even used, so not be able to reuse diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index 389bd4e..6f26b46 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -369,6 +369,7 @@ async fn handshake( // TODO: log h2 handshake time timing_digest: stream.get_timing_digest(), proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), }; // TODO: make these configurable let (send_req, connection) = Builder::new() diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 6f0f5fd..3db2771 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -16,10 +16,12 @@ use log::debug; use pingora_error::{Context, Error, ErrorType::*, OrErr, Result}; use rand::seq::SliceRandom; use std::net::SocketAddr as InetSocketAddr; +use std::os::unix::io::AsRawFd; use crate::protocols::l4::ext::{connect as tcp_connect, connect_uds, set_tcp_keepalive}; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::l4::stream::Stream; +use crate::protocols::{GetSocketDigest, SocketDigest}; use crate::upstreams::peer::Peer; /// Establish a connection (l4) to the given peer using its settings and an optional bind address. @@ -32,7 +34,8 @@ where .await .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } - let mut stream: Stream = match peer.address() { + let peer_addr = peer.address(); + let mut stream: Stream = match peer_addr { SocketAddr::Inet(addr) => { let connect_future = tcp_connect(addr, bind_to.as_ref()); let conn_res = match peer.connection_timeout() { @@ -97,6 +100,14 @@ where } stream.set_nodelay()?; + + let digest = SocketDigest::from_raw_fd(stream.as_raw_fd()); + digest + .peer_addr + .set(Some(peer_addr.clone())) + .expect("newly created OnceCell must be empty"); + stream.set_socket_digest(digest); + Ok(stream) } diff --git a/pingora-core/src/protocols/digest.rs b/pingora-core/src/protocols/digest.rs index 13ce35c..594dbba 100644 --- a/pingora-core/src/protocols/digest.rs +++ b/pingora-core/src/protocols/digest.rs @@ -17,11 +17,14 @@ use std::sync::Arc; use std::time::SystemTime; +use once_cell::sync::OnceCell; + +use super::l4::socket::SocketAddr; use super::raw_connect::ProxyDigest; use super::ssl::digest::SslDigest; /// The information can be extracted from a connection -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Digest { /// Information regarding the TLS of this connection if any pub ssl_digest: Option<Arc<SslDigest>>, @@ -29,6 +32,8 @@ pub struct Digest { pub timing_digest: Vec<Option<TimingDigest>>, /// information regarding the CONNECT proxy this connection uses. pub proxy_digest: Option<Arc<ProxyDigest>>, + /// Information about underlying socket/fd of this connection + pub socket_digest: Option<Arc<SocketDigest>>, } /// The interface to return protocol related information @@ -53,6 +58,38 @@ impl Default for TimingDigest { } } +#[derive(Debug)] +/// The interface to return socket-related information +pub struct SocketDigest { + raw_fd: std::os::unix::io::RawFd, + /// Remote socket address + pub peer_addr: OnceCell<Option<SocketAddr>>, + /// Local socket address + pub local_addr: OnceCell<Option<SocketAddr>>, +} + +impl SocketDigest { + pub fn from_raw_fd(raw_fd: std::os::unix::io::RawFd) -> SocketDigest { + SocketDigest { + raw_fd, + peer_addr: OnceCell::new(), + local_addr: OnceCell::new(), + } + } + + pub fn peer_addr(&self) -> Option<&SocketAddr> { + self.peer_addr + .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, true)) + .as_ref() + } + + pub fn local_addr(&self) -> Option<&SocketAddr> { + self.local_addr + .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, false)) + .as_ref() + } +} + /// The interface to return timing information pub trait GetTimingDigest { /// Return the timing for each layer from the lowest layer to upper @@ -64,3 +101,9 @@ pub trait GetProxyDigest { fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>>; fn set_proxy_digest(&mut self, _digest: ProxyDigest) {} } + +/// The interface to set or return socket information +pub trait GetSocketDigest { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>>; + fn set_socket_digest(&mut self, _socket_digest: SocketDigest) {} +} diff --git a/pingora-core/src/protocols/http/client.rs b/pingora-core/src/protocols/http/client.rs index 0fe6c90..83a7618 100644 --- a/pingora-core/src/protocols/http/client.rs +++ b/pingora-core/src/protocols/http/client.rs @@ -19,7 +19,7 @@ use std::time::Duration; use super::v1::client::HttpSession as Http1Session; use super::v2::client::Http2Session; -use crate::protocols::Digest; +use crate::protocols::{Digest, SocketAddr}; /// A type for Http client session. It can be either an Http1 connection or an Http2 stream. pub enum HttpSession { @@ -151,11 +151,27 @@ impl HttpSession { /// Return the [Digest] of the connection /// /// For reused connection, the timing in the digest will reflect its initial handshakes - /// The caller should check if the connection is reused to avoid misuse the timing field + /// The caller should check if the connection is reused to avoid misuse of the timing field pub fn digest(&self) -> Option<&Digest> { match self { Self::H1(s) => Some(s.digest()), Self::H2(s) => s.digest(), } } + + /// Return the server (peer) address of the connection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.server_addr(), + Self::H2(s) => s.server_addr(), + } + } + + /// Return the client (local) address of the connection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.client_addr(), + Self::H2(s) => s.client_addr(), + } + } } diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs index c4a0b07..6edea75 100644 --- a/pingora-core/src/protocols/http/server.rs +++ b/pingora-core/src/protocols/http/server.rs @@ -18,7 +18,7 @@ use super::error_resp; use super::v1::server::HttpSession as SessionV1; use super::v2::server::HttpSession as SessionV2; use super::HttpTask; -use crate::protocols::Stream; +use crate::protocols::{SocketAddr, Stream}; use bytes::Bytes; use http::header::AsHeaderName; use http::HeaderValue; @@ -330,4 +330,20 @@ impl Session { Self::H2(s) => s.body_bytes_sent(), } } + + /// Return the client (peer) address of the connnection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.client_addr(), + Self::H2(s) => s.client_addr(), + } + } + + /// Return the server (local) address of the connection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.server_addr(), + Self::H2(s) => s.server_addr(), + } + } } diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 13e0482..7881e59 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::HttpTask; -use crate::protocols::{Digest, Stream, UniqueID}; +use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x client session @@ -65,6 +65,7 @@ impl HttpSession { ssl_digest: stream.get_ssl_digest(), timing_digest: stream.get_timing_digest(), proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), }); HttpSession { underlying_stream: stream, @@ -601,6 +602,22 @@ impl HttpSession { pub fn digest(&self) -> &Digest { &self.digest } + + /// Return the server (peer) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the client (local) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } } #[inline] diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index c1b2d75..0b19970 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -31,7 +31,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::{body_buffer::FixedBuffer, date, error_resp, HttpTask}; -use crate::protocols::Stream; +use crate::protocols::{Digest, SocketAddr, Stream}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x server session @@ -68,6 +68,8 @@ pub struct HttpSession { /// Whether this session is an upgraded session. This flag is calculated when sending the /// response header to the client. upgraded: bool, + /// Digest to track underlying connection metrics + digest: Box<Digest>, } impl HttpSession { @@ -75,6 +77,14 @@ impl HttpSession { /// The created session needs to call [`Self::read_request()`] first before performing /// any other operations. pub fn new(underlying_stream: Stream) -> Self { + // TODO: maybe we should put digest in the connection itself + let digest = Box::new(Digest { + ssl_digest: underlying_stream.get_ssl_digest(), + timing_digest: underlying_stream.get_timing_digest(), + proxy_digest: underlying_stream.get_proxy_digest(), + socket_digest: underlying_stream.get_socket_digest(), + }); + HttpSession { underlying_stream, buf: Bytes::new(), // zero size, with be replaced by parsed header later @@ -92,6 +102,7 @@ impl HttpSession { body_bytes_sent: 0, retry_buffer: None, upgraded: false, + digest, } } @@ -751,6 +762,27 @@ impl HttpSession { } } + /// Return the [Digest] of the connection. + pub fn digest(&self) -> &Digest { + &self.digest + } + + /// Return the client (peer) address of the underlying connnection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the server (local) address of the underlying connnection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } + /// Consume `self`, if the connection can be reused, the underlying stream will be returned /// to be fed to the next [`Self::new()`]. The next session can just call [`Self::read_request()`]. /// If the connection cannot be reused, the underlying stream will be closed and `None` will be diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 48551ec..15f86d0 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch; use crate::connectors::http::v2::ConnectionRef; -use crate::protocols::Digest; +use crate::protocols::{Digest, SocketAddr}; pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); @@ -310,6 +310,24 @@ impl Http2Session { Some(self.conn.digest()) } + /// Return the server (peer) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.conn + .digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the client (local) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.conn + .digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } + /// the FD of the underlying connection pub fn fd(&self) -> i32 { self.conn.id() diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs index 6ec75e8..a811769 100644 --- a/pingora-core/src/protocols/http/v2/server.rs +++ b/pingora-core/src/protocols/http/v2/server.rs @@ -23,12 +23,13 @@ use http::header::HeaderName; use http::{header, Response}; use log::{debug, warn}; use pingora_http::{RequestHeader, ResponseHeader}; +use std::sync::Arc; use crate::protocols::http::body_buffer::FixedBuffer; use crate::protocols::http::date::get_cached_date; use crate::protocols::http::v1::client::http_req_header_to_wire; use crate::protocols::http::HttpTask; -use crate::protocols::Stream; +use crate::protocols::{Digest, SocketAddr, Stream}; use crate::{Error, ErrorType, OrErr, Result}; const BODY_BUF_LIMIT: usize = 1024 * 64; @@ -95,6 +96,8 @@ pub struct HttpSession { body_sent: usize, // buffered request body for retry logic retry_buffer: Option<FixedBuffer>, + // digest to record underlying connection info + digest: Arc<Digest>, } impl HttpSession { @@ -102,11 +105,19 @@ impl HttpSession { /// This function returns a new HTTP/2 session when the provided HTTP/2 connection, `conn`, /// establishes a new HTTP/2 stream to this server. /// + /// A [`Digest`] from the IO stream is also stored in the resulting session, since the + /// session doesn't have access to the underlying stream (and the stream itself isn't + /// accessible from the `h2::server::Connection`). + /// /// Note: in order to handle all **existing** and new HTTP/2 sessions, the server must call /// this function in a loop until the client decides to close the connection. /// /// `None` will be returned when the connection is closing so that the loop can exit. - pub async fn from_h2_conn(conn: &mut H2Connection<Stream>) -> Result<Option<Self>> { + /// + pub async fn from_h2_conn( + conn: &mut H2Connection<Stream>, + digest: Arc<Digest>, + ) -> Result<Option<Self>> { // NOTE: conn.accept().await is what drives the entire connection. let res = conn.accept().await.transpose().or_err( ErrorType::H2Error, @@ -125,6 +136,7 @@ impl HttpSession { body_read: 0, body_sent: 0, retry_buffer: None, + digest, } })) } @@ -405,6 +417,21 @@ impl HttpSession { pub fn body_bytes_sent(&self) -> usize { self.body_sent } + + /// Return the [Digest] of the connection. + pub fn digest(&self) -> Option<&Digest> { + Some(&self.digest) + } + + /// Return the server (local) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest.socket_digest.as_ref().map(|d| d.local_addr())? + } + + /// Return the client (peer) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest.socket_digest.as_ref().map(|d| d.peer_addr())? + } } #[cfg(test)] @@ -444,8 +471,12 @@ mod test { }); let mut connection = handshake(Box::new(server), None).await.unwrap(); + let digest = Arc::new(Digest::default()); - while let Some(mut http) = HttpSession::from_h2_conn(&mut connection).await.unwrap() { + while let Some(mut http) = HttpSession::from_h2_conn(&mut connection, digest.clone()) + .await + .unwrap() + { tokio::spawn(async move { let req = http.req_header(); assert_eq!(req.method, Method::GET); diff --git a/pingora-core/src/protocols/l4/listener.rs b/pingora-core/src/protocols/l4/listener.rs index 6473fb4..29cc9e9 100644 --- a/pingora-core/src/protocols/l4/listener.rs +++ b/pingora-core/src/protocols/l4/listener.rs @@ -18,6 +18,7 @@ use std::io; use std::os::unix::io::AsRawFd; use tokio::net::{TcpListener, UnixListener}; +use crate::protocols::digest::{GetSocketDigest, SocketDigest}; use crate::protocols::l4::stream::Stream; /// The type for generic listener for both TCP and Unix domain socket @@ -40,7 +41,7 @@ impl From<UnixListener> for Listener { } impl AsRawFd for Listener { - fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { match &self { Self::Tcp(l) => l.as_raw_fd(), Self::Unix(l) => l.as_raw_fd(), @@ -52,8 +53,32 @@ impl Listener { /// Accept a connection from the listening endpoint pub async fn accept(&self) -> io::Result<Stream> { match &self { - Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()), - Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()), + Self::Tcp(l) => l.accept().await.map(|(stream, peer_addr)| { + let mut s: Stream = stream.into(); + let digest = SocketDigest::from_raw_fd(s.as_raw_fd()); + digest + .peer_addr + .set(Some(peer_addr.into())) + .expect("newly created OnceCell must be empty"); + s.set_socket_digest(digest); + // TODO: if listening on a specific bind address, we could save + // an extra syscall looking up the local_addr later if we can pass + // and init it in the socket digest here + s + }), + Self::Unix(l) => l.accept().await.map(|(stream, peer_addr)| { + let mut s: Stream = stream.into(); + let digest = SocketDigest::from_raw_fd(s.as_raw_fd()); + // note: if unnamed/abstract UDS, it will be `None` + // (see TryFrom<tokio::net::unix::SocketAddr>) + let addr = peer_addr.try_into().ok(); + digest + .peer_addr + .set(addr) + .expect("newly created OnceCell must be empty"); + s.set_socket_digest(digest); + s + }), } } } diff --git a/pingora-core/src/protocols/l4/socket.rs b/pingora-core/src/protocols/l4/socket.rs index 02eab36..186fbec 100644 --- a/pingora-core/src/protocols/l4/socket.rs +++ b/pingora-core/src/protocols/l4/socket.rs @@ -15,10 +15,12 @@ //! Generic socket type use crate::{Error, OrErr}; +use nix::sys::socket::{getpeername, getsockname, SockaddrStorage}; use std::cmp::Ordering; use std::hash::{Hash, Hasher}; use std::net::SocketAddr as StdSockAddr; use std::os::unix::net::SocketAddr as StdUnixSockAddr; +use tokio::net::unix::SocketAddr as TokioUnixSockAddr; /// [`SocketAddr`] is a storage type that contains either a Internet (IP address) /// socket address or a Unix domain socket address. @@ -53,6 +55,40 @@ impl SocketAddr { addr.set_port(port) } } + + fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option<SocketAddr> { + if let Some(v4) = sock.as_sockaddr_in() { + return Some(SocketAddr::Inet(StdSockAddr::V4( + std::net::SocketAddrV4::new(v4.ip().into(), v4.port()), + ))); + } else if let Some(v6) = sock.as_sockaddr_in6() { + return Some(SocketAddr::Inet(StdSockAddr::V6( + std::net::SocketAddrV6::new(v6.ip(), v6.port(), v6.flowinfo(), v6.scope_id()), + ))); + } + + // TODO: don't set abstract / unnamed for now, + // for parity with how we treat these types in TryFrom<TokioUnixSockAddr> + Some(SocketAddr::Unix( + sock.as_unix_addr() + .map(|addr| addr.path().map(StdUnixSockAddr::from_pathname))?? + .ok()?, + )) + } + + pub fn from_raw_fd(fd: std::os::unix::io::RawFd, peer_addr: bool) -> Option<SocketAddr> { + let sockaddr_storage = if peer_addr { + getpeername(fd) + } else { + getsockname(fd) + }; + match sockaddr_storage { + Ok(sockaddr) => Self::from_sockaddr_storage(&sockaddr), + // could be errors such as EBADF, i.e. fd is no longer a valid socket + // fail open in this case + Err(_e) => None, + } + } } impl std::fmt::Display for SocketAddr { @@ -167,6 +203,34 @@ impl std::net::ToSocketAddrs for SocketAddr { } } +impl From<StdSockAddr> for SocketAddr { + fn from(sockaddr: StdSockAddr) -> Self { + SocketAddr::Inet(sockaddr) + } +} + +impl From<StdUnixSockAddr> for SocketAddr { + fn from(sockaddr: StdUnixSockAddr) -> Self { + SocketAddr::Unix(sockaddr) + } +} + +// TODO: ideally mio/tokio will start using the std version of the unix `SocketAddr` +// so we can avoid a fallible conversion +// https://github.com/tokio-rs/mio/issues/1527 +impl TryFrom<TokioUnixSockAddr> for SocketAddr { + type Error = String; + + fn try_from(value: TokioUnixSockAddr) -> Result<Self, Self::Error> { + if let Some(Ok(addr)) = value.as_pathname().map(StdUnixSockAddr::from_pathname) { + Ok(addr.into()) + } else { + // may be unnamed/abstract UDS + Err(format!("could not convert {value:?} to SocketAddr")) + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index ebe4f67..e2efdbf 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -27,7 +27,10 @@ use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf}; use tokio::net::{TcpStream, UnixStream}; use crate::protocols::raw_connect::ProxyDigest; -use crate::protocols::{GetProxyDigest, GetTimingDigest, Shutdown, Ssl, TimingDigest, UniqueID}; +use crate::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, + UniqueID, +}; use crate::upstreams::peer::Tracer; #[derive(Debug)] @@ -105,6 +108,15 @@ impl AsyncWrite for RawStream { } } +impl AsRawFd for RawStream { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + match self { + RawStream::Tcp(s) => s.as_raw_fd(), + RawStream::Unix(s) => s.as_raw_fd(), + } + } +} + // Large read buffering helps reducing syscalls with little trade-off // Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot. const BUF_READ_SIZE: usize = 64 * 1024; @@ -123,6 +135,7 @@ pub struct Stream { stream: BufStream<RawStream>, buffer_write: bool, proxy_digest: Option<Arc<ProxyDigest>>, + socket_digest: Option<Arc<SocketDigest>>, /// When this connection is established pub established_ts: SystemTime, /// The distributed tracing object for this stream @@ -147,6 +160,7 @@ impl From<TcpStream> for Stream { buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, + socket_digest: None, tracer: None, } } @@ -159,17 +173,21 @@ impl From<UnixStream> for Stream { buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, + socket_digest: None, tracer: None, } } } +impl AsRawFd for Stream { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.stream.get_ref().as_raw_fd() + } +} + impl UniqueID for Stream { fn id(&self) -> i32 { - match &self.stream.get_ref() { - RawStream::Tcp(s) => s.as_raw_fd(), - RawStream::Unix(s) => s.as_raw_fd(), - } + self.as_raw_fd() } } @@ -204,6 +222,16 @@ impl GetProxyDigest for Stream { } } +impl GetSocketDigest for Stream { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + self.socket_digest.clone() + } + + fn set_socket_digest(&mut self, socket_digest: SocketDigest) { + self.socket_digest = Some(Arc::new(socket_digest)) + } +} + impl Drop for Stream { fn drop(&mut self) { if let Some(t) = self.tracer.as_ref() { diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index 4df6da8..6b7a357 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -20,7 +20,10 @@ pub mod l4; pub mod raw_connect; pub mod ssl; -pub use digest::{Digest, GetProxyDigest, GetTimingDigest, ProtoDigest, TimingDigest}; +pub use digest::{ + Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest, + TimingDigest, +}; pub use ssl::ALPN; use async_trait::async_trait; @@ -71,6 +74,7 @@ pub trait IO: + Ssl + GetTimingDigest + GetProxyDigest + + GetSocketDigest + Unpin + Debug + Send @@ -90,6 +94,7 @@ impl< + Ssl + GetTimingDigest + GetProxyDigest + + GetSocketDigest + Unpin + Debug + Send @@ -134,6 +139,11 @@ mod ext_io_impl { None } } + impl GetSocketDigest for Mock { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + None + } + } use std::io::Cursor; @@ -157,6 +167,11 @@ mod ext_io_impl { None } } + impl<T> GetSocketDigest for Cursor<T> { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + None + } + } use tokio::io::DuplexStream; @@ -180,6 +195,11 @@ mod ext_io_impl { None } } + impl GetSocketDigest for DuplexStream { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + None + } + } } pub(crate) trait ConnFdReusable { diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs index 7ed683f..07811dd 100644 --- a/pingora-core/src/protocols/ssl/client.rs +++ b/pingora-core/src/protocols/ssl/client.rs @@ -16,7 +16,9 @@ use super::SslStream; use crate::protocols::raw_connect::ProxyDigest; -use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO}; +use crate::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, TimingDigest, IO, +}; use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; @@ -90,3 +92,15 @@ where self.get_ref().get_proxy_digest() } } + +impl<S> GetSocketDigest for SslStream<S> +where + S: GetSocketDigest, +{ + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + self.get_ref().get_socket_digest() + } + fn set_socket_digest(&mut self, socket_digest: SocketDigest) { + self.get_mut().set_socket_digest(socket_digest) + } +} diff --git a/pingora-proxy/src/subrequest.rs b/pingora-proxy/src/subrequest.rs index 9490a40..e75dcc6 100644 --- a/pingora-proxy/src/subrequest.rs +++ b/pingora-proxy/src/subrequest.rs @@ -17,7 +17,9 @@ use core::pin::Pin; use core::task::{Context, Poll}; use pingora_cache::lock::WritePermit; use pingora_core::protocols::raw_connect::ProxyDigest; -use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID}; +use pingora_core::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID, +}; use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf}; @@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO { } } +impl GetSocketDigest for DummyIO { + fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> { + None + } +} + #[async_trait] impl pingora_core::protocols::Shutdown for DummyIO { async fn shutdown(&mut self) -> () {} diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index a4730bf..4f11c2b 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -20,6 +20,10 @@ use reqwest::{header, StatusCode}; use utils::server_utils::init; +fn is_specified_port(port: u16) -> bool { + (1..65535).contains(&port) +} + #[tokio::test] async fn test_origin_alive() { init(); @@ -36,8 +40,27 @@ async fn test_simple_proxy() { init(); let res = reqwest::get("http://127.0.0.1:6147").await.unwrap(); assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -53,8 +76,28 @@ async fn test_h2_to_h1() { let res = client.get("https://127.0.0.1:6150").send().await.unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -75,8 +118,27 @@ async fn test_h2_to_h2() { .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() { assert_eq!(res.status(), reqwest::StatusCode::OK); let (resp, body) = res.into_parts(); - assert_eq!(resp.headers[header::CONTENT_LENGTH], "13"); + + let headers = &resp.headers; + assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock"); + assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = hyper::body::to_bytes(body).await.unwrap(); assert_eq!(body.as_ref(), b"Hello World!\n"); } @@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() { async fn test_simple_proxy_uds_peer() { init(); let client = reqwest::Client::new(); + let res = client .get("http://127.0.0.1:6147") .header("x-uds-peer", "1") // force upstream peer to be UDS .send() .await .unwrap(); + assert_eq!(res.status(), StatusCode::OK); - let headers = res.headers(); + + let headers = &res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::<std::net::SocketAddr>() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS + assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock"); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 32f3275..50fc804 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -23,7 +23,7 @@ use pingora_cache::{ set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason, RespCacheable, }; -use pingora_core::protocols::Digest; +use pingora_core::protocols::{l4::socket::SocketAddr, Digest}; use pingora_core::server::configuration::Opt; use pingora_core::services::Service; use pingora_core::upstreams::peer::HttpPeer; @@ -38,15 +38,72 @@ use structopt::StructOpt; pub struct ExampleProxyHttps {} #[allow(clippy::upper_case_acronyms)] +#[derive(Default)] pub struct CTX { conn_reused: bool, + upstream_client_addr: Option<SocketAddr>, + upstream_server_addr: Option<SocketAddr>, +} + +// Common logic for both ProxyHttp(s) types +fn connected_to_upstream_common( + reused: bool, + digest: Option<&Digest>, + ctx: &mut CTX, +) -> Result<()> { + ctx.conn_reused = reused; + let socket_digest = digest + .expect("upstream connector digest should be set for HTTP sessions") + .socket_digest + .as_ref() + .expect("socket digest should be set for HTTP sessions"); + ctx.upstream_client_addr = socket_digest.local_addr().cloned(); + ctx.upstream_server_addr = socket_digest.peer_addr().cloned(); + + Ok(()) +} + +fn response_filter_common( + session: &mut Session, + response: &mut ResponseHeader, + ctx: &mut CTX, +) -> Result<()> { + if ctx.conn_reused { + response.insert_header("x-conn-reuse", "1")?; + } + + let client_addr = session.client_addr(); + let server_addr = session.server_addr(); + response.insert_header( + "x-client-addr", + client_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-server-addr", + server_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + response.insert_header( + "x-upstream-client-addr", + ctx.upstream_client_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-upstream-server-addr", + ctx.upstream_server_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + Ok(()) } #[async_trait] impl ProxyHttp for ExampleProxyHttps { type CTX = CTX; fn new_ctx(&self) -> Self::CTX { - CTX { conn_reused: false } + CTX::default() } async fn upstream_peer( @@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps { async fn response_filter( &self, - _session: &mut Session, + session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { - if ctx.conn_reused { - upstream_response.insert_header("x-conn-reuse", "1")?; - } - Ok(()) + response_filter_common(session, upstream_response, ctx) } async fn upstream_request_filter( @@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps { session: &mut Session, req: &mut RequestHeader, _ctx: &mut Self::CTX, - ) -> Result<()> - where - Self::CTX: Send + Sync, - { + ) -> Result<()> { let host = session.get_header_bytes("host-override"); if host != b"" { req.insert_header("host", host)?; @@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps { reused: bool, _peer: &HttpPeer, _fd: std::os::unix::io::RawFd, - _digest: Option<&Digest>, + digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { - ctx.conn_reused = reused; - Ok(()) + connected_to_upstream_common(reused, digest, ctx) } } @@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {} #[async_trait] impl ProxyHttp for ExampleProxyHttp { - type CTX = (); - fn new_ctx(&self) -> Self::CTX {} + type CTX = CTX; + fn new_ctx(&self) -> Self::CTX { + CTX::default() + } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { let req = session.req_header(); @@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp { Ok(false) } + async fn response_filter( + &self, + session: &mut Session, + upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) -> Result<()> { + response_filter_common(session, upstream_response, ctx) + } + async fn upstream_peer( &self, session: &mut Session, @@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp { .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); let peer = Box::new(HttpPeer::new( - format!("127.0.0.1:{}", port), + format!("127.0.0.1:{port}"), false, "".to_string(), )); Ok(peer) } + + async fn connected_to_upstream( + &self, + _http_session: &mut Session, + reused: bool, + _peer: &HttpPeer, + _fd: std::os::unix::io::RawFd, + digest: Option<&Digest>, + ctx: &mut CTX, + ) -> Result<()> { + connected_to_upstream_common(reused, digest, ctx) + } } static CACHE_BACKEND: Lazy<MemCache> = Lazy::new(MemCache::new); |