aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDaniel Dao <[email protected]>2024-09-03 15:31:39 +0100
committerKevin Guthrie <[email protected]>2024-10-07 14:23:08 -0400
commitd811795938cee5a6eb7cd46399cef17210a0d0c5 (patch)
tree8ee066c16f860083fb100791fe08190689787a56
parentcc96400232af789b1796b917438816bb7110c9f8 (diff)
downloadpingora-d811795938cee5a6eb7cd46399cef17210a0d0c5.tar.gz
pingora-d811795938cee5a6eb7cd46399cef17210a0d0c5.zip
support retrieving rx timestamp for TcpStream
Documentation: https://docs.kernel.org/networking/timestamping.html This modifies Stream type to be able to setup rx timestamp on its socket, and use `recvmsg` to read the returned timestamp. The rx timestamp of the last recvmsg is stored as `rx_ts` In the context of tcp stream, if the returned byte range spans multiple timestamps, the last one is returned.
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/protocols/l4/stream.rs242
2 files changed, 236 insertions, 8 deletions
diff --git a/.bleep b/.bleep
index 321c159..a5d99c6 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-7e74e88f3e50bfdb38eeea244f3100d8477db7e4 \ No newline at end of file
+2b28af8029c2e74b642c3a7445dfd6768eda1b24 \ No newline at end of file
diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs
index edcb188..37f7486 100644
--- a/pingora-core/src/protocols/l4/stream.rs
+++ b/pingora-core/src/protocols/l4/stream.rs
@@ -17,13 +17,15 @@
use async_trait::async_trait;
use futures::FutureExt;
use log::{debug, error};
+
use pingora_error::{ErrorType::*, OrErr, Result};
+use std::io::IoSliceMut;
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};
-use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf};
+use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, Interest, ReadBuf};
use tokio::net::{TcpStream, UnixStream};
use crate::protocols::l4::ext::{set_tcp_keepalive, TcpKeepalive};
@@ -118,6 +120,162 @@ impl AsRawFd for RawStream {
}
}
+#[derive(Debug)]
+struct RawStreamWrapper {
+ pub(crate) stream: RawStream,
+ /// store the last rx timestamp of the stream.
+ pub(crate) rx_ts: Option<SystemTime>,
+ #[cfg(target_os = "linux")]
+ /// This can be reused across multiple recvmsg calls. The cmsg buffer may
+ /// come from old sockets created by older version of pingora and so,
+ /// this vector can only grow.
+ reusable_cmsg_space: Vec<u8>,
+}
+
+impl RawStreamWrapper {
+ pub fn new(stream: RawStream) -> Self {
+ RawStreamWrapper {
+ stream,
+ rx_ts: None,
+ #[cfg(target_os = "linux")]
+ reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec),
+ }
+ }
+}
+
+impl AsyncRead for RawStreamWrapper {
+ #[cfg(not(target_os = "linux"))]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: Basic enum pin projection
+ unsafe {
+ let rs_wrapper = Pin::get_unchecked_mut(self);
+ match &mut rs_wrapper.stream {
+ RawStream::Tcp(s) => Pin::new_unchecked(s).poll_read(cx, buf),
+ RawStream::Unix(s) => Pin::new_unchecked(s).poll_read(cx, buf),
+ }
+ }
+ }
+
+ #[cfg(target_os = "linux")]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ use futures::ready;
+ use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage};
+
+ // Safety: Basic pin projection to get mutable stream
+ let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) };
+ match &mut rs_wrapper.stream {
+ RawStream::Tcp(s) => {
+ loop {
+ ready!(s.poll_read_ready(cx))?;
+ // Safety: maybe uninitialized bytes will only be passed to recvmsg
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>]
+ as *mut [u8])
+ };
+ let mut iov = [IoSliceMut::new(b)];
+ rs_wrapper.reusable_cmsg_space.clear();
+
+ match s.try_io(Interest::READABLE, || {
+ recvmsg::<SockaddrStorage>(
+ s.as_raw_fd(),
+ &mut iov,
+ Some(&mut rs_wrapper.reusable_cmsg_space),
+ MsgFlags::empty(),
+ )
+ .map_err(|errno| errno.into())
+ }) {
+ Ok(r) => {
+ if let Some(ControlMessageOwned::ScmTimestampsns(rtime)) = r
+ .cmsgs()
+ .find(|i| matches!(i, ControlMessageOwned::ScmTimestampsns(_)))
+ {
+ // The returned timestamp is a real (i.e. not monotonic) timestamp
+ // https://docs.kernel.org/networking/timestamping.html
+ rs_wrapper.rx_ts =
+ SystemTime::UNIX_EPOCH.checked_add(rtime.system.into());
+ }
+ // Safety: We trust `recvmsg` to have filled up `r.bytes` bytes in the buffer.
+ unsafe {
+ buf.assume_init(r.bytes);
+ }
+ buf.advance(r.bytes);
+ return Poll::Ready(Ok(()));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+ }
+ // Unix RX timestamp only works with datagram for now, so we do not care about it
+ RawStream::Unix(s) => unsafe { Pin::new_unchecked(s).poll_read(cx, buf) },
+ }
+ }
+}
+
+impl AsyncWrite for RawStreamWrapper {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
+ // Safety: Basic enum pin projection
+ unsafe {
+ match &mut Pin::get_unchecked_mut(self).stream {
+ RawStream::Tcp(s) => Pin::new_unchecked(s).poll_write(cx, buf),
+ RawStream::Unix(s) => Pin::new_unchecked(s).poll_write(cx, buf),
+ }
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+ // Safety: Basic enum pin projection
+ unsafe {
+ match &mut Pin::get_unchecked_mut(self).stream {
+ RawStream::Tcp(s) => Pin::new_unchecked(s).poll_flush(cx),
+ RawStream::Unix(s) => Pin::new_unchecked(s).poll_flush(cx),
+ }
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
+ // Safety: Basic enum pin projection
+ unsafe {
+ match &mut Pin::get_unchecked_mut(self).stream {
+ RawStream::Tcp(s) => Pin::new_unchecked(s).poll_shutdown(cx),
+ RawStream::Unix(s) => Pin::new_unchecked(s).poll_shutdown(cx),
+ }
+ }
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ // Safety: Basic enum pin projection
+ unsafe {
+ match &mut Pin::get_unchecked_mut(self).stream {
+ RawStream::Tcp(s) => Pin::new_unchecked(s).poll_write_vectored(cx, bufs),
+ RawStream::Unix(s) => Pin::new_unchecked(s).poll_write_vectored(cx, bufs),
+ }
+ }
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.stream.is_write_vectored()
+ }
+}
+
+impl AsRawFd for RawStreamWrapper {
+ fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
+ self.stream.as_raw_fd()
+ }
+}
+
// Large read buffering helps reducing syscalls with little trade-off
// Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot.
const BUF_READ_SIZE: usize = 64 * 1024;
@@ -133,7 +291,7 @@ const BUF_WRITE_SIZE: usize = 1460;
/// A concrete type for transport layer connection + extra fields for logging
#[derive(Debug)]
pub struct Stream {
- stream: BufStream<RawStream>,
+ stream: BufStream<RawStreamWrapper>,
buffer_write: bool,
proxy_digest: Option<Arc<ProxyDigest>>,
socket_digest: Option<Arc<SocketDigest>>,
@@ -143,12 +301,14 @@ pub struct Stream {
pub tracer: Option<Tracer>,
read_pending_time: AccumulatedDuration,
write_pending_time: AccumulatedDuration,
+ /// Last rx timestamp associated with the last recvmsg call.
+ pub rx_ts: Option<SystemTime>,
}
impl Stream {
/// set TCP nodelay for this connection if `self` is TCP
pub fn set_nodelay(&mut self) -> Result<()> {
- if let RawStream::Tcp(s) = &self.stream.get_ref() {
+ if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
s.set_nodelay(true)
.or_err(ConnectError, "failed to set_nodelay")?;
}
@@ -157,18 +317,40 @@ impl Stream {
/// set TCP keepalive settings for this connection if `self` is TCP
pub fn set_keepalive(&mut self, ka: &TcpKeepalive) -> Result<()> {
- if let RawStream::Tcp(s) = &self.stream.get_ref() {
+ if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
debug!("Setting tcp keepalive");
set_tcp_keepalive(s, ka)?;
}
Ok(())
}
+
+ #[cfg(target_os = "linux")]
+ pub fn set_rx_timestamp(&mut self) -> Result<()> {
+ use nix::sys::socket::{setsockopt, sockopt, TimestampingFlag};
+
+ if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
+ let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
+ | TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
+ return setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
+ .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE");
+ }
+ Ok(())
+ }
+
+ #[cfg(not(target_os = "linux"))]
+ pub fn set_rx_timestamp(&mut self) -> io::Result<()> {
+ Ok(())
+ }
}
impl From<TcpStream> for Stream {
fn from(s: TcpStream) -> Self {
Stream {
- stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Tcp(s)),
+ stream: BufStream::with_capacity(
+ BUF_READ_SIZE,
+ BUF_WRITE_SIZE,
+ RawStreamWrapper::new(RawStream::Tcp(s)),
+ ),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
@@ -176,6 +358,7 @@ impl From<TcpStream> for Stream {
tracer: None,
read_pending_time: AccumulatedDuration::new(),
write_pending_time: AccumulatedDuration::new(),
+ rx_ts: None,
}
}
}
@@ -183,7 +366,11 @@ impl From<TcpStream> for Stream {
impl From<UnixStream> for Stream {
fn from(s: UnixStream) -> Self {
Stream {
- stream: BufStream::with_capacity(BUF_READ_SIZE, BUF_WRITE_SIZE, RawStream::Unix(s)),
+ stream: BufStream::with_capacity(
+ BUF_READ_SIZE,
+ BUF_WRITE_SIZE,
+ RawStreamWrapper::new(RawStream::Unix(s)),
+ ),
buffer_write: true,
established_ts: SystemTime::now(),
proxy_digest: None,
@@ -191,6 +378,7 @@ impl From<UnixStream> for Stream {
tracer: None,
read_pending_time: AccumulatedDuration::new(),
write_pending_time: AccumulatedDuration::new(),
+ rx_ts: None,
}
}
}
@@ -262,7 +450,7 @@ impl Drop for Stream {
t.0.on_disconnected();
}
/* use nodelay/local_addr function to detect socket status */
- let ret = match &self.stream.get_ref() {
+ let ret = match &self.stream.get_ref().stream {
RawStream::Tcp(s) => s.nodelay().err(),
RawStream::Unix(s) => s.local_addr().err(),
};
@@ -298,6 +486,7 @@ impl AsyncRead for Stream {
) -> Poll<io::Result<()>> {
let result = Pin::new(&mut self.stream).poll_read(cx, buf);
self.read_pending_time.poll_time(&result);
+ self.rx_ts = self.stream.get_ref().rx_ts;
result
}
}
@@ -528,3 +717,42 @@ impl AccumulatedDuration {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::Arc;
+ use tokio::io::AsyncReadExt;
+ use tokio::io::AsyncWriteExt;
+ use tokio::net::TcpListener;
+ use tokio::sync::Notify;
+
+ #[cfg(target_os = "linux")]
+ #[tokio::test]
+ async fn test_rx_timestamp() {
+ let message = "hello world".as_bytes();
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let notify = Arc::new(Notify::new());
+ let notify2 = notify.clone();
+
+ tokio::spawn(async move {
+ let (mut stream, _) = listener.accept().await.unwrap();
+ notify2.notified().await;
+ stream.write_all(message).await.unwrap();
+ });
+
+ let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into();
+ stream.set_rx_timestamp().unwrap();
+ // Receive the message
+ // setsockopt for SO_TIMESTAMPING is asynchronous so sleep a little bit
+ // to let kernel do the work
+ std::thread::sleep(Duration::from_micros(100));
+ notify.notify_one();
+
+ let mut buffer = vec![0u8; message.len()];
+ let n = stream.read(buffer.as_mut_slice()).await.unwrap();
+ assert_eq!(n, message.len());
+ assert!(stream.rx_ts.is_some());
+ }
+}