diff options
Diffstat (limited to 'libs/hbb_common/src/tcp.rs')
-rw-r--r-- | libs/hbb_common/src/tcp.rs | 144 |
1 files changed, 91 insertions, 53 deletions
diff --git a/libs/hbb_common/src/tcp.rs b/libs/hbb_common/src/tcp.rs index 7966920..a1322fc 100644 --- a/libs/hbb_common/src/tcp.rs +++ b/libs/hbb_common/src/tcp.rs @@ -5,7 +5,7 @@ use protobuf::Message; use sodiumoxide::crypto::secretbox::{self, Key, Nonce}; use std::{ io::{self, Error, ErrorKind}, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, @@ -73,73 +73,79 @@ fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std: } impl FramedStream { - pub async fn new<T1: ToSocketAddrs, T2: ToSocketAddrs>( - remote_addr: T1, - local_addr: T2, + pub async fn new<T: ToSocketAddrs + std::fmt::Display>( + remote_addr: T, + local_addr: Option<SocketAddr>, ms_timeout: u64, ) -> ResultType<Self> { - for local_addr in lookup_host(&local_addr).await? { - for remote_addr in lookup_host(&remote_addr).await? { - let stream = super::timeout( - ms_timeout, - new_socket(local_addr, true)?.connect(remote_addr), - ) - .await??; - stream.set_nodelay(true).ok(); - let addr = stream.local_addr()?; - return Ok(Self( - Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), - addr, - None, - 0, - )); + for remote_addr in lookup_host(&remote_addr).await? { + let local = if let Some(addr) = local_addr { + addr + } else { + crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4()) + }; + if let Ok(socket) = new_socket(local, true) { + if let Ok(Ok(stream)) = + super::timeout(ms_timeout, socket.connect(remote_addr)).await + { + stream.set_nodelay(true).ok(); + let addr = stream.local_addr()?; + return Ok(Self( + Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + addr, + None, + 0, + )); + } } } - bail!("could not resolve to any address"); + bail!(format!("Failed to connect to {}", remote_addr)); } - pub async fn connect<'a, 't, P, T1, T2>( + pub async fn connect<'a, 't, P, T>( proxy: P, - target: T1, - local: T2, + target: T, + local_addr: Option<SocketAddr>, username: &'a str, password: &'a str, ms_timeout: u64, ) -> ResultType<Self> where P: ToProxyAddrs, - T1: IntoTargetAddr<'t>, - T2: ToSocketAddrs, + T: IntoTargetAddr<'t>, { - if let Some(local) = lookup_host(&local).await?.next() { - if let Some(proxy) = proxy.to_proxy_addrs().next().await { - let stream = - super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy?)).await??; - stream.set_nodelay(true).ok(); - let stream = if username.trim().is_empty() { - super::timeout( - ms_timeout, - Socks5Stream::connect_with_socket(stream, target), - ) - .await?? - } else { - super::timeout( - ms_timeout, - Socks5Stream::connect_with_password_and_socket( - stream, target, username, password, - ), - ) - .await?? - }; - let addr = stream.local_addr()?; - return Ok(Self( - Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), - addr, - None, - 0, - )); + if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await { + let local = if let Some(addr) = local_addr { + addr + } else { + crate::config::Config::get_any_listen_addr(proxy.is_ipv4()) + }; + let stream = + super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??; + stream.set_nodelay(true).ok(); + let stream = if username.trim().is_empty() { + super::timeout( + ms_timeout, + Socks5Stream::connect_with_socket(stream, target), + ) + .await?? + } else { + super::timeout( + ms_timeout, + Socks5Stream::connect_with_password_and_socket( + stream, target, username, password, + ), + ) + .await?? }; - }; + let addr = stream.local_addr()?; + return Ok(Self( + Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + addr, + None, + 0, + )); + } bail!("could not resolve to any address"); } @@ -252,6 +258,38 @@ pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType< } } +pub async fn listen_any(port: u16) -> ResultType<TcpListener> { + if let Ok(mut socket) = TcpSocket::new_v6() { + #[cfg(unix)] + { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + let raw_fd = socket.into_raw_fd(); + let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) }; + sock2.set_only_v6(false).ok(); + socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) }; + } + #[cfg(windows)] + { + use std::os::windows::prelude::{FromRawSocket, IntoRawSocket}; + let raw_socket = socket.into_raw_socket(); + let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) }; + sock2.set_only_v6(false).ok(); + socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) }; + } + if socket + .bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)) + .is_ok() + { + if let Ok(l) = socket.listen(DEFAULT_BACKLOG) { + return Ok(l); + } + } + } + let s = TcpSocket::new_v4()?; + s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?; + Ok(s.listen(DEFAULT_BACKLOG)?) +} + impl Unpin for DynTcpStream {} impl AsyncRead for DynTcpStream { |