diff options
author | open-trade <[email protected]> | 2020-05-14 09:18:07 +0000 |
---|---|---|
committer | open-trade <[email protected]> | 2020-05-14 09:18:07 +0000 |
commit | b06f9d22aca9e51a4c29c74f89ac164cd2b8b896 (patch) | |
tree | 837b4082355b4038556c34e77b5299fae1b62d06 /src | |
parent | 47e36017832bba93430d0d5148c035256b3016be (diff) | |
download | rustdesk-server-b06f9d22aca9e51a4c29c74f89ac164cd2b8b896.tar.gz rustdesk-server-b06f9d22aca9e51a4c29c74f89ac164cd2b8b896.zip |
prepare sled
Diffstat (limited to 'src')
-rw-r--r-- | src/rendezvous_server.rs | 53 |
1 files changed, 37 insertions, 16 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index a139b21..cb0e9e5 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -23,35 +23,57 @@ use std::{ time::Instant, }; -pub struct Peer { +struct Peer { socket_addr: SocketAddr, last_reg_time: Instant, } -type PeerMap = HashMap<String, Peer>; +struct PeerMap { + map: HashMap<String, Peer>, + db: sled::Db, +} + +impl PeerMap { + fn new() -> ResultType<Self> { + Ok(Self { + map: HashMap::new(), + db: sled::open("./sled.db")?, + }) + } + + fn insert(&mut self, key: String, peer: Peer) { + self.map.insert(key, peer); + } + + fn get(&self, key: &str) -> Option<&Peer> { + self.map.get(key) + } +} + const REG_TIMEOUT: i32 = 30_000; +type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>; +#[derive(Clone)] pub struct RendezvousServer { - peer_map: PeerMap, - tcp_punch: Arc<Mutex<HashMap<SocketAddr, SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>>>, + tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>, } impl RendezvousServer { pub async fn start(addr: &str) -> ResultType<()> { + let mut pm = PeerMap::new()?; let mut socket = FramedSocket::new(addr).await?; let mut rs = Self { - peer_map: PeerMap::new(), tcp_punch: Arc::new(Mutex::new(HashMap::new())), }; let (tx, mut rx) = mpsc::unbounded_channel::<(SocketAddr, String)>(); - let mut listener = new_listener(addr, true).await.unwrap(); + let mut listener = new_listener(addr, true).await?; loop { tokio::select! { Some((addr, id)) = rx.recv() => { - allow_err!(rs.handle_punch_hole_request(addr, &id, &mut socket, true).await); + allow_err!(rs.handle_punch_hole_request(addr, &id, &mut socket, true, &pm).await); } Some(Ok((bytes, addr))) = socket.next() => { - allow_err!(rs.handle_msg(&bytes, addr, &mut socket).await); + allow_err!(rs.handle_msg(&bytes, addr, &mut socket, &mut pm).await); } Ok((stream, addr)) = listener.accept() => { log::debug!("Tcp connection from {:?}", addr); @@ -59,10 +81,7 @@ impl RendezvousServer { let tcp_punch = rs.tcp_punch.clone(); tcp_punch.lock().unwrap().insert(addr, a); let tx = tx.clone(); - let mut rs = Self { - peer_map: PeerMap::new(), - tcp_punch: tcp_punch, - }; + let mut rs = rs.clone(); tokio::spawn(async move { while let Some(Ok(bytes)) = b.next().await { if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) { @@ -90,11 +109,12 @@ impl RendezvousServer { } } - pub async fn handle_msg( + async fn handle_msg( &mut self, bytes: &BytesMut, addr: SocketAddr, socket: &mut FramedSocket, + pm: &mut PeerMap, ) -> ResultType<()> { if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) { match msg_in.union { @@ -102,7 +122,7 @@ impl RendezvousServer { // B registered if rp.id.len() > 0 { log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr); - self.peer_map.insert( + pm.insert( rp.id, Peer { socket_addr: addr, @@ -115,7 +135,7 @@ impl RendezvousServer { } } Some(rendezvous_message::Union::punch_hole_request(ph)) => { - self.handle_punch_hole_request(addr, &ph.id, socket, false) + self.handle_punch_hole_request(addr, &ph.id, socket, false, &pm) .await?; } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { @@ -190,13 +210,14 @@ impl RendezvousServer { id: &str, socket: &mut FramedSocket, is_tcp: bool, + pm: &PeerMap, ) -> ResultType<()> { // punch hole request from A, forward to B, // check if in same intranet first, // fetch local addrs if in same intranet. // because punch hole won't work if in the same intranet, // all routers will drop such self-connections. - if let Some(peer) = self.peer_map.get(id) { + if let Some(peer) = pm.get(id) { if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT { let mut msg_out = RendezvousMessage::new(); msg_out.set_punch_hole_response(PunchHoleResponse { |