diff options
author | Noah Kennedy <[email protected]> | 2024-11-04 14:47:25 -0600 |
---|---|---|
committer | Noah Kennedy <[email protected]> | 2024-11-04 15:55:45 -0600 |
commit | ee42db5e5d7fe2569111528a3c4e5fba240caf38 (patch) | |
tree | 9a7af30350483eef25c83e45a43aa37b3af61861 | |
parent | 4ea803251dd8e210ce87c81f41b75b7b0282c076 (diff) | |
download | pingora-ee42db5e5d7fe2569111528a3c4e5fba240caf38.tar.gz pingora-ee42db5e5d7fe2569111528a3c4e5fba240caf38.zip |
add builder api for pingora listeners:
This change takes the existing API for ListenerEndpoint and modifies it to use a typical idiomatic builder style.
There are additional improvements which I feel can be made here, but I believe that it would be better to do them in subsequent commits in order to keep changes as incremental as possible.
In particular, I think that making socket options into sub-builder is probably a good move.
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/listeners/l4.rs | 122 | ||||
-rw-r--r-- | pingora-core/src/listeners/mod.rs | 72 | ||||
-rw-r--r-- | pingora-core/src/services/listening.rs | 18 |
4 files changed, 125 insertions, 89 deletions
@@ -1 +1 @@ -396362a56a579d4e83f4022cdaf943c858f52a84
\ No newline at end of file +a76fd1ca898a9518c6aa0c7c8fa5ed845dd1bcbb
\ No newline at end of file diff --git a/pingora-core/src/listeners/l4.rs b/pingora-core/src/listeners/l4.rs index 43c4939..c832320 100644 --- a/pingora-core/src/listeners/l4.rs +++ b/pingora-core/src/listeners/l4.rs @@ -255,51 +255,71 @@ async fn bind(addr: &ServerAddress) -> Result<Listener> { pub struct ListenerEndpoint { listen_addr: ServerAddress, - listener: Option<Listener>, + listener: Listener, } -impl ListenerEndpoint { - pub fn new(listen_addr: ServerAddress) -> Self { - ListenerEndpoint { - listen_addr, - listener: None, - } +#[derive(Default)] +pub struct ListenerEndpointBuilder { + listen_addr: Option<ServerAddress>, +} + +impl ListenerEndpointBuilder { + pub fn new() -> ListenerEndpointBuilder { + Self { listen_addr: None } } - pub fn as_str(&self) -> &str { - self.listen_addr.as_ref() + pub fn listen_addr(&mut self, addr: ServerAddress) -> &mut Self { + self.listen_addr = Some(addr); + self } #[cfg(unix)] - pub async fn listen(&mut self, fds: Option<ListenFds>) -> Result<()> { - if self.listener.is_some() { - return Ok(()); - } + pub async fn listen(self, fds: Option<ListenFds>) -> Result<ListenerEndpoint> { + let listen_addr = self + .listen_addr + .expect("Tried to listen with no addr specified"); let listener = if let Some(fds_table) = fds { - let addr = self.listen_addr.as_ref(); + let addr_str = listen_addr.as_ref(); + // consider make this mutex std::sync::Mutex or OnceCell let mut table = fds_table.lock().await; - if let Some(fd) = table.get(addr.as_ref()) { - from_raw_fd(&self.listen_addr, *fd)? + + if let Some(fd) = table.get(addr_str) { + from_raw_fd(&listen_addr, *fd)? } else { // not found - let listener = bind(&self.listen_addr).await?; - table.add(addr.to_string(), listener.as_raw_fd()); + let listener = bind(&listen_addr).await?; + table.add(addr_str.to_string(), listener.as_raw_fd()); listener } } else { // not found, no fd table - bind(&self.listen_addr).await? + bind(&listen_addr).await? }; - self.listener = Some(listener); - Ok(()) + + Ok(ListenerEndpoint { + listen_addr, + listener, + }) } #[cfg(windows)] - pub async fn listen(&mut self) -> Result<()> { - self.listener = Some(bind(&self.listen_addr).await?); - Ok(()) + pub async fn listen(self) -> Result<ListenerEndpoint> { + Ok(ListenerEndpoint { + listen_addr, + listener: bind(&listen_addr).await?, + }) + } +} + +impl ListenerEndpoint { + pub fn builder() -> ListenerEndpointBuilder { + ListenerEndpointBuilder::new() + } + + pub fn as_str(&self) -> &str { + self.listen_addr.as_ref() } fn apply_stream_settings(&self, stream: &mut Stream) -> Result<()> { @@ -321,11 +341,8 @@ impl ListenerEndpoint { } pub async fn accept(&mut self) -> Result<Stream> { - let Some(listener) = self.listener.as_mut() else { - // panic otherwise this thing dead loop - panic!("Need to call listen() first"); - }; - let mut stream = listener + let mut stream = self + .listener .accept() .await .or_err(AcceptError, "Fail to accept()")?; @@ -341,14 +358,17 @@ mod test { #[tokio::test] async fn test_listen_tcp() { let addr = "127.0.0.1:7100"; - let mut listener = ListenerEndpoint::new(ServerAddress::Tcp(addr.into(), None)); - listener - .listen( - #[cfg(unix)] - None, - ) - .await - .unwrap(); + + let mut builder = ListenerEndpoint::builder(); + + builder.listen_addr(ServerAddress::Tcp(addr.into(), None)); + + #[cfg(unix)] + let mut listener = builder.listen(None).await.unwrap(); + + #[cfg(windows)] + let mut listener = builder.listen().await.unwrap(); + tokio::spawn(async move { // just try to accept once listener.accept().await.unwrap(); @@ -364,14 +384,17 @@ mod test { ipv6_only: Some(true), ..Default::default() }); - let mut listener = ListenerEndpoint::new(ServerAddress::Tcp("[::]:7101".into(), sock_opt)); - listener - .listen( - #[cfg(unix)] - None, - ) - .await - .unwrap(); + + let mut builder = ListenerEndpoint::builder(); + + builder.listen_addr(ServerAddress::Tcp("[::]:7101".into(), sock_opt)); + + #[cfg(unix)] + let mut listener = builder.listen(None).await.unwrap(); + + #[cfg(windows)] + let mut listener = builder.listen().await.unwrap(); + tokio::spawn(async move { // just try to accept twice listener.accept().await.unwrap(); @@ -389,8 +412,13 @@ mod test { #[tokio::test] async fn test_listen_uds() { let addr = "/tmp/test_listen_uds"; - let mut listener = ListenerEndpoint::new(ServerAddress::Uds(addr.into(), None)); - listener.listen(None).await.unwrap(); + + let mut builder = ListenerEndpoint::builder(); + + builder.listen_addr(ServerAddress::Uds(addr.into(), None)); + + let mut listener = builder.listen(None).await.unwrap(); + tokio::spawn(async move { // just try to accept once listener.accept().await.unwrap(); diff --git a/pingora-core/src/listeners/mod.rs b/pingora-core/src/listeners/mod.rs index 09dc606..82c31cc 100644 --- a/pingora-core/src/listeners/mod.rs +++ b/pingora-core/src/listeners/mod.rs @@ -58,22 +58,30 @@ struct TransportStackBuilder { } impl TransportStackBuilder { - pub fn build(&mut self, #[cfg(unix)] upgrade_listeners: Option<ListenFds>) -> TransportStack { - TransportStack { - l4: ListenerEndpoint::new(self.l4.clone()), + pub async fn build( + &mut self, + #[cfg(unix)] upgrade_listeners: Option<ListenFds>, + ) -> Result<TransportStack> { + let mut builder = ListenerEndpoint::builder(); + + builder.listen_addr(self.l4.clone()); + + #[cfg(unix)] + let l4 = builder.listen(upgrade_listeners).await?; + + #[cfg(windows)] + let l4 = builder.listen().await?; + + Ok(TransportStack { + l4, tls: self.tls.take().map(|tls| Arc::new(tls.build())), - #[cfg(unix)] - upgrade_listeners, - } + }) } } pub(crate) struct TransportStack { l4: ListenerEndpoint, tls: Option<Arc<Acceptor>>, - // listeners sent from the old process for graceful upgrade - #[cfg(unix)] - upgrade_listeners: Option<ListenFds>, } impl TransportStack { @@ -81,15 +89,6 @@ impl TransportStack { self.l4.as_str() } - pub async fn listen(&mut self) -> Result<()> { - self.l4 - .listen( - #[cfg(unix)] - self.upgrade_listeners.take(), - ) - .await - } - pub async fn accept(&mut self) -> Result<UninitializedStream> { let stream = self.l4.accept().await?; Ok(UninitializedStream { @@ -199,19 +198,24 @@ impl Listeners { self.stacks.push(TransportStackBuilder { l4, tls }) } - pub(crate) fn build( + pub(crate) async fn build( &mut self, #[cfg(unix)] upgrade_listeners: Option<ListenFds>, - ) -> Vec<TransportStack> { - self.stacks - .iter_mut() - .map(|b| { - b.build( + ) -> Result<Vec<TransportStack>> { + let mut stacks = Vec::with_capacity(self.stacks.len()); + + for b in self.stacks.iter_mut() { + let new_stack = b + .build( #[cfg(unix)] upgrade_listeners.clone(), ) - }) - .collect() + .await?; + + stacks.push(new_stack); + } + + Ok(stacks) } pub(crate) fn cleanup(&self) { @@ -234,14 +238,17 @@ mod test { let mut listeners = Listeners::tcp(addr1); listeners.add_tcp(addr2); - let listeners = listeners.build( - #[cfg(unix)] - None, - ); + let listeners = listeners + .build( + #[cfg(unix)] + None, + ) + .await + .unwrap(); + assert_eq!(listeners.len(), 2); for mut listener in listeners { tokio::spawn(async move { - listener.listen().await.unwrap(); // just try to accept once let stream = listener.accept().await.unwrap(); stream.handshake().await.unwrap(); @@ -269,11 +276,12 @@ mod test { #[cfg(unix)] None, ) + .await + .unwrap() .pop() .unwrap(); tokio::spawn(async move { - listener.listen().await.unwrap(); // just try to accept once let stream = listener.accept().await.unwrap(); let mut stream = stream.handshake().await.unwrap(); diff --git a/pingora-core/src/services/listening.rs b/pingora-core/src/services/listening.rs index 3cdb3b8..e2153dc 100644 --- a/pingora-core/src/services/listening.rs +++ b/pingora-core/src/services/listening.rs @@ -140,11 +140,6 @@ impl<A: ServerApp + Send + Sync + 'static> Service<A> { mut stack: TransportStack, mut shutdown: ShutdownWatch, ) { - if let Err(e) = stack.listen().await { - error!("Listen() failed: {e}"); - return; - } - // the accept loop, until the system is shutting down loop { let new_io = tokio::select! { // TODO: consider biased for perf reason? @@ -211,10 +206,15 @@ impl<A: ServerApp + Send + Sync + 'static> ServiceTrait for Service<A> { shutdown: ShutdownWatch, ) { let runtime = current_handle(); - let endpoints = self.listeners.build( - #[cfg(unix)] - fds, - ); + let endpoints = self + .listeners + .build( + #[cfg(unix)] + fds, + ) + .await + .expect("Failed to build listeners"); + let app_logic = self .app_logic .take() |