aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2024-06-24 15:09:45 -0700
committerYuchen Wu <[email protected]>2024-07-12 11:24:29 -0700
commitee7f66082fd1578758c59855b03bda7d90b0a16a (patch)
tree855baae5ca6ae72ad82776ded73dc5d9c9769c12
parent38a9d556b557a10f7815f50fca6a49de146ab5be (diff)
downloadpingora-ee7f66082fd1578758c59855b03bda7d90b0a16a.tar.gz
pingora-ee7f66082fd1578758c59855b03bda7d90b0a16a.zip
Allow to create a new connection when the current one is shutting down
When we see a GOAWAY(NO_ERROR), the connector no longer fail the entire request. Now the connector creates a new connection instead.
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/connectors/http/v2.rs50
2 files changed, 36 insertions, 16 deletions
diff --git a/.bleep b/.bleep
index cee92a7..68106ae 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-361d88592075f7f98f581b139d0349f1b70190a2 \ No newline at end of file
+9ec5f295aba1ec889914afb8c3cbb44724a516f1 \ No newline at end of file
diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs
index a082943..ce9213b 100644
--- a/pingora-core/src/connectors/http/v2.rs
+++ b/pingora-core/src/connectors/http/v2.rs
@@ -21,7 +21,7 @@ use crate::upstreams::peer::{Peer, ALPN};
use bytes::Bytes;
use h2::client::SendRequest;
-use log::{debug, warn};
+use log::debug;
use parking_lot::{Mutex, RwLock};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool, PoolNode};
@@ -52,6 +52,8 @@ pub(crate) struct ConnectionRefInner {
max_streams: usize,
// how many concurrent streams already active
current_streams: AtomicUsize,
+ // The connection is gracefully shutting down, no more stream is allowed
+ shutting_down: AtomicBool,
// because `SendRequest` doesn't actually have access to the underlying Stream,
// we log info about timing and tcp info here.
pub(crate) digest: Digest,
@@ -78,12 +80,14 @@ impl ConnectionRef {
id,
max_streams,
current_streams: AtomicUsize::new(0),
+ shutting_down: false.into(),
digest,
release_lock: Arc::new(Mutex::new(())),
}))
}
pub fn more_streams_allowed(&self) -> bool {
- self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed)
+ !self.is_shutting_down()
+ && self.0.max_streams > self.0.current_streams.load(Ordering::Relaxed)
}
pub fn is_idle(&self) -> bool {
@@ -114,6 +118,12 @@ impl ConnectionRef {
*self.0.closed.borrow()
}
+ // different from is_closed, existing streams can still be processed but can no longer create
+ // new stream.
+ pub fn is_shutting_down(&self) -> bool {
+ self.0.shutting_down.load(Ordering::Relaxed)
+ }
+
// spawn a stream if more stream is allowed, otherwise return Ok(None)
pub async fn spawn_stream(&self) -> Result<Option<Http2Session>> {
// Atomically check if the current_stream is over the limit
@@ -124,13 +134,28 @@ impl ConnectionRef {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
return Ok(None);
}
- let send_req = self.0.connection_stub.new_stream().await.map_err(|e| {
- // fail to create the stream, reset the counter
- self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
- e
- })?;
- Ok(Some(Http2Session::new(send_req, self.clone())))
+ match self.0.connection_stub.new_stream().await {
+ Ok(send_req) => Ok(Some(Http2Session::new(send_req, self.clone()))),
+ Err(e) => {
+ // fail to create the stream, reset the counter
+ self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
+ // Remote sends GOAWAY(NO_ERROR): graceful shutdown: this connection no longer
+ // accepts new streams. We can still try to create new connection.
+ if e.root_cause()
+ .downcast_ref::<Box<h2::Error>>()
+ .map(|e| {
+ e.is_go_away() && e.is_remote() && e.reason() == Some(h2::Reason::NO_ERROR)
+ })
+ .unwrap_or(false)
+ {
+ self.0.shutting_down.store(true, Ordering::Relaxed);
+ Ok(None)
+ } else {
+ Err(e)
+ }
+ }
+ }
}
}
@@ -277,11 +302,6 @@ impl Connector {
.or_else(|| self.idle_pool.get(&reuse_hash));
if let Some(conn) = maybe_conn {
let h2_stream = conn.spawn_stream().await?;
- if h2_stream.is_none() {
- warn!("connection from the pools should have free stream to allocate, current in use {}, max {}",
- conn.0.current_streams.load(Ordering::Relaxed),
- conn.0.max_streams);
- }
if conn.more_streams_allowed() {
self.in_use_pool.insert(reuse_hash, conn);
}
@@ -318,8 +338,8 @@ impl Connector {
// find and remove the conn stored in in_use_pool so that it could be put in the idle pool
// if necessary
let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);
- if conn.is_closed() {
- // Already dead h2 connection
+ if conn.is_closed() || conn.is_shutting_down() {
+ // should never be put back to the pool
return;
}
if conn.is_idle() {