use crate::{bail, bytes_codec::BytesCodec, ResultType}; use anyhow::Context as AnyhowCtx; use bytes::{BufMut, Bytes, BytesMut}; use futures::{SinkExt, StreamExt}; use protobuf::Message; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::{ io::{self, Error, ErrorKind}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, }; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, net::{lookup_host, TcpListener, TcpSocket, ToSocketAddrs}, }; use tokio_socks::{tcp::Socks5Stream, IntoTargetAddr, ToProxyAddrs}; use tokio_util::codec::Framed; pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {} pub struct DynTcpStream(Box); pub struct FramedStream( Framed, SocketAddr, Option<(Key, u64, u64)>, u64, ); impl Deref for FramedStream { type Target = Framed; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for FramedStream { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } impl Deref for DynTcpStream { type Target = Box; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for DynTcpStream { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result { let socket = match addr { std::net::SocketAddr::V4(..) => TcpSocket::new_v4()?, std::net::SocketAddr::V6(..) => TcpSocket::new_v6()?, }; if reuse { // windows has no reuse_port, but it's reuse_address // almost equals to unix's reuse_port + reuse_address, // though may introduce nondeterministic behavior #[cfg(unix)] socket.set_reuseport(true)?; socket.set_reuseaddr(true)?; } socket.bind(addr)?; Ok(socket) } impl FramedStream { pub async fn new( remote_addr: T, local_addr: Option, ms_timeout: u64, ) -> ResultType { for remote_addr in lookup_host(&remote_addr).await? { let local = if let Some(addr) = local_addr { addr } else { crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4()) }; if let Ok(socket) = new_socket(local, true) { if let Ok(Ok(stream)) = super::timeout(ms_timeout, socket.connect(remote_addr)).await { stream.set_nodelay(true).ok(); let addr = stream.local_addr()?; return Ok(Self( Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), addr, None, 0, )); } } } bail!(format!("Failed to connect to {remote_addr}")); } pub async fn connect<'a, 't, P, T>( proxy: P, target: T, local_addr: Option, username: &'a str, password: &'a str, ms_timeout: u64, ) -> ResultType where P: ToProxyAddrs, T: IntoTargetAddr<'t>, { if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await { let local = if let Some(addr) = local_addr { addr } else { crate::config::Config::get_any_listen_addr(proxy.is_ipv4()) }; let stream = super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??; stream.set_nodelay(true).ok(); let stream = if username.trim().is_empty() { super::timeout( ms_timeout, Socks5Stream::connect_with_socket(stream, target), ) .await?? } else { super::timeout( ms_timeout, Socks5Stream::connect_with_password_and_socket( stream, target, username, password, ), ) .await?? }; let addr = stream.local_addr()?; return Ok(Self( Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), addr, None, 0, )); } bail!("could not resolve to any address"); } pub fn local_addr(&self) -> SocketAddr { self.1 } pub fn set_send_timeout(&mut self, ms: u64) { self.3 = ms; } pub fn from(stream: impl TcpStreamTrait + Send + Sync + 'static, addr: SocketAddr) -> Self { Self( Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), addr, None, 0, ) } pub fn set_raw(&mut self) { self.0.codec_mut().set_raw(); self.2 = None; } pub fn is_secured(&self) -> bool { self.2.is_some() } #[inline] pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> { self.send_raw(msg.write_to_bytes()?).await } #[inline] pub async fn send_raw(&mut self, msg: Vec) -> ResultType<()> { let mut msg = msg; if let Some(key) = self.2.as_mut() { key.1 += 1; let nonce = Self::get_nonce(key.1); msg = secretbox::seal(&msg, &nonce, &key.0); } self.send_bytes(bytes::Bytes::from(msg)).await?; Ok(()) } #[inline] pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> { if self.3 > 0 { super::timeout(self.3, self.0.send(bytes)).await??; } else { self.0.send(bytes).await?; } Ok(()) } #[inline] pub async fn next(&mut self) -> Option> { let mut res = self.0.next().await; if let Some(key) = self.2.as_mut() { if let Some(Ok(bytes)) = res.as_mut() { key.2 += 1; let nonce = Self::get_nonce(key.2); match secretbox::open(bytes, &nonce, &key.0) { Ok(res) => { bytes.clear(); bytes.put_slice(&res); } Err(()) => { return Some(Err(Error::new(ErrorKind::Other, "decryption error"))); } } } } res } #[inline] pub async fn next_timeout(&mut self, ms: u64) -> Option> { if let Ok(res) = super::timeout(ms, self.next()).await { res } else { None } } pub fn set_key(&mut self, key: Key) { self.2 = Some((key, 0, 0)); } fn get_nonce(seqnum: u64) -> Nonce { let mut nonce = Nonce([0u8; secretbox::NONCEBYTES]); nonce.0[..std::mem::size_of_val(&seqnum)].copy_from_slice(&seqnum.to_le_bytes()); nonce } } const DEFAULT_BACKLOG: u32 = 128; pub async fn new_listener(addr: T, reuse: bool) -> ResultType { if !reuse { Ok(TcpListener::bind(addr).await?) } else { let addr = lookup_host(&addr) .await? .next() .context("could not resolve to any address")?; new_socket(addr, true)? .listen(DEFAULT_BACKLOG) .map_err(anyhow::Error::msg) } } pub async fn listen_any(port: u16, reuse: bool) -> ResultType { if let Ok(mut socket) = TcpSocket::new_v6() { if reuse { // windows has no reuse_port, but it's reuse_address // almost equals to unix's reuse_port + reuse_address, // though may introduce nondeterministic behavior #[cfg(unix)] socket.set_reuseport(true).ok(); socket.set_reuseaddr(true).ok(); } #[cfg(unix)] { use std::os::unix::io::{FromRawFd, IntoRawFd}; let raw_fd = socket.into_raw_fd(); let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) }; sock2.set_only_v6(false).ok(); socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) }; } #[cfg(windows)] { use std::os::windows::prelude::{FromRawSocket, IntoRawSocket}; let raw_socket = socket.into_raw_socket(); let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) }; sock2.set_only_v6(false).ok(); socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) }; } if socket .bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)) .is_ok() { if let Ok(l) = socket.listen(DEFAULT_BACKLOG) { return Ok(l); } } } let s = TcpSocket::new_v4()?; s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?; Ok(s.listen(DEFAULT_BACKLOG)?) } impl Unpin for DynTcpStream {} impl AsyncRead for DynTcpStream { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { AsyncRead::poll_read(Pin::new(&mut self.0), cx, buf) } } impl AsyncWrite for DynTcpStream { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) } } impl TcpStreamTrait for R {}