aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKevin Guthrie <[email protected]>2024-08-02 16:13:28 -0400
committerKevin Guthrie <[email protected]>2024-08-03 00:08:29 +0000
commitb2e0b29493d27a529337b6e515a192d29bd07156 (patch)
treec13dea0b8cf511368851f130d30bf03cffa3241e
parent654a32f48ea4eda9c5c2a4aa4038c0c37ea21b46 (diff)
downloadpingora-b2e0b29493d27a529337b6e515a192d29bd07156.tar.gz
pingora-b2e0b29493d27a529337b6e515a192d29bd07156.zip
Updating the rate-limiter documentation with a simpler example
-rw-r--r--.bleep2
-rw-r--r--docs/user_guide/index.md2
-rw-r--r--docs/user_guide/rate_limiter.md167
-rw-r--r--docs/user_guide/ratelimiter.md116
-rw-r--r--pingora-proxy/Cargo.toml1
-rw-r--r--pingora-proxy/examples/rate_limiter.rs117
6 files changed, 287 insertions, 118 deletions
diff --git a/.bleep b/.bleep
index 04ccaaf..87e4334 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-9ec558778457c28b1ffbb1d7b4f4f5ab370b97a5 \ No newline at end of file
+c02dc12f1f7caf9aa90a13112321cf5c3d2f6b6d \ No newline at end of file
diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md
index d3f8309..cb57eb6 100644
--- a/docs/user_guide/index.md
+++ b/docs/user_guide/index.md
@@ -20,7 +20,7 @@ In this guide, we will cover the most used features, operations and settings of
* [Examples: take control of the request](modify_filter.md)
* [Connection pooling and reuse](pooling.md)
* [Handling failures and failover](failover.md)
-* [RateLimiter quickstart](ratelimiter.md)
+* [RateLimiter quickstart](rate_limiter.md)
## Advanced topics (WIP)
* [Pingora internals](internals.md)
diff --git a/docs/user_guide/rate_limiter.md b/docs/user_guide/rate_limiter.md
new file mode 100644
index 0000000..fe337a1
--- /dev/null
+++ b/docs/user_guide/rate_limiter.md
@@ -0,0 +1,167 @@
+# **RateLimiter quickstart**
+Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application. Below is an example of how you can use [`Rate`](https://docs.rs/pingora-limits/latest/pingora_limits/rate/struct.Rate.html) to create an application that uses multiple limiters to restrict the rate at which requests can be made on a per-app basis (determined by a request header).
+
+## Steps
+1. Add the following dependencies to your `Cargo.toml`:
+ ```toml
+ async-trait="0.1"
+ pingora = { version = "0.3", features = [ "lb" ] }
+ pingora-limits = "0.3.0"
+ once_cell = "1.19.0"
+ ```
+2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`.
+3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting.
+ 1. Retrieve the client appid from header.
+ 2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
+ 3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
+ 4. If the request is not rate limited, return `Ok(false)` to continue the request.
+
+## Example
+```rust
+use async_trait::async_trait;
+use once_cell::sync::Lazy;
+use pingora::http::ResponseHeader;
+use pingora::prelude::*;
+use pingora_limits::rate::Rate;
+use std::sync::Arc;
+use std::time::Duration;
+
+fn main() {
+ let mut server = Server::new(Some(Opt::default())).unwrap();
+ server.bootstrap();
+ let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
+ // Set health check
+ let hc = TcpHealthCheck::new();
+ upstreams.set_health_check(hc);
+ upstreams.health_check_frequency = Some(Duration::from_secs(1));
+ // Set background service
+ let background = background_service("health check", upstreams);
+ let upstreams = background.task();
+ // Set load balancer
+ let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
+ lb.add_tcp("0.0.0.0:6188");
+
+ // let rate = Rate
+ server.add_service(background);
+ server.add_service(lb);
+ server.run_forever();
+}
+
+pub struct LB(Arc<LoadBalancer<RoundRobin>>);
+
+impl LB {
+ pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
+ match session
+ .req_header()
+ .headers
+ .get("appid")
+ .map(|v| v.to_str())
+ {
+ None => None,
+ Some(v) => match v {
+ Ok(v) => Some(v.to_string()),
+ Err(_) => None,
+ },
+ }
+ }
+}
+
+// Rate limiter
+static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
+
+// max request per second per client
+static MAX_REQ_PER_SEC: isize = 1;
+
+#[async_trait]
+impl ProxyHttp for LB {
+ type CTX = ();
+
+ fn new_ctx(&self) {}
+
+ async fn upstream_peer(
+ &self,
+ _session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<Box<HttpPeer>> {
+ let upstream = self.0.select(b"", 256).unwrap();
+ // Set SNI
+ let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
+ Ok(peer)
+ }
+
+ async fn upstream_request_filter(
+ &self,
+ _session: &mut Session,
+ upstream_request: &mut RequestHeader,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()>
+ where
+ Self::CTX: Send + Sync,
+ {
+ upstream_request
+ .insert_header("Host", "one.one.one.one")
+ .unwrap();
+ Ok(())
+ }
+
+ async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
+ where
+ Self::CTX: Send + Sync,
+ {
+ let appid = match self.get_request_appid(session) {
+ None => return Ok(false), // no client appid found, skip rate limiting
+ Some(addr) => addr,
+ };
+
+ // retrieve the current window requests
+ let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
+ if curr_window_requests > MAX_REQ_PER_SEC {
+ // rate limited, return 429
+ let mut header = ResponseHeader::build(429, None).unwrap();
+ header
+ .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
+ .unwrap();
+ header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
+ header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
+ session.set_keepalive(None);
+ session
+ .write_response_header(Box::new(header), true)
+ .await?;
+ return Ok(true);
+ }
+ Ok(false)
+ }
+}
+```
+
+## Testing
+To use the example above,
+
+1. Run your program with `cargo run`.
+2. Verify the program is working with a few executions of ` curl localhost:6188 -H "appid:1" -v`
+ - The first request should work and any later requests that arrive within 1s of a previous request should fail with:
+ ```
+ * Trying 127.0.0.1:6188...
+ * Connected to localhost (127.0.0.1) port 6188 (#0)
+ > GET / HTTP/1.1
+ > Host: localhost:6188
+ > User-Agent: curl/7.88.1
+ > Accept: */*
+ > appid:1
+ >
+ < HTTP/1.1 429 Too Many Requests
+ < X-Rate-Limit-Limit: 1
+ < X-Rate-Limit-Remaining: 0
+ < X-Rate-Limit-Reset: 1
+ < Date: Sun, 14 Jul 2024 20:29:02 GMT
+ < Connection: close
+ <
+ * Closing connection 0
+ ```
+
+## Complete Example
+You can run the pre-made example code in the [`pingora-proxy` examples folder](https://github.com/cloudflare/pingora/tree/main/pingora-proxy/examples/rate_limiter.rs) with
+
+```
+cargo run --example rate_limiter
+``` \ No newline at end of file
diff --git a/docs/user_guide/ratelimiter.md b/docs/user_guide/ratelimiter.md
deleted file mode 100644
index 3bb36c3..0000000
--- a/docs/user_guide/ratelimiter.md
+++ /dev/null
@@ -1,116 +0,0 @@
-# **RateLimiter quickstart**
-Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application.
-
-## Steps
-1. Add the following dependencies to your `Cargo.toml`:
- ```toml
- pingora-limits = "0.1.0"
- lazy_static = "1.4.0"
- ```
-2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`.
-3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting.
- 1. Retrieve the client appid from header.
- 2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map.
- 3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers.
- 4. If the request is not rate limited, return `Ok(false)` to continue the request.
-
-## Example
-```rust
-use std::collections::HashMap;
-use std::sync::{Arc, Mutex};
-use std::time::Duration;
-use async_trait::async_trait;
-use lazy_static::lazy_static;
-use pingora::http::ResponseHeader;
-use pingora::prelude::*;
-use pingora_limits::rate::Rate;
-
-fn main() {
- let mut server = Server::new(Some(Opt::default())).unwrap();
- server.bootstrap();
- let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
- // Set health check
- let hc = TcpHealthCheck::new();
- upstreams.set_health_check(hc);
- upstreams.health_check_frequency = Some(Duration::from_secs(1));
- // Set background service
- let background = background_service("health check", upstreams);
- let upstreams = background.task();
- // Set load balancer
- let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
- lb.add_tcp("0.0.0.0:6188");
-
- // let rate = Rate
- server.add_service(background);
- server.add_service(lb);
- server.run_forever();
-}
-
-pub struct LB(Arc<LoadBalancer<RoundRobin>>);
-
-impl LB {
- pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
- match session.req_header().headers.get("appid").map(|v| v.to_str()) {
- None => None,
- Some(v) => match v {
- Ok(v) => Some(v.to_string()),
- Err(_) => None
- }
- }
- }
-}
-
-
-// global limiter
-lazy_static! {
- static ref RATE_LIMITER_MAP: Arc<Mutex<HashMap<String, Rate>>> = {
- Arc::new(Mutex::new(HashMap::new()))
- };
-}
-// max request per second per client
-static MAX_REQ_PER_SEC: isize = 1;
-
-#[async_trait]
-impl ProxyHttp for LB {
- type CTX = ();
-
- fn new_ctx(&self) -> Self::CTX {
- ()
- }
- async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
- let upstream = self.0
- .select(b"", 256)
- .unwrap();
- // Set SNI
- let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
- Ok(peer)
- }
- async fn upstream_request_filter(&self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> where Self::CTX: Send + Sync {
- upstream_request.insert_header("Host", "one.one.one.one").unwrap();
- Ok(())
- }
- async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> where Self::CTX: Send + Sync {
- let appid = match self.get_request_appid(session) {
- None => return Ok(false), // no client appid found, skip rate limiting
- Some(addr) => addr
- };
-
- // retrieve the current window requests
- let curr_window_requests = {
- let mut rate_limiter_map = RATE_LIMITER_MAP.lock().unwrap();
- let rate_limiter = rate_limiter_map.entry(appid.clone()).insert_or_with(|| Rate::new(Duration::from_secs(1)));
- rate_limiter.observe(&appid, 1)
- };
- if curr_window_requests > MAX_REQ_PER_SEC { // rate limited, return 429
- let mut header = ResponseHeader::build(429, None).unwrap();
- header.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()).unwrap();
- header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
- header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
- session.set_keepalive(None);
- session.write_response_header(Box::new(header)).await?;
- return Ok(true);
- }
- Ok(false)
- }
-}
-```
diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml
index 0a52dff..a672867 100644
--- a/pingora-proxy/Cargo.toml
+++ b/pingora-proxy/Cargo.toml
@@ -44,6 +44,7 @@ env_logger = "0.9"
hyperlocal = "0.8"
hyper = "0.14"
tokio-tungstenite = "0.20.1"
+pingora-limits = { version = "0.3.0", path = "../pingora-limits" }
pingora-load-balancing = { version = "0.3.0", path = "../pingora-load-balancing" }
prometheus = "0"
futures-util = "0.3"
diff --git a/pingora-proxy/examples/rate_limiter.rs b/pingora-proxy/examples/rate_limiter.rs
new file mode 100644
index 0000000..d2c8b7e
--- /dev/null
+++ b/pingora-proxy/examples/rate_limiter.rs
@@ -0,0 +1,117 @@
+use async_trait::async_trait;
+use once_cell::sync::Lazy;
+use pingora_core::prelude::*;
+use pingora_http::{RequestHeader, ResponseHeader};
+use pingora_limits::rate::Rate;
+use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck};
+use pingora_load_balancing::LoadBalancer;
+use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
+use std::sync::Arc;
+use std::time::Duration;
+
+fn main() {
+ let mut server = Server::new(Some(Opt::default())).unwrap();
+ server.bootstrap();
+ let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
+ // Set health check
+ let hc = TcpHealthCheck::new();
+ upstreams.set_health_check(hc);
+ upstreams.health_check_frequency = Some(Duration::from_secs(1));
+ // Set background service
+ let background = background_service("health check", upstreams);
+ let upstreams = background.task();
+ // Set load balancer
+ let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
+ lb.add_tcp("0.0.0.0:6188");
+
+ // let rate = Rate
+ server.add_service(background);
+ server.add_service(lb);
+ server.run_forever();
+}
+
+pub struct LB(Arc<LoadBalancer<RoundRobin>>);
+
+impl LB {
+ pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
+ match session
+ .req_header()
+ .headers
+ .get("appid")
+ .map(|v| v.to_str())
+ {
+ None => None,
+ Some(v) => match v {
+ Ok(v) => Some(v.to_string()),
+ Err(_) => None,
+ },
+ }
+ }
+}
+
+// Rate limiter
+static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
+
+// max request per second per client
+static MAX_REQ_PER_SEC: isize = 1;
+
+#[async_trait]
+impl ProxyHttp for LB {
+ type CTX = ();
+
+ fn new_ctx(&self) {}
+
+ async fn upstream_peer(
+ &self,
+ _session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<Box<HttpPeer>> {
+ let upstream = self.0.select(b"", 256).unwrap();
+ // Set SNI
+ let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
+ Ok(peer)
+ }
+
+ async fn upstream_request_filter(
+ &self,
+ _session: &mut Session,
+ upstream_request: &mut RequestHeader,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()>
+ where
+ Self::CTX: Send + Sync,
+ {
+ upstream_request
+ .insert_header("Host", "one.one.one.one")
+ .unwrap();
+ Ok(())
+ }
+
+ async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
+ where
+ Self::CTX: Send + Sync,
+ {
+ let appid = match self.get_request_appid(session) {
+ None => return Ok(false), // no client appid found, skip rate limiting
+ Some(addr) => addr,
+ };
+
+ // retrieve the current window requests
+ let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
+ if curr_window_requests > MAX_REQ_PER_SEC {
+ // rate limited, return 429
+ let mut header = ResponseHeader::build(429, None).unwrap();
+ header
+ .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
+ .unwrap();
+ header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
+ header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
+ session.set_keepalive(None);
+ session
+ .write_response_header(Box::new(header), true)
+ .await?;
+ return Ok(true);
+ }
+ Ok(false)
+ }
+}