aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorNoah Kennedy <[email protected]>2024-11-04 14:47:25 -0600
committerMatthew (mbg) <[email protected]>2024-11-08 10:14:21 -0800
commit3f564af3ae56e898478e13e71d67d095d7f5dbbd (patch)
tree9a7af30350483eef25c83e45a43aa37b3af61861
parent49e8d31206df85d8048574c5dd1838870d443cc0 (diff)
downloadpingora-3f564af3ae56e898478e13e71d67d095d7f5dbbd.tar.gz
pingora-3f564af3ae56e898478e13e71d67d095d7f5dbbd.zip
add builder api for pingora listeners:
This change takes the existing API for ListenerEndpoint and modifies it to use a typical idiomatic builder style. There are additional improvements which I feel can be made here, but I believe that it would be better to do them in subsequent commits in order to keep changes as incremental as possible. In particular, I think that making socket options into sub-builder is probably a good move.
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/listeners/l4.rs122
-rw-r--r--pingora-core/src/listeners/mod.rs72
-rw-r--r--pingora-core/src/services/listening.rs18
4 files changed, 125 insertions, 89 deletions
diff --git a/.bleep b/.bleep
index cbcf6cc..51c1503 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-396362a56a579d4e83f4022cdaf943c858f52a84 \ No newline at end of file
+a76fd1ca898a9518c6aa0c7c8fa5ed845dd1bcbb \ No newline at end of file
diff --git a/pingora-core/src/listeners/l4.rs b/pingora-core/src/listeners/l4.rs
index 43c4939..c832320 100644
--- a/pingora-core/src/listeners/l4.rs
+++ b/pingora-core/src/listeners/l4.rs
@@ -255,51 +255,71 @@ async fn bind(addr: &ServerAddress) -> Result<Listener> {
pub struct ListenerEndpoint {
listen_addr: ServerAddress,
- listener: Option<Listener>,
+ listener: Listener,
}
-impl ListenerEndpoint {
- pub fn new(listen_addr: ServerAddress) -> Self {
- ListenerEndpoint {
- listen_addr,
- listener: None,
- }
+#[derive(Default)]
+pub struct ListenerEndpointBuilder {
+ listen_addr: Option<ServerAddress>,
+}
+
+impl ListenerEndpointBuilder {
+ pub fn new() -> ListenerEndpointBuilder {
+ Self { listen_addr: None }
}
- pub fn as_str(&self) -> &str {
- self.listen_addr.as_ref()
+ pub fn listen_addr(&mut self, addr: ServerAddress) -> &mut Self {
+ self.listen_addr = Some(addr);
+ self
}
#[cfg(unix)]
- pub async fn listen(&mut self, fds: Option<ListenFds>) -> Result<()> {
- if self.listener.is_some() {
- return Ok(());
- }
+ pub async fn listen(self, fds: Option<ListenFds>) -> Result<ListenerEndpoint> {
+ let listen_addr = self
+ .listen_addr
+ .expect("Tried to listen with no addr specified");
let listener = if let Some(fds_table) = fds {
- let addr = self.listen_addr.as_ref();
+ let addr_str = listen_addr.as_ref();
+
// consider make this mutex std::sync::Mutex or OnceCell
let mut table = fds_table.lock().await;
- if let Some(fd) = table.get(addr.as_ref()) {
- from_raw_fd(&self.listen_addr, *fd)?
+
+ if let Some(fd) = table.get(addr_str) {
+ from_raw_fd(&listen_addr, *fd)?
} else {
// not found
- let listener = bind(&self.listen_addr).await?;
- table.add(addr.to_string(), listener.as_raw_fd());
+ let listener = bind(&listen_addr).await?;
+ table.add(addr_str.to_string(), listener.as_raw_fd());
listener
}
} else {
// not found, no fd table
- bind(&self.listen_addr).await?
+ bind(&listen_addr).await?
};
- self.listener = Some(listener);
- Ok(())
+
+ Ok(ListenerEndpoint {
+ listen_addr,
+ listener,
+ })
}
#[cfg(windows)]
- pub async fn listen(&mut self) -> Result<()> {
- self.listener = Some(bind(&self.listen_addr).await?);
- Ok(())
+ pub async fn listen(self) -> Result<ListenerEndpoint> {
+ Ok(ListenerEndpoint {
+ listen_addr,
+ listener: bind(&listen_addr).await?,
+ })
+ }
+}
+
+impl ListenerEndpoint {
+ pub fn builder() -> ListenerEndpointBuilder {
+ ListenerEndpointBuilder::new()
+ }
+
+ pub fn as_str(&self) -> &str {
+ self.listen_addr.as_ref()
}
fn apply_stream_settings(&self, stream: &mut Stream) -> Result<()> {
@@ -321,11 +341,8 @@ impl ListenerEndpoint {
}
pub async fn accept(&mut self) -> Result<Stream> {
- let Some(listener) = self.listener.as_mut() else {
- // panic otherwise this thing dead loop
- panic!("Need to call listen() first");
- };
- let mut stream = listener
+ let mut stream = self
+ .listener
.accept()
.await
.or_err(AcceptError, "Fail to accept()")?;
@@ -341,14 +358,17 @@ mod test {
#[tokio::test]
async fn test_listen_tcp() {
let addr = "127.0.0.1:7100";
- let mut listener = ListenerEndpoint::new(ServerAddress::Tcp(addr.into(), None));
- listener
- .listen(
- #[cfg(unix)]
- None,
- )
- .await
- .unwrap();
+
+ let mut builder = ListenerEndpoint::builder();
+
+ builder.listen_addr(ServerAddress::Tcp(addr.into(), None));
+
+ #[cfg(unix)]
+ let mut listener = builder.listen(None).await.unwrap();
+
+ #[cfg(windows)]
+ let mut listener = builder.listen().await.unwrap();
+
tokio::spawn(async move {
// just try to accept once
listener.accept().await.unwrap();
@@ -364,14 +384,17 @@ mod test {
ipv6_only: Some(true),
..Default::default()
});
- let mut listener = ListenerEndpoint::new(ServerAddress::Tcp("[::]:7101".into(), sock_opt));
- listener
- .listen(
- #[cfg(unix)]
- None,
- )
- .await
- .unwrap();
+
+ let mut builder = ListenerEndpoint::builder();
+
+ builder.listen_addr(ServerAddress::Tcp("[::]:7101".into(), sock_opt));
+
+ #[cfg(unix)]
+ let mut listener = builder.listen(None).await.unwrap();
+
+ #[cfg(windows)]
+ let mut listener = builder.listen().await.unwrap();
+
tokio::spawn(async move {
// just try to accept twice
listener.accept().await.unwrap();
@@ -389,8 +412,13 @@ mod test {
#[tokio::test]
async fn test_listen_uds() {
let addr = "/tmp/test_listen_uds";
- let mut listener = ListenerEndpoint::new(ServerAddress::Uds(addr.into(), None));
- listener.listen(None).await.unwrap();
+
+ let mut builder = ListenerEndpoint::builder();
+
+ builder.listen_addr(ServerAddress::Uds(addr.into(), None));
+
+ let mut listener = builder.listen(None).await.unwrap();
+
tokio::spawn(async move {
// just try to accept once
listener.accept().await.unwrap();
diff --git a/pingora-core/src/listeners/mod.rs b/pingora-core/src/listeners/mod.rs
index 09dc606..82c31cc 100644
--- a/pingora-core/src/listeners/mod.rs
+++ b/pingora-core/src/listeners/mod.rs
@@ -58,22 +58,30 @@ struct TransportStackBuilder {
}
impl TransportStackBuilder {
- pub fn build(&mut self, #[cfg(unix)] upgrade_listeners: Option<ListenFds>) -> TransportStack {
- TransportStack {
- l4: ListenerEndpoint::new(self.l4.clone()),
+ pub async fn build(
+ &mut self,
+ #[cfg(unix)] upgrade_listeners: Option<ListenFds>,
+ ) -> Result<TransportStack> {
+ let mut builder = ListenerEndpoint::builder();
+
+ builder.listen_addr(self.l4.clone());
+
+ #[cfg(unix)]
+ let l4 = builder.listen(upgrade_listeners).await?;
+
+ #[cfg(windows)]
+ let l4 = builder.listen().await?;
+
+ Ok(TransportStack {
+ l4,
tls: self.tls.take().map(|tls| Arc::new(tls.build())),
- #[cfg(unix)]
- upgrade_listeners,
- }
+ })
}
}
pub(crate) struct TransportStack {
l4: ListenerEndpoint,
tls: Option<Arc<Acceptor>>,
- // listeners sent from the old process for graceful upgrade
- #[cfg(unix)]
- upgrade_listeners: Option<ListenFds>,
}
impl TransportStack {
@@ -81,15 +89,6 @@ impl TransportStack {
self.l4.as_str()
}
- pub async fn listen(&mut self) -> Result<()> {
- self.l4
- .listen(
- #[cfg(unix)]
- self.upgrade_listeners.take(),
- )
- .await
- }
-
pub async fn accept(&mut self) -> Result<UninitializedStream> {
let stream = self.l4.accept().await?;
Ok(UninitializedStream {
@@ -199,19 +198,24 @@ impl Listeners {
self.stacks.push(TransportStackBuilder { l4, tls })
}
- pub(crate) fn build(
+ pub(crate) async fn build(
&mut self,
#[cfg(unix)] upgrade_listeners: Option<ListenFds>,
- ) -> Vec<TransportStack> {
- self.stacks
- .iter_mut()
- .map(|b| {
- b.build(
+ ) -> Result<Vec<TransportStack>> {
+ let mut stacks = Vec::with_capacity(self.stacks.len());
+
+ for b in self.stacks.iter_mut() {
+ let new_stack = b
+ .build(
#[cfg(unix)]
upgrade_listeners.clone(),
)
- })
- .collect()
+ .await?;
+
+ stacks.push(new_stack);
+ }
+
+ Ok(stacks)
}
pub(crate) fn cleanup(&self) {
@@ -234,14 +238,17 @@ mod test {
let mut listeners = Listeners::tcp(addr1);
listeners.add_tcp(addr2);
- let listeners = listeners.build(
- #[cfg(unix)]
- None,
- );
+ let listeners = listeners
+ .build(
+ #[cfg(unix)]
+ None,
+ )
+ .await
+ .unwrap();
+
assert_eq!(listeners.len(), 2);
for mut listener in listeners {
tokio::spawn(async move {
- listener.listen().await.unwrap();
// just try to accept once
let stream = listener.accept().await.unwrap();
stream.handshake().await.unwrap();
@@ -269,11 +276,12 @@ mod test {
#[cfg(unix)]
None,
)
+ .await
+ .unwrap()
.pop()
.unwrap();
tokio::spawn(async move {
- listener.listen().await.unwrap();
// just try to accept once
let stream = listener.accept().await.unwrap();
let mut stream = stream.handshake().await.unwrap();
diff --git a/pingora-core/src/services/listening.rs b/pingora-core/src/services/listening.rs
index 3cdb3b8..e2153dc 100644
--- a/pingora-core/src/services/listening.rs
+++ b/pingora-core/src/services/listening.rs
@@ -140,11 +140,6 @@ impl<A: ServerApp + Send + Sync + 'static> Service<A> {
mut stack: TransportStack,
mut shutdown: ShutdownWatch,
) {
- if let Err(e) = stack.listen().await {
- error!("Listen() failed: {e}");
- return;
- }
-
// the accept loop, until the system is shutting down
loop {
let new_io = tokio::select! { // TODO: consider biased for perf reason?
@@ -211,10 +206,15 @@ impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> {
shutdown: ShutdownWatch,
) {
let runtime = current_handle();
- let endpoints = self.listeners.build(
- #[cfg(unix)]
- fds,
- );
+ let endpoints = self
+ .listeners
+ .build(
+ #[cfg(unix)]
+ fds,
+ )
+ .await
+ .expect("Failed to build listeners");
+
let app_logic = self
.app_logic
.take()