aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/hbb_common/src/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'libs/hbb_common/src/tcp.rs')
-rw-r--r--libs/hbb_common/src/tcp.rs144
1 files changed, 91 insertions, 53 deletions
diff --git a/libs/hbb_common/src/tcp.rs b/libs/hbb_common/src/tcp.rs
index 7966920..a1322fc 100644
--- a/libs/hbb_common/src/tcp.rs
+++ b/libs/hbb_common/src/tcp.rs
@@ -5,7 +5,7 @@ use protobuf::Message;
use sodiumoxide::crypto::secretbox::{self, Key, Nonce};
use std::{
io::{self, Error, ErrorKind},
- net::SocketAddr,
+ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
@@ -73,73 +73,79 @@ fn new_socket(addr: std::net::SocketAddr, reuse: bool) -> Result<TcpSocket, std:
}
impl FramedStream {
- pub async fn new<T1: ToSocketAddrs, T2: ToSocketAddrs>(
- remote_addr: T1,
- local_addr: T2,
+ pub async fn new<T: ToSocketAddrs + std::fmt::Display>(
+ remote_addr: T,
+ local_addr: Option<SocketAddr>,
ms_timeout: u64,
) -> ResultType<Self> {
- for local_addr in lookup_host(&local_addr).await? {
- for remote_addr in lookup_host(&remote_addr).await? {
- let stream = super::timeout(
- ms_timeout,
- new_socket(local_addr, true)?.connect(remote_addr),
- )
- .await??;
- stream.set_nodelay(true).ok();
- let addr = stream.local_addr()?;
- return Ok(Self(
- Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
- addr,
- None,
- 0,
- ));
+ for remote_addr in lookup_host(&remote_addr).await? {
+ let local = if let Some(addr) = local_addr {
+ addr
+ } else {
+ crate::config::Config::get_any_listen_addr(remote_addr.is_ipv4())
+ };
+ if let Ok(socket) = new_socket(local, true) {
+ if let Ok(Ok(stream)) =
+ super::timeout(ms_timeout, socket.connect(remote_addr)).await
+ {
+ stream.set_nodelay(true).ok();
+ let addr = stream.local_addr()?;
+ return Ok(Self(
+ Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
+ addr,
+ None,
+ 0,
+ ));
+ }
}
}
- bail!("could not resolve to any address");
+ bail!(format!("Failed to connect to {}", remote_addr));
}
- pub async fn connect<'a, 't, P, T1, T2>(
+ pub async fn connect<'a, 't, P, T>(
proxy: P,
- target: T1,
- local: T2,
+ target: T,
+ local_addr: Option<SocketAddr>,
username: &'a str,
password: &'a str,
ms_timeout: u64,
) -> ResultType<Self>
where
P: ToProxyAddrs,
- T1: IntoTargetAddr<'t>,
- T2: ToSocketAddrs,
+ T: IntoTargetAddr<'t>,
{
- if let Some(local) = lookup_host(&local).await?.next() {
- if let Some(proxy) = proxy.to_proxy_addrs().next().await {
- let stream =
- super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy?)).await??;
- stream.set_nodelay(true).ok();
- let stream = if username.trim().is_empty() {
- super::timeout(
- ms_timeout,
- Socks5Stream::connect_with_socket(stream, target),
- )
- .await??
- } else {
- super::timeout(
- ms_timeout,
- Socks5Stream::connect_with_password_and_socket(
- stream, target, username, password,
- ),
- )
- .await??
- };
- let addr = stream.local_addr()?;
- return Ok(Self(
- Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
- addr,
- None,
- 0,
- ));
+ if let Some(Ok(proxy)) = proxy.to_proxy_addrs().next().await {
+ let local = if let Some(addr) = local_addr {
+ addr
+ } else {
+ crate::config::Config::get_any_listen_addr(proxy.is_ipv4())
+ };
+ let stream =
+ super::timeout(ms_timeout, new_socket(local, true)?.connect(proxy)).await??;
+ stream.set_nodelay(true).ok();
+ let stream = if username.trim().is_empty() {
+ super::timeout(
+ ms_timeout,
+ Socks5Stream::connect_with_socket(stream, target),
+ )
+ .await??
+ } else {
+ super::timeout(
+ ms_timeout,
+ Socks5Stream::connect_with_password_and_socket(
+ stream, target, username, password,
+ ),
+ )
+ .await??
};
- };
+ let addr = stream.local_addr()?;
+ return Ok(Self(
+ Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()),
+ addr,
+ None,
+ 0,
+ ));
+ }
bail!("could not resolve to any address");
}
@@ -252,6 +258,38 @@ pub async fn new_listener<T: ToSocketAddrs>(addr: T, reuse: bool) -> ResultType<
}
}
+pub async fn listen_any(port: u16) -> ResultType<TcpListener> {
+ if let Ok(mut socket) = TcpSocket::new_v6() {
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::{FromRawFd, IntoRawFd};
+ let raw_fd = socket.into_raw_fd();
+ let sock2 = unsafe { socket2::Socket::from_raw_fd(raw_fd) };
+ sock2.set_only_v6(false).ok();
+ socket = unsafe { TcpSocket::from_raw_fd(sock2.into_raw_fd()) };
+ }
+ #[cfg(windows)]
+ {
+ use std::os::windows::prelude::{FromRawSocket, IntoRawSocket};
+ let raw_socket = socket.into_raw_socket();
+ let sock2 = unsafe { socket2::Socket::from_raw_socket(raw_socket) };
+ sock2.set_only_v6(false).ok();
+ socket = unsafe { TcpSocket::from_raw_socket(sock2.into_raw_socket()) };
+ }
+ if socket
+ .bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port))
+ .is_ok()
+ {
+ if let Ok(l) = socket.listen(DEFAULT_BACKLOG) {
+ return Ok(l);
+ }
+ }
+ }
+ let s = TcpSocket::new_v4()?;
+ s.bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port))?;
+ Ok(s.listen(DEFAULT_BACKLOG)?)
+}
+
impl Unpin for DynTcpStream {}
impl AsyncRead for DynTcpStream {