// Copyright 2024 Cloudflare, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use super::*; use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache}; use crate::proxy_common::*; impl HttpProxy { pub(crate) async fn proxy_1to1( &self, session: &mut Session, client_session: &mut HttpSessionV1, peer: &HttpPeer, ctx: &mut SV::CTX, ) -> (bool, bool, Option>) where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { client_session.read_timeout = peer.options.read_timeout; client_session.write_timeout = peer.options.write_timeout; // phase 2 send to upstream let mut req = session.req_header().clone(); // Convert HTTP2 headers to H1 if req.version == Version::HTTP_2 { req.set_version(Version::HTTP_11); // if client has body but has no content length, add chunked encoding // https://datatracker.ietf.org/doc/html/rfc9112#name-message-body // "The presence of a message body in a request is signaled by a Content-Length or Transfer-Encoding header field." if !session.is_body_empty() && session.get_header(header::CONTENT_LENGTH).is_none() { req.insert_header(header::TRANSFER_ENCODING, "chunked") .unwrap(); } if session.get_header(header::HOST).is_none() { // H2 is required to set :authority, but no necessarily header // most H1 server expect host header, so convert let host = req.uri.authority().map_or("", |a| a.as_str()).to_owned(); req.insert_header(header::HOST, host).unwrap(); } // TODO: Add keepalive header for connection reuse, but this is not required per RFC } if session.cache.enabled() { if let Err(e) = pingora_cache::filters::upstream::request_filter( &mut req, session.cache.maybe_cache_meta(), ) { session.cache.disable(NoCacheReason::InternalError); warn!("cache upstream filter error {}, disabling cache", e); } } match self .inner .upstream_request_filter(session, &mut req, ctx) .await { Ok(_) => { /* continue */ } Err(e) => { return (false, true, Some(e)); } } session.upstream_compression.request_filter(&req); debug!("Sending header to upstream {:?}", req); match client_session.write_request_header(Box::new(req)).await { Ok(_) => { /* Continue */ } Err(e) => { return (false, false, Some(e.into_up())); } } let (tx_upstream, rx_upstream) = mpsc::channel::(TASK_BUFFER_SIZE); let (tx_downstream, rx_downstream) = mpsc::channel::(TASK_BUFFER_SIZE); session.as_mut().enable_retry_buffering(); // start bi-directional streaming let ret = tokio::try_join!( self.proxy_handle_downstream(session, tx_downstream, rx_upstream, ctx), self.proxy_handle_upstream(client_session, tx_upstream, rx_downstream), ); match ret { Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, true, None), Err(e) => (false, false, Some(e)), } } pub(crate) async fn proxy_to_h1_upstream( &self, session: &mut Session, client_session: &mut HttpSessionV1, reused: bool, peer: &HttpPeer, ctx: &mut SV::CTX, ) -> (bool, bool, Option>) // (reuse_server, reuse_client, error) where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { #[cfg(windows)] let raw = client_session.id() as std::os::windows::io::RawSocket; #[cfg(unix)] let raw = client_session.id(); if let Err(e) = self .inner .connected_to_upstream( session, reused, peer, raw, Some(client_session.digest()), ctx, ) .await { return (false, false, Some(e)); } let (server_session_reuse, client_session_reuse, error) = self.proxy_1to1(session, client_session, peer, ctx).await; (server_session_reuse, client_session_reuse, error) } async fn proxy_handle_upstream( &self, client_session: &mut HttpSessionV1, tx: mpsc::Sender, mut rx: mpsc::Receiver, ) -> Result<()> where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { let mut request_done = false; let mut response_done = false; /* duplex mode, wait for either to complete */ while !request_done || !response_done { tokio::select! { res = client_session.read_response_task(), if !response_done => { match res { Ok(task) => { response_done = task.is_end(); let result = tx.send(task) .await.or_err( InternalError, "Failed to send upstream header to pipe"); // If the request is upgraded, the downstream pipe can early exit // when the downstream connection is closed. // In that case, this function should ignore that the pipe is closed. // So that this function could read the rest events from rx including // the closure, then exit. if result.is_err() && !client_session.is_upgrade_req() { return result; } }, Err(e) => { // Push the error to downstream and then quit // Don't care if send fails: downstream already gone let _ = tx.send(HttpTask::Failed(e.into_up())).await; // Downstream should consume all remaining data and handle the error return Ok(()) } } }, body = rx.recv(), if !request_done => { request_done = send_body_to1(client_session, body).await?; // An upgraded request is terminated when either side is done if request_done && client_session.is_upgrade_req() { response_done = true; } }, else => { // this shouldn't be reached as the while loop would already exit break; } } } Ok(()) } // todo use this function to replace bidirection_1to2() // returns whether this server (downstream) session can be reused async fn proxy_handle_downstream( &self, session: &mut Session, tx: mpsc::Sender, mut rx: mpsc::Receiver, ctx: &mut SV::CTX, ) -> Result where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { let mut downstream_state = DownstreamStateMachine::new(session.as_mut().is_body_done()); let buffer = session.as_ref().get_retry_buffer(); // retry, send buffer if it exists or body empty if buffer.is_some() || session.as_mut().is_body_empty() { let send_permit = tx .reserve() .await .or_err(InternalError, "reserving body pipe")?; self.send_body_to_pipe( session, buffer, downstream_state.is_done(), send_permit, ctx, ) .await?; } let mut response_state = ResponseStateMachine::new(); // these two below can be wrapped into an internal ctx // use cache when upstream revalidates (or TODO: error) let mut serve_from_cache = proxy_cache::ServeFromCache::new(); let mut range_body_filter = proxy_cache::range_filter::RangeBodyFilter::new(); /* duplex mode without caching * Read body from downstream while reading response from upstream * If response is done, only read body from downstream * If request is done, read response from upstream while idling downstream (to close quickly) * If both are done, quit the loop * * With caching + but without partial read support * Similar to above, cache admission write happen when the data is write to downstream * * With caching + partial read support * A. Read upstream response and write to cache * B. Read data from cache and send to downstream * If B fails (usually downstream close), continue A. * If A fails, exit with error. * If both are done, quit the loop * Usually there is no request body to read for cacheable request */ while !downstream_state.is_done() || !response_state.is_done() { // reserve tx capacity ahead to avoid deadlock, see below let send_permit = tx .try_reserve() .or_err(InternalError, "try_reserve() body pipe for upstream"); tokio::select! { // only try to send to pipe if there is capacity to avoid deadlock // Otherwise deadlock could happen if both upstream and downstream are blocked // on sending to their corresponding pipes which are both full. body = session.downstream_session.read_body_or_idle(downstream_state.is_done()), if downstream_state.can_poll() && send_permit.is_ok() => { debug!("downstream event"); let body = match body { Ok(b) => b, Err(e) => { if serve_from_cache.is_miss() { // ignore downstream error so that upstream can continue to write cache downstream_state.to_errored(); warn!( "Downstream Error ignored during caching: {}, {}", e, self.inner.request_summary(session, ctx) ); continue; } else { return Err(e.into_down()); } } }; // If the request is websocket, `None` body means the request is closed. // Set the response to be done as well so that the request completes normally. if body.is_none() && session.is_upgrade_req() { response_state.maybe_set_upstream_done(true); } // TODO: consider just drain this if serve_from_cache is set let is_body_done = session.is_body_done(); let request_done = self.send_body_to_pipe( session, body, is_body_done, send_permit.unwrap(), // safe because we checked is_ok() ctx, ) .await?; downstream_state.maybe_finished(request_done); }, _ = tx.reserve(), if downstream_state.is_reading() && send_permit.is_err() => { // If tx is closed, the upstream has already finished its job. downstream_state.maybe_finished(tx.is_closed()); debug!("waiting for permit {send_permit:?}, upstream closed {}", tx.is_closed()); /* No permit, wait on more capacity to avoid starving. * Otherwise this select only blocks on rx, which might send no data * before the entire body is uploaded. * once more capacity arrives we just loop back */ }, task = rx.recv(), if !response_state.upstream_done() => { debug!("upstream event: {:?}", task); if let Some(t) = task { if serve_from_cache.should_discard_upstream() { // just drain, do we need to do anything else? continue; } // pull as many tasks as we can let mut tasks = Vec::with_capacity(TASK_BUFFER_SIZE); tasks.push(t); while let Some(maybe_task) = rx.recv().now_or_never() { debug!("upstream event now: {:?}", maybe_task); if let Some(t) = maybe_task { tasks.push(t); } else { break; // upstream closed } } /* run filters before sending to downstream */ let mut filtered_tasks = Vec::with_capacity(TASK_BUFFER_SIZE); for mut t in tasks { if self.revalidate_or_stale(session, &mut t, ctx).await { serve_from_cache.enable(); response_state.enable_cached_response(); // skip downstream filtering entirely as the 304 will not be sent break; } session.upstream_compression.response_filter(&mut t); let task = self.h1_response_filter(session, t, ctx, &mut serve_from_cache, &mut range_body_filter, false).await?; if serve_from_cache.is_miss_header() { response_state.enable_cached_response(); } // check error and abort // otherwise the error is surfaced via write_response_tasks() if !serve_from_cache.should_send_to_downstream() { if let HttpTask::Failed(e) = task { return Err(e); } } filtered_tasks.push(task); } if !serve_from_cache.should_send_to_downstream() { // TODO: need to derive response_done from filtered_tasks in case downstream failed already continue; } // set to downstream let response_done = session.write_response_tasks(filtered_tasks).await?; response_state.maybe_set_upstream_done(response_done); // unsuccessful upgrade response may force the request done downstream_state.maybe_finished(session.is_body_done()); } else { debug!("empty upstream event"); response_state.maybe_set_upstream_done(true); } }, task = serve_from_cache.next_http_task(&mut session.cache), if !response_state.cached_done() && !downstream_state.is_errored() && serve_from_cache.is_on() => { let task = self.h1_response_filter(session, task?, ctx, &mut serve_from_cache, &mut range_body_filter, true).await?; debug!("serve_from_cache task {task:?}"); match session.write_response_tasks(vec![task]).await { Ok(b) => response_state.maybe_set_cache_done(b), Err(e) => if serve_from_cache.is_miss() { // give up writing to downstream but wait for upstream cache write to finish downstream_state.to_errored(); response_state.maybe_set_cache_done(true); warn!( "Downstream Error ignored during caching: {}, {}", e, self.inner.request_summary(session, ctx) ); continue; } else { return Err(e); } } if response_state.cached_done() { if let Err(e) = session.cache.finish_hit_handler().await { warn!("Error during finish_hit_handler: {}", e); } } } else => { break; } } } let mut reuse_downstream = !downstream_state.is_errored(); if reuse_downstream { match session.as_mut().finish_body().await { Ok(_) => { debug!("finished sending body to downstream"); } Err(e) => { error!("Error finish sending body to downstream: {}", e); reuse_downstream = false; } } } Ok(reuse_downstream) } async fn h1_response_filter( &self, session: &mut Session, mut task: HttpTask, ctx: &mut SV::CTX, serve_from_cache: &mut ServeFromCache, range_body_filter: &mut RangeBodyFilter, from_cache: bool, // are the task from cache already ) -> Result where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { // skip caching if already served from cache if !from_cache { self.upstream_filter(session, &mut task, ctx)?; // cache the original response before any downstream transformation // requests that bypassed cache still need to run filters to see if the response has become cacheable if session.cache.enabled() || session.cache.bypassing() { if let Err(e) = self .cache_http_task(session, &task, ctx, serve_from_cache) .await { session.cache.disable(NoCacheReason::StorageError); if serve_from_cache.is_miss_body() { // if the response stream cache body during miss but write fails, it has to // give up the entire request return Err(e); } else { // otherwise, continue processing the response warn!( "Fail to cache response: {}, {}", e, self.inner.request_summary(session, ctx) ); } } } if !serve_from_cache.should_send_to_downstream() { return Ok(task); } } // else: cached/local response, no need to trigger upstream filters and caching match task { HttpTask::Header(mut header, end) => { /* Downstream revalidation/range, only needed when cache is on because otherwise origin * will handle it */ // TODO: if cache is disabled during response phase, we should still do the filter if session.cache.enabled() { self.downstream_response_conditional_filter( serve_from_cache, session, &mut header, ctx, ); if !session.ignore_downstream_range { let range_type = self.inner .range_header_filter(session.req_header(), &mut header, ctx); range_body_filter.set(range_type); } } /* Convert HTTP 1.0 style response to chunked encoding so that we don't * have to close the downstream connection */ // these status codes / method cannot have body, so no need to add chunked encoding let no_body = session.req_header().method == http::method::Method::HEAD || matches!(header.status.as_u16(), 204 | 304); if !no_body && !header.status.is_informational() && header .headers .get(http::header::TRANSFER_ENCODING) .is_none() && header.headers.get(http::header::CONTENT_LENGTH).is_none() && !end { header.insert_header(http::header::TRANSFER_ENCODING, "chunked")?; } match self.inner.response_filter(session, &mut header, ctx).await { Ok(_) => Ok(HttpTask::Header(header, end)), Err(e) => Err(e), } } HttpTask::Body(data, end) => { let mut data = range_body_filter.filter_body(data); if let Some(duration) = self .inner .response_body_filter(session, &mut data, end, ctx)? { trace!("delaying response for {:?}", duration); time::sleep(duration).await; } Ok(HttpTask::Body(data, end)) } HttpTask::Trailer(h) => Ok(HttpTask::Trailer(h)), // TODO: support trailers for h1 HttpTask::Done => Ok(task), HttpTask::Failed(_) => Ok(task), // Do nothing just pass the error down } } // TODO:: use this function to replace send_body_to2 async fn send_body_to_pipe( &self, session: &mut Session, mut data: Option, end_of_body: bool, tx: mpsc::Permit<'_, HttpTask>, ctx: &mut SV::CTX, ) -> Result where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, { // None: end of body // this var is to signal if downstream finish sending the body, which shouldn't be // affected by the request_body_filter let end_of_body = end_of_body || data.is_none(); session .downstream_modules_ctx .request_body_filter(&mut data, end_of_body) .await?; self.inner .request_body_filter(session, &mut data, end_of_body, ctx) .await?; // the flag to signal to upstream let upstream_end_of_body = end_of_body || data.is_none(); /* It is normal to get 0 bytes because of multi-chunk or request_body_filter decides not to * output anything yet. * Don't write 0 bytes to the network since it will be * treated as the terminating chunk */ if !upstream_end_of_body && data.as_ref().map_or(false, |d| d.is_empty()) { return Ok(false); } debug!( "Read {} bytes body from downstream", data.as_ref().map_or(-1, |d| d.len() as isize) ); tx.send(HttpTask::Body(data, upstream_end_of_body)); Ok(end_of_body) } } pub(crate) async fn send_body_to1( client_session: &mut HttpSessionV1, recv_task: Option, ) -> Result { let body_done; if let Some(task) = recv_task { match task { HttpTask::Body(data, end) => { body_done = end; if let Some(d) = data { let m = client_session.write_body(&d).await; match m { Ok(m) => match m { Some(n) => { debug!("Write {} bytes body to upstream", n); } None => { warn!("Upstream body is already finished. Nothing to write"); } }, Err(e) => { return e.into_up().into_err(); } } } } _ => { // should never happen, sender only sends body warn!("Unexpected task sent to upstream"); body_done = true; } } } else { // sender dropped body_done = true; } if body_done { match client_session.finish_body().await { Ok(_) => { debug!("finish sending body to upstream"); Ok(true) } Err(e) => e.into_up().into_err(), } } else { Ok(false) } }