diff options
author | fufesou <[email protected]> | 2022-07-26 21:54:18 +0800 |
---|---|---|
committer | fufesou <[email protected]> | 2022-07-26 23:03:30 +0800 |
commit | a4940f4634ab43568dedecacee2f31b5d2dc3907 (patch) | |
tree | 7e48e6f3ed6bc6d9a73758b2393a61b9536cc50d /src | |
parent | 8c477c8cd04a12d02fc69849a3a337c011179fda (diff) | |
download | rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.tar.gz rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.zip |
peer_online_state: serve online state
Signed-off-by: fufesou <[email protected]>
Diffstat (limited to 'src')
-rw-r--r-- | src/rendezvous_server.rs | 57 |
1 files changed, 49 insertions, 8 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 60f95d9..9650602 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -757,6 +757,40 @@ impl RendezvousServer { } #[inline] + async fn handle_online_request( + &mut self, + stream: &mut FramedStream, + peers: Vec<String>, + ) -> ResultType<()> { + let mut onlines = Vec::new(); + for peer_id in &peers { + if let Some(peer) = self.pm.get_in_memory(&peer_id).await { + let (elapsed, _) = { + let r = peer.read().await; + (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) + }; + if elapsed < REG_TIMEOUT { + onlines.push(peer_id.clone()); + } + } + } + let offlines = peers + .into_iter() + .filter(|p| onlines.iter().position(|r| r == p).is_none()) + .collect::<Vec<_>>(); + + let mut msg_out = RendezvousMessage::new(); + msg_out.set_online_response(OnlineResponse { + onlines, + offlines, + ..Default::default() + }); + stream.send(&msg_out).await?; + + Ok(()) + } + + #[inline] async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) { let mut tcp = self.tcp_punch.lock().await.remove(&addr); tokio::spawn(async move { @@ -1014,8 +1048,8 @@ impl RendezvousServer { } async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) { + let mut rs = self.clone(); if addr.ip().to_string() == "127.0.0.1" { - let rs = self.clone(); tokio::spawn(async move { let mut stream = stream; let mut buffer = [0; 64]; @@ -1033,13 +1067,20 @@ impl RendezvousServer { let mut stream = stream; if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union { - let mut msg_out = RendezvousMessage::new(); - msg_out.set_test_nat_response(TestNatResponse { - port: addr.port() as _, - ..Default::default() - }); - stream.send(&msg_out).await.ok(); + match msg_in.union { + Some(rendezvous_message::Union::TestNatRequest(_)) => { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_response(TestNatResponse { + port: addr.port() as _, + ..Default::default() + }); + stream.send(&msg_out).await.ok(); + } + Some(rendezvous_message::Union::OnlineRequest(or)) => { + allow_err!(rs.handle_online_request(&mut stream, or.peers).await); + } + _ => { + } } } } |