diff options
author | Yuchen Wu <[email protected]> | 2023-05-08 10:10:40 -0700 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2023-05-08 10:33:45 -0700 |
commit | 0bca116c1027a878469b72352e1e9e3916e85dde (patch) | |
tree | 9d4474666644fc24798513f850f2f17bc04885a8 /pingora-limits | |
download | pingora-0bca116c1027a878469b72352e1e9e3916e85dde.tar.gz pingora-0bca116c1027a878469b72352e1e9e3916e85dde.zip |
Pingora-limits initial commit
Diffstat (limited to 'pingora-limits')
-rw-r--r-- | pingora-limits/Cargo.toml | 26 | ||||
-rw-r--r-- | pingora-limits/LICENSE | 202 | ||||
-rw-r--r-- | pingora-limits/benches/benchmark.rs | 207 | ||||
-rw-r--r-- | pingora-limits/src/estimator.rs | 131 | ||||
-rw-r--r-- | pingora-limits/src/inflight.rs | 116 | ||||
-rw-r--r-- | pingora-limits/src/lib.rs | 34 | ||||
-rw-r--r-- | pingora-limits/src/rate.rs | 165 |
7 files changed, 881 insertions, 0 deletions
diff --git a/pingora-limits/Cargo.toml b/pingora-limits/Cargo.toml new file mode 100644 index 0000000..88ee11d --- /dev/null +++ b/pingora-limits/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "pingora-limits" +version = "0.1.0" +authors = ["Yuchen Wu <[email protected]>"] +license = "Apache-2.0" +description = "A library for rate limiting and event frequency estimation" +edition = "2018" + +[lib] +name = "pingora_limits" +path = "src/lib.rs" + +[dependencies] +ahash = "0" + +[dev-dependencies] +rand = "0" +dashmap = "5" +dhat = "0" + +[[bench]] +name = "benchmark" +harness = false + +[features] +dhat-heap = [] # for benchmark only diff --git a/pingora-limits/LICENSE b/pingora-limits/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/pingora-limits/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pingora-limits/benches/benchmark.rs b/pingora-limits/benches/benchmark.rs new file mode 100644 index 0000000..bf84a10 --- /dev/null +++ b/pingora-limits/benches/benchmark.rs @@ -0,0 +1,207 @@ +// Copyright 2023 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use ahash::RandomState; +use dashmap::DashMap; +use pingora_limits::estimator::Estimator; +use rand::distributions::Uniform; +use rand::{thread_rng, Rng}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; +use std::time::Instant; + +trait Counter { + fn incr(&self, key: u32, value: usize); + fn name() -> &'static str; +} + +#[derive(Default)] +struct NaiveCounter(Mutex<HashMap<u32, usize>>); +impl Counter for NaiveCounter { + fn incr(&self, key: u32, value: usize) { + let mut map = self.0.lock().unwrap(); + if let Some(v) = map.get_mut(&key) { + *v += value; + } else { + map.insert(key, value); + } + } + + fn name() -> &'static str { + "Naive Counter" + } +} + +#[derive(Default)] +struct OptimizedCounter(DashMap<u32, AtomicUsize, RandomState>); +impl Counter for OptimizedCounter { + fn incr(&self, key: u32, value: usize) { + if let Some(v) = self.0.get(&key) { + v.fetch_add(value, Ordering::Relaxed); + return; + } + self.0.insert(key, AtomicUsize::new(value)); + } + + fn name() -> &'static str { + "Optimized Counter" + } +} + +impl Counter for Estimator { + fn incr(&self, key: u32, value: usize) { + self.incr(key, value as isize); + } + + fn name() -> &'static str { + "Pingora Estimator" + } +} + +fn run_bench<T: Counter>( + counter: &T, + samples: usize, + distribution: &Uniform<u32>, + test_name: &str, +) { + let mut rng = thread_rng(); + let before = Instant::now(); + for _ in 0..samples { + let event: u32 = rng.sample(distribution); + counter.incr(event, 1); + } + let elapsed = before.elapsed(); + println!( + "{} {test_name} {:?} total, {:?} avg per operation", + T::name(), + elapsed, + elapsed / samples as u32 + ); +} + +fn run_threaded_bench<T: Counter + Send + Sync + 'static>( + threads: usize, + counter: Arc<T>, + samples: usize, + distribution: &Uniform<u32>, +) { + let mut handlers = vec![]; + for i in 0..threads { + let est = counter.clone(); + let dist = *distribution; + let handler = thread::spawn(move || { + run_bench(est.as_ref(), samples, &dist, &format!("thread#{i}")); + }); + handlers.push(handler); + } + for thread in handlers { + thread.join().unwrap(); + } +} + +/* +Pingora Estimator single thread 1.042849543s total, 10ns avg per operation +Naive Counter single thread 5.12641496s total, 51ns avg per operation +Optimized Counter single thread 4.302553352s total, 43ns avg per operation +Pingora Estimator thread#7 2.654667606s total, 212ns avg per operation +Pingora Estimator thread#2 2.65651993s total, 212ns avg per operation +Pingora Estimator thread#4 2.658225266s total, 212ns avg per operation +Pingora Estimator thread#0 2.660603361s total, 212ns avg per operation +Pingora Estimator thread#1 2.66139014s total, 212ns avg per operation +Pingora Estimator thread#6 2.663498849s total, 213ns avg per operation +Pingora Estimator thread#5 2.663344276s total, 213ns avg per operation +Pingora Estimator thread#3 2.664652951s total, 213ns avg per operation +Naive Counter thread#7 18.795881242s total, 1.503µs avg per operation +Naive Counter thread#1 18.805652672s total, 1.504µs avg per operation +Naive Counter thread#6 18.818084416s total, 1.505µs avg per operation +Naive Counter thread#4 18.832778982s total, 1.506µs avg per operation +Naive Counter thread#3 18.833952715s total, 1.506µs avg per operation +Naive Counter thread#2 18.837975133s total, 1.507µs avg per operation +Naive Counter thread#0 18.8397464s total, 1.507µs avg per operation +Naive Counter thread#5 18.842616299s total, 1.507µs avg per operation +Optimized Counter thread#4 2.650860314s total, 212ns avg per operation +Optimized Counter thread#0 2.651867013s total, 212ns avg per operation +Optimized Counter thread#2 2.656473381s total, 212ns avg per operation +Optimized Counter thread#5 2.657715876s total, 212ns avg per operation +Optimized Counter thread#1 2.658275111s total, 212ns avg per operation +Optimized Counter thread#7 2.658770751s total, 212ns avg per operation +Optimized Counter thread#6 2.659831251s total, 212ns avg per operation +Optimized Counter thread#3 2.664375398s total, 213ns avg per operation +*/ + +/* cargo bench --features dhat-heap for memory info + +Pingora Estimator single thread 1.066846098s total, 10ns avg per operation +dhat: Total: 26,184 bytes in 9 blocks +dhat: At t-gmax: 26,184 bytes in 9 blocks +dhat: At t-end: 1,464 bytes in 5 blocks +dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html +Naive Counter single thread 5.429089242s total, 54ns avg per operation +dhat: Total: 71,303,260 bytes in 20 blocks +dhat: At t-gmax: 53,477,392 bytes in 2 blocks +dhat: At t-end: 0 bytes in 0 blocks +dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html +Optimized Counter single thread 4.361720355s total, 43ns avg per operation +dhat: Total: 71,307,722 bytes in 491 blocks +dhat: At t-gmax: 36,211,208 bytes in 34 blocks +dhat: At t-end: 0 bytes in 0 blocks +dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html +*/ + +fn main() { + const SAMPLES: usize = 100_000_000; + const THREADS: usize = 8; + const ITEMS: u32 = 1_000_000; + const SAMPLES_PER_THREAD: usize = SAMPLES / THREADS; + let distribution = Uniform::new(0, ITEMS); + + // single thread + { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + let pingora_est = Estimator::new(3, 1024); + run_bench(&pingora_est, SAMPLES, &distribution, "single thread"); + } + + { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + let naive: NaiveCounter = Default::default(); + run_bench(&naive, SAMPLES, &distribution, "single thread"); + } + + { + #[cfg(feature = "dhat-heap")] + let _profiler = dhat::Profiler::new_heap(); + let optimized: OptimizedCounter = Default::default(); + run_bench(&optimized, SAMPLES, &distribution, "single thread"); + } + + // multithread + let pingora_est = Arc::new(Estimator::new(3, 1024)); + run_threaded_bench(THREADS, pingora_est, SAMPLES_PER_THREAD, &distribution); + + let naive: Arc<NaiveCounter> = Default::default(); + run_threaded_bench(THREADS, naive, SAMPLES_PER_THREAD, &distribution); + + let optimized: Arc<OptimizedCounter> = Default::default(); + run_threaded_bench(THREADS, optimized, SAMPLES_PER_THREAD, &distribution); +} diff --git a/pingora-limits/src/estimator.rs b/pingora-limits/src/estimator.rs new file mode 100644 index 0000000..45f1592 --- /dev/null +++ b/pingora-limits/src/estimator.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The estimator module contains a Count-Min Sketch type to help estimate the frequency of an item. + +use crate::hash; +use crate::RandomState; +use std::hash::Hash; +use std::sync::atomic::{AtomicIsize, Ordering}; + +/// An implementation of a lock-free count–min sketch estimator. See the [wikipedia] page for more +/// information. +/// +/// [wikipedia]: https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch +pub struct Estimator { + estimator: Box<[(Box<[AtomicIsize]>, RandomState)]>, +} + +impl Estimator { + /// Create a new `Estimator` with the given amount of hashes and columns (slots). + pub fn new(hashes: usize, slots: usize) -> Self { + let mut estimator = Vec::with_capacity(hashes); + for _ in 0..hashes { + let mut slot = Vec::with_capacity(slots); + for _ in 0..slots { + slot.push(AtomicIsize::new(0)); + } + estimator.push((slot.into_boxed_slice(), RandomState::new())); + } + + Estimator { + estimator: estimator.into_boxed_slice(), + } + } + + /// Increment `key` by the value given. Return the new estimated value as a result. + /// Note: overflow can happen. When some of the internal counters overflow, a negative number + /// will be returned. It is up to the caller to catch and handle this case. + pub fn incr<T: Hash>(&self, key: T, value: isize) -> isize { + let mut min = isize::MAX; + for (slot, hasher) in self.estimator.iter() { + let hash = hash(&key, hasher) as usize; + let counter = &slot[hash % slot.len()]; + // Overflow is allowed for simplicity + let current = counter.fetch_add(value, Ordering::Relaxed); + min = std::cmp::min(min, current + value); + } + min + } + + /// Decrement `key` by the value given. + pub fn decr<T: Hash>(&self, key: T, value: isize) { + for (slot, hasher) in self.estimator.iter() { + let hash = hash(&key, hasher) as usize; + let counter = &slot[hash % slot.len()]; + counter.fetch_sub(value, Ordering::Relaxed); + } + } + + /// Get the estimated frequency of `key`. + pub fn get<T: Hash>(&self, key: T) -> isize { + let mut min = isize::MAX; + for (slot, hasher) in self.estimator.iter() { + let hash = hash(&key, hasher) as usize; + let counter = &slot[hash % slot.len()]; + let current = counter.load(Ordering::Relaxed); + min = std::cmp::min(min, current); + } + min + } + + /// Reset all values inside this `Estimator`. + pub fn reset(&self) { + for (slot, _) in self.estimator.iter() { + for counter in slot.iter() { + counter.store(0, Ordering::Relaxed); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn incr() { + let est = Estimator::new(8, 8); + let v = est.incr("a", 1); + assert_eq!(v, 1); + let v = est.incr("b", 1); + assert_eq!(v, 1); + let v = est.incr("a", 2); + assert_eq!(v, 3); + let v = est.incr("b", 2); + assert_eq!(v, 3); + } + + #[test] + fn desc() { + let est = Estimator::new(8, 8); + est.incr("a", 3); + est.incr("b", 3); + est.decr("a", 1); + est.decr("b", 1); + assert_eq!(est.get("a"), 2); + assert_eq!(est.get("b"), 2); + } + + #[test] + fn get() { + let est = Estimator::new(8, 8); + est.incr("a", 1); + est.incr("a", 2); + est.incr("b", 1); + est.incr("b", 2); + assert_eq!(est.get("a"), 3); + assert_eq!(est.get("b"), 3); + } +} diff --git a/pingora-limits/src/inflight.rs b/pingora-limits/src/inflight.rs new file mode 100644 index 0000000..68d78d2 --- /dev/null +++ b/pingora-limits/src/inflight.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The inflight module defines the [Inflight] type which estimates the count of events occurring +//! at any point in time. + +use crate::estimator::Estimator; +use crate::{hash, RandomState}; +use std::hash::Hash; +use std::sync::Arc; + +/// An `Inflight` type tracks the frequency of actions that are actively occurring. When the value +/// is dropped from scope, the count will automatically decrease. +pub struct Inflight { + estimator: Arc<Estimator>, + hasher: RandomState, +} + +// fixed parameters for simplicity: hashes: h, slots: n +// Time complexity for a lookup operation is O(h). Space complexity is O(h*n) +// False positive ratio is 1/(n^h) +// We choose a small h and a large n to keep lookup cheap and FP ratio low +const HASHES: usize = 4; +const SLOTS: usize = 8192; + +impl Inflight { + /// Create a new `Inflight`. + pub fn new() -> Self { + Inflight { + estimator: Arc::new(Estimator::new(HASHES, SLOTS)), + hasher: RandomState::new(), + } + } + + /// Increment `key` by the value given. The return value is a tuple of a [Guard] and the + /// estimated count. + pub fn incr<T: Hash>(&self, key: T, value: isize) -> (Guard, isize) { + let guard = Guard { + estimator: self.estimator.clone(), + id: hash(key, &self.hasher), + value, + }; + let estimation = guard.incr(); + (guard, estimation) + } +} + +/// A `Guard` is returned when an `Inflight` key is incremented via [Inflight::incr]. +pub struct Guard { + estimator: Arc<Estimator>, + // store the hash instead of the actual key to save space + id: u64, + value: isize, +} + +impl Guard { + /// Increment the key's value that the `Guard` was created from. + pub fn incr(&self) -> isize { + self.estimator.incr(self.id, self.value) + } + + /// Get the estimated count of the key that the `Guard` was created from. + pub fn get(&self) -> isize { + self.estimator.get(self.id) + } +} + +impl Drop for Guard { + fn drop(&mut self) { + self.estimator.decr(self.id, self.value) + } +} + +impl std::fmt::Debug for Guard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Guard") + .field("id", &self.id) + .field("value", &self.value) + // no need to dump shared estimator + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn inflight_count() { + let inflight = Inflight::new(); + let (g1, v) = inflight.incr("a", 1); + assert_eq!(v, 1); + let (g2, v) = inflight.incr("a", 2); + assert_eq!(v, 3); + + drop(g1); + + assert_eq!(g2.get(), 2); + + drop(g2); + + let (_, v) = inflight.incr("a", 1); + assert_eq!(v, 1); + } +} diff --git a/pingora-limits/src/lib.rs b/pingora-limits/src/lib.rs new file mode 100644 index 0000000..5a396e0 --- /dev/null +++ b/pingora-limits/src/lib.rs @@ -0,0 +1,34 @@ +// Copyright 2023 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The pingora_limits crate contains modules that can help introduce things like rate limiting or +//! thread-safe event count estimation. + +#![warn(clippy::all)] +#![allow(clippy::new_without_default)] +#![allow(clippy::type_complexity)] + +pub mod estimator; +pub mod inflight; +pub mod rate; + +use ahash::RandomState; +use std::hash::{BuildHasher, Hash, Hasher}; + +#[inline] +fn hash<T: Hash>(key: T, hasher: &RandomState) -> u64 { + let mut hasher = hasher.build_hasher(); + key.hash(&mut hasher); + hasher.finish() +} diff --git a/pingora-limits/src/rate.rs b/pingora-limits/src/rate.rs new file mode 100644 index 0000000..dd05cec --- /dev/null +++ b/pingora-limits/src/rate.rs @@ -0,0 +1,165 @@ +// Copyright 2023 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The rate module defines the [Rate] type that helps estimate the occurrence of events over a +//! period of time. + +use crate::estimator::Estimator; +use std::hash::Hash; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Instant; + +/// A stable rate estimator that reports the rate of events in the past `interval` time. +/// It returns the average rate between `interval` * 2 and `interval` while collecting the events +/// happening between `interval` and now. +/// +/// This estimator ignores events that happen less than once per `interval` time. +pub struct Rate { + // 2 slots so that we use one to collect the current events and the other to report rate + red_slot: Estimator, + blue_slot: Estimator, + red_or_blue: AtomicBool, // true: current slot is red, otherwise blue + start: Instant, + // Use u64 below instead of Instant because we want atomic operation + reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous` + last_reset_time: AtomicU64, // the timestamp in ms since `start` +} + +// see inflight module for the meaning for these numbers +const HASHES: usize = 4; +const SLOTS: usize = 1024; // This value can be lower if interval is short (key cardinality is low) + +impl Rate { + /// Create a new `Rate` with the given interval. + pub fn new(interval: std::time::Duration) -> Self { + Rate { + red_slot: Estimator::new(HASHES, SLOTS), + blue_slot: Estimator::new(HASHES, SLOTS), + red_or_blue: AtomicBool::new(true), + start: Instant::now(), + reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow + last_reset_time: AtomicU64::new(0), + } + } + + fn current(&self, red_or_blue: bool) -> &Estimator { + if red_or_blue { + &self.red_slot + } else { + &self.blue_slot + } + } + + fn previous(&self, red_or_blue: bool) -> &Estimator { + if red_or_blue { + &self.blue_slot + } else { + &self.red_slot + } + } + + fn red_or_blue(&self) -> bool { + self.red_or_blue.load(Ordering::SeqCst) + } + + /// Return the per second rate estimation. + pub fn rate<T: Hash>(&self, key: &T) -> f64 { + let past_ms = self.maybe_reset(); + if past_ms >= self.reset_interval_ms * 2 { + // already missed 2 intervals, no data, just report 0 as a short cut + return 0f64; + } + + self.previous(self.red_or_blue()).get(key) as f64 / self.reset_interval_ms as f64 * 1000.0 + } + + /// Report new events and return number of events seen so far in the current interval. + pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize { + self.maybe_reset(); + self.current(self.red_or_blue()).incr(key, events) + } + + // reset if needed, return the time since last reset for other fn to use + fn maybe_reset(&self) -> u64 { + // should be short enough not to overflow + let now = Instant::now().duration_since(self.start).as_millis() as u64; + let last_reset = self.last_reset_time.load(Ordering::SeqCst); + let past_ms = now - last_reset; + + if past_ms < self.reset_interval_ms { + // no need to reset + return past_ms; + } + let red_or_blue = self.red_or_blue(); + match self.last_reset_time.compare_exchange( + last_reset, + now, + Ordering::SeqCst, + Ordering::Acquire, + ) { + Ok(_) => { + // first clear the previous slot + self.previous(red_or_blue).reset(); + // then flip the flag to tell others to use the reset slot + self.red_or_blue.store(!red_or_blue, Ordering::SeqCst); + // if current time is beyond 2 intervals, the data stored in the previous slot + // is also stale, we should clear that too + if now - last_reset >= self.reset_interval_ms * 2 { + // Note that this is the previous one now because we just flipped self.red_or_blue + self.current(red_or_blue).reset(); + } + } + Err(new) => { + // another thread beats us to it + assert!(new >= now - 1000); // double check that the new timestamp looks right + } + } + + past_ms + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread::sleep; + use std::time::Duration; + + #[test] + fn test_observe_rate() { + let r = Rate::new(Duration::from_secs(1)); + let key = 1; + + // second: 0 + let observed = r.observe(&key, 3); + assert_eq!(observed, 3); + let observed = r.observe(&key, 2); + assert_eq!(observed, 5); + assert_eq!(r.rate(&key), 0f64); // no estimation yet because the interval has not passed + + // second: 1 + sleep(Duration::from_secs(1)); + let observed = r.observe(&key, 4); + assert_eq!(observed, 4); + assert_eq!(r.rate(&key), 5f64); // 5 rps + + // second: 2 + sleep(Duration::from_secs(1)); + assert_eq!(r.rate(&key), 4f64); + + // second: 3 + sleep(Duration::from_secs(1)); + assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 second + } +} |