aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorvicanso <[email protected]>2024-07-31 12:53:15 +0000
committerAndrew Hauck <[email protected]>2024-08-09 14:30:49 -0700
commit8a0c73f174a27a87c54426a748c4818b10de9425 (patch)
tree7ee86a583a585e88868bd4432b11ddaf729e7393
parent24d722920a537d88e3259f026b232f2f09d1662d (diff)
downloadpingora-8a0c73f174a27a87c54426a748c4818b10de9425.tar.gz
pingora-8a0c73f174a27a87c54426a748c4818b10de9425.zip
Support observe backend health status #225
--- test: add test for upstream health observe --- renamed the function and added doc to make it intelligible --- fix clippy error --- Merge branch 'main' into main --- test: fix test for backend do update Co-authored-by: Tree Xie <[email protected]> Includes-commit: 1421c267563f0161624e7dcc00a8f8d7be86d8b8 Includes-commit: 695d5490142945e11effe6ab87f4785ee0c7e4ab Includes-commit: 6a09b52c5e8b710c9fa4e0ac75420edd73556897 Includes-commit: 72d6ee09ae7488a372180cbf2b57f19d76eb8697 Includes-commit: e6c2af0e77283f73c785e73a702d739f4d5a8da0 Includes-commit: fb62869583667a220fab019bec8d7a80ef3caba2 Replicated-from: https://github.com/cloudflare/pingora/pull/325
-rw-r--r--.bleep2
-rw-r--r--pingora-load-balancing/src/health_check.rs111
-rw-r--r--pingora-load-balancing/src/lib.rs1
3 files changed, 112 insertions, 2 deletions
diff --git a/.bleep b/.bleep
index 4536067..4cef6a0 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-80970257fefa5cff505f63552dd2ae74372d501a \ No newline at end of file
+36269b8823b23381398508138bbab33c03ba7681 \ No newline at end of file
diff --git a/pingora-load-balancing/src/health_check.rs b/pingora-load-balancing/src/health_check.rs
index ba5e6c0..b02776d 100644
--- a/pingora-load-balancing/src/health_check.rs
+++ b/pingora-load-balancing/src/health_check.rs
@@ -24,6 +24,14 @@ use pingora_http::{RequestHeader, ResponseHeader};
use std::sync::Arc;
use std::time::Duration;
+#[async_trait]
+pub trait HealthObserve {
+ /// Observes the health of a [Backend], can be used for monitoring purposes.
+ async fn observe(&self, target: &Backend, healthy: bool);
+}
+/// Provided to a [HealthCheck] to observe changes to [Backend] health.
+pub type HealthObserveCallback = Box<dyn HealthObserve + Send + Sync>;
+
/// [HealthCheck] is the interface to implement health check for backends
#[async_trait]
pub trait HealthCheck {
@@ -31,6 +39,10 @@ pub trait HealthCheck {
///
/// `Ok(())`` if the check passes, otherwise the check fails.
async fn check(&self, target: &Backend) -> Result<()>;
+
+ /// Called when the health changes for a [Backend].
+ async fn health_status_change(&self, _target: &Backend, _healthy: bool) {}
+
/// This function defines how many *consecutive* checks should flip the health of a backend.
///
/// For example: with `success``: `true`: this function should return the
@@ -56,6 +68,8 @@ pub struct TcpHealthCheck {
/// set, it will also try to establish a TLS connection on top of the TCP connection.
pub peer_template: BasicPeer,
connector: TransportConnector,
+ /// A callback that is invoked when the `healthy` status changes for a [Backend].
+ pub health_changed_callback: Option<HealthObserveCallback>,
}
impl Default for TcpHealthCheck {
@@ -67,6 +81,7 @@ impl Default for TcpHealthCheck {
consecutive_failure: 1,
peer_template,
connector: TransportConnector::new(None),
+ health_changed_callback: None,
}
}
}
@@ -110,6 +125,12 @@ impl HealthCheck for TcpHealthCheck {
peer._address = target.addr.clone();
self.connector.get_stream(&peer).await.map(|_| {})
}
+
+ async fn health_status_change(&self, target: &Backend, healthy: bool) {
+ if let Some(callback) = &self.health_changed_callback {
+ callback.observe(target, healthy).await;
+ }
+ }
}
type Validator = Box<dyn Fn(&ResponseHeader) -> Result<()> + Send + Sync>;
@@ -147,6 +168,8 @@ pub struct HttpHealthCheck {
/// Sometimes the health check endpoint lives one a different port than the actual backend.
/// Setting this option allows the health check to perform on the given port of the backend IP.
pub port_override: Option<u16>,
+ /// A callback that is invoked when the `healthy` status changes for a [Backend].
+ pub health_changed_callback: Option<HealthObserveCallback>,
}
impl HttpHealthCheck {
@@ -174,6 +197,7 @@ impl HttpHealthCheck {
req,
validator: None,
port_override: None,
+ health_changed_callback: None,
}
}
@@ -235,6 +259,11 @@ impl HealthCheck for HttpHealthCheck {
Ok(())
}
+ async fn health_status_change(&self, target: &Backend, healthy: bool) {
+ if let Some(callback) = &self.health_changed_callback {
+ callback.observe(target, healthy).await;
+ }
+ }
}
#[derive(Clone)]
@@ -313,8 +342,14 @@ impl Health {
#[cfg(test)]
mod test {
+ use std::{
+ collections::{BTreeSet, HashMap},
+ sync::atomic::{AtomicU16, Ordering},
+ };
+
use super::*;
- use crate::SocketAddr;
+ use crate::{discovery, Backends, SocketAddr};
+ use async_trait::async_trait;
use http::Extensions;
#[tokio::test]
@@ -387,4 +422,78 @@ mod test {
assert!(http_check.check(&backend).await.is_ok());
}
+
+ #[tokio::test]
+ async fn test_health_observe() {
+ struct Observe {
+ unhealthy_count: Arc<AtomicU16>,
+ }
+ #[async_trait]
+ impl HealthObserve for Observe {
+ async fn observe(&self, _target: &Backend, healthy: bool) {
+ if !healthy {
+ self.unhealthy_count.fetch_add(1, Ordering::Relaxed);
+ }
+ }
+ }
+
+ let good_backend = Backend::new("127.0.0.1:79").unwrap();
+ let new_good_backends = || -> (BTreeSet<Backend>, HashMap<u64, bool>) {
+ let mut healthy = HashMap::new();
+ healthy.insert(good_backend.hash_key(), true);
+ let mut backends = BTreeSet::new();
+ backends.extend(vec![good_backend.clone()]);
+ (backends, healthy)
+ };
+ // tcp health check
+ {
+ let unhealthy_count = Arc::new(AtomicU16::new(0));
+ let ob = Observe {
+ unhealthy_count: unhealthy_count.clone(),
+ };
+ let bob = Box::new(ob);
+ let tcp_check = TcpHealthCheck {
+ health_changed_callback: Some(bob),
+ ..Default::default()
+ };
+
+ let discovery = discovery::Static::default();
+ let mut backends = Backends::new(Box::new(discovery));
+ backends.set_health_check(Box::new(tcp_check));
+ let result = new_good_backends();
+ backends.do_update(result.0, result.1, |_backend: Arc<BTreeSet<Backend>>| {});
+ // the backend is ready
+ assert!(backends.ready(&good_backend));
+
+ // run health check
+ backends.run_health_check(false).await;
+ assert!(1 == unhealthy_count.load(Ordering::Relaxed));
+ // backend is unhealthy
+ assert!(!backends.ready(&good_backend));
+ }
+
+ // http health check
+ {
+ let unhealthy_count = Arc::new(AtomicU16::new(0));
+ let ob = Observe {
+ unhealthy_count: unhealthy_count.clone(),
+ };
+ let bob = Box::new(ob);
+
+ let mut https_check = HttpHealthCheck::new("one.one.one.one", true);
+ https_check.health_changed_callback = Some(bob);
+
+ let discovery = discovery::Static::default();
+ let mut backends = Backends::new(Box::new(discovery));
+ backends.set_health_check(Box::new(https_check));
+ let result = new_good_backends();
+ backends.do_update(result.0, result.1, |_backend: Arc<BTreeSet<Backend>>| {});
+ // the backend is ready
+ assert!(backends.ready(&good_backend));
+ // run health check
+ backends.run_health_check(false).await;
+ assert!(1 == unhealthy_count.load(Ordering::Relaxed));
+ assert!(!backends.ready(&good_backend));
+ }
+ }
}
diff --git a/pingora-load-balancing/src/lib.rs b/pingora-load-balancing/src/lib.rs
index 4009dc1..4a7433e 100644
--- a/pingora-load-balancing/src/lib.rs
+++ b/pingora-load-balancing/src/lib.rs
@@ -266,6 +266,7 @@ impl Backends {
let flipped =
h.observe_health(errored.is_none(), check.health_threshold(errored.is_none()));
if flipped {
+ check.health_status_change(backend, errored.is_none()).await;
if let Some(e) = errored {
warn!("{backend:?} becomes unhealthy, {e}");
} else {