aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-proxy
diff options
context:
space:
mode:
authorKevin Guthrie <[email protected]>2024-08-02 16:13:28 -0400
committerAndrew Hauck <[email protected]>2024-08-09 14:30:49 -0700
commit76c4fdacbb0f68dde7b7a60a9e38ef50ee51510b (patch)
treec13dea0b8cf511368851f130d30bf03cffa3241e /pingora-proxy
parentef5ed1af3b74bcf6e07fe65c106acb3fc68c4149 (diff)
downloadpingora-76c4fdacbb0f68dde7b7a60a9e38ef50ee51510b.tar.gz
pingora-76c4fdacbb0f68dde7b7a60a9e38ef50ee51510b.zip
Updating the rate-limiter documentation with a simpler example
Diffstat (limited to 'pingora-proxy')
-rw-r--r--pingora-proxy/Cargo.toml1
-rw-r--r--pingora-proxy/examples/rate_limiter.rs117
2 files changed, 118 insertions, 0 deletions
diff --git a/pingora-proxy/Cargo.toml b/pingora-proxy/Cargo.toml
index 0a52dff..a672867 100644
--- a/pingora-proxy/Cargo.toml
+++ b/pingora-proxy/Cargo.toml
@@ -44,6 +44,7 @@ env_logger = "0.9"
hyperlocal = "0.8"
hyper = "0.14"
tokio-tungstenite = "0.20.1"
+pingora-limits = { version = "0.3.0", path = "../pingora-limits" }
pingora-load-balancing = { version = "0.3.0", path = "../pingora-load-balancing" }
prometheus = "0"
futures-util = "0.3"
diff --git a/pingora-proxy/examples/rate_limiter.rs b/pingora-proxy/examples/rate_limiter.rs
new file mode 100644
index 0000000..d2c8b7e
--- /dev/null
+++ b/pingora-proxy/examples/rate_limiter.rs
@@ -0,0 +1,117 @@
+use async_trait::async_trait;
+use once_cell::sync::Lazy;
+use pingora_core::prelude::*;
+use pingora_http::{RequestHeader, ResponseHeader};
+use pingora_limits::rate::Rate;
+use pingora_load_balancing::prelude::{RoundRobin, TcpHealthCheck};
+use pingora_load_balancing::LoadBalancer;
+use pingora_proxy::{http_proxy_service, ProxyHttp, Session};
+use std::sync::Arc;
+use std::time::Duration;
+
+fn main() {
+ let mut server = Server::new(Some(Opt::default())).unwrap();
+ server.bootstrap();
+ let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
+ // Set health check
+ let hc = TcpHealthCheck::new();
+ upstreams.set_health_check(hc);
+ upstreams.health_check_frequency = Some(Duration::from_secs(1));
+ // Set background service
+ let background = background_service("health check", upstreams);
+ let upstreams = background.task();
+ // Set load balancer
+ let mut lb = http_proxy_service(&server.configuration, LB(upstreams));
+ lb.add_tcp("0.0.0.0:6188");
+
+ // let rate = Rate
+ server.add_service(background);
+ server.add_service(lb);
+ server.run_forever();
+}
+
+pub struct LB(Arc<LoadBalancer<RoundRobin>>);
+
+impl LB {
+ pub fn get_request_appid(&self, session: &mut Session) -> Option<String> {
+ match session
+ .req_header()
+ .headers
+ .get("appid")
+ .map(|v| v.to_str())
+ {
+ None => None,
+ Some(v) => match v {
+ Ok(v) => Some(v.to_string()),
+ Err(_) => None,
+ },
+ }
+ }
+}
+
+// Rate limiter
+static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
+
+// max request per second per client
+static MAX_REQ_PER_SEC: isize = 1;
+
+#[async_trait]
+impl ProxyHttp for LB {
+ type CTX = ();
+
+ fn new_ctx(&self) {}
+
+ async fn upstream_peer(
+ &self,
+ _session: &mut Session,
+ _ctx: &mut Self::CTX,
+ ) -> Result<Box<HttpPeer>> {
+ let upstream = self.0.select(b"", 256).unwrap();
+ // Set SNI
+ let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
+ Ok(peer)
+ }
+
+ async fn upstream_request_filter(
+ &self,
+ _session: &mut Session,
+ upstream_request: &mut RequestHeader,
+ _ctx: &mut Self::CTX,
+ ) -> Result<()>
+ where
+ Self::CTX: Send + Sync,
+ {
+ upstream_request
+ .insert_header("Host", "one.one.one.one")
+ .unwrap();
+ Ok(())
+ }
+
+ async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
+ where
+ Self::CTX: Send + Sync,
+ {
+ let appid = match self.get_request_appid(session) {
+ None => return Ok(false), // no client appid found, skip rate limiting
+ Some(addr) => addr,
+ };
+
+ // retrieve the current window requests
+ let curr_window_requests = RATE_LIMITER.observe(&appid, 1);
+ if curr_window_requests > MAX_REQ_PER_SEC {
+ // rate limited, return 429
+ let mut header = ResponseHeader::build(429, None).unwrap();
+ header
+ .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string())
+ .unwrap();
+ header.insert_header("X-Rate-Limit-Remaining", "0").unwrap();
+ header.insert_header("X-Rate-Limit-Reset", "1").unwrap();
+ session.set_keepalive(None);
+ session
+ .write_response_header(Box::new(header), true)
+ .await?;
+ return Ok(true);
+ }
+ Ok(false)
+ }
+}