diff options
author | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
---|---|---|
committer | Yuchen Wu <[email protected]> | 2024-02-27 20:25:44 -0800 |
commit | 8797329225018c4d0ab990166dd020338ae292dc (patch) | |
tree | 1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-timeout/src | |
parent | 0bca116c1027a878469b72352e1e9e3916e85dde (diff) | |
download | pingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz pingora-8797329225018c4d0ab990166dd020338ae292dc.zip |
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]>
Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-timeout/src')
-rw-r--r-- | pingora-timeout/src/fast_timeout.rs | 132 | ||||
-rw-r--r-- | pingora-timeout/src/lib.rs | 175 | ||||
-rw-r--r-- | pingora-timeout/src/timer.rs | 328 |
3 files changed, 635 insertions, 0 deletions
diff --git a/pingora-timeout/src/fast_timeout.rs b/pingora-timeout/src/fast_timeout.rs new file mode 100644 index 0000000..5fa7a3d --- /dev/null +++ b/pingora-timeout/src/fast_timeout.rs @@ -0,0 +1,132 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! The fast and more complicated version of pingora-timeout +//! +//! The following optimizations are applied +//! - The timeouts lazily initialize their timer when the Future is pending for the first time. +//! - There is no global lock for creating and cancelling timeouts. +//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline. +//! +//! In order for this to work, a standalone thread is created to arm the timers, which has its +//! overheads. As a general rule, the benefits of this doesn't outweight the overhead unless +//! there are more than about 100/s timeout() calls in the system. Use regular tokio timeout or +//! [super::tokio_timeout] in the low usage case. + +use super::timer::*; +use super::*; +use once_cell::sync::Lazy; +use std::sync::Arc; + +static TIMER_MANAGER: Lazy<Arc<TimerManager>> = Lazy::new(|| { + let tm = Arc::new(TimerManager::new()); + check_clock_thread(&tm); + tm +}); + +fn check_clock_thread(tm: &Arc<TimerManager>) { + if tm.should_i_start_clock() { + std::thread::Builder::new() + .name("Timer thread".into()) + .spawn(|| TIMER_MANAGER.clock_thread()) + .unwrap(); + } +} + +/// The timeout generated by [fast_timeout()]. +/// +/// Users don't need to interact with this object. +pub struct FastTimeout(Duration); + +impl ToTimeout for FastTimeout { + fn timeout(&self) -> BoxFuture<'static, ()> { + Box::pin(TIMER_MANAGER.register_timer(self.0).poll()) + } + + fn create(d: Duration) -> Self { + FastTimeout(d) + } +} + +/// Similar to [tokio::time::timeout] but more efficient. +pub fn fast_timeout<T>(duration: Duration, future: T) -> Timeout<T, FastTimeout> +where + T: Future, +{ + check_clock_thread(&TIMER_MANAGER); + Timeout::new_with_delay(future, duration) +} + +/// Similar to [tokio::time::sleep] but more efficient. +pub async fn fast_sleep(duration: Duration) { + check_clock_thread(&TIMER_MANAGER); + TIMER_MANAGER.register_timer(duration).poll().await +} + +/// Pause the timer for fork() +/// +/// Because RwLock across fork() is undefined behavior, this function makes sure that no one +/// holds any locks. +/// +/// This function should be called right before fork(). +pub fn pause_for_fork() { + TIMER_MANAGER.pause_for_fork(); +} + +/// Unpause the timer after fork() +/// +/// This function should be called right after fork(). +pub fn unpause() { + TIMER_MANAGER.unpause(); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_timeout() { + let fut = tokio_sleep(Duration::from_secs(1000)); + let to = fast_timeout(Duration::from_secs(1), fut); + assert!(to.await.is_err()) + } + + #[tokio::test] + async fn test_instantly_return() { + let fut = async { 1 }; + let to = fast_timeout(Duration::from_secs(1), fut); + assert_eq!(to.await.unwrap(), 1) + } + + #[tokio::test] + async fn test_delayed_return() { + let fut = async { + tokio_sleep(Duration::from_secs(1)).await; + 1 + }; + let to = fast_timeout(Duration::from_secs(1000), fut); + assert_eq!(to.await.unwrap(), 1) + } + + #[tokio::test] + async fn test_sleep() { + let fut = async { + fast_sleep(Duration::from_secs(1)).await; + 1 + }; + let to = fast_timeout(Duration::from_secs(1000), fut); + assert_eq!(to.await.unwrap(), 1) + } +} diff --git a/pingora-timeout/src/lib.rs b/pingora-timeout/src/lib.rs new file mode 100644 index 0000000..f3a33dd --- /dev/null +++ b/pingora-timeout/src/lib.rs @@ -0,0 +1,175 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![warn(clippy::all)] + +//! A drop-in replacement of [tokio::time::timeout] which is much more efficient. +//! +//! Similar to [tokio::time::timeout] but more efficient on busy concurrent IOs where timeouts are +//! created and canceled very frequently. +//! +//! This crate provides the following optimizations +//! - The timeouts lazily initializes their timer when the Future is pending for the first time. +//! - There is no global lock for creating and cancelling timeouts. +//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline. +//! +//! Benchmark: +//! +//! 438.302µs total, 4ns avg per iteration +//! +//! v.s. Tokio timeout(): +//! +//! 10.716192ms total, 107ns avg per iteration +//! + +pub mod fast_timeout; +pub mod timer; + +pub use fast_timeout::fast_sleep as sleep; +pub use fast_timeout::fast_timeout as timeout; + +use futures::future::BoxFuture; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; +use tokio::time::{sleep as tokio_sleep, Duration}; + +/// The interface to start a timeout +/// +/// Users don't need to interact with this trait +pub trait ToTimeout { + fn timeout(&self) -> BoxFuture<'static, ()>; + fn create(d: Duration) -> Self; +} + +/// The timeout generated by [tokio_timeout()]. +/// +/// Users don't need to interact with this object. +pub struct TokioTimeout(Duration); + +impl ToTimeout for TokioTimeout { + fn timeout(&self) -> BoxFuture<'static, ()> { + Box::pin(tokio_sleep(self.0)) + } + + fn create(d: Duration) -> Self { + TokioTimeout(d) + } +} + +/// The error type returned when the timeout is reached. +#[derive(Debug)] +pub struct Elapsed; + +impl std::fmt::Display for Elapsed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Timeout Elapsed") + } +} + +impl std::error::Error for Elapsed {} + +/// The [tokio::time::timeout] with just lazy timer initialization. +/// +/// The timer is created the first time the `future` is pending. This avoids unnecessary timer +/// creation and cancellation on busy IOs with a good chance to be already ready (e.g., reading +/// data from TCP where the recv buffer already has a lot data to read right away). +pub fn tokio_timeout<T>(duration: Duration, future: T) -> Timeout<T, TokioTimeout> +where + T: Future, +{ + Timeout::<T, TokioTimeout>::new_with_delay(future, duration) +} + +pin_project! { + /// The timeout future returned by the timeout functions + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Timeout<T, F> { + #[pin] + value: T, + #[pin] + delay: Option<BoxFuture<'static, ()>>, + callback: F, // callback to create the timer + } +} + +impl<T, F> Timeout<T, F> +where + F: ToTimeout, +{ + pub(crate) fn new_with_delay(value: T, d: Duration) -> Timeout<T, F> { + Timeout { + value, + delay: None, + callback: F::create(d), + } + } +} + +impl<T, F> Future for Timeout<T, F> +where + T: Future, + F: ToTimeout, +{ + type Output = Result<T::Output, Elapsed>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { + let mut me = self.project(); + + // First, try polling the future + if let Poll::Ready(v) = me.value.poll(cx) { + return Poll::Ready(Ok(v)); + } + + let delay = me + .delay + .get_or_insert_with(|| Box::pin(me.callback.timeout())); + + match delay.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Err(Elapsed {})), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_timeout() { + let fut = tokio_sleep(Duration::from_secs(1000)); + let to = timeout(Duration::from_secs(1), fut); + assert!(to.await.is_err()) + } + + #[tokio::test] + async fn test_instantly_return() { + let fut = async { 1 }; + let to = timeout(Duration::from_secs(1), fut); + assert_eq!(to.await.unwrap(), 1) + } + + #[tokio::test] + async fn test_delayed_return() { + let fut = async { + tokio_sleep(Duration::from_secs(1)).await; + 1 + }; + let to = timeout(Duration::from_secs(1000), fut); + assert_eq!(to.await.unwrap(), 1) + } +} diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs new file mode 100644 index 0000000..6e25c24 --- /dev/null +++ b/pingora-timeout/src/timer.rs @@ -0,0 +1,328 @@ +// Copyright 2024 Cloudflare, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Lightweight timer for systems with high rate of operations with timeout +//! associated with them +//! +//! Users don't need to interact with this module. +//! +//! The idea is to bucket timers into finite time slots so that operations that +//! start and end quickly don't have to create their own timers all the time +//! +//! Benchmark: +//! - create 7.809622ms total, 78ns avg per iteration +//! - drop: 1.348552ms total, 13ns avg per iteration +//! +//! tokio timer: +//! - create 34.317439ms total, 343ns avg per iteration +//! - drop: 10.694154ms total, 106ns avg per iteration + +use parking_lot::RwLock; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use thread_local::ThreadLocal; +use tokio::sync::Notify; + +const RESOLUTION_MS: u64 = 10; +const RESOLUTION_DURATION: Duration = Duration::from_millis(RESOLUTION_MS); + +// round to the NEXT timestamp based on the resolution +#[inline] +fn round_to(raw: u128, resolution: u128) -> u128 { + raw - 1 + resolution - (raw - 1) % resolution +} +// millisecond resolution as most +#[derive(PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Debug)] +struct Time(u128); + +impl From<u128> for Time { + fn from(raw_ms: u128) -> Self { + Time(round_to(raw_ms, RESOLUTION_MS as u128)) + } +} + +impl From<Duration> for Time { + fn from(d: Duration) -> Self { + Time(round_to(d.as_millis(), RESOLUTION_MS as u128)) + } +} + +impl Time { + pub fn not_after(&self, ts: u128) -> bool { + self.0 <= ts + } +} + +/// the stub for waiting for a timer to be expired. +pub struct TimerStub(Arc<Notify>, Arc<AtomicBool>); + +impl TimerStub { + /// Wait for the timer to expire. + pub async fn poll(self) { + if self.1.load(Ordering::SeqCst) { + return; + } + self.0.notified().await; + } +} + +struct Timer(Arc<Notify>, Arc<AtomicBool>); + +impl Timer { + pub fn new() -> Self { + Timer(Arc::new(Notify::new()), Arc::new(AtomicBool::new(false))) + } + + pub fn fire(&self) { + self.1.store(true, Ordering::SeqCst); + self.0.notify_waiters(); + } + + pub fn subscribe(&self) -> TimerStub { + TimerStub(self.0.clone(), self.1.clone()) + } +} + +/// The object that holds all the timers registered to it. +pub struct TimerManager { + // each thread insert into its local timer tree to avoid lock contention + timers: ThreadLocal<RwLock<BTreeMap<Time, Timer>>>, + zero: Instant, // the reference zero point of Timestamp + // Start a new clock thread if this is -1 or staled. The clock thread should keep updating this + clock_watchdog: AtomicI64, + paused: AtomicBool, +} + +// Consider the clock thread is dead after it fails to update the thread in DELAYS_SEC +const DELAYS_SEC: i64 = 2; // TODO: make sure this value is larger than RESOLUTION_DURATION + +impl Default for TimerManager { + fn default() -> Self { + TimerManager { + timers: ThreadLocal::new(), + zero: Instant::now(), + clock_watchdog: AtomicI64::new(-DELAYS_SEC), + paused: AtomicBool::new(false), + } + } +} + +impl TimerManager { + /// Create a new [TimerManager] + pub fn new() -> Self { + Self::default() + } + + // this thread sleep a resolution time and fire all Timers that a due to fire + pub(crate) fn clock_thread(&self) { + loop { + std::thread::sleep(RESOLUTION_DURATION); + let now = Instant::now() - self.zero; + self.clock_watchdog + .store(now.as_secs() as i64, Ordering::Relaxed); + if self.is_paused_for_fork() { + // just stop acquiring the locks, waiting for fork to happen + continue; + } + let now = now.as_millis(); + // iterate through the timer tree for all threads + for thread_timer in self.timers.iter() { + let mut timers = thread_timer.write(); + // Fire all timers until now + loop { + let key_to_remove = timers.iter().next().and_then(|(k, _)| { + if k.not_after(now) { + Some(*k) + } else { + None + } + }); + if let Some(k) = key_to_remove { + let timer = timers.remove(&k); + // safe to unwrap, the key is from iter().next() + timer.unwrap().fire(); + } else { + break; + } + } + // write lock drops here + } + } + } + + // False if the clock is already started + // If true, the caller must start the clock thread next + pub(crate) fn should_i_start_clock(&self) -> bool { + let Err(prev) = self.is_clock_running() else { + return false; + }; + let now = Instant::now().duration_since(self.zero).as_secs() as i64; + let res = + self.clock_watchdog + .compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst); + res.is_ok() + } + + // Ok(()) if clock is running (watch dog is within DELAYS_SEC of now) + // Err(time) if watch do stopped at `time` + pub(crate) fn is_clock_running(&self) -> Result<(), i64> { + let now = Instant::now().duration_since(self.zero).as_secs() as i64; + let prev = self.clock_watchdog.load(Ordering::SeqCst); + if now < prev + DELAYS_SEC { + Ok(()) + } else { + Err(prev) + } + } + + /// Register a timer. + /// + /// When the timer expires, the [TimerStub] will be notified. + pub fn register_timer(&self, duration: Duration) -> TimerStub { + if self.is_paused_for_fork() { + // Return a dummy TimerStub that will trigger right away. + // This is fine assuming pause_for_fork() is called right before fork(). + // The only possible register_timer() is from another thread which will + // be entirely lost after fork() + // TODO: buffer these register calls instead (without a lock) + let timer = Timer::new(); + timer.fire(); + return timer.subscribe(); + } + let now: Time = (Instant::now() + duration - self.zero).into(); + { + let timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).read(); + if let Some(t) = timers.get(&now) { + return t.subscribe(); + } + } // drop read lock + + let timer = Timer::new(); + let mut timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).write(); + // Usually we check if another thread has insert the same node before we get the write lock, + // but because only this thread will insert anything to its local timers tree, there + // is no possible race that can happen. The only other thread is the clock thread who + // only removes timer from the tree + let stub = timer.subscribe(); + timers.insert(now, timer); + stub + } + + fn is_paused_for_fork(&self) -> bool { + self.paused.load(Ordering::SeqCst) + } + + /// Pause the timer for fork() + /// + /// Because RwLock across fork() is undefined behavior, this function makes sure that no one + /// holds any locks. + /// + /// This function should be called right before fork(). + pub fn pause_for_fork(&self) { + self.paused.store(true, Ordering::SeqCst); + // wait for everything to get out of their locks + std::thread::sleep(RESOLUTION_DURATION * 2); + } + + /// Unpause the timer after fork() + /// + /// This function should be called right after fork(). + pub fn unpause(&self) { + self.paused.store(false, Ordering::SeqCst) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + #[test] + fn test_round() { + assert_eq!(round_to(30, 10), 30); + assert_eq!(round_to(31, 10), 40); + assert_eq!(round_to(29, 10), 30); + } + + #[test] + fn test_time() { + let t: Time = 128.into(); // t will round to 130 + assert_eq!(t, Duration::from_millis(130).into()); + assert!(!t.not_after(128)); + assert!(!t.not_after(129)); + assert!(t.not_after(130)); + assert!(t.not_after(131)); + } + + #[tokio::test] + async fn test_timer_manager() { + let tm_a = Arc::new(TimerManager::new()); + let tm = tm_a.clone(); + std::thread::spawn(move || tm_a.clock_thread()); + + let now = Instant::now(); + let t1 = tm.register_timer(Duration::from_secs(1)); + let t2 = tm.register_timer(Duration::from_secs(1)); + t1.poll().await; + assert_eq!(now.elapsed().as_secs(), 1); + let now = Instant::now(); + t2.poll().await; + // t2 fired along t1 so no extra wait time + assert_eq!(now.elapsed().as_secs(), 0); + } + + #[test] + fn test_timer_manager_start_check() { + let tm = Arc::new(TimerManager::new()); + assert!(tm.should_i_start_clock()); + assert!(!tm.should_i_start_clock()); + assert!(tm.is_clock_running().is_ok()); + } + + #[test] + fn test_timer_manager_watchdog() { + let tm = Arc::new(TimerManager::new()); + assert!(tm.should_i_start_clock()); + assert!(!tm.should_i_start_clock()); + + // we don't actually start the clock thread, sleep for the watchdog to expire + std::thread::sleep(Duration::from_secs(DELAYS_SEC as u64 + 1)); + assert!(tm.is_clock_running().is_err()); + assert!(tm.should_i_start_clock()); + } + + #[tokio::test] + async fn test_timer_manager_pause() { + let tm_a = Arc::new(TimerManager::new()); + let tm = tm_a.clone(); + std::thread::spawn(move || tm_a.clock_thread()); + + let now = Instant::now(); + let t1 = tm.register_timer(Duration::from_secs(2)); + tm.pause_for_fork(); + // no actual fork happen, we just test that pause and unpause work + + // any timer in this critical section is timed out right away + let t2 = tm.register_timer(Duration::from_secs(2)); + t2.poll().await; + assert_eq!(now.elapsed().as_secs(), 0); + + std::thread::sleep(Duration::from_secs(1)); + tm.unpause(); + t1.poll().await; + assert_eq!(now.elapsed().as_secs(), 2); + } +} |