diff options
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | docs/user_guide/index.md | 2 | ||||
-rw-r--r-- | docs/user_guide/rate_limiter.md | 167 | ||||
-rw-r--r-- | docs/user_guide/ratelimiter.md | 116 | ||||
-rw-r--r-- | pingora-proxy/Cargo.toml | 1 | ||||
-rw-r--r-- | pingora-proxy/examples/rate_limiter.rs | 117 |
6 files changed, 287 insertions, 118 deletions
@@ -1 +1 @@ -9ec558778457c28b1ffbb1d7b4f4f5ab370b97a5
\ No newline at end of file +c02dc12f1f7caf9aa90a13112321cf5c3d2f6b6d
\ No newline at end of file diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index d3f8309..cb57eb6 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -20,7 +20,7 @@ In this guide, we will cover the most used features, operations and settings of * [Examples: take control of the request](modify_filter.md) * [Connection pooling and reuse](pooling.md) * [Handling failures and failover](failover.md) -* [RateLimiter quickstart](ratelimiter.md) +* [RateLimiter quickstart](rate_limiter.md) ## Advanced topics (WIP) * [Pingora internals](internals.md) diff --git a/docs/user_guide/rate_limiter.md b/docs/user_guide/rate_limiter.md new file mode 100644 index 0000000..fe337a1 --- /dev/null +++ b/docs/user_guide/rate_limiter.md @@ -0,0 +1,167 @@ +# **RateLimiter quickstart** +Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application. Below is an example of how you can use [`Rate`](https://docs.rs/pingora-limits/latest/pingora_limits/rate/struct.Rate.html) to create an application that uses multiple limiters to restrict the rate at which requests can be made on a per-app basis (determined by a request header). + +## Steps +1. Add the following dependencies to your `Cargo.toml`: + ```toml + async-trait="0.1" + pingora = { version = "0.3", features = [ "lb" ] } + pingora-limits = "0.3.0" + once_cell = "1.19.0" + ``` +2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`. +3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting. + 1. Retrieve the client appid from header. + 2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map. + 3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers. + 4. If the request is not rate limited, return `Ok(false)` to continue the request. + +## Example +```rust +use async_trait::async_trait; +use once_cell::sync::Lazy; +use pingora::http::ResponseHeader; +use pingora::prelude::*; +use pingora_limits::rate::Rate; +use std::sync::Arc; +use std::time::Duration; + +fn main() { + let mut server = Server::new(Some(Opt::default())).unwrap(); + server.bootstrap(); + let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); + // Set health check + let hc = TcpHealthCheck::new(); + upstreams.set_health_check(hc); + upstreams.health_check_frequency = Some(Duration::from_secs(1)); + // Set background service + let background = background_service("health check", upstreams); + let upstreams = background.task(); + // Set load balancer + let mut lb = http_proxy_service(&server.configuration, LB(upstreams)); + lb.add_tcp("0.0.0.0:6188"); + + // let rate = Rate + server.add_service(background); + server.add_service(lb); + server.run_forever(); +} + +pub struct LB(Arc<LoadBalancer<RoundRobin>>); + +impl LB { + pub fn get_request_appid(&self, session: &mut Session) -> Option<String> { + match session + .req_header() + .headers + .get("appid") + .map(|v| v.to_str()) + { + None => None, + Some(v) => match v { + Ok(v) => Some(v.to_string()), + Err(_) => None, + }, + } + } +} + +// Rate limiter +static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1))); + +// max request per second per client +static MAX_REQ_PER_SEC: isize = 1; + +#[async_trait] +impl ProxyHttp for LB { + type CTX = (); + + fn new_ctx(&self) {} + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<Box<HttpPeer>> { + let upstream = self.0.select(b"", 256).unwrap(); + // Set SNI + let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); + Ok(peer) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut RequestHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + upstream_request + .insert_header("Host", "one.one.one.one") + .unwrap(); + Ok(()) + } + + async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> + where + Self::CTX: Send + Sync, + { + let appid = match self.get_request_appid(session) { + None => return Ok(false), // no client appid found, skip rate limiting + Some(addr) => addr, + }; + + // retrieve the current window requests + let curr_window_requests = RATE_LIMITER.observe(&appid, 1); + if curr_window_requests > MAX_REQ_PER_SEC { + // rate limited, return 429 + let mut header = ResponseHeader::build(429, None).unwrap(); + header + .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()) + .unwrap(); + header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); + header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); + session.set_keepalive(None); + session + .write_response_header(Box::new(header), true) + .await?; + return Ok(true); + } + Ok(false) + } +} +``` + +## Testing +To use the example above, + +1. Run your program with `cargo run`. +2. Verify the program is working with a few executions of ` curl localhost:6188 -H "appid:1" -v` + - The first request should work and any later requests that arrive within 1s of a previous request should fail with: + ``` + * Trying 127.0.0.1:6188... + * Connected to localhost (127.0.0.1) port 6188 (#0) + > GET / HTTP/1.1 + > Host: localhost:6188 + > User-Agent: curl/7.88.1 + > Accept: */* + > appid:1 + > + < HTTP/1.1 429 Too Many Requests + < X-Rate-Limit-Limit: 1 + < X-Rate-Limit-Remaining: 0 + < X-Rate-Limit-Reset: 1 + < Date: Sun, 14 Jul 2024 20:29:02 GMT + < Connection: close + < + * Closing connection 0 + ``` + +## Complete Example +You can run the pre-made example code in the [`pingora-proxy` examples folder](https://github.com/cloudflare/pingora/tree/main/pingora-proxy/examples/rate_limiter.rs) with + +``` +cargo run --example rate_limiter +```
\ No newline at end of file diff --git a/docs/user_guide/ratelimiter.md b/docs/user_guide/ratelimiter.md deleted file mode 100644 index 3bb36c3..0000000 --- a/docs/user_guide/ratelimiter.md +++ /dev/null @@ -1,116 +0,0 @@ -# **RateLimiter quickstart** -Pingora provides a crate `pingora-limits` which provides a simple and easy to use rate limiter for your application. - -## Steps -1. Add the following dependencies to your `Cargo.toml`: - ```toml - pingora-limits = "0.1.0" - lazy_static = "1.4.0" - ``` -2. Declare a global rate limiter map to store the rate limiter for each client. In this example, we use `appid`. -3. Override the `request_filter` method in the `ProxyHttp` trait to implement rate limiting. - 1. Retrieve the client appid from header. - 2. Retrieve the current window requests from the rate limiter map. If there is no rate limiter for the client, create a new one and insert it into the map. - 3. If the current window requests exceed the limit, return 429 and set RateLimiter associated headers. - 4. If the request is not rate limited, return `Ok(false)` to continue the request. - -## Example -```rust -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::time::Duration; -use async_trait::async_trait; -use lazy_static::lazy_static; -use pingora::http::ResponseHeader; -use pingora::prelude::*; -use pingora_limits::rate::Rate; - -fn main() { - let mut server = Server::new(Some(Opt::default())).unwrap(); - server.bootstrap(); - let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); - // Set health check - let hc = TcpHealthCheck::new(); - upstreams.set_health_check(hc); - upstreams.health_check_frequency = Some(Duration::from_secs(1)); - // Set background service - let background = background_service("health check", upstreams); - let upstreams = background.task(); - // Set load balancer - let mut lb = http_proxy_service(&server.configuration, LB(upstreams)); - lb.add_tcp("0.0.0.0:6188"); - - // let rate = Rate - server.add_service(background); - server.add_service(lb); - server.run_forever(); -} - -pub struct LB(Arc<LoadBalancer<RoundRobin>>); - -impl LB { - pub fn get_request_appid(&self, session: &mut Session) -> Option<String> { - match session.req_header().headers.get("appid").map(|v| v.to_str()) { - None => None, - Some(v) => match v { - Ok(v) => Some(v.to_string()), - Err(_) => None - } - } - } -} - - -// global limiter -lazy_static! { - static ref RATE_LIMITER_MAP: Arc<Mutex<HashMap<String, Rate>>> = { - Arc::new(Mutex::new(HashMap::new())) - }; -} -// max request per second per client -static MAX_REQ_PER_SEC: isize = 1; - -#[async_trait] -impl ProxyHttp for LB { - type CTX = (); - - fn new_ctx(&self) -> Self::CTX { - () - } - async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> { - let upstream = self.0 - .select(b"", 256) - .unwrap(); - // Set SNI - let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); - Ok(peer) - } - async fn upstream_request_filter(&self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> where Self::CTX: Send + Sync { - upstream_request.insert_header("Host", "one.one.one.one").unwrap(); - Ok(()) - } - async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> where Self::CTX: Send + Sync { - let appid = match self.get_request_appid(session) { - None => return Ok(false), // no client appid found, skip rate limiting - Some(addr) => addr - }; - - // retrieve the current window requests - let curr_window_requests = { - let mut rate_limiter_map = RATE_LIMITER_MAP.lock().unwrap(); - let rate_limiter = rate_limiter_map.entry(appid.clone()).insert_or_with(|| Rate::new(Duration::from_secs(1))); - rate_limiter.observe(&appid, 1) - }; - if curr_window_requests > MAX_REQ_PER_SEC { // rate limited, return 429 - let mut header = ResponseHeader::build(429, None).unwrap(); - header.insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()).unwrap(); - header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); - header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); - session.set_keepalive(None); - session.write_response_header(Box::new(header)).await?; - return Ok(true); - } - Ok(false) - } -} -``` diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml index 0a52dff..a672867 100644 --- a/pingora-proxy/Cargo.toml +++ b/pingora-proxy/Cargo.toml @@ -44,6 +44,7 @@ env_logger = "0.9" hyperlocal = "0.8" hyper = "0.14" tokio-tungstenite = "0.20.1" +pingora-limits = { version = "0.3.0", path = "../pingora-limits" } pingora-load-balancing = { version = "0.3.0", path = "../pingora-load-balancing" } prometheus = "0" futures-util = "0.3" diff --git a/pingora-proxy/examples/rate_limiter.rs b/pingora-proxy/examples/rate_limiter.rs new file mode 100644 index 0000000..d2c8b7e --- /dev/null +++ b/pingora-proxy/examples/rate_limiter.rs @@ -0,0 +1,117 @@ +use async_trait::async_trait; +use once_cell::sync::Lazy; +use pingora_core::prelude::*; +use pingora_http::{RequestHeader, ResponseHeader}; +use pingora_limits::rate::Rate; +use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck}; +use pingora_load_balancing::LoadBalancer; +use pingora_proxy::{http_proxy_service, ProxyHttp, Session}; +use std::sync::Arc; +use std::time::Duration; + +fn main() { + let mut server = Server::new(Some(Opt::default())).unwrap(); + server.bootstrap(); + let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); + // Set health check + let hc = TcpHealthCheck::new(); + upstreams.set_health_check(hc); + upstreams.health_check_frequency = Some(Duration::from_secs(1)); + // Set background service + let background = background_service("health check", upstreams); + let upstreams = background.task(); + // Set load balancer + let mut lb = http_proxy_service(&server.configuration, LB(upstreams)); + lb.add_tcp("0.0.0.0:6188"); + + // let rate = Rate + server.add_service(background); + server.add_service(lb); + server.run_forever(); +} + +pub struct LB(Arc<LoadBalancer<RoundRobin>>); + +impl LB { + pub fn get_request_appid(&self, session: &mut Session) -> Option<String> { + match session + .req_header() + .headers + .get("appid") + .map(|v| v.to_str()) + { + None => None, + Some(v) => match v { + Ok(v) => Some(v.to_string()), + Err(_) => None, + }, + } + } +} + +// Rate limiter +static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1))); + +// max request per second per client +static MAX_REQ_PER_SEC: isize = 1; + +#[async_trait] +impl ProxyHttp for LB { + type CTX = (); + + fn new_ctx(&self) {} + + async fn upstream_peer( + &self, + _session: &mut Session, + _ctx: &mut Self::CTX, + ) -> Result<Box<HttpPeer>> { + let upstream = self.0.select(b"", 256).unwrap(); + // Set SNI + let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); + Ok(peer) + } + + async fn upstream_request_filter( + &self, + _session: &mut Session, + upstream_request: &mut RequestHeader, + _ctx: &mut Self::CTX, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + upstream_request + .insert_header("Host", "one.one.one.one") + .unwrap(); + Ok(()) + } + + async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> + where + Self::CTX: Send + Sync, + { + let appid = match self.get_request_appid(session) { + None => return Ok(false), // no client appid found, skip rate limiting + Some(addr) => addr, + }; + + // retrieve the current window requests + let curr_window_requests = RATE_LIMITER.observe(&appid, 1); + if curr_window_requests > MAX_REQ_PER_SEC { + // rate limited, return 429 + let mut header = ResponseHeader::build(429, None).unwrap(); + header + .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()) + .unwrap(); + header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); + header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); + session.set_keepalive(None); + session + .write_response_header(Box::new(header), true) + .await?; + return Ok(true); + } + Ok(false) + } +} |