aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorfufesou <[email protected]>2022-07-27 00:35:40 +0800
committerfufesou <[email protected]>2022-07-27 00:35:40 +0800
commitf7fc45a3d2280a9026858a8be4157eb3a1b301c8 (patch)
treee4f25ecbbc740575c32d893ccf74f91fabcd5a28 /src
parenta4940f4634ab43568dedecacee2f31b5d2dc3907 (diff)
downloadrustdesk-server-f7fc45a3d2280a9026858a8be4157eb3a1b301c8.tar.gz
rustdesk-server-f7fc45a3d2280a9026858a8be4157eb3a1b301c8.zip
peer_online_state: response online state bits
Signed-off-by: fufesou <[email protected]>
Diffstat (limited to 'src')
-rw-r--r--src/rendezvous_server.rs50
1 files changed, 14 insertions, 36 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
index 9650602..b93e04f 100644
--- a/src/rendezvous_server.rs
+++ b/src/rendezvous_server.rs
@@ -89,12 +89,7 @@ enum LoopFailure {
impl RendezvousServer {
#[tokio::main(flavor = "multi_thread")]
- pub async fn start(
- port: i32,
- serial: i32,
- key: &str,
- rmem: usize,
- ) -> ResultType<()> {
+ pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
let (key, sk) = Self::get_server_sk(key);
let addr = format!("0.0.0.0:{}", port);
let addr2 = format!("0.0.0.0:{}", port - 1);
@@ -762,27 +757,26 @@ impl RendezvousServer {
stream: &mut FramedStream,
peers: Vec<String>,
) -> ResultType<()> {
- let mut onlines = Vec::new();
- for peer_id in &peers {
+ let mut states = BytesMut::zeroed((peers.len() + 7) / 8);
+ for i in 0..peers.len() {
+ let peer_id = &peers[i];
+ // bytes index from left to right
+ let states_idx = i / 8;
+ let bit_idx = 7 - i % 8;
if let Some(peer) = self.pm.get_in_memory(&peer_id).await {
let (elapsed, _) = {
let r = peer.read().await;
(r.last_reg_time.elapsed().as_millis() as i32, r.socket_addr)
};
if elapsed < REG_TIMEOUT {
- onlines.push(peer_id.clone());
+ states[states_idx] |= 0x01 << bit_idx;
}
}
}
- let offlines = peers
- .into_iter()
- .filter(|p| onlines.iter().position(|r| r == p).is_none())
- .collect::<Vec<_>>();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_response(OnlineResponse {
- onlines,
- offlines,
+ states: states.into(),
..Default::default()
});
stream.send(&msg_out).await?;
@@ -1079,29 +1073,19 @@ impl RendezvousServer {
Some(rendezvous_message::Union::OnlineRequest(or)) => {
allow_err!(rs.handle_online_request(&mut stream, or.peers).await);
}
- _ => {
- }
+ _ => {}
}
}
}
});
}
- async fn handle_listener(
- &self,
- stream: TcpStream,
- addr: SocketAddr,
- key: &str,
- ws: bool,
- ) {
+ async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
let mut rs = self.clone();
let key = key.to_owned();
tokio::spawn(async move {
- allow_err!(
- rs.handle_listener_inner(stream, addr, &key, ws)
- .await
- );
+ allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
});
}
@@ -1121,10 +1105,7 @@ impl RendezvousServer {
while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
match msg {
tungstenite::Message::Binary(bytes) => {
- if !self
- .handle_tcp(&bytes, &mut sink, addr, key, ws)
- .await
- {
+ if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
break;
}
}
@@ -1135,10 +1116,7 @@ impl RendezvousServer {
let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
sink = Some(Sink::TcpStream(a));
while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
- if !self
- .handle_tcp(&bytes, &mut sink, addr, key, ws)
- .await
- {
+ if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
break;
}
}