aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorfufesou <[email protected]>2022-07-26 21:54:18 +0800
committerfufesou <[email protected]>2022-07-26 23:03:30 +0800
commita4940f4634ab43568dedecacee2f31b5d2dc3907 (patch)
tree7e48e6f3ed6bc6d9a73758b2393a61b9536cc50d /src
parent8c477c8cd04a12d02fc69849a3a337c011179fda (diff)
downloadrustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.tar.gz
rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.zip
peer_online_state: serve online state
Signed-off-by: fufesou <[email protected]>
Diffstat (limited to 'src')
-rw-r--r--src/rendezvous_server.rs57
1 files changed, 49 insertions, 8 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
index 60f95d9..9650602 100644
--- a/src/rendezvous_server.rs
+++ b/src/rendezvous_server.rs
@@ -757,6 +757,40 @@ impl RendezvousServer {
}
#[inline]
+ async fn handle_online_request(
+ &mut self,
+ stream: &mut FramedStream,
+ peers: Vec<String>,
+ ) -> ResultType<()> {
+ let mut onlines = Vec::new();
+ for peer_id in &peers {
+ if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
+ let (elapsed, _) = {
+ let r = peer.read().await;
+ (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
+ };
+ if elapsed < REG_TIMEOUT {
+ onlines.push(peer_id.clone());
+ }
+ }
+ }
+ let offlines = peers
+ .into_iter()
+ .filter(|p| onlines.iter().position(|r| r == p).is_none())
+ .collect::<Vec<_>>();
+
+ let mut msg_out = RendezvousMessage::new();
+ msg_out.set_online_response(OnlineResponse {
+ onlines,
+ offlines,
+ ..Default::default()
+ });
+ stream.send(&msg_out).await?;
+
+ Ok(())
+ }
+
+ #[inline]
async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
let mut tcp = self.tcp_punch.lock().await.remove(&addr);
tokio::spawn(async move {
@@ -1014,8 +1048,8 @@ impl RendezvousServer {
}
async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
+ let mut rs = self.clone();
if addr.ip().to_string() == "127.0.0.1" {
- let rs = self.clone();
tokio::spawn(async move {
let mut stream = stream;
let mut buffer = [0; 64];
@@ -1033,13 +1067,20 @@ impl RendezvousServer {
let mut stream = stream;
if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_test_nat_response(TestNatResponse {
- port: addr.port() as _,
- ..Default::default()
- });
- stream.send(&msg_out).await.ok();
+ match msg_in.union {
+ Some(rendezvous_message::Union::TestNatRequest(_)) => {
+ let mut msg_out = RendezvousMessage::new();
+ msg_out.set_test_nat_response(TestNatResponse {
+ port: addr.port() as _,
+ ..Default::default()
+ });
+ stream.send(&msg_out).await.ok();
+ }
+ Some(rendezvous_message::Union::OnlineRequest(or)) => {
+ allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
+ }
+ _ => {
+ }
}
}
}