// Copyright 2024 Cloudflare, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #[cfg(feature = "any_tls")] use super::cert; use async_trait::async_trait; use clap::Parser; use http::header::VARY; use http::HeaderValue; use once_cell::sync::Lazy; use pingora_cache::cache_control::CacheControl; use pingora_cache::key::HashBinary; use pingora_cache::VarianceBuilder; use pingora_cache::{ eviction::simple_lru::Manager, filters::resp_cacheable, lock::CacheLock, predictor::Predictor, set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason, RespCacheable, }; use pingora_core::apps::{HttpServerApp, HttpServerOptions}; use pingora_core::modules::http::compression::ResponseCompression; use pingora_core::protocols::{l4::socket::SocketAddr, Digest}; use pingora_core::server::configuration::Opt; use pingora_core::services::Service; use pingora_core::upstreams::peer::HttpPeer; use pingora_core::utils::tls::CertKey; use pingora_error::{Error, ErrorSource, Result}; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::thread; use std::time::Duration; pub struct ExampleProxyHttps {} #[allow(clippy::upper_case_acronyms)] #[derive(Default)] pub struct CTX { conn_reused: bool, upstream_client_addr: Option, upstream_server_addr: Option, } // Common logic for both ProxyHttp(s) types fn connected_to_upstream_common( reused: bool, digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { ctx.conn_reused = reused; let socket_digest = digest .expect("upstream connector digest should be set for HTTP sessions") .socket_digest .as_ref() .expect("socket digest should be set for HTTP sessions"); ctx.upstream_client_addr = socket_digest.local_addr().cloned(); ctx.upstream_server_addr = socket_digest.peer_addr().cloned(); Ok(()) } fn response_filter_common( session: &mut Session, response: &mut ResponseHeader, ctx: &mut CTX, ) -> Result<()> { if ctx.conn_reused { response.insert_header("x-conn-reuse", "1")?; } let client_addr = session.client_addr(); let server_addr = session.server_addr(); response.insert_header( "x-client-addr", client_addr.map_or_else(|| "unset".into(), |a| a.to_string()), )?; response.insert_header( "x-server-addr", server_addr.map_or_else(|| "unset".into(), |a| a.to_string()), )?; response.insert_header( "x-upstream-client-addr", ctx.upstream_client_addr .as_ref() .map_or_else(|| "unset".into(), |a| a.to_string()), )?; response.insert_header( "x-upstream-server-addr", ctx.upstream_server_addr .as_ref() .map_or_else(|| "unset".into(), |a| a.to_string()), )?; Ok(()) } #[async_trait] #[cfg(feature = "any_tls")] impl ProxyHttp for ExampleProxyHttps { type CTX = CTX; fn new_ctx(&self) -> Self::CTX { CTX::default() } async fn upstream_peer( &self, session: &mut Session, _ctx: &mut Self::CTX, ) -> Result> { let session = session.as_downstream(); let req = session.req_header(); let port = req .headers .get("x-port") .map_or("8443", |v| v.to_str().unwrap()); let sni = req.headers.get("sni").map_or("", |v| v.to_str().unwrap()); let alt = req.headers.get("alt").map_or("", |v| v.to_str().unwrap()); let client_cert = session.get_header_bytes("client_cert"); let mut peer = Box::new(HttpPeer::new( format!("127.0.0.1:{port}"), true, sni.to_string(), )); peer.options.alternative_cn = Some(alt.to_string()); let verify = session.get_header_bytes("verify") == b"1"; peer.options.verify_cert = verify; let verify_host = session.get_header_bytes("verify_host") == b"1"; peer.options.verify_hostname = verify_host; if matches!(client_cert, b"1" | b"2") { let (mut certs, key) = if client_cert == b"1" { (vec![cert::LEAF_CERT.clone()], cert::LEAF_KEY.clone()) } else { (vec![cert::LEAF2_CERT.clone()], cert::LEAF2_KEY.clone()) }; if session.get_header_bytes("client_intermediate") == b"1" { certs.push(cert::INTERMEDIATE_CERT.clone()); } peer.client_cert_key = Some(Arc::new(CertKey::new(certs, key))); } if session.get_header_bytes("x-h2") == b"true" { // default is 1, 1 peer.options.set_http_version(2, 2); } Ok(peer) } async fn response_filter( &self, session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { response_filter_common(session, upstream_response, ctx) } async fn upstream_request_filter( &self, session: &mut Session, req: &mut RequestHeader, _ctx: &mut Self::CTX, ) -> Result<()> { let host = session.get_header_bytes("host-override"); if host != b"" { req.insert_header("host", host)?; } Ok(()) } async fn connected_to_upstream( &self, _http_session: &mut Session, reused: bool, _peer: &HttpPeer, #[cfg(unix)] _fd: std::os::unix::io::RawFd, #[cfg(windows)] _sock: std::os::windows::io::RawSocket, digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { connected_to_upstream_common(reused, digest, ctx) } } pub struct ExampleProxyHttp {} #[async_trait] impl ProxyHttp for ExampleProxyHttp { type CTX = CTX; fn new_ctx(&self) -> Self::CTX { CTX::default() } async fn early_request_filter( &self, session: &mut Session, _ctx: &mut Self::CTX, ) -> Result<()> { let req = session.req_header(); let downstream_compression = req.headers.get("x-downstream-compression").is_some(); if downstream_compression { session .downstream_modules_ctx .get_mut::() .unwrap() .adjust_level(6); } else { // enable upstream compression for all requests by default session.upstream_compression.adjust_level(6); } Ok(()) } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { let req = session.req_header(); let write_timeout = req .headers .get("x-write-timeout") .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok())); let min_rate = req .headers .get("x-min-rate") .and_then(|v| v.to_str().ok().and_then(|v| v.parse().ok())); let downstream_compression = req.headers.get("x-downstream-compression").is_some(); if !downstream_compression { // enable upstream compression for all requests by default session.upstream_compression.adjust_level(6); // also disable downstream compression in order to test the upstream one session .downstream_modules_ctx .get_mut::() .unwrap() .adjust_level(0); } if let Some(min_rate) = min_rate { session.set_min_send_rate(min_rate); } if let Some(write_timeout) = write_timeout { session.set_write_timeout(Duration::from_secs(write_timeout)); } Ok(false) } async fn response_filter( &self, session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> { response_filter_common(session, upstream_response, ctx) } async fn upstream_peer( &self, session: &mut Session, _ctx: &mut Self::CTX, ) -> Result> { let req = session.req_header(); #[cfg(unix)] if req.headers.contains_key("x-uds-peer") { return Ok(Box::new(HttpPeer::new_uds( "/tmp/pingora_nginx_test.sock", false, "".to_string(), )?)); } let port = req .headers .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); let mut peer = Box::new(HttpPeer::new( format!("127.0.0.1:{port}"), false, "".to_string(), )); if session.get_header_bytes("x-h2") == b"true" { // default is 1, 1 peer.options.set_http_version(2, 2); } Ok(peer) } async fn connected_to_upstream( &self, _http_session: &mut Session, reused: bool, _peer: &HttpPeer, #[cfg(unix)] _fd: std::os::unix::io::RawFd, #[cfg(windows)] _sock: std::os::windows::io::RawSocket, digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { connected_to_upstream_common(reused, digest, ctx) } } static CACHE_BACKEND: Lazy = Lazy::new(MemCache::new); const CACHE_DEFAULT: CacheMetaDefaults = CacheMetaDefaults::new(|_| Some(1), 1, 1); static CACHE_PREDICTOR: Lazy> = Lazy::new(|| Predictor::new(5, None)); static EVICTION_MANAGER: Lazy = Lazy::new(|| Manager::new(8192)); // 8192 bytes static CACHE_LOCK: Lazy = Lazy::new(|| CacheLock::new(std::time::Duration::from_secs(2))); // Example of how one might restrict which fields can be varied on. static CACHE_VARY_ALLOWED_HEADERS: Lazy>> = Lazy::new(|| Some(vec!["accept", "accept-encoding"].into_iter().collect())); // #[allow(clippy::upper_case_acronyms)] pub struct CacheCTX { upstream_status: Option, } pub struct ExampleProxyCache {} #[async_trait] impl ProxyHttp for ExampleProxyCache { type CTX = CacheCTX; fn new_ctx(&self) -> Self::CTX { CacheCTX { upstream_status: None, } } async fn upstream_peer( &self, session: &mut Session, _ctx: &mut Self::CTX, ) -> Result> { let req = session.req_header(); let port = req .headers .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); let mut peer = Box::new(HttpPeer::new( format!("127.0.0.1:{}", port), false, "".to_string(), )); if session.get_header_bytes("x-h2") == b"true" { // default is 1, 1 peer.options.set_http_version(2, 2); } Ok(peer) } fn request_cache_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> { // TODO: only allow GET & HEAD if session.get_header_bytes("x-bypass-cache") != b"" { return Ok(()); } // turn on eviction only for some requests to avoid interference across tests let eviction = session.req_header().headers.get("x-eviction").map(|_| { &*EVICTION_MANAGER as &'static (dyn pingora_cache::eviction::EvictionManager + Sync) }); let lock = session .req_header() .headers .get("x-lock") .map(|_| &*CACHE_LOCK); session .cache .enable(&*CACHE_BACKEND, eviction, Some(&*CACHE_PREDICTOR), lock); if let Some(max_file_size_hdr) = session .req_header() .headers .get("x-cache-max-file-size-bytes") { let bytes = max_file_size_hdr .to_str() .unwrap() .parse::() .unwrap(); session.cache.set_max_file_size_bytes(bytes); } Ok(()) } async fn cache_hit_filter( &self, session: &Session, _meta: &CacheMeta, _ctx: &mut Self::CTX, ) -> Result { // allow test header to control force expiry if session.get_header_bytes("x-force-expire") != b"" { return Ok(true); } Ok(false) } fn cache_vary_filter( &self, meta: &CacheMeta, _ctx: &mut Self::CTX, req: &RequestHeader, ) -> Option { let mut key = VarianceBuilder::new(); // Vary per header from origin. Target headers are de-duplicated by key logic. let vary_headers_lowercased: Vec = meta .headers() .get_all(VARY) .iter() // Filter out any unparseable vary headers. .flat_map(|vary_header| vary_header.to_str().ok()) .flat_map(|vary_header| vary_header.split(',')) .map(|s| s.trim().to_lowercase()) .filter(|header_name| { // Filter only for allowed headers, if restricted. CACHE_VARY_ALLOWED_HEADERS .as_ref() .map(|al| al.contains(header_name.as_str())) .unwrap_or(true) }) .collect(); vary_headers_lowercased.iter().for_each(|header_name| { // Add this header and value to be considered in the variance key. key.add_value( header_name, req.headers .get(header_name) .map(|v| v.as_bytes()) .unwrap_or(&[]), ); }); key.finalize() } fn response_cache_filter( &self, _session: &Session, resp: &ResponseHeader, _ctx: &mut Self::CTX, ) -> Result { let cc = CacheControl::from_resp_headers(resp); Ok(resp_cacheable( cc.as_ref(), resp.clone(), false, &CACHE_DEFAULT, )) } fn upstream_response_filter( &self, _session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) where Self::CTX: Send + Sync, { ctx.upstream_status = Some(upstream_response.status.into()); } async fn response_filter( &self, session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { if session.cache.enabled() { match session.cache.phase() { CachePhase::Hit => upstream_response.insert_header("x-cache-status", "hit")?, CachePhase::Miss => upstream_response.insert_header("x-cache-status", "miss")?, CachePhase::Stale => upstream_response.insert_header("x-cache-status", "stale")?, CachePhase::StaleUpdating => { upstream_response.insert_header("x-cache-status", "stale-updating")? } CachePhase::Expired => { upstream_response.insert_header("x-cache-status", "expired")? } CachePhase::Revalidated | CachePhase::RevalidatedNoCache(_) => { upstream_response.insert_header("x-cache-status", "revalidated")? } _ => upstream_response.insert_header("x-cache-status", "invalid")?, } } else { match session.cache.phase() { CachePhase::Disabled(NoCacheReason::Deferred) => { upstream_response.insert_header("x-cache-status", "deferred")?; } _ => upstream_response.insert_header("x-cache-status", "no-cache")?, } } if let Some(d) = session.cache.lock_duration() { upstream_response.insert_header("x-cache-lock-time-ms", format!("{}", d.as_millis()))? } if let Some(up_stat) = ctx.upstream_status { upstream_response.insert_header("x-upstream-status", up_stat.to_string())?; } Ok(()) } fn should_serve_stale( &self, _session: &mut Session, _ctx: &mut Self::CTX, error: Option<&Error>, // None when it is called during stale while revalidate ) -> bool { // enable serve stale while updating error.map_or(true, |e| e.esource() == &ErrorSource::Upstream) } fn is_purge(&self, session: &Session, _ctx: &Self::CTX) -> bool { session.req_header().method == "PURGE" } } fn test_main() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let opts: Vec = vec![ "pingora-proxy".into(), "-c".into(), "tests/pingora_conf.yaml".into(), ]; let mut my_server = pingora_core::server::Server::new(Some(Opt::parse_from(opts))).unwrap(); my_server.bootstrap(); let mut proxy_service_http = pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttp {}); proxy_service_http.add_tcp("0.0.0.0:6147"); #[cfg(unix)] proxy_service_http.add_uds("/tmp/pingora_proxy.sock", None); let mut proxy_service_h2c = pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttp {}); let http_logic = proxy_service_h2c.app_logic_mut().unwrap(); let mut http_server_options = HttpServerOptions::default(); http_server_options.h2c = true; http_logic.server_options = Some(http_server_options); proxy_service_h2c.add_tcp("0.0.0.0:6146"); let mut proxy_service_https_opt: Option> = None; #[cfg(feature = "any_tls")] { let mut proxy_service_https = pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyHttps {}); proxy_service_https.add_tcp("0.0.0.0:6149"); let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR")); let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR")); let mut tls_settings = pingora_core::listeners::tls::TlsSettings::intermediate(&cert_path, &key_path).unwrap(); tls_settings.enable_h2(); proxy_service_https.add_tls_with_settings("0.0.0.0:6150", None, tls_settings); proxy_service_https_opt = Some(Box::new(proxy_service_https)) } let mut proxy_service_cache = pingora_proxy::http_proxy_service(&my_server.configuration, ExampleProxyCache {}); proxy_service_cache.add_tcp("0.0.0.0:6148"); let mut services: Vec> = vec![ Box::new(proxy_service_h2c), Box::new(proxy_service_http), Box::new(proxy_service_cache), ]; if let Some(proxy_service_https) = proxy_service_https_opt { services.push(proxy_service_https) } set_compression_dict_path("tests/headers.dict"); my_server.add_services(services); my_server.run_forever(); } pub struct Server { pub handle: thread::JoinHandle<()>, } impl Server { pub fn start() -> Self { let server_handle = thread::spawn(|| { test_main(); }); Server { handle: server_handle, } } } // FIXME: this still allows multiple servers to spawn across integration tests pub static TEST_SERVER: Lazy = Lazy::new(Server::start); use super::mock_origin::MOCK_ORIGIN; pub fn init() { let _ = *TEST_SERVER; let _ = *MOCK_ORIGIN; }