aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/connectors/l4.rs160
-rw-r--r--pingora-core/src/connectors/mod.rs1
-rw-r--r--pingora-core/src/upstreams/peer.rs4
4 files changed, 104 insertions, 63 deletions
diff --git a/.bleep b/.bleep
index f00c19a..311a546 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-26e2e108b43f8e1739801106e958f50892bd55cd \ No newline at end of file
+78a170341a0fb030b8bcb2afe84afb268cdc5b2d \ No newline at end of file
diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs
index 449ea4c..d226a8b 100644
--- a/pingora-core/src/connectors/l4.rs
+++ b/pingora-core/src/connectors/l4.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use async_trait::async_trait;
use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
@@ -26,6 +27,12 @@ use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;
+/// The interface to establish a L4 connection
+#[async_trait]
+pub trait Connect: std::fmt::Debug {
+ async fn connect(&self, addr: &SocketAddr) -> Result<Stream>;
+}
+
/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
pub async fn connect<P>(peer: &P, bind_to: Option<InetSocketAddr>) -> Result<Stream>
where
@@ -37,72 +44,78 @@ where
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let peer_addr = peer.address();
- let mut stream: Stream = match peer_addr {
- SocketAddr::Inet(addr) => {
- let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
- if peer.tcp_fast_open() {
- set_tcp_fastopen_connect(socket.as_raw_fd())?;
- }
- if let Some(recv_buf) = peer.tcp_recv_buf() {
- debug!("Setting recv buf size");
- set_recv_buf(socket.as_raw_fd(), recv_buf)?;
- }
- if let Some(dscp) = peer.dscp() {
- debug!("Setting dscp");
- set_dscp(socket.as_raw_fd(), dscp)?;
- }
- Ok(())
- });
- let conn_res = match peer.connection_timeout() {
- Some(t) => pingora_timeout::timeout(t, connect_future)
- .await
- .explain_err(ConnectTimedout, |_| {
- format!("timeout {t:?} connecting to server {peer}")
- })?,
- None => connect_future.await,
- };
- match conn_res {
- Ok(socket) => {
- debug!("connected to new server: {}", peer.address());
- Ok(socket.into())
- }
- Err(e) => {
- let c = format!("Fail to connect to {peer}");
- match e.etype() {
- SocketError | BindError => Error::e_because(InternalError, c, e),
- _ => Err(e.more_context(c)),
+ let mut stream: Stream =
+ if let Some(custom_l4) = peer.get_peer_options().and_then(|o| o.custom_l4.as_ref()) {
+ custom_l4.connect(peer_addr).await?
+ } else {
+ match peer_addr {
+ SocketAddr::Inet(addr) => {
+ let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
+ if peer.tcp_fast_open() {
+ set_tcp_fastopen_connect(socket.as_raw_fd())?;
+ }
+ if let Some(recv_buf) = peer.tcp_recv_buf() {
+ debug!("Setting recv buf size");
+ set_recv_buf(socket.as_raw_fd(), recv_buf)?;
+ }
+ if let Some(dscp) = peer.dscp() {
+ debug!("Setting dscp");
+ set_dscp(socket.as_raw_fd(), dscp)?;
+ }
+ Ok(())
+ });
+ let conn_res = match peer.connection_timeout() {
+ Some(t) => pingora_timeout::timeout(t, connect_future)
+ .await
+ .explain_err(ConnectTimedout, |_| {
+ format!("timeout {t:?} connecting to server {peer}")
+ })?,
+ None => connect_future.await,
+ };
+ match conn_res {
+ Ok(socket) => {
+ debug!("connected to new server: {}", peer.address());
+ Ok(socket.into())
+ }
+ Err(e) => {
+ let c = format!("Fail to connect to {peer}");
+ match e.etype() {
+ SocketError | BindError => Error::e_because(InternalError, c, e),
+ _ => Err(e.more_context(c)),
+ }
+ }
}
}
- }
- }
- SocketAddr::Unix(addr) => {
- let connect_future = connect_uds(
- addr.as_pathname()
- .expect("non-pathname unix sockets not supported as peer"),
- );
- let conn_res = match peer.connection_timeout() {
- Some(t) => pingora_timeout::timeout(t, connect_future)
- .await
- .explain_err(ConnectTimedout, |_| {
- format!("timeout {t:?} connecting to server {peer}")
- })?,
- None => connect_future.await,
- };
- match conn_res {
- Ok(socket) => {
- debug!("connected to new server: {}", peer.address());
- Ok(socket.into())
- }
- Err(e) => {
- let c = format!("Fail to connect to {peer}");
- match e.etype() {
- SocketError | BindError => Error::e_because(InternalError, c, e),
- _ => Err(e.more_context(c)),
+ SocketAddr::Unix(addr) => {
+ let connect_future = connect_uds(
+ addr.as_pathname()
+ .expect("non-pathname unix sockets not supported as peer"),
+ );
+ let conn_res = match peer.connection_timeout() {
+ Some(t) => pingora_timeout::timeout(t, connect_future)
+ .await
+ .explain_err(ConnectTimedout, |_| {
+ format!("timeout {t:?} connecting to server {peer}")
+ })?,
+ None => connect_future.await,
+ };
+ match conn_res {
+ Ok(socket) => {
+ debug!("connected to new server: {}", peer.address());
+ Ok(socket.into())
+ }
+ Err(e) => {
+ let c = format!("Fail to connect to {peer}");
+ match e.etype() {
+ SocketError | BindError => Error::e_because(InternalError, c, e),
+ _ => Err(e.more_context(c)),
+ }
+ }
}
}
- }
- }
- }?;
+ }?
+ };
+
let tracer = peer.get_tracer();
if let Some(t) = tracer {
t.0.on_connected();
@@ -250,6 +263,29 @@ mod tests {
}
#[tokio::test]
+ async fn test_custom_connect() {
+ #[derive(Debug)]
+ struct MyL4;
+ #[async_trait]
+ impl Connect for MyL4 {
+ async fn connect(&self, _addr: &SocketAddr) -> Result<Stream> {
+ tokio::net::TcpStream::connect("1.1.1.1:80")
+ .await
+ .map(|s| s.into())
+ .or_fail()
+ }
+ }
+ // :79 shouldn't be able to be connected to
+ let mut peer = BasicPeer::new("1.1.1.1:79");
+ peer.options.custom_l4 = Some(std::sync::Arc::new(MyL4 {}));
+
+ let new_session = connect(&peer, None).await;
+
+ // but MyL4 connects to :80 instead
+ assert!(new_session.is_ok());
+ }
+
+ #[tokio::test]
async fn test_connect_proxy_fail() {
let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string());
let mut path = PathBuf::new();
diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs
index 9bfd91b..d13f3a9 100644
--- a/pingora-core/src/connectors/mod.rs
+++ b/pingora-core/src/connectors/mod.rs
@@ -25,6 +25,7 @@ use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};
use l4::connect as l4_connect;
+pub use l4::Connect as L4Connect;
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
diff --git a/pingora-core/src/upstreams/peer.rs b/pingora-core/src/upstreams/peer.rs
index d0c8125..c4a23a8 100644
--- a/pingora-core/src/upstreams/peer.rs
+++ b/pingora-core/src/upstreams/peer.rs
@@ -29,6 +29,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
+use crate::connectors::L4Connect;
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::ConnFdReusable;
use crate::protocols::TcpKeepalive;
@@ -322,6 +323,8 @@ pub struct PeerOptions {
pub tcp_fast_open: bool,
// use Arc because Clone is required but not allowed in trait object
pub tracer: Option<Tracer>,
+ // A custom L4 connector to use to establish new L4 connections
+ pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
}
impl PeerOptions {
@@ -350,6 +353,7 @@ impl PeerOptions {
second_keyshare: true, // default true and noop when not using PQ curves
tcp_fast_open: false,
tracer: None,
+ custom_l4: None,
}
}