diff options
author | Yuchen Wu <[email protected]> | 2024-06-24 15:09:45 -0700 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-07-12 11:24:29 -0700 |
commit | ee7f66082fd1578758c59855b03bda7d90b0a16a (patch) | |
tree | 855baae5ca6ae72ad82776ded73dc5d9c9769c12 | |
parent | 38a9d556b557a10f7815f50fca6a49de146ab5be (diff) | |
download | pingora-ee7f66082fd1578758c59855b03bda7d90b0a16a.tar.gz pingora-ee7f66082fd1578758c59855b03bda7d90b0a16a.zip |
Allow to create a new connection when the current one is shutting down
When we see a GOAWAY(NO_ERROR), the connector no longer fail the entire request.
Now the connector creates a new connection instead.
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/connectors/http/v2.rs | 50 |
2 files changed, 36 insertions, 16 deletions
@@ -1 +1 @@ -361d88592075f7f98f581b139d0349f1b70190a2
\ No newline at end of file +9ec5f295aba1ec889914afb8c3cbb44724a516f1
\ No newline at end of file diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index a082943..ce9213b 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -21,7 +21,7 @@ use crate::upstreams::peer::{Peer, ALPN}; use bytes::Bytes; use h2::client::SendRequest; -use log::{debug, warn}; +use log::debug; use parking_lot::{Mutex, RwLock}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; use pingora_pool::{ConnectionMeta, ConnectionPool, PoolNode}; @@ -52,6 +52,8 @@ pub(crate) struct ConnectionRefInner { max_streams: usize, // how many concurrent streams already active current_streams: AtomicUsize, + // The connection is gracefully shutting down, no more stream is allowed + shutting_down: AtomicBool, // because `SendRequest` doesn't actually have access to the underlying Stream, // we log info about timing and tcp info here. pub(crate) digest: Digest, @@ -78,12 +80,14 @@ impl ConnectionRef { id, max_streams, current_streams: AtomicUsize::new(0), + shutting_down: false.into(), digest, release_lock: Arc::new(Mutex::new(())), })) } pub fn more_streams_allowed(&self) -> bool { - self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed) + !self.is_shutting_down() + && self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed) } pub fn is_idle(&self) -> bool { @@ -114,6 +118,12 @@ impl ConnectionRef { *self.0.closed.borrow() } + // different from is_closed, existing streams can still be processed but can no longer create + // new stream. + pub fn is_shutting_down(&self) -> bool { + self.0.shutting_down.load(Ordering::Relaxed) + } + // spawn a stream if more stream is allowed, otherwise return Ok(None) pub async fn spawn_stream(&self) -> Result<Option<Http2Session>> { // Atomically check if the current_stream is over the limit @@ -124,13 +134,28 @@ impl ConnectionRef { self.0.current_streams.fetch_sub(1, Ordering::SeqCst); return Ok(None); } - let send_req = self.0.connection_stub.new_stream().await.map_err(|e| { - // fail to create the stream, reset the counter - self.0.current_streams.fetch_sub(1, Ordering::SeqCst); - e - })?; - Ok(Some(Http2Session::new(send_req, self.clone()))) + match self.0.connection_stub.new_stream().await { + Ok(send_req) => Ok(Some(Http2Session::new(send_req, self.clone()))), + Err(e) => { + // fail to create the stream, reset the counter + self.0.current_streams.fetch_sub(1, Ordering::SeqCst); + // Remote sends GOAWAY(NO_ERROR): graceful shutdown: this connection no longer + // accepts new streams. We can still try to create new connection. + if e.root_cause() + .downcast_ref::<Box<h2::Error>>() + .map(|e| { + e.is_go_away() && e.is_remote() && e.reason() == Some(h2::Reason::NO_ERROR) + }) + .unwrap_or(false) + { + self.0.shutting_down.store(true, Ordering::Relaxed); + Ok(None) + } else { + Err(e) + } + } + } } } @@ -277,11 +302,6 @@ impl Connector { .or_else(|| self.idle_pool.get(&reuse_hash)); if let Some(conn) = maybe_conn { let h2_stream = conn.spawn_stream().await?; - if h2_stream.is_none() { - warn!("connection from the pools should have free stream to allocate, current in use {}, max {}", - conn.0.current_streams.load(Ordering::Relaxed), - conn.0.max_streams); - } if conn.more_streams_allowed() { self.in_use_pool.insert(reuse_hash, conn); } @@ -318,8 +338,8 @@ impl Connector { // find and remove the conn stored in in_use_pool so that it could be put in the idle pool // if necessary let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn); - if conn.is_closed() { - // Already dead h2 connection + if conn.is_closed() || conn.is_shutting_down() { + // should never be put back to the pool return; } if conn.is_idle() { |