diff options
author | Daniel Dao <[email protected]> | 2024-09-09 11:49:55 +0100 |
---|---|---|
committer | Matthew Gumport <[email protected]> | 2024-09-23 21:23:52 +0000 |
commit | ebca6767829366f33564b447d6789c3927308c82 (patch) | |
tree | f27bb09e79678292d945a012f4743216e3cd16ce | |
parent | 7e0368df949bace91fc5518570ade42bcdd90636 (diff) | |
download | pingora-bleeper-kevinbartlett-5bbb21bd37.tar.gz pingora-bleeper-kevinbartlett-5bbb21bd37.zip |
Stream able to configure rx timestamp readingbleeper-kevinbartlett-5bbb21bd37
This allows us to optionally enable rx timestamp reading logic per
stream by calling `set_rx_timestamp`. When rx timestamp is disabled, the
standard battle-tested logic is used.
-rw-r--r-- | .bleep | 2 | ||||
-rw-r--r-- | pingora-core/src/protocols/l4/stream.rs | 50 |
2 files changed, 49 insertions, 3 deletions
@@ -1 +1 @@ -2b28af8029c2e74b642c3a7445dfd6768eda1b24
\ No newline at end of file +5bbb21bd377e352872ab767af7583e4d8e9022f8
\ No newline at end of file diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index 37f7486..8c5f3cf 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -125,6 +125,8 @@ struct RawStreamWrapper { pub(crate) stream: RawStream, /// store the last rx timestamp of the stream. pub(crate) rx_ts: Option<SystemTime>, + /// enable reading rx timestamp + pub(crate) enable_rx_ts: bool, #[cfg(target_os = "linux")] /// This can be reused across multiple recvmsg calls. The cmsg buffer may /// come from old sockets created by older version of pingora and so, @@ -137,10 +139,15 @@ impl RawStreamWrapper { RawStreamWrapper { stream, rx_ts: None, + enable_rx_ts: false, #[cfg(target_os = "linux")] reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec), } } + + pub fn enable_rx_ts(&mut self, enable_rx_ts: bool) { + self.enable_rx_ts = enable_rx_ts; + } } impl AsyncRead for RawStreamWrapper { @@ -169,6 +176,18 @@ impl AsyncRead for RawStreamWrapper { use futures::ready; use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage}; + // if we do not need rx timestamp, then use the standard path + if !self.enable_rx_ts { + // Safety: Basic enum pin projection + unsafe { + let rs_wrapper = Pin::get_unchecked_mut(self); + match &mut rs_wrapper.stream { + RawStream::Tcp(s) => return Pin::new_unchecked(s).poll_read(cx, buf), + RawStream::Unix(s) => return Pin::new_unchecked(s).poll_read(cx, buf), + } + } + } + // Safety: Basic pin projection to get mutable stream let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) }; match &mut rs_wrapper.stream { @@ -331,9 +350,11 @@ impl Stream { if let RawStream::Tcp(s) = &self.stream.get_mut().stream { let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE | TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE; - return setsockopt(s.as_raw_fd(), sockopt::Timestamping, ×tamp_options) - .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE"); + setsockopt(s.as_raw_fd(), sockopt::Timestamping, ×tamp_options) + .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?; + self.stream.get_mut().enable_rx_ts(true); } + Ok(()) } @@ -755,4 +776,29 @@ mod tests { assert_eq!(n, message.len()); assert!(stream.rx_ts.is_some()); } + + #[cfg(target_os = "linux")] + #[tokio::test] + async fn test_rx_timestamp_standard_path() { + let message = "hello world".as_bytes(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + + tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + notify2.notified().await; + stream.write_all(message).await.unwrap(); + }); + + let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into(); + std::thread::sleep(Duration::from_micros(100)); + notify.notify_one(); + + let mut buffer = vec![0u8; message.len()]; + let n = stream.read(buffer.as_mut_slice()).await.unwrap(); + assert_eq!(n, message.len()); + assert!(stream.rx_ts.is_none()); + } } |