aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorfufesou <[email protected]>2022-07-26 21:54:18 +0800
committerfufesou <[email protected]>2022-07-26 23:03:30 +0800
commita4940f4634ab43568dedecacee2f31b5d2dc3907 (patch)
tree7e48e6f3ed6bc6d9a73758b2393a61b9536cc50d
parent8c477c8cd04a12d02fc69849a3a337c011179fda (diff)
downloadrustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.tar.gz
rustdesk-server-a4940f4634ab43568dedecacee2f31b5d2dc3907.zip
peer_online_state: serve online state
Signed-off-by: fufesou <[email protected]>
-rw-r--r--.gitignore1
-rw-r--r--libs/hbb_common/protos/rendezvous.proto12
-rw-r--r--src/rendezvous_server.rs57
3 files changed, 62 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
index 4565963..4c16996 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ debian-build
debian/.debhelper
debian/debhelper-build-stamp
.DS_Store
+.vscode
diff --git a/libs/hbb_common/protos/rendezvous.proto b/libs/hbb_common/protos/rendezvous.proto
index 2c5f1b3..894eba1 100644
--- a/libs/hbb_common/protos/rendezvous.proto
+++ b/libs/hbb_common/protos/rendezvous.proto
@@ -148,6 +148,16 @@ message PeerDiscovery {
string misc = 7;
}
+message OnlineRequest {
+ string id = 1;
+ repeated string peers = 2;
+}
+
+message OnlineResponse {
+ repeated string onlines = 1;
+ repeated string offlines = 2;
+}
+
message RendezvousMessage {
oneof union {
RegisterPeer register_peer = 6;
@@ -167,5 +177,7 @@ message RendezvousMessage {
TestNatRequest test_nat_request = 20;
TestNatResponse test_nat_response = 21;
PeerDiscovery peer_discovery = 22;
+ OnlineRequest online_request = 23;
+ OnlineResponse online_response = 24;
}
}
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
index 60f95d9..9650602 100644
--- a/src/rendezvous_server.rs
+++ b/src/rendezvous_server.rs
@@ -757,6 +757,40 @@ impl RendezvousServer {
}
#[inline]
+ async fn handle_online_request(
+ &mut self,
+ stream: &mut FramedStream,
+ peers: Vec<String>,
+ ) -> ResultType<()> {
+ let mut onlines = Vec::new();
+ for peer_id in &peers {
+ if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
+ let (elapsed, _) = {
+ let r = peer.read().await;
+ (r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
+ };
+ if elapsed < REG_TIMEOUT {
+ onlines.push(peer_id.clone());
+ }
+ }
+ }
+ let offlines = peers
+ .into_iter()
+ .filter(|p| onlines.iter().position(|r| r == p).is_none())
+ .collect::<Vec<_>>();
+
+ let mut msg_out = RendezvousMessage::new();
+ msg_out.set_online_response(OnlineResponse {
+ onlines,
+ offlines,
+ ..Default::default()
+ });
+ stream.send(&msg_out).await?;
+
+ Ok(())
+ }
+
+ #[inline]
async fn send_to_tcp(&mut self, msg: RendezvousMessage, addr: SocketAddr) {
let mut tcp = self.tcp_punch.lock().await.remove(&addr);
tokio::spawn(async move {
@@ -1014,8 +1048,8 @@ impl RendezvousServer {
}
async fn handle_listener2(&self, stream: TcpStream, addr: SocketAddr) {
+ let mut rs = self.clone();
if addr.ip().to_string() == "127.0.0.1" {
- let rs = self.clone();
tokio::spawn(async move {
let mut stream = stream;
let mut buffer = [0; 64];
@@ -1033,13 +1067,20 @@ impl RendezvousServer {
let mut stream = stream;
if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
- let mut msg_out = RendezvousMessage::new();
- msg_out.set_test_nat_response(TestNatResponse {
- port: addr.port() as _,
- ..Default::default()
- });
- stream.send(&msg_out).await.ok();
+ match msg_in.union {
+ Some(rendezvous_message::Union::TestNatRequest(_)) => {
+ let mut msg_out = RendezvousMessage::new();
+ msg_out.set_test_nat_response(TestNatResponse {
+ port: addr.port() as _,
+ ..Default::default()
+ });
+ stream.send(&msg_out).await.ok();
+ }
+ Some(rendezvous_message::Union::OnlineRequest(or)) => {
+ allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
+ }
+ _ => {
+ }
}
}
}