aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAndrew Hauck <[email protected]>2024-07-20 14:50:00 -0700
committerAndrew Hauck <[email protected]>2024-07-22 17:40:46 +0000
commit79e42d953231e88a59be96079deae80d3a8da341 (patch)
treeb9ccdd118f8600a1143b074bf95fd419da466c58
parent1e75e18ed9d2cffb0a2429aa68fdf3d54da8f566 (diff)
downloadpingora-79e42d953231e88a59be96079deae80d3a8da341.tar.gz
pingora-79e42d953231e88a59be96079deae80d3a8da341.zip
Add callback function to Backends update() to address atomicity issue when building selector
-rw-r--r--.bleep2
-rw-r--r--pingora-load-balancing/src/lib.rs119
2 files changed, 100 insertions, 21 deletions
diff --git a/.bleep b/.bleep
index 311a546..750970d 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-78a170341a0fb030b8bcb2afe84afb268cdc5b2d \ No newline at end of file
+9fdf48d67b78675c989f51ec18829a81fe6976ef \ No newline at end of file
diff --git a/pingora-load-balancing/src/lib.rs b/pingora-load-balancing/src/lib.rs
index c950c85..07062df 100644
--- a/pingora-load-balancing/src/lib.rs
+++ b/pingora-load-balancing/src/lib.rs
@@ -130,8 +130,17 @@ impl Backends {
self.health_check = Some(hc.into())
}
- /// Return true when the new is different from the current set of backends
- fn do_update(&self, new_backends: BTreeSet<Backend>, enablement: HashMap<u64, bool>) -> bool {
+ /// Updates backends when the new is different from the current set,
+ /// the callback will be invoked when the new set of backend is different
+ /// from the current one so that the caller can update the selector accordingly.
+ fn do_update<F>(
+ &self,
+ new_backends: BTreeSet<Backend>,
+ enablement: HashMap<u64, bool>,
+ callback: F,
+ ) where
+ F: Fn(Arc<BTreeSet<Backend>>),
+ {
if (**self.backends.load()) != new_backends {
let old_health = self.health.load();
let mut health = HashMap::with_capacity(new_backends.len());
@@ -147,10 +156,14 @@ impl Backends {
health.insert(hash_key, backend_health);
}
- // TODO: put backend and health under 1 ArcSwap so that this update is atomic
- self.backends.store(Arc::new(new_backends));
+ // TODO: put this all under 1 ArcSwap so the update is atomic
+ // It's important the `callback()` executes first since computing selector backends might
+ // be expensive. For example, if a caller checks `backends` to see if any are available
+ // they may encounter false positives if the selector isn't ready yet.
+ let new_backends = Arc::new(new_backends);
+ callback(new_backends.clone());
+ self.backends.store(new_backends);
self.health.store(Arc::new(health));
- true
} else {
// no backend change, just check enablement
for (hash_key, backend_enabled) in enablement.iter() {
@@ -160,7 +173,6 @@ impl Backends {
backend_health.enable(*backend_enabled);
}
}
- false
}
}
@@ -199,12 +211,15 @@ impl Backends {
/// Call the service discovery method to update the collection of backends.
///
- /// Return `true` when the new collection is different from the current set of backends.
- /// This return value is useful to tell the caller when to rebuild things that are expensive to
- /// update, such as consistent hashing rings.
- pub async fn update(&self) -> Result<bool> {
+ /// The callback will be invoked when the new set of backend is different
+ /// from the current one so that the caller can update the selector accordingly.
+ pub async fn update<F>(&self, callback: F) -> Result<()>
+ where
+ F: Fn(Arc<BTreeSet<Backend>>),
+ {
let (new_backends, enablement) = self.discovery.discover().await?;
- Ok(self.do_update(new_backends, enablement))
+ self.do_update(new_backends, enablement, callback);
+ Ok(())
}
/// Run health check on all backends if it is set.
@@ -320,11 +335,9 @@ where
/// This function will be called every `update_frequency` if this [LoadBalancer] instance
/// is running as a background service.
pub async fn update(&self) -> Result<()> {
- if self.backends.update().await? {
- self.selector
- .store(Arc::new(S::build(&self.backends.get_backend())))
- }
- Ok(())
+ self.backends
+ .update(|backends| self.selector.store(Arc::new(S::build(&backends))))
+ .await
}
/// Return the first healthy [Backend] according to the selection algorithm and the
@@ -378,6 +391,8 @@ where
#[cfg(test)]
mod test {
+ use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
+
use super::*;
use async_trait::async_trait;
@@ -408,10 +423,20 @@ mod test {
backends.set_health_check(check);
// true: new backend discovered
- assert!(backends.update().await.unwrap());
+ let updated = AtomicBool::new(false);
+ backends
+ .update(|_| updated.store(true, Relaxed))
+ .await
+ .unwrap();
+ assert!(updated.load(Relaxed));
// false: no new backend discovered
- assert!(!backends.update().await.unwrap());
+ let updated = AtomicBool::new(false);
+ backends
+ .update(|_| updated.store(true, Relaxed))
+ .await
+ .unwrap();
+ assert!(!updated.load(Relaxed));
backends.run_health_check(false).await;
@@ -449,7 +474,14 @@ mod test {
let discovery = TestDiscovery(discovery);
let backends = Backends::new(Box::new(discovery));
- assert!(backends.update().await.unwrap());
+
+ // true: new backend discovered
+ let updated = AtomicBool::new(false);
+ backends
+ .update(|_| updated.store(true, Relaxed))
+ .await
+ .unwrap();
+ assert!(updated.load(Relaxed));
let backend = backends.get_backend();
assert!(backend.contains(&good1));
@@ -476,7 +508,12 @@ mod test {
backends.set_health_check(check);
// true: new backend discovered
- assert!(backends.update().await.unwrap());
+ let updated = AtomicBool::new(false);
+ backends
+ .update(|_| updated.store(true, Relaxed))
+ .await
+ .unwrap();
+ assert!(updated.load(Relaxed));
backends.run_health_check(true).await;
@@ -484,4 +521,46 @@ mod test {
assert!(backends.ready(&good2));
assert!(!backends.ready(&bad));
}
+
+ mod thread_safety {
+ use super::*;
+
+ struct MockDiscovery {
+ expected: usize,
+ }
+ #[async_trait]
+ impl ServiceDiscovery for MockDiscovery {
+ async fn discover(&self) -> Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
+ let mut d = BTreeSet::new();
+ let mut m = HashMap::with_capacity(self.expected);
+ for i in 0..self.expected {
+ let b = Backend::new(&format!("1.1.1.1:{i}")).unwrap();
+ m.insert(i as u64, true);
+ d.insert(b);
+ }
+ Ok((d, m))
+ }
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_consistency() {
+ let expected = 3000;
+ let discovery = MockDiscovery { expected };
+ let lb = Arc::new(LoadBalancer::<selection::Consistent>::from_backends(
+ Backends::new(Box::new(discovery)),
+ ));
+ let lb2 = lb.clone();
+
+ tokio::spawn(async move {
+ assert!(lb2.update().await.is_ok());
+ });
+ let mut backend_count = 0;
+ while backend_count == 0 {
+ let backends = lb.backends();
+ backend_count = backends.backends.load_full().len();
+ }
+ assert_eq!(backend_count, expected);
+ assert!(lb.select_with(b"test", 1, |_, _| true).is_some());
+ }
+ }
}