aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-limits
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2023-05-08 10:10:40 -0700
committerYuchen Wu <[email protected]>2023-05-08 10:33:45 -0700
commit0bca116c1027a878469b72352e1e9e3916e85dde (patch)
tree9d4474666644fc24798513f850f2f17bc04885a8 /pingora-limits
downloadpingora-0bca116c1027a878469b72352e1e9e3916e85dde.tar.gz
pingora-0bca116c1027a878469b72352e1e9e3916e85dde.zip
Pingora-limits initial commit
Diffstat (limited to 'pingora-limits')
-rw-r--r--pingora-limits/Cargo.toml26
-rw-r--r--pingora-limits/LICENSE202
-rw-r--r--pingora-limits/benches/benchmark.rs207
-rw-r--r--pingora-limits/src/estimator.rs131
-rw-r--r--pingora-limits/src/inflight.rs116
-rw-r--r--pingora-limits/src/lib.rs34
-rw-r--r--pingora-limits/src/rate.rs165
7 files changed, 881 insertions, 0 deletions
diff --git a/pingora-limits/Cargo.toml b/pingora-limits/Cargo.toml
new file mode 100644
index 0000000..88ee11d
--- /dev/null
+++ b/pingora-limits/Cargo.toml
@@ -0,0 +1,26 @@
+[package]
+name = "pingora-limits"
+version = "0.1.0"
+authors = ["Yuchen Wu <[email protected]>"]
+license = "Apache-2.0"
+description = "A library for rate limiting and event frequency estimation"
+edition = "2018"
+
+[lib]
+name = "pingora_limits"
+path = "src/lib.rs"
+
+[dependencies]
+ahash = "0"
+
+[dev-dependencies]
+rand = "0"
+dashmap = "5"
+dhat = "0"
+
+[[bench]]
+name = "benchmark"
+harness = false
+
+[features]
+dhat-heap = [] # for benchmark only
diff --git a/pingora-limits/LICENSE b/pingora-limits/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/pingora-limits/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/pingora-limits/benches/benchmark.rs b/pingora-limits/benches/benchmark.rs
new file mode 100644
index 0000000..bf84a10
--- /dev/null
+++ b/pingora-limits/benches/benchmark.rs
@@ -0,0 +1,207 @@
+// Copyright 2023 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#[cfg(feature = "dhat-heap")]
+#[global_allocator]
+static ALLOC: dhat::Alloc = dhat::Alloc;
+
+use ahash::RandomState;
+use dashmap::DashMap;
+use pingora_limits::estimator::Estimator;
+use rand::distributions::Uniform;
+use rand::{thread_rng, Rng};
+use std::collections::HashMap;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::thread;
+use std::time::Instant;
+
+trait Counter {
+ fn incr(&self, key: u32, value: usize);
+ fn name() -> &'static str;
+}
+
+#[derive(Default)]
+struct NaiveCounter(Mutex<HashMap<u32, usize>>);
+impl Counter for NaiveCounter {
+ fn incr(&self, key: u32, value: usize) {
+ let mut map = self.0.lock().unwrap();
+ if let Some(v) = map.get_mut(&key) {
+ *v += value;
+ } else {
+ map.insert(key, value);
+ }
+ }
+
+ fn name() -> &'static str {
+ "Naive Counter"
+ }
+}
+
+#[derive(Default)]
+struct OptimizedCounter(DashMap<u32, AtomicUsize, RandomState>);
+impl Counter for OptimizedCounter {
+ fn incr(&self, key: u32, value: usize) {
+ if let Some(v) = self.0.get(&key) {
+ v.fetch_add(value, Ordering::Relaxed);
+ return;
+ }
+ self.0.insert(key, AtomicUsize::new(value));
+ }
+
+ fn name() -> &'static str {
+ "Optimized Counter"
+ }
+}
+
+impl Counter for Estimator {
+ fn incr(&self, key: u32, value: usize) {
+ self.incr(key, value as isize);
+ }
+
+ fn name() -> &'static str {
+ "Pingora Estimator"
+ }
+}
+
+fn run_bench<T: Counter>(
+ counter: &T,
+ samples: usize,
+ distribution: &Uniform<u32>,
+ test_name: &str,
+) {
+ let mut rng = thread_rng();
+ let before = Instant::now();
+ for _ in 0..samples {
+ let event: u32 = rng.sample(distribution);
+ counter.incr(event, 1);
+ }
+ let elapsed = before.elapsed();
+ println!(
+ "{} {test_name} {:?} total, {:?} avg per operation",
+ T::name(),
+ elapsed,
+ elapsed / samples as u32
+ );
+}
+
+fn run_threaded_bench<T: Counter + Send + Sync + 'static>(
+ threads: usize,
+ counter: Arc<T>,
+ samples: usize,
+ distribution: &Uniform<u32>,
+) {
+ let mut handlers = vec![];
+ for i in 0..threads {
+ let est = counter.clone();
+ let dist = *distribution;
+ let handler = thread::spawn(move || {
+ run_bench(est.as_ref(), samples, &dist, &format!("thread#{i}"));
+ });
+ handlers.push(handler);
+ }
+ for thread in handlers {
+ thread.join().unwrap();
+ }
+}
+
+/*
+Pingora Estimator single thread 1.042849543s total, 10ns avg per operation
+Naive Counter single thread 5.12641496s total, 51ns avg per operation
+Optimized Counter single thread 4.302553352s total, 43ns avg per operation
+Pingora Estimator thread#7 2.654667606s total, 212ns avg per operation
+Pingora Estimator thread#2 2.65651993s total, 212ns avg per operation
+Pingora Estimator thread#4 2.658225266s total, 212ns avg per operation
+Pingora Estimator thread#0 2.660603361s total, 212ns avg per operation
+Pingora Estimator thread#1 2.66139014s total, 212ns avg per operation
+Pingora Estimator thread#6 2.663498849s total, 213ns avg per operation
+Pingora Estimator thread#5 2.663344276s total, 213ns avg per operation
+Pingora Estimator thread#3 2.664652951s total, 213ns avg per operation
+Naive Counter thread#7 18.795881242s total, 1.503µs avg per operation
+Naive Counter thread#1 18.805652672s total, 1.504µs avg per operation
+Naive Counter thread#6 18.818084416s total, 1.505µs avg per operation
+Naive Counter thread#4 18.832778982s total, 1.506µs avg per operation
+Naive Counter thread#3 18.833952715s total, 1.506µs avg per operation
+Naive Counter thread#2 18.837975133s total, 1.507µs avg per operation
+Naive Counter thread#0 18.8397464s total, 1.507µs avg per operation
+Naive Counter thread#5 18.842616299s total, 1.507µs avg per operation
+Optimized Counter thread#4 2.650860314s total, 212ns avg per operation
+Optimized Counter thread#0 2.651867013s total, 212ns avg per operation
+Optimized Counter thread#2 2.656473381s total, 212ns avg per operation
+Optimized Counter thread#5 2.657715876s total, 212ns avg per operation
+Optimized Counter thread#1 2.658275111s total, 212ns avg per operation
+Optimized Counter thread#7 2.658770751s total, 212ns avg per operation
+Optimized Counter thread#6 2.659831251s total, 212ns avg per operation
+Optimized Counter thread#3 2.664375398s total, 213ns avg per operation
+*/
+
+/* cargo bench --features dhat-heap for memory info
+
+Pingora Estimator single thread 1.066846098s total, 10ns avg per operation
+dhat: Total: 26,184 bytes in 9 blocks
+dhat: At t-gmax: 26,184 bytes in 9 blocks
+dhat: At t-end: 1,464 bytes in 5 blocks
+dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html
+Naive Counter single thread 5.429089242s total, 54ns avg per operation
+dhat: Total: 71,303,260 bytes in 20 blocks
+dhat: At t-gmax: 53,477,392 bytes in 2 blocks
+dhat: At t-end: 0 bytes in 0 blocks
+dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html
+Optimized Counter single thread 4.361720355s total, 43ns avg per operation
+dhat: Total: 71,307,722 bytes in 491 blocks
+dhat: At t-gmax: 36,211,208 bytes in 34 blocks
+dhat: At t-end: 0 bytes in 0 blocks
+dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html
+*/
+
+fn main() {
+ const SAMPLES: usize = 100_000_000;
+ const THREADS: usize = 8;
+ const ITEMS: u32 = 1_000_000;
+ const SAMPLES_PER_THREAD: usize = SAMPLES / THREADS;
+ let distribution = Uniform::new(0, ITEMS);
+
+ // single thread
+ {
+ #[cfg(feature = "dhat-heap")]
+ let _profiler = dhat::Profiler::new_heap();
+ let pingora_est = Estimator::new(3, 1024);
+ run_bench(&pingora_est, SAMPLES, &distribution, "single thread");
+ }
+
+ {
+ #[cfg(feature = "dhat-heap")]
+ let _profiler = dhat::Profiler::new_heap();
+ let naive: NaiveCounter = Default::default();
+ run_bench(&naive, SAMPLES, &distribution, "single thread");
+ }
+
+ {
+ #[cfg(feature = "dhat-heap")]
+ let _profiler = dhat::Profiler::new_heap();
+ let optimized: OptimizedCounter = Default::default();
+ run_bench(&optimized, SAMPLES, &distribution, "single thread");
+ }
+
+ // multithread
+ let pingora_est = Arc::new(Estimator::new(3, 1024));
+ run_threaded_bench(THREADS, pingora_est, SAMPLES_PER_THREAD, &distribution);
+
+ let naive: Arc<NaiveCounter> = Default::default();
+ run_threaded_bench(THREADS, naive, SAMPLES_PER_THREAD, &distribution);
+
+ let optimized: Arc<OptimizedCounter> = Default::default();
+ run_threaded_bench(THREADS, optimized, SAMPLES_PER_THREAD, &distribution);
+}
diff --git a/pingora-limits/src/estimator.rs b/pingora-limits/src/estimator.rs
new file mode 100644
index 0000000..45f1592
--- /dev/null
+++ b/pingora-limits/src/estimator.rs
@@ -0,0 +1,131 @@
+// Copyright 2023 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The estimator module contains a Count-Min Sketch type to help estimate the frequency of an item.
+
+use crate::hash;
+use crate::RandomState;
+use std::hash::Hash;
+use std::sync::atomic::{AtomicIsize, Ordering};
+
+/// An implementation of a lock-free count–min sketch estimator. See the [wikipedia] page for more
+/// information.
+///
+/// [wikipedia]: https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch
+pub struct Estimator {
+ estimator: Box<[(Box<[AtomicIsize]>, RandomState)]>,
+}
+
+impl Estimator {
+ /// Create a new `Estimator` with the given amount of hashes and columns (slots).
+ pub fn new(hashes: usize, slots: usize) -> Self {
+ let mut estimator = Vec::with_capacity(hashes);
+ for _ in 0..hashes {
+ let mut slot = Vec::with_capacity(slots);
+ for _ in 0..slots {
+ slot.push(AtomicIsize::new(0));
+ }
+ estimator.push((slot.into_boxed_slice(), RandomState::new()));
+ }
+
+ Estimator {
+ estimator: estimator.into_boxed_slice(),
+ }
+ }
+
+ /// Increment `key` by the value given. Return the new estimated value as a result.
+ /// Note: overflow can happen. When some of the internal counters overflow, a negative number
+ /// will be returned. It is up to the caller to catch and handle this case.
+ pub fn incr<T: Hash>(&self, key: T, value: isize) -> isize {
+ let mut min = isize::MAX;
+ for (slot, hasher) in self.estimator.iter() {
+ let hash = hash(&key, hasher) as usize;
+ let counter = &slot[hash % slot.len()];
+ // Overflow is allowed for simplicity
+ let current = counter.fetch_add(value, Ordering::Relaxed);
+ min = std::cmp::min(min, current + value);
+ }
+ min
+ }
+
+ /// Decrement `key` by the value given.
+ pub fn decr<T: Hash>(&self, key: T, value: isize) {
+ for (slot, hasher) in self.estimator.iter() {
+ let hash = hash(&key, hasher) as usize;
+ let counter = &slot[hash % slot.len()];
+ counter.fetch_sub(value, Ordering::Relaxed);
+ }
+ }
+
+ /// Get the estimated frequency of `key`.
+ pub fn get<T: Hash>(&self, key: T) -> isize {
+ let mut min = isize::MAX;
+ for (slot, hasher) in self.estimator.iter() {
+ let hash = hash(&key, hasher) as usize;
+ let counter = &slot[hash % slot.len()];
+ let current = counter.load(Ordering::Relaxed);
+ min = std::cmp::min(min, current);
+ }
+ min
+ }
+
+ /// Reset all values inside this `Estimator`.
+ pub fn reset(&self) {
+ for (slot, _) in self.estimator.iter() {
+ for counter in slot.iter() {
+ counter.store(0, Ordering::Relaxed);
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn incr() {
+ let est = Estimator::new(8, 8);
+ let v = est.incr("a", 1);
+ assert_eq!(v, 1);
+ let v = est.incr("b", 1);
+ assert_eq!(v, 1);
+ let v = est.incr("a", 2);
+ assert_eq!(v, 3);
+ let v = est.incr("b", 2);
+ assert_eq!(v, 3);
+ }
+
+ #[test]
+ fn desc() {
+ let est = Estimator::new(8, 8);
+ est.incr("a", 3);
+ est.incr("b", 3);
+ est.decr("a", 1);
+ est.decr("b", 1);
+ assert_eq!(est.get("a"), 2);
+ assert_eq!(est.get("b"), 2);
+ }
+
+ #[test]
+ fn get() {
+ let est = Estimator::new(8, 8);
+ est.incr("a", 1);
+ est.incr("a", 2);
+ est.incr("b", 1);
+ est.incr("b", 2);
+ assert_eq!(est.get("a"), 3);
+ assert_eq!(est.get("b"), 3);
+ }
+}
diff --git a/pingora-limits/src/inflight.rs b/pingora-limits/src/inflight.rs
new file mode 100644
index 0000000..68d78d2
--- /dev/null
+++ b/pingora-limits/src/inflight.rs
@@ -0,0 +1,116 @@
+// Copyright 2023 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The inflight module defines the [Inflight] type which estimates the count of events occurring
+//! at any point in time.
+
+use crate::estimator::Estimator;
+use crate::{hash, RandomState};
+use std::hash::Hash;
+use std::sync::Arc;
+
+/// An `Inflight` type tracks the frequency of actions that are actively occurring. When the value
+/// is dropped from scope, the count will automatically decrease.
+pub struct Inflight {
+ estimator: Arc<Estimator>,
+ hasher: RandomState,
+}
+
+// fixed parameters for simplicity: hashes: h, slots: n
+// Time complexity for a lookup operation is O(h). Space complexity is O(h*n)
+// False positive ratio is 1/(n^h)
+// We choose a small h and a large n to keep lookup cheap and FP ratio low
+const HASHES: usize = 4;
+const SLOTS: usize = 8192;
+
+impl Inflight {
+ /// Create a new `Inflight`.
+ pub fn new() -> Self {
+ Inflight {
+ estimator: Arc::new(Estimator::new(HASHES, SLOTS)),
+ hasher: RandomState::new(),
+ }
+ }
+
+ /// Increment `key` by the value given. The return value is a tuple of a [Guard] and the
+ /// estimated count.
+ pub fn incr<T: Hash>(&self, key: T, value: isize) -> (Guard, isize) {
+ let guard = Guard {
+ estimator: self.estimator.clone(),
+ id: hash(key, &self.hasher),
+ value,
+ };
+ let estimation = guard.incr();
+ (guard, estimation)
+ }
+}
+
+/// A `Guard` is returned when an `Inflight` key is incremented via [Inflight::incr].
+pub struct Guard {
+ estimator: Arc<Estimator>,
+ // store the hash instead of the actual key to save space
+ id: u64,
+ value: isize,
+}
+
+impl Guard {
+ /// Increment the key's value that the `Guard` was created from.
+ pub fn incr(&self) -> isize {
+ self.estimator.incr(self.id, self.value)
+ }
+
+ /// Get the estimated count of the key that the `Guard` was created from.
+ pub fn get(&self) -> isize {
+ self.estimator.get(self.id)
+ }
+}
+
+impl Drop for Guard {
+ fn drop(&mut self) {
+ self.estimator.decr(self.id, self.value)
+ }
+}
+
+impl std::fmt::Debug for Guard {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Guard")
+ .field("id", &self.id)
+ .field("value", &self.value)
+ // no need to dump shared estimator
+ .finish()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn inflight_count() {
+ let inflight = Inflight::new();
+ let (g1, v) = inflight.incr("a", 1);
+ assert_eq!(v, 1);
+ let (g2, v) = inflight.incr("a", 2);
+ assert_eq!(v, 3);
+
+ drop(g1);
+
+ assert_eq!(g2.get(), 2);
+
+ drop(g2);
+
+ let (_, v) = inflight.incr("a", 1);
+ assert_eq!(v, 1);
+ }
+}
diff --git a/pingora-limits/src/lib.rs b/pingora-limits/src/lib.rs
new file mode 100644
index 0000000..5a396e0
--- /dev/null
+++ b/pingora-limits/src/lib.rs
@@ -0,0 +1,34 @@
+// Copyright 2023 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The pingora_limits crate contains modules that can help introduce things like rate limiting or
+//! thread-safe event count estimation.
+
+#![warn(clippy::all)]
+#![allow(clippy::new_without_default)]
+#![allow(clippy::type_complexity)]
+
+pub mod estimator;
+pub mod inflight;
+pub mod rate;
+
+use ahash::RandomState;
+use std::hash::{BuildHasher, Hash, Hasher};
+
+#[inline]
+fn hash<T: Hash>(key: T, hasher: &RandomState) -> u64 {
+ let mut hasher = hasher.build_hasher();
+ key.hash(&mut hasher);
+ hasher.finish()
+}
diff --git a/pingora-limits/src/rate.rs b/pingora-limits/src/rate.rs
new file mode 100644
index 0000000..dd05cec
--- /dev/null
+++ b/pingora-limits/src/rate.rs
@@ -0,0 +1,165 @@
+// Copyright 2023 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The rate module defines the [Rate] type that helps estimate the occurrence of events over a
+//! period of time.
+
+use crate::estimator::Estimator;
+use std::hash::Hash;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::Instant;
+
+/// A stable rate estimator that reports the rate of events in the past `interval` time.
+/// It returns the average rate between `interval` * 2 and `interval` while collecting the events
+/// happening between `interval` and now.
+///
+/// This estimator ignores events that happen less than once per `interval` time.
+pub struct Rate {
+ // 2 slots so that we use one to collect the current events and the other to report rate
+ red_slot: Estimator,
+ blue_slot: Estimator,
+ red_or_blue: AtomicBool, // true: current slot is red, otherwise blue
+ start: Instant,
+ // Use u64 below instead of Instant because we want atomic operation
+ reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous`
+ last_reset_time: AtomicU64, // the timestamp in ms since `start`
+}
+
+// see inflight module for the meaning for these numbers
+const HASHES: usize = 4;
+const SLOTS: usize = 1024; // This value can be lower if interval is short (key cardinality is low)
+
+impl Rate {
+ /// Create a new `Rate` with the given interval.
+ pub fn new(interval: std::time::Duration) -> Self {
+ Rate {
+ red_slot: Estimator::new(HASHES, SLOTS),
+ blue_slot: Estimator::new(HASHES, SLOTS),
+ red_or_blue: AtomicBool::new(true),
+ start: Instant::now(),
+ reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow
+ last_reset_time: AtomicU64::new(0),
+ }
+ }
+
+ fn current(&self, red_or_blue: bool) -> &Estimator {
+ if red_or_blue {
+ &self.red_slot
+ } else {
+ &self.blue_slot
+ }
+ }
+
+ fn previous(&self, red_or_blue: bool) -> &Estimator {
+ if red_or_blue {
+ &self.blue_slot
+ } else {
+ &self.red_slot
+ }
+ }
+
+ fn red_or_blue(&self) -> bool {
+ self.red_or_blue.load(Ordering::SeqCst)
+ }
+
+ /// Return the per second rate estimation.
+ pub fn rate<T: Hash>(&self, key: &T) -> f64 {
+ let past_ms = self.maybe_reset();
+ if past_ms >= self.reset_interval_ms * 2 {
+ // already missed 2 intervals, no data, just report 0 as a short cut
+ return 0f64;
+ }
+
+ self.previous(self.red_or_blue()).get(key) as f64 / self.reset_interval_ms as f64 * 1000.0
+ }
+
+ /// Report new events and return number of events seen so far in the current interval.
+ pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
+ self.maybe_reset();
+ self.current(self.red_or_blue()).incr(key, events)
+ }
+
+ // reset if needed, return the time since last reset for other fn to use
+ fn maybe_reset(&self) -> u64 {
+ // should be short enough not to overflow
+ let now = Instant::now().duration_since(self.start).as_millis() as u64;
+ let last_reset = self.last_reset_time.load(Ordering::SeqCst);
+ let past_ms = now - last_reset;
+
+ if past_ms < self.reset_interval_ms {
+ // no need to reset
+ return past_ms;
+ }
+ let red_or_blue = self.red_or_blue();
+ match self.last_reset_time.compare_exchange(
+ last_reset,
+ now,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // first clear the previous slot
+ self.previous(red_or_blue).reset();
+ // then flip the flag to tell others to use the reset slot
+ self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
+ // if current time is beyond 2 intervals, the data stored in the previous slot
+ // is also stale, we should clear that too
+ if now - last_reset >= self.reset_interval_ms * 2 {
+ // Note that this is the previous one now because we just flipped self.red_or_blue
+ self.current(red_or_blue).reset();
+ }
+ }
+ Err(new) => {
+ // another thread beats us to it
+ assert!(new >= now - 1000); // double check that the new timestamp looks right
+ }
+ }
+
+ past_ms
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::thread::sleep;
+ use std::time::Duration;
+
+ #[test]
+ fn test_observe_rate() {
+ let r = Rate::new(Duration::from_secs(1));
+ let key = 1;
+
+ // second: 0
+ let observed = r.observe(&key, 3);
+ assert_eq!(observed, 3);
+ let observed = r.observe(&key, 2);
+ assert_eq!(observed, 5);
+ assert_eq!(r.rate(&key), 0f64); // no estimation yet because the interval has not passed
+
+ // second: 1
+ sleep(Duration::from_secs(1));
+ let observed = r.observe(&key, 4);
+ assert_eq!(observed, 4);
+ assert_eq!(r.rate(&key), 5f64); // 5 rps
+
+ // second: 2
+ sleep(Duration::from_secs(1));
+ assert_eq!(r.rate(&key), 4f64);
+
+ // second: 3
+ sleep(Duration::from_secs(1));
+ assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 second
+ }
+}