From 0021d41522a464cd91e893911930260bb4d0f115 Mon Sep 17 00:00:00 2001 From: Kevin Guthrie Date: Fri, 13 Sep 2024 14:28:08 -0400 Subject: Add function for custom rate measurements --- .bleep | 2 +- pingora-limits/Cargo.toml | 1 + pingora-limits/src/rate.rs | 153 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 154 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index fc84a1c..79df49e 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -e278ef01df84b21d283864c494b51614a12171de \ No newline at end of file +5a6061f33868fbf1f246f048941964592bc73d85 \ No newline at end of file diff --git a/pingora-limits/Cargo.toml b/pingora-limits/Cargo.toml index 5bb1ef2..0df5c23 100644 --- a/pingora-limits/Cargo.toml +++ b/pingora-limits/Cargo.toml @@ -21,6 +21,7 @@ ahash = { workspace = true } rand = "0" dashmap = "5" dhat = "0" +float-cmp = "0.9.0" [[bench]] name = "benchmark" diff --git a/pingora-limits/src/rate.rs b/pingora-limits/src/rate.rs index 33b0700..fee5d11 100644 --- a/pingora-limits/src/rate.rs +++ b/pingora-limits/src/rate.rs @@ -18,7 +18,25 @@ use crate::estimator::Estimator; use std::hash::Hash; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant}; + +/// Input struct to custom functions for calculating rate. Includes the counts +/// from the current interval, previous interval, the configured duration of an +/// interval, and the fraction into the current interval that the sample was +/// taken. +/// +/// Ex. If the interval to the Rate instance is `10s`, and the rate calculation +/// is taken at 2 seconds after the start of the current interval, then the +/// fraction of the current interval returned in this struct will be `0.2` +/// meaning 20% of the current interval has elapsed +#[non_exhaustive] +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +pub struct RateComponents { + pub prev_samples: isize, + pub curr_samples: isize, + pub interval: Duration, + pub current_interval_fraction: f64, +} /// A stable rate estimator that reports the rate of events in the past `interval` time. /// It returns the average rate between `interval` * 2 and `interval` while collecting the events @@ -34,6 +52,7 @@ pub struct Rate { // Use u64 below instead of Instant because we want atomic operation reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous` last_reset_time: AtomicU64, // the timestamp in ms since `start` + interval: Duration, } // see inflight module for the meaning for these numbers @@ -60,6 +79,7 @@ impl Rate { start: Instant::now(), reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow last_reset_time: AtomicU64::new(0), + interval, } } @@ -138,10 +158,53 @@ impl Rate { past_ms } + + /// Get the current rate as calculated with the given closure. This closure + /// will take an argument containing all the accessible information about + /// the rate from this object and allow the caller to make their own + /// estimation of rate based on: + /// + /// 1. The accumulated samples in the current interval (in progress) + /// 2. The accumulated samples in the previous interval (completed) + /// 3. The size of the interval + /// 4. Elapsed fraction of current interval for this sample (0..1) + /// + pub fn rate_with(&self, key: &K, mut rate_calc_fn: F) -> T + where + F: FnMut(RateComponents) -> T, + K: Hash, + { + let past_ms = self.maybe_reset(); + + let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 { + // already missed 2 intervals, no data, just report 0 as a short cut + (0, 0) + } else if past_ms >= self.reset_interval_ms { + (self.previous(self.red_or_blue()).get(key), 0) + } else { + let (prev_est, curr_est) = if self.red_or_blue() { + (&self.blue_slot, &self.red_slot) + } else { + (&self.red_slot, &self.blue_slot) + }; + + (prev_est.get(key), curr_est.get(key)) + }; + + rate_calc_fn(RateComponents { + interval: self.interval, + prev_samples, + curr_samples, + current_interval_fraction: (past_ms % self.reset_interval_ms) as f64 + / self.reset_interval_ms as f64, + }) + } } #[cfg(test)] mod tests { + use float_cmp::assert_approx_eq; + use super::*; use std::thread::sleep; use std::time::Duration; @@ -172,4 +235,92 @@ mod tests { sleep(Duration::from_secs(1)); assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 seconds } + + /// Assertion that 2 numbers are close within a generous margin. These + /// tests are doing a lot of literal sleeping, so the measured results + /// can't be accurate or consistent. This function does an assert with a + /// generous tolerance + fn assert_eq_ish(left: f64, right: f64) { + assert_approx_eq!(f64, left, right, epsilon = 0.15) + } + + #[test] + fn test_observe_rate_custom_90_10() { + let r = Rate::new(Duration::from_secs(1)); + let key = 1; + + let rate_90_10_fn = |rate_info: RateComponents| { + let prev = rate_info.prev_samples as f64; + let curr = rate_info.curr_samples as f64; + (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64() + }; + + // second: 0 + let observed = r.observe(&key, 3); + assert_eq!(observed, 3); + let observed = r.observe(&key, 2); + assert_eq!(observed, 5); + assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9); + + // second: 1 + sleep(Duration::from_secs(1)); + let observed = r.observe(&key, 4); + assert_eq!(observed, 4); + assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9); + + // second: 2 + sleep(Duration::from_secs(1)); + assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1); + + // second: 3 + sleep(Duration::from_secs(1)); + assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64); + } + + // this is the function described in this post + // https://blog.cloudflare.com/counting-things-a-lot-of-different-things/ + #[test] + fn test_observe_rate_custom_proportional() { + let r = Rate::new(Duration::from_secs(1)); + let key = 1; + + let rate_prop_fn = |rate_info: RateComponents| { + let prev = rate_info.prev_samples as f64; + let curr = rate_info.curr_samples as f64; + let interval_secs = rate_info.interval.as_secs_f64(); + let interval_fraction = rate_info.current_interval_fraction; + + let weighted_count = prev * (1. - interval_fraction) + curr * interval_fraction; + weighted_count / interval_secs + }; + + // second: 0 + let observed = r.observe(&key, 3); + assert_eq!(observed, 3); + let observed = r.observe(&key, 2); + assert_eq!(observed, 5); + assert_eq_ish(r.rate_with(&key, rate_prop_fn), 0.); + + // second 0.5 + sleep(Duration::from_secs_f64(0.5)); + assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.5); + + // second: 1 + sleep(Duration::from_secs_f64(0.5)); + let observed = r.observe(&key, 4); + assert_eq!(observed, 4); + assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5.); + + // second 1.75 + sleep(Duration::from_secs_f64(0.75)); + assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.25 + 4. * 0.75); + + // second: 2 + sleep(Duration::from_secs_f64(0.25)); + assert_eq_ish(r.rate_with(&key, rate_prop_fn), 4.); + + // second: 3 + sleep(Duration::from_secs(1)); + assert_eq!(r.rate_with(&key, rate_prop_fn), 0f64); + } } -- cgit v1.2.3