aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.bleep2
-rw-r--r--pingora-core/src/protocols/l4/stream.rs50
2 files changed, 49 insertions, 3 deletions
diff --git a/.bleep b/.bleep
index a5d99c6..f8542c1 100644
--- a/.bleep
+++ b/.bleep
@@ -1 +1 @@
-2b28af8029c2e74b642c3a7445dfd6768eda1b24 \ No newline at end of file
+5bbb21bd377e352872ab767af7583e4d8e9022f8 \ No newline at end of file
diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs
index 37f7486..8c5f3cf 100644
--- a/pingora-core/src/protocols/l4/stream.rs
+++ b/pingora-core/src/protocols/l4/stream.rs
@@ -125,6 +125,8 @@ struct RawStreamWrapper {
pub(crate) stream: RawStream,
/// store the last rx timestamp of the stream.
pub(crate) rx_ts: Option<SystemTime>,
+ /// enable reading rx timestamp
+ pub(crate) enable_rx_ts: bool,
#[cfg(target_os = "linux")]
/// This can be reused across multiple recvmsg calls. The cmsg buffer may
/// come from old sockets created by older version of pingora and so,
@@ -137,10 +139,15 @@ impl RawStreamWrapper {
RawStreamWrapper {
stream,
rx_ts: None,
+ enable_rx_ts: false,
#[cfg(target_os = "linux")]
reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec),
}
}
+
+ pub fn enable_rx_ts(&mut self, enable_rx_ts: bool) {
+ self.enable_rx_ts = enable_rx_ts;
+ }
}
impl AsyncRead for RawStreamWrapper {
@@ -169,6 +176,18 @@ impl AsyncRead for RawStreamWrapper {
use futures::ready;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage};
+ // if we do not need rx timestamp, then use the standard path
+ if !self.enable_rx_ts {
+ // Safety: Basic enum pin projection
+ unsafe {
+ let rs_wrapper = Pin::get_unchecked_mut(self);
+ match &mut rs_wrapper.stream {
+ RawStream::Tcp(s) => return Pin::new_unchecked(s).poll_read(cx, buf),
+ RawStream::Unix(s) => return Pin::new_unchecked(s).poll_read(cx, buf),
+ }
+ }
+ }
+
// Safety: Basic pin projection to get mutable stream
let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) };
match &mut rs_wrapper.stream {
@@ -331,9 +350,11 @@ impl Stream {
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
| TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
- return setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
- .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE");
+ setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
+ .or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?;
+ self.stream.get_mut().enable_rx_ts(true);
}
+
Ok(())
}
@@ -755,4 +776,29 @@ mod tests {
assert_eq!(n, message.len());
assert!(stream.rx_ts.is_some());
}
+
+ #[cfg(target_os = "linux")]
+ #[tokio::test]
+ async fn test_rx_timestamp_standard_path() {
+ let message = "hello world".as_bytes();
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let notify = Arc::new(Notify::new());
+ let notify2 = notify.clone();
+
+ tokio::spawn(async move {
+ let (mut stream, _) = listener.accept().await.unwrap();
+ notify2.notified().await;
+ stream.write_all(message).await.unwrap();
+ });
+
+ let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into();
+ std::thread::sleep(Duration::from_micros(100));
+ notify.notify_one();
+
+ let mut buffer = vec![0u8; message.len()];
+ let n = stream.read(buffer.as_mut_slice()).await.unwrap();
+ assert_eq!(n, message.len());
+ assert!(stream.rx_ts.is_none());
+ }
}