aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authoropen-trade <[email protected]>2020-05-14 09:18:07 +0000
committeropen-trade <[email protected]>2020-05-14 09:18:07 +0000
commitb06f9d22aca9e51a4c29c74f89ac164cd2b8b896 (patch)
tree837b4082355b4038556c34e77b5299fae1b62d06 /src
parent47e36017832bba93430d0d5148c035256b3016be (diff)
downloadrustdesk-server-b06f9d22aca9e51a4c29c74f89ac164cd2b8b896.tar.gz
rustdesk-server-b06f9d22aca9e51a4c29c74f89ac164cd2b8b896.zip
prepare sled
Diffstat (limited to 'src')
-rw-r--r--src/rendezvous_server.rs53
1 files changed, 37 insertions, 16 deletions
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
index a139b21..cb0e9e5 100644
--- a/src/rendezvous_server.rs
+++ b/src/rendezvous_server.rs
@@ -23,35 +23,57 @@ use std::{
time::Instant,
};
-pub struct Peer {
+struct Peer {
socket_addr: SocketAddr,
last_reg_time: Instant,
}
-type PeerMap = HashMap<String, Peer>;
+struct PeerMap {
+ map: HashMap<String, Peer>,
+ db: sled::Db,
+}
+
+impl PeerMap {
+ fn new() -> ResultType<Self> {
+ Ok(Self {
+ map: HashMap::new(),
+ db: sled::open("./sled.db")?,
+ })
+ }
+
+ fn insert(&mut self, key: String, peer: Peer) {
+ self.map.insert(key, peer);
+ }
+
+ fn get(&self, key: &str) -> Option<&Peer> {
+ self.map.get(key)
+ }
+}
+
const REG_TIMEOUT: i32 = 30_000;
+type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
+#[derive(Clone)]
pub struct RendezvousServer {
- peer_map: PeerMap,
- tcp_punch: Arc<Mutex<HashMap<SocketAddr, SplitSink<Framed<TcpStream, BytesCodec>, Bytes>>>>,
+ tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
}
impl RendezvousServer {
pub async fn start(addr: &str) -> ResultType<()> {
+ let mut pm = PeerMap::new()?;
let mut socket = FramedSocket::new(addr).await?;
let mut rs = Self {
- peer_map: PeerMap::new(),
tcp_punch: Arc::new(Mutex::new(HashMap::new())),
};
let (tx, mut rx) = mpsc::unbounded_channel::<(SocketAddr, String)>();
- let mut listener = new_listener(addr, true).await.unwrap();
+ let mut listener = new_listener(addr, true).await?;
loop {
tokio::select! {
Some((addr, id)) = rx.recv() => {
- allow_err!(rs.handle_punch_hole_request(addr, &id, &mut socket, true).await);
+ allow_err!(rs.handle_punch_hole_request(addr, &id, &mut socket, true, &pm).await);
}
Some(Ok((bytes, addr))) = socket.next() => {
- allow_err!(rs.handle_msg(&bytes, addr, &mut socket).await);
+ allow_err!(rs.handle_msg(&bytes, addr, &mut socket, &mut pm).await);
}
Ok((stream, addr)) = listener.accept() => {
log::debug!("Tcp connection from {:?}", addr);
@@ -59,10 +81,7 @@ impl RendezvousServer {
let tcp_punch = rs.tcp_punch.clone();
tcp_punch.lock().unwrap().insert(addr, a);
let tx = tx.clone();
- let mut rs = Self {
- peer_map: PeerMap::new(),
- tcp_punch: tcp_punch,
- };
+ let mut rs = rs.clone();
tokio::spawn(async move {
while let Some(Ok(bytes)) = b.next().await {
if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
@@ -90,11 +109,12 @@ impl RendezvousServer {
}
}
- pub async fn handle_msg(
+ async fn handle_msg(
&mut self,
bytes: &BytesMut,
addr: SocketAddr,
socket: &mut FramedSocket,
+ pm: &mut PeerMap,
) -> ResultType<()> {
if let Ok(msg_in) = parse_from_bytes::<RendezvousMessage>(&bytes) {
match msg_in.union {
@@ -102,7 +122,7 @@ impl RendezvousServer {
// B registered
if rp.id.len() > 0 {
log::debug!("New peer registered: {:?} {:?}", &rp.id, &addr);
- self.peer_map.insert(
+ pm.insert(
rp.id,
Peer {
socket_addr: addr,
@@ -115,7 +135,7 @@ impl RendezvousServer {
}
}
Some(rendezvous_message::Union::punch_hole_request(ph)) => {
- self.handle_punch_hole_request(addr, &ph.id, socket, false)
+ self.handle_punch_hole_request(addr, &ph.id, socket, false, &pm)
.await?;
}
Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
@@ -190,13 +210,14 @@ impl RendezvousServer {
id: &str,
socket: &mut FramedSocket,
is_tcp: bool,
+ pm: &PeerMap,
) -> ResultType<()> {
// punch hole request from A, forward to B,
// check if in same intranet first,
// fetch local addrs if in same intranet.
// because punch hole won't work if in the same intranet,
// all routers will drop such self-connections.
- if let Some(peer) = self.peer_map.get(id) {
+ if let Some(peer) = pm.get(id) {
if peer.last_reg_time.elapsed().as_millis() as i32 >= REG_TIMEOUT {
let mut msg_out = RendezvousMessage::new();
msg_out.set_punch_hole_response(PunchHoleResponse {