aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-limits
diff options
context:
space:
mode:
authorKevin Guthrie <[email protected]>2024-09-13 14:28:08 -0400
committerKevin Guthrie <[email protected]>2024-10-07 14:23:08 -0400
commit0021d41522a464cd91e893911930260bb4d0f115 (patch)
tree0754dde6af9e61c97cbe0e73af02c6a29e83623b /pingora-limits
parent5759d38624a43ed591bd53e9c0c56a38eb003eaa (diff)
downloadpingora-0021d41522a464cd91e893911930260bb4d0f115.tar.gz
pingora-0021d41522a464cd91e893911930260bb4d0f115.zip
Add function for custom rate measurements
Diffstat (limited to 'pingora-limits')
-rw-r--r--pingora-limits/Cargo.toml1
-rw-r--r--pingora-limits/src/rate.rs153
2 files changed, 153 insertions, 1 deletions
diff --git a/pingora-limits/Cargo.toml b/pingora-limits/Cargo.toml
index 5bb1ef2..0df5c23 100644
--- a/pingora-limits/Cargo.toml
+++ b/pingora-limits/Cargo.toml
@@ -21,6 +21,7 @@ ahash = { workspace = true }
rand = "0"
dashmap = "5"
dhat = "0"
+float-cmp = "0.9.0"
[[bench]]
name = "benchmark"
diff --git a/pingora-limits/src/rate.rs b/pingora-limits/src/rate.rs
index 33b0700..fee5d11 100644
--- a/pingora-limits/src/rate.rs
+++ b/pingora-limits/src/rate.rs
@@ -18,7 +18,25 @@
use crate::estimator::Estimator;
use std::hash::Hash;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
-use std::time::Instant;
+use std::time::{Duration, Instant};
+
+/// Input struct to custom functions for calculating rate. Includes the counts
+/// from the current interval, previous interval, the configured duration of an
+/// interval, and the fraction into the current interval that the sample was
+/// taken.
+///
+/// Ex. If the interval to the Rate instance is `10s`, and the rate calculation
+/// is taken at 2 seconds after the start of the current interval, then the
+/// fraction of the current interval returned in this struct will be `0.2`
+/// meaning 20% of the current interval has elapsed
+#[non_exhaustive]
+#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
+pub struct RateComponents {
+ pub prev_samples: isize,
+ pub curr_samples: isize,
+ pub interval: Duration,
+ pub current_interval_fraction: f64,
+}
/// A stable rate estimator that reports the rate of events in the past `interval` time.
/// It returns the average rate between `interval` * 2 and `interval` while collecting the events
@@ -34,6 +52,7 @@ pub struct Rate {
// Use u64 below instead of Instant because we want atomic operation
reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous`
last_reset_time: AtomicU64, // the timestamp in ms since `start`
+ interval: Duration,
}
// see inflight module for the meaning for these numbers
@@ -60,6 +79,7 @@ impl Rate {
start: Instant::now(),
reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow
last_reset_time: AtomicU64::new(0),
+ interval,
}
}
@@ -138,10 +158,53 @@ impl Rate {
past_ms
}
+
+ /// Get the current rate as calculated with the given closure. This closure
+ /// will take an argument containing all the accessible information about
+ /// the rate from this object and allow the caller to make their own
+ /// estimation of rate based on:
+ ///
+ /// 1. The accumulated samples in the current interval (in progress)
+ /// 2. The accumulated samples in the previous interval (completed)
+ /// 3. The size of the interval
+ /// 4. Elapsed fraction of current interval for this sample (0..1)
+ ///
+ pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
+ where
+ F: FnMut(RateComponents) -> T,
+ K: Hash,
+ {
+ let past_ms = self.maybe_reset();
+
+ let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
+ // already missed 2 intervals, no data, just report 0 as a short cut
+ (0, 0)
+ } else if past_ms >= self.reset_interval_ms {
+ (self.previous(self.red_or_blue()).get(key), 0)
+ } else {
+ let (prev_est, curr_est) = if self.red_or_blue() {
+ (&self.blue_slot, &self.red_slot)
+ } else {
+ (&self.red_slot, &self.blue_slot)
+ };
+
+ (prev_est.get(key), curr_est.get(key))
+ };
+
+ rate_calc_fn(RateComponents {
+ interval: self.interval,
+ prev_samples,
+ curr_samples,
+ current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
+ / self.reset_interval_ms as f64,
+ })
+ }
}
#[cfg(test)]
mod tests {
+ use float_cmp::assert_approx_eq;
+
use super::*;
use std::thread::sleep;
use std::time::Duration;
@@ -172,4 +235,92 @@ mod tests {
sleep(Duration::from_secs(1));
assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 seconds
}
+
+ /// Assertion that 2 numbers are close within a generous margin. These
+ /// tests are doing a lot of literal sleeping, so the measured results
+ /// can't be accurate or consistent. This function does an assert with a
+ /// generous tolerance
+ fn assert_eq_ish(left: f64, right: f64) {
+ assert_approx_eq!(f64, left, right, epsilon = 0.15)
+ }
+
+ #[test]
+ fn test_observe_rate_custom_90_10() {
+ let r = Rate::new(Duration::from_secs(1));
+ let key = 1;
+
+ let rate_90_10_fn = |rate_info: RateComponents| {
+ let prev = rate_info.prev_samples as f64;
+ let curr = rate_info.curr_samples as f64;
+ (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
+ };
+
+ // second: 0
+ let observed = r.observe(&key, 3);
+ assert_eq!(observed, 3);
+ let observed = r.observe(&key, 2);
+ assert_eq!(observed, 5);
+ assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
+
+ // second: 1
+ sleep(Duration::from_secs(1));
+ let observed = r.observe(&key, 4);
+ assert_eq!(observed, 4);
+ assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
+
+ // second: 2
+ sleep(Duration::from_secs(1));
+ assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
+
+ // second: 3
+ sleep(Duration::from_secs(1));
+ assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
+ }
+
+ // this is the function described in this post
+ // https://blog.cloudflare.com/counting-things-a-lot-of-different-things/
+ #[test]
+ fn test_observe_rate_custom_proportional() {
+ let r = Rate::new(Duration::from_secs(1));
+ let key = 1;
+
+ let rate_prop_fn = |rate_info: RateComponents| {
+ let prev = rate_info.prev_samples as f64;
+ let curr = rate_info.curr_samples as f64;
+ let interval_secs = rate_info.interval.as_secs_f64();
+ let interval_fraction = rate_info.current_interval_fraction;
+
+ let weighted_count = prev * (1. - interval_fraction) + curr * interval_fraction;
+ weighted_count / interval_secs
+ };
+
+ // second: 0
+ let observed = r.observe(&key, 3);
+ assert_eq!(observed, 3);
+ let observed = r.observe(&key, 2);
+ assert_eq!(observed, 5);
+ assert_eq_ish(r.rate_with(&key, rate_prop_fn), 0.);
+
+ // second 0.5
+ sleep(Duration::from_secs_f64(0.5));
+ assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.5);
+
+ // second: 1
+ sleep(Duration::from_secs_f64(0.5));
+ let observed = r.observe(&key, 4);
+ assert_eq!(observed, 4);
+ assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5.);
+
+ // second 1.75
+ sleep(Duration::from_secs_f64(0.75));
+ assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.25 + 4. * 0.75);
+
+ // second: 2
+ sleep(Duration::from_secs_f64(0.25));
+ assert_eq_ish(r.rate_with(&key, rate_prop_fn), 4.);
+
+ // second: 3
+ sleep(Duration::from_secs(1));
+ assert_eq!(r.rate_with(&key, rate_prop_fn), 0f64);
+ }
}