diff options
author | open-trade <[email protected]> | 2020-05-11 15:47:13 +0800 |
---|---|---|
committer | open-trade <[email protected]> | 2020-05-11 15:47:13 +0800 |
commit | 4e07acdb6cd096af344cefc4a037ed8c69fd9189 (patch) | |
tree | 86f1e97edb182f3886fe1f05f2c67f749cea7c45 | |
parent | 0034fd695b15cc3754eefa694dc39a381dae0002 (diff) | |
download | rustdesk-server-4e07acdb6cd096af344cefc4a037ed8c69fd9189.tar.gz rustdesk-server-4e07acdb6cd096af344cefc4a037ed8c69fd9189.zip |
handle tcp punch response via tcp connection
-rw-r--r-- | src/rendezvous_server.rs | 92 |
1 files changed, 70 insertions, 22 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 591693d..a60f16c 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -59,18 +59,30 @@ impl RendezvousServer { let tcp_punch = rs.tcp_punch.clone(); tcp_punch.lock().unwrap().insert(addr, a); let tx = tx.clone(); + let mut rs = Self { + peer_map: PeerMap::new(), + tcp_punch: tcp_punch, + }; tokio::spawn(async move { while let Some(Ok(bytes)) = b.next().await { if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) { match msg_in.union { Some(rendezvous_message::Union::punch_hole_request(ph)) => { - tx.send((addr, ph.id)).ok(); + allow_err!(tx.send((addr, ph.id))); + } + Some(rendezvous_message::Union::punch_hole_sent(phs)) => { + allow_err!(rs.handle_hole_sent(&phs, addr, None).await); + break; + } + Some(rendezvous_message::Union::local_addr(la)) => { + allow_err!(rs.handle_local_addr(&la, addr, None).await); + break; } _ => {} } } } - tcp_punch.lock().unwrap().remove(&addr); + rs.tcp_punch.lock().unwrap().remove(&addr); log::debug!("Tcp connection from {:?} closed", addr); }); } @@ -106,28 +118,10 @@ impl RendezvousServer { self.handle_punch_hole_request(addr, &ph.id, socket).await?; } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { - // punch hole sent from B, tell A that B is ready to be connected - let addr_a = AddrMangle::decode(&phs.socket_addr); - log::debug!("Punch hole response to {:?} from {:?}", &addr_a, &addr); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_punch_hole_response(PunchHoleResponse { - socket_addr: AddrMangle::encode(addr), - ..Default::default() - }); - socket.send(&msg_out, addr_a).await?; - self.send_to_tcp(&msg_out, addr_a).await?; + self.handle_hole_sent(&phs, addr, Some(socket)).await?; } Some(rendezvous_message::Union::local_addr(la)) => { - // forward local addrs of B to A - let addr_a = AddrMangle::decode(&la.socket_addr); - log::debug!("Local addrs response to {:?} from {:?}", &addr_a, &addr); - let mut msg_out = RendezvousMessage::new(); - msg_out.set_punch_hole_response(PunchHoleResponse { - socket_addr: la.local_addr, - ..Default::default() - }); - socket.send(&msg_out, addr_a).await?; - self.send_to_tcp(&msg_out, addr_a).await?; + self.handle_local_addr(&la, addr, Some(socket)).await?; } _ => {} } @@ -135,6 +129,60 @@ impl RendezvousServer { Ok(()) } + async fn handle_hole_sent<'a>( + &mut self, + phs: &PunchHoleSent, + addr: SocketAddr, + socket: Option<&'a mut FramedSocket>, + ) -> ResultType<()> { + // punch hole sent from B, tell A that B is ready to be connected + let addr_a = AddrMangle::decode(&phs.socket_addr); + log::debug!( + "{} punch hole response to {:?} from {:?}", + if socket.is_none() { "TCP" } else { "UDP" }, + &addr_a, + &addr + ); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_punch_hole_response(PunchHoleResponse { + socket_addr: AddrMangle::encode(addr), + ..Default::default() + }); + if let Some(socket) = socket { + socket.send(&msg_out, addr_a).await?; + } else { + self.send_to_tcp(&msg_out, addr_a).await?; + } + Ok(()) + } + + async fn handle_local_addr<'a>( + &mut self, + la: &LocalAddr, + addr: SocketAddr, + socket: Option<&'a mut FramedSocket>, + ) -> ResultType<()> { + // forward local addrs of B to A + let addr_a = AddrMangle::decode(&la.socket_addr); + log::debug!( + "{} local addrs response to {:?} from {:?}", + if socket.is_none() { "TCP" } else { "UDP" }, + &addr_a, + &addr + ); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_punch_hole_response(PunchHoleResponse { + socket_addr: la.local_addr.clone(), + ..Default::default() + }); + if let Some(socket) = socket { + socket.send(&msg_out, addr_a).await?; + } else { + self.send_to_tcp(&msg_out, addr_a).await?; + } + Ok(()) + } + async fn handle_punch_hole_request( &mut self, addr: SocketAddr, |