aboutsummaryrefslogtreecommitdiffhomepage
path: root/pingora-timeout/src
diff options
context:
space:
mode:
authorYuchen Wu <[email protected]>2024-02-27 20:25:44 -0800
committerYuchen Wu <[email protected]>2024-02-27 20:25:44 -0800
commit8797329225018c4d0ab990166dd020338ae292dc (patch)
tree1e8d0bf6f3c27e987559f52319d91ff75e4da5cb /pingora-timeout/src
parent0bca116c1027a878469b72352e1e9e3916e85dde (diff)
downloadpingora-8797329225018c4d0ab990166dd020338ae292dc.tar.gz
pingora-8797329225018c4d0ab990166dd020338ae292dc.zip
Release Pingora version 0.1.0v0.1.0
Co-authored-by: Andrew Hauck <[email protected]> Co-authored-by: Edward Wang <[email protected]>
Diffstat (limited to 'pingora-timeout/src')
-rw-r--r--pingora-timeout/src/fast_timeout.rs132
-rw-r--r--pingora-timeout/src/lib.rs175
-rw-r--r--pingora-timeout/src/timer.rs328
3 files changed, 635 insertions, 0 deletions
diff --git a/pingora-timeout/src/fast_timeout.rs b/pingora-timeout/src/fast_timeout.rs
new file mode 100644
index 0000000..5fa7a3d
--- /dev/null
+++ b/pingora-timeout/src/fast_timeout.rs
@@ -0,0 +1,132 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! The fast and more complicated version of pingora-timeout
+//!
+//! The following optimizations are applied
+//! - The timeouts lazily initialize their timer when the Future is pending for the first time.
+//! - There is no global lock for creating and cancelling timeouts.
+//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline.
+//!
+//! In order for this to work, a standalone thread is created to arm the timers, which has its
+//! overheads. As a general rule, the benefits of this doesn't outweight the overhead unless
+//! there are more than about 100/s timeout() calls in the system. Use regular tokio timeout or
+//! [super::tokio_timeout] in the low usage case.
+
+use super::timer::*;
+use super::*;
+use once_cell::sync::Lazy;
+use std::sync::Arc;
+
+static TIMER_MANAGER: Lazy<Arc<TimerManager>> = Lazy::new(|| {
+ let tm = Arc::new(TimerManager::new());
+ check_clock_thread(&tm);
+ tm
+});
+
+fn check_clock_thread(tm: &Arc<TimerManager>) {
+ if tm.should_i_start_clock() {
+ std::thread::Builder::new()
+ .name("Timer thread".into())
+ .spawn(|| TIMER_MANAGER.clock_thread())
+ .unwrap();
+ }
+}
+
+/// The timeout generated by [fast_timeout()].
+///
+/// Users don't need to interact with this object.
+pub struct FastTimeout(Duration);
+
+impl ToTimeout for FastTimeout {
+ fn timeout(&self) -> BoxFuture<'static, ()> {
+ Box::pin(TIMER_MANAGER.register_timer(self.0).poll())
+ }
+
+ fn create(d: Duration) -> Self {
+ FastTimeout(d)
+ }
+}
+
+/// Similar to [tokio::time::timeout] but more efficient.
+pub fn fast_timeout<T>(duration: Duration, future: T) -> Timeout<T, FastTimeout>
+where
+ T: Future,
+{
+ check_clock_thread(&TIMER_MANAGER);
+ Timeout::new_with_delay(future, duration)
+}
+
+/// Similar to [tokio::time::sleep] but more efficient.
+pub async fn fast_sleep(duration: Duration) {
+ check_clock_thread(&TIMER_MANAGER);
+ TIMER_MANAGER.register_timer(duration).poll().await
+}
+
+/// Pause the timer for fork()
+///
+/// Because RwLock across fork() is undefined behavior, this function makes sure that no one
+/// holds any locks.
+///
+/// This function should be called right before fork().
+pub fn pause_for_fork() {
+ TIMER_MANAGER.pause_for_fork();
+}
+
+/// Unpause the timer after fork()
+///
+/// This function should be called right after fork().
+pub fn unpause() {
+ TIMER_MANAGER.unpause();
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::Duration;
+
+ #[tokio::test]
+ async fn test_timeout() {
+ let fut = tokio_sleep(Duration::from_secs(1000));
+ let to = fast_timeout(Duration::from_secs(1), fut);
+ assert!(to.await.is_err())
+ }
+
+ #[tokio::test]
+ async fn test_instantly_return() {
+ let fut = async { 1 };
+ let to = fast_timeout(Duration::from_secs(1), fut);
+ assert_eq!(to.await.unwrap(), 1)
+ }
+
+ #[tokio::test]
+ async fn test_delayed_return() {
+ let fut = async {
+ tokio_sleep(Duration::from_secs(1)).await;
+ 1
+ };
+ let to = fast_timeout(Duration::from_secs(1000), fut);
+ assert_eq!(to.await.unwrap(), 1)
+ }
+
+ #[tokio::test]
+ async fn test_sleep() {
+ let fut = async {
+ fast_sleep(Duration::from_secs(1)).await;
+ 1
+ };
+ let to = fast_timeout(Duration::from_secs(1000), fut);
+ assert_eq!(to.await.unwrap(), 1)
+ }
+}
diff --git a/pingora-timeout/src/lib.rs b/pingora-timeout/src/lib.rs
new file mode 100644
index 0000000..f3a33dd
--- /dev/null
+++ b/pingora-timeout/src/lib.rs
@@ -0,0 +1,175 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#![warn(clippy::all)]
+
+//! A drop-in replacement of [tokio::time::timeout] which is much more efficient.
+//!
+//! Similar to [tokio::time::timeout] but more efficient on busy concurrent IOs where timeouts are
+//! created and canceled very frequently.
+//!
+//! This crate provides the following optimizations
+//! - The timeouts lazily initializes their timer when the Future is pending for the first time.
+//! - There is no global lock for creating and cancelling timeouts.
+//! - Timeout timers are rounded to the next 10ms tick and timers are shared across all timeouts with the same deadline.
+//!
+//! Benchmark:
+//!
+//! 438.302µs total, 4ns avg per iteration
+//!
+//! v.s. Tokio timeout():
+//!
+//! 10.716192ms total, 107ns avg per iteration
+//!
+
+pub mod fast_timeout;
+pub mod timer;
+
+pub use fast_timeout::fast_sleep as sleep;
+pub use fast_timeout::fast_timeout as timeout;
+
+use futures::future::BoxFuture;
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{self, Poll};
+use tokio::time::{sleep as tokio_sleep, Duration};
+
+/// The interface to start a timeout
+///
+/// Users don't need to interact with this trait
+pub trait ToTimeout {
+ fn timeout(&self) -> BoxFuture<'static, ()>;
+ fn create(d: Duration) -> Self;
+}
+
+/// The timeout generated by [tokio_timeout()].
+///
+/// Users don't need to interact with this object.
+pub struct TokioTimeout(Duration);
+
+impl ToTimeout for TokioTimeout {
+ fn timeout(&self) -> BoxFuture<'static, ()> {
+ Box::pin(tokio_sleep(self.0))
+ }
+
+ fn create(d: Duration) -> Self {
+ TokioTimeout(d)
+ }
+}
+
+/// The error type returned when the timeout is reached.
+#[derive(Debug)]
+pub struct Elapsed;
+
+impl std::fmt::Display for Elapsed {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "Timeout Elapsed")
+ }
+}
+
+impl std::error::Error for Elapsed {}
+
+/// The [tokio::time::timeout] with just lazy timer initialization.
+///
+/// The timer is created the first time the `future` is pending. This avoids unnecessary timer
+/// creation and cancellation on busy IOs with a good chance to be already ready (e.g., reading
+/// data from TCP where the recv buffer already has a lot data to read right away).
+pub fn tokio_timeout<T>(duration: Duration, future: T) -> Timeout<T, TokioTimeout>
+where
+ T: Future,
+{
+ Timeout::<T, TokioTimeout>::new_with_delay(future, duration)
+}
+
+pin_project! {
+ /// The timeout future returned by the timeout functions
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Timeout<T, F> {
+ #[pin]
+ value: T,
+ #[pin]
+ delay: Option<BoxFuture<'static, ()>>,
+ callback: F, // callback to create the timer
+ }
+}
+
+impl<T, F> Timeout<T, F>
+where
+ F: ToTimeout,
+{
+ pub(crate) fn new_with_delay(value: T, d: Duration) -> Timeout<T, F> {
+ Timeout {
+ value,
+ delay: None,
+ callback: F::create(d),
+ }
+ }
+}
+
+impl<T, F> Future for Timeout<T, F>
+where
+ T: Future,
+ F: ToTimeout,
+{
+ type Output = Result<T::Output, Elapsed>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ let mut me = self.project();
+
+ // First, try polling the future
+ if let Poll::Ready(v) = me.value.poll(cx) {
+ return Poll::Ready(Ok(v));
+ }
+
+ let delay = me
+ .delay
+ .get_or_insert_with(|| Box::pin(me.callback.timeout()));
+
+ match delay.as_mut().poll(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(()) => Poll::Ready(Err(Elapsed {})),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::Duration;
+
+ #[tokio::test]
+ async fn test_timeout() {
+ let fut = tokio_sleep(Duration::from_secs(1000));
+ let to = timeout(Duration::from_secs(1), fut);
+ assert!(to.await.is_err())
+ }
+
+ #[tokio::test]
+ async fn test_instantly_return() {
+ let fut = async { 1 };
+ let to = timeout(Duration::from_secs(1), fut);
+ assert_eq!(to.await.unwrap(), 1)
+ }
+
+ #[tokio::test]
+ async fn test_delayed_return() {
+ let fut = async {
+ tokio_sleep(Duration::from_secs(1)).await;
+ 1
+ };
+ let to = timeout(Duration::from_secs(1000), fut);
+ assert_eq!(to.await.unwrap(), 1)
+ }
+}
diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs
new file mode 100644
index 0000000..6e25c24
--- /dev/null
+++ b/pingora-timeout/src/timer.rs
@@ -0,0 +1,328 @@
+// Copyright 2024 Cloudflare, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Lightweight timer for systems with high rate of operations with timeout
+//! associated with them
+//!
+//! Users don't need to interact with this module.
+//!
+//! The idea is to bucket timers into finite time slots so that operations that
+//! start and end quickly don't have to create their own timers all the time
+//!
+//! Benchmark:
+//! - create 7.809622ms total, 78ns avg per iteration
+//! - drop: 1.348552ms total, 13ns avg per iteration
+//!
+//! tokio timer:
+//! - create 34.317439ms total, 343ns avg per iteration
+//! - drop: 10.694154ms total, 106ns avg per iteration
+
+use parking_lot::RwLock;
+use std::collections::BTreeMap;
+use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+use thread_local::ThreadLocal;
+use tokio::sync::Notify;
+
+const RESOLUTION_MS: u64 = 10;
+const RESOLUTION_DURATION: Duration = Duration::from_millis(RESOLUTION_MS);
+
+// round to the NEXT timestamp based on the resolution
+#[inline]
+fn round_to(raw: u128, resolution: u128) -> u128 {
+ raw - 1 + resolution - (raw - 1) % resolution
+}
+// millisecond resolution as most
+#[derive(PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Debug)]
+struct Time(u128);
+
+impl From<u128> for Time {
+ fn from(raw_ms: u128) -> Self {
+ Time(round_to(raw_ms, RESOLUTION_MS as u128))
+ }
+}
+
+impl From<Duration> for Time {
+ fn from(d: Duration) -> Self {
+ Time(round_to(d.as_millis(), RESOLUTION_MS as u128))
+ }
+}
+
+impl Time {
+ pub fn not_after(&self, ts: u128) -> bool {
+ self.0 <= ts
+ }
+}
+
+/// the stub for waiting for a timer to be expired.
+pub struct TimerStub(Arc<Notify>, Arc<AtomicBool>);
+
+impl TimerStub {
+ /// Wait for the timer to expire.
+ pub async fn poll(self) {
+ if self.1.load(Ordering::SeqCst) {
+ return;
+ }
+ self.0.notified().await;
+ }
+}
+
+struct Timer(Arc<Notify>, Arc<AtomicBool>);
+
+impl Timer {
+ pub fn new() -> Self {
+ Timer(Arc::new(Notify::new()), Arc::new(AtomicBool::new(false)))
+ }
+
+ pub fn fire(&self) {
+ self.1.store(true, Ordering::SeqCst);
+ self.0.notify_waiters();
+ }
+
+ pub fn subscribe(&self) -> TimerStub {
+ TimerStub(self.0.clone(), self.1.clone())
+ }
+}
+
+/// The object that holds all the timers registered to it.
+pub struct TimerManager {
+ // each thread insert into its local timer tree to avoid lock contention
+ timers: ThreadLocal<RwLock<BTreeMap<Time, Timer>>>,
+ zero: Instant, // the reference zero point of Timestamp
+ // Start a new clock thread if this is -1 or staled. The clock thread should keep updating this
+ clock_watchdog: AtomicI64,
+ paused: AtomicBool,
+}
+
+// Consider the clock thread is dead after it fails to update the thread in DELAYS_SEC
+const DELAYS_SEC: i64 = 2; // TODO: make sure this value is larger than RESOLUTION_DURATION
+
+impl Default for TimerManager {
+ fn default() -> Self {
+ TimerManager {
+ timers: ThreadLocal::new(),
+ zero: Instant::now(),
+ clock_watchdog: AtomicI64::new(-DELAYS_SEC),
+ paused: AtomicBool::new(false),
+ }
+ }
+}
+
+impl TimerManager {
+ /// Create a new [TimerManager]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ // this thread sleep a resolution time and fire all Timers that a due to fire
+ pub(crate) fn clock_thread(&self) {
+ loop {
+ std::thread::sleep(RESOLUTION_DURATION);
+ let now = Instant::now() - self.zero;
+ self.clock_watchdog
+ .store(now.as_secs() as i64, Ordering::Relaxed);
+ if self.is_paused_for_fork() {
+ // just stop acquiring the locks, waiting for fork to happen
+ continue;
+ }
+ let now = now.as_millis();
+ // iterate through the timer tree for all threads
+ for thread_timer in self.timers.iter() {
+ let mut timers = thread_timer.write();
+ // Fire all timers until now
+ loop {
+ let key_to_remove = timers.iter().next().and_then(|(k, _)| {
+ if k.not_after(now) {
+ Some(*k)
+ } else {
+ None
+ }
+ });
+ if let Some(k) = key_to_remove {
+ let timer = timers.remove(&k);
+ // safe to unwrap, the key is from iter().next()
+ timer.unwrap().fire();
+ } else {
+ break;
+ }
+ }
+ // write lock drops here
+ }
+ }
+ }
+
+ // False if the clock is already started
+ // If true, the caller must start the clock thread next
+ pub(crate) fn should_i_start_clock(&self) -> bool {
+ let Err(prev) = self.is_clock_running() else {
+ return false;
+ };
+ let now = Instant::now().duration_since(self.zero).as_secs() as i64;
+ let res =
+ self.clock_watchdog
+ .compare_exchange(prev, now, Ordering::SeqCst, Ordering::SeqCst);
+ res.is_ok()
+ }
+
+ // Ok(()) if clock is running (watch dog is within DELAYS_SEC of now)
+ // Err(time) if watch do stopped at `time`
+ pub(crate) fn is_clock_running(&self) -> Result<(), i64> {
+ let now = Instant::now().duration_since(self.zero).as_secs() as i64;
+ let prev = self.clock_watchdog.load(Ordering::SeqCst);
+ if now < prev + DELAYS_SEC {
+ Ok(())
+ } else {
+ Err(prev)
+ }
+ }
+
+ /// Register a timer.
+ ///
+ /// When the timer expires, the [TimerStub] will be notified.
+ pub fn register_timer(&self, duration: Duration) -> TimerStub {
+ if self.is_paused_for_fork() {
+ // Return a dummy TimerStub that will trigger right away.
+ // This is fine assuming pause_for_fork() is called right before fork().
+ // The only possible register_timer() is from another thread which will
+ // be entirely lost after fork()
+ // TODO: buffer these register calls instead (without a lock)
+ let timer = Timer::new();
+ timer.fire();
+ return timer.subscribe();
+ }
+ let now: Time = (Instant::now() + duration - self.zero).into();
+ {
+ let timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).read();
+ if let Some(t) = timers.get(&now) {
+ return t.subscribe();
+ }
+ } // drop read lock
+
+ let timer = Timer::new();
+ let mut timers = self.timers.get_or(|| RwLock::new(BTreeMap::new())).write();
+ // Usually we check if another thread has insert the same node before we get the write lock,
+ // but because only this thread will insert anything to its local timers tree, there
+ // is no possible race that can happen. The only other thread is the clock thread who
+ // only removes timer from the tree
+ let stub = timer.subscribe();
+ timers.insert(now, timer);
+ stub
+ }
+
+ fn is_paused_for_fork(&self) -> bool {
+ self.paused.load(Ordering::SeqCst)
+ }
+
+ /// Pause the timer for fork()
+ ///
+ /// Because RwLock across fork() is undefined behavior, this function makes sure that no one
+ /// holds any locks.
+ ///
+ /// This function should be called right before fork().
+ pub fn pause_for_fork(&self) {
+ self.paused.store(true, Ordering::SeqCst);
+ // wait for everything to get out of their locks
+ std::thread::sleep(RESOLUTION_DURATION * 2);
+ }
+
+ /// Unpause the timer after fork()
+ ///
+ /// This function should be called right after fork().
+ pub fn unpause(&self) {
+ self.paused.store(false, Ordering::SeqCst)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_round() {
+ assert_eq!(round_to(30, 10), 30);
+ assert_eq!(round_to(31, 10), 40);
+ assert_eq!(round_to(29, 10), 30);
+ }
+
+ #[test]
+ fn test_time() {
+ let t: Time = 128.into(); // t will round to 130
+ assert_eq!(t, Duration::from_millis(130).into());
+ assert!(!t.not_after(128));
+ assert!(!t.not_after(129));
+ assert!(t.not_after(130));
+ assert!(t.not_after(131));
+ }
+
+ #[tokio::test]
+ async fn test_timer_manager() {
+ let tm_a = Arc::new(TimerManager::new());
+ let tm = tm_a.clone();
+ std::thread::spawn(move || tm_a.clock_thread());
+
+ let now = Instant::now();
+ let t1 = tm.register_timer(Duration::from_secs(1));
+ let t2 = tm.register_timer(Duration::from_secs(1));
+ t1.poll().await;
+ assert_eq!(now.elapsed().as_secs(), 1);
+ let now = Instant::now();
+ t2.poll().await;
+ // t2 fired along t1 so no extra wait time
+ assert_eq!(now.elapsed().as_secs(), 0);
+ }
+
+ #[test]
+ fn test_timer_manager_start_check() {
+ let tm = Arc::new(TimerManager::new());
+ assert!(tm.should_i_start_clock());
+ assert!(!tm.should_i_start_clock());
+ assert!(tm.is_clock_running().is_ok());
+ }
+
+ #[test]
+ fn test_timer_manager_watchdog() {
+ let tm = Arc::new(TimerManager::new());
+ assert!(tm.should_i_start_clock());
+ assert!(!tm.should_i_start_clock());
+
+ // we don't actually start the clock thread, sleep for the watchdog to expire
+ std::thread::sleep(Duration::from_secs(DELAYS_SEC as u64 + 1));
+ assert!(tm.is_clock_running().is_err());
+ assert!(tm.should_i_start_clock());
+ }
+
+ #[tokio::test]
+ async fn test_timer_manager_pause() {
+ let tm_a = Arc::new(TimerManager::new());
+ let tm = tm_a.clone();
+ std::thread::spawn(move || tm_a.clock_thread());
+
+ let now = Instant::now();
+ let t1 = tm.register_timer(Duration::from_secs(2));
+ tm.pause_for_fork();
+ // no actual fork happen, we just test that pause and unpause work
+
+ // any timer in this critical section is timed out right away
+ let t2 = tm.register_timer(Duration::from_secs(2));
+ t2.poll().await;
+ assert_eq!(now.elapsed().as_secs(), 0);
+
+ std::thread::sleep(Duration::from_secs(1));
+ tm.unpause();
+ t1.poll().await;
+ assert_eq!(now.elapsed().as_secs(), 2);
+ }
+}