aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2024-07-18 16:50:30 -0700
committerYuchen Wu <[email protected]>2024-07-26 13:35:13 -0700
commit7c122e7f36de5c946ac960a1691c5dd41f26e6e6 (patch)
treea56e90c19fd2f95943a8fbeb8d5556fadeab6fcb
parent60787db4a15c088556eb0feab9fb89727db3a668 (diff)
downloadpingora-7c122e7f36de5c946ac960a1691c5dd41f26e6e6.tar.gz
pingora-7c122e7f36de5c946ac960a1691c5dd41f26e6e6.zip
Add the support for custom L4 connector
This allows user defined L4 connect() to be used so that they can customize the connect behavior such as changing socket options and simulating errors in tests.
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/connectors/l4.rs160
-rw-r--r--pingora-core/src/connectors/mod.rs1
-rw-r--r--pingora-core/src/upstreams/peer.rs4
4 files changed, 104 insertions, 63 deletions
diff --git a/.bleep b/.bleep
index f00c19a..311a546 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-26e2e108b43f8e1739801106e958f50892bd55cd \ No newline at end of file
+78a170341a0fb030b8bcb2afe84afb268cdc5b2d \ No newline at end of file
diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs
index 449ea4c..d226a8b 100644
--- a/pingora-core/src/connectors/l4.rs
+++ b/pingora-core/src/connectors/l4.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use async_trait::async_trait;
use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
@@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;
+/// The interface to establish a L4 connection
+#[async_trait]
+pub trait Connect: std::fmt::Debug {
+ async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
+}
+
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
@@ -37,72 +44,78 @@ where
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
- let mut stream: Stream = match peer_addr {
- SocketAddr::Inet(addr) => {
- let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
- if peer.tcp_fast_open() {
- set_tcp_fastopen_connect(socket.as_raw_fd())?;
- }
- if let Some(recv_buf) = peer.tcp_recv_buf() {
- debug!("Setting recv buf size");
- set_recv_buf(socket.as_raw_fd(), recv_buf)?;
- }
- if let Some(dscp) = peer.dscp() {
- debug!("Setting dscp");
- set_dscp(socket.as_raw_fd(), dscp)?;
- }
- Ok(())
- });
- let conn_res = match peer.connection_timeout() {
- Some(t) => pingora_timeout::timeout(t, connect_future)
- .await
- .explain_err(ConnectTimedout, |_| {
- format!("timeout {t:?} connecting to server {peer}")
- })?,
- None => connect_future.await,
- };
- match conn_res {
- Ok(socket) => {
- debug!("connected to new server: {}", peer.address());
- Ok(socket.into())
- }
- Err(e) => {
- let c = format!("Fail to connect to {peer}");
- match e.etype() {
- SocketError | BindError => Error::e_because(InternalError, c, e),
- _ => Err(e.more_context(c)),
+ let mut stream: Stream =
+ if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) {
+ custom_l4.connect(peer_addr).await?
+ } else {
+ match peer_addr {
+ SocketAddr::Inet(addr) => {
+ let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
+ if peer.tcp_fast_open() {
+ set_tcp_fastopen_connect(socket.as_raw_fd())?;
+ }
+ if let Some(recv_buf) = peer.tcp_recv_buf() {
+ debug!("Setting recv buf size");
+ set_recv_buf(socket.as_raw_fd(), recv_buf)?;
+ }
+ if let Some(dscp) = peer.dscp() {
+ debug!("Setting dscp");
+ set_dscp(socket.as_raw_fd(), dscp)?;
+ }
+ Ok(())
+ });
+ let conn_res = match peer.connection_timeout() {
+ Some(t) => pingora_timeout::timeout(t, connect_future)
+ .await
+ .explain_err(ConnectTimedout, |_| {
+ format!("timeout {t:?} connecting to server {peer}")
+ })?,
+ None => connect_future.await,
+ };
+ match conn_res {
+ Ok(socket) => {
+ debug!("connected to new server: {}", peer.address());
+ Ok(socket.into())
+ }
+ Err(e) => {
+ let c = format!("Fail to connect to {peer}");
+ match e.etype() {
+ SocketError | BindError => Error::e_because(InternalError, c, e),
+ _ => Err(e.more_context(c)),
+ }
+ }
}
}
- }
- }
- SocketAddr::Unix(addr) => {
- let connect_future = connect_uds(
- addr.as_pathname()
- .expect("non-pathname unix sockets not supported as peer"),
- );
- let conn_res = match peer.connection_timeout() {
- Some(t) => pingora_timeout::timeout(t, connect_future)
- .await
- .explain_err(ConnectTimedout, |_| {
- format!("timeout {t:?} connecting to server {peer}")
- })?,
- None => connect_future.await,
- };
- match conn_res {
- Ok(socket) => {
- debug!("connected to new server: {}", peer.address());
- Ok(socket.into())
- }
- Err(e) => {
- let c = format!("Fail to connect to {peer}");
- match e.etype() {
- SocketError | BindError => Error::e_because(InternalError, c, e),
- _ => Err(e.more_context(c)),
+ SocketAddr::Unix(addr) => {
+ let connect_future = connect_uds(
+ addr.as_pathname()
+ .expect("non-pathname unix sockets not supported as peer"),
+ );
+ let conn_res = match peer.connection_timeout() {
+ Some(t) => pingora_timeout::timeout(t, connect_future)
+ .await
+ .explain_err(ConnectTimedout, |_| {
+ format!("timeout {t:?} connecting to server {peer}")
+ })?,
+ None => connect_future.await,
+ };
+ match conn_res {
+ Ok(socket) => {
+ debug!("connected to new server: {}", peer.address());
+ Ok(socket.into())
+ }
+ Err(e) => {
+ let c = format!("Fail to connect to {peer}");
+ match e.etype() {
+ SocketError | BindError => Error::e_because(InternalError, c, e),
+ _ => Err(e.more_context(c)),
+ }
+ }
}
}
- }
- }
- }?;
+ }?
+ };
+
let tracer = peer.get_tracer();
if let Some(t) = tracer {
t.0.on_connected();
@@ -250,6 +263,29 @@ mod tests {
}
#[tokio::test]
+ async fn test_custom_connect() {
+ #[derive(Debug)]
+ struct MyL4;
+ #[async_trait]
+ impl Connect for MyL4 {
+ async fn connect(&self, _addr: &SocketAddr) -> Result<Stream> {
+ tokio::net::TcpStream::connect("1.1.1.1:80")
+ .await
+ .map(|s| s.into())
+ .or_fail()
+ }
+ }
+ // :79 shouldn't be able to be connected to
+ let mut peer = BasicPeer::new("1.1.1.1:79");
+ peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {}));
+
+ let new_session = connect(&peer, None).await;
+
+ // but MyL4 connects to :80 instead
+ assert!(new_session.is_ok());
+ }
+
+ #[tokio::test]
async fn test_connect_proxy_fail() {
let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string());
let mut path = PathBuf::new();
diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs
index 9bfd91b..d13f3a9 100644
--- a/pingora-core/src/connectors/mod.rs
+++ b/pingora-core/src/connectors/mod.rs
@@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};
use l4::connect as l4_connect;
+pub use l4::Connect as L4Connect;
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs
index d0c8125..c4a23a8 100644
--- a/pingora-core/src/upstreams/peer.rs
+++ b/pingora-core/src/upstreams/peer.rs
@@ -29,6 +29,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
+use crate::connectors::L4Connect;
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::ConnFdReusable;
use crate::protocols::TcpKeepalive;
@@ -322,6 +323,8 @@ pub struct PeerOptions {
pub tcp_fast_open: bool,
// use Arc because Clone is required but not allowed in trait object
pub tracer: Option<Tracer>,
+ // A custom L4 connector to use to establish new L4 connections
+ pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
}
impl PeerOptions {
@@ -350,6 +353,7 @@ impl PeerOptions {
second_keyshare: true, // default true and noop when not using PQ curves
tcp_fast_open: false,
tracer: None,
+ custom_l4: None,
}
}