diff options
author | fufesou <[email protected]> | 2022-07-26 21:54:18 +0800 |
---|---|---|
committer | fufesou <[email protected]> | 2022-07-26 23:03:30 +0800 |
commit | a4940f4634ab43568dedecacee2f31b5d2dc3907 (patch) | |
tree | 7e48e6f3ed6bc6d9a73758b2393a61b9536cc50d | |
parent | 8c477c8cd04a12d02fc69849a3a337c011179fda (diff) | |
download | rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.tar.gz rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.zip |
peer_online_state: serve online state
Signed-off-by: fufesou <[email protected]>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | libs/hbb_common/protos/rendezvous.proto | 12 | ||||
-rw-r--r-- | src/rendezvous_server.rs | 57 |
3 files changed, 62 insertions, 8 deletions
@@ -5,3 +5,4 @@ debian-build debian/.debhelper debian/debhelper-build-stamp .DS_Store +.vscode diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto index 2c5f1b3..894eba1 100644 --- a/libs/hbb_common/protos/rendezvous.proto +++ b/libs/hbb_common/protos/rendezvous.proto @@ -148,6 +148,16 @@ message PeerDiscovery { string misc = 7; } +message OnlineRequest { + string id = 1; + repeated string peers = 2; +} + +message OnlineResponse { + repeated string onlines = 1; + repeated string offlines = 2; +} + message RendezvousMessage { oneof union { RegisterPeer register_peer = 6; @@ -167,5 +177,7 @@ message RendezvousMessage { TestNatRequest test_nat_request = 20; TestNatResponse test_nat_response = 21; PeerDiscovery peer_discovery = 22; + OnlineRequest online_request = 23; + OnlineResponse online_response = 24; } } diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 60f95d9..9650602 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -757,6 +757,40 @@ impl RendezvousServer { } #[inline] + async fn handle_online_request( + &mut self, + stream: &mut FramedStream, + peers: Vec<String>, + ) -> ResultType<()> { + let mut onlines = Vec::new(); + for peer_id in &peers { + if let Some(peer) = self.pm.get_in_memory(&peer_id).await { + let (elapsed, _) = { + let r = peer.read().await; + (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr) + }; + if elapsed < REG_TIMEOUT { + onlines.push(peer_id.clone()); + } + } + } + let offlines = peers + .into_iter() + .filter(|p| onlines.iter().position(|r| r == p).is_none()) + .collect::<Vec<_>>(); + + let mut msg_out = RendezvousMessage::new(); + msg_out.set_online_response(OnlineResponse { + onlines, + offlines, + ..Default::default() + }); + stream.send(&msg_out).await?; + + Ok(()) + } + + #[inline] async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) { let mut tcp = self.tcp_punch.lock().await.remove(&addr); tokio::spawn(async move { @@ -1014,8 +1048,8 @@ impl RendezvousServer { } async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) { + let mut rs = self.clone(); if addr.ip().to_string() == "127.0.0.1" { - let rs = self.clone(); tokio::spawn(async move { let mut stream = stream; let mut buffer = [0; 64]; @@ -1033,13 +1067,20 @@ impl RendezvousServer { let mut stream = stream; if let Some(Ok(bytes)) = stream.next_timeout(30_000).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { - if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union { - let mut msg_out = RendezvousMessage::new(); - msg_out.set_test_nat_response(TestNatResponse { - port: addr.port() as _, - ..Default::default() - }); - stream.send(&msg_out).await.ok(); + match msg_in.union { + Some(rendezvous_message::Union::TestNatRequest(_)) => { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_response(TestNatResponse { + port: addr.port() as _, + ..Default::default() + }); + stream.send(&msg_out).await.ok(); + } + Some(rendezvous_message::Union::OnlineRequest(or)) => { + allow_err!(rs.handle_online_request(&mut stream, or.peers).await); + } + _ => { + } } } } |