aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorrustdesk <[email protected]>2022-07-22 00:28:10 +0800
committerrustdesk <[email protected]>2022-07-22 00:28:10 +0800
commit51d8cd80c1e0b0c1cfb08b139b7e92866389cb48 (patch)
tree0990bde9bfd4def3dde5ecaa12a95f1d7e7f7bd0 /src
parent2d385d88d3769267b9a1011cb877be072202b623 (diff)
downloadrustdesk-server-51d8cd80c1e0b0c1cfb08b139b7e92866389cb48.tar.gz
rustdesk-server-51d8cd80c1e0b0c1cfb08b139b7e92866389cb48.zip
protbuf 3.1 with_bytes
Diffstat (limited to 'src')
-rw-r--r--src/common.rs3
-rw-r--r--src/database.rs22
-rw-r--r--src/peer.rs23
-rw-r--r--src/relay_server.rs2
-rw-r--r--src/rendezvous_server.rs87
5 files changed, 64 insertions, 73 deletions
diff --git a/src/common.rs b/src/common.rs
index 74b9e78..253d3d1 100644
--- a/src/common.rs
+++ b/src/common.rs
@@ -3,10 +3,9 @@ use hbb_common::{anyhow::Context, log, ResultType};
use ini::Ini;
use sodiumoxide::crypto::sign;
use std::{
- collections::HashMap,
io::prelude::*,
io::Read,
- net::{IpAddr, SocketAddr},
+ net::SocketAddr,
time::{Instant, SystemTime},
};
diff --git a/src/database.rs b/src/database.rs
index cf64faf..41ad5e3 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -8,9 +8,7 @@ use std::{ops::DerefMut, str::FromStr};
//use sqlx::postgres::PgPoolOptions;
//use sqlx::mysql::MySqlPoolOptions;
-pub(crate) type DB = sqlx::Sqlite;
pub(crate) type MapValue = serde_json::map::Map<String, Value>;
-pub(crate) type MapStr = std::collections::HashMap<String, String>;
type Pool = deadpool::managed::Pool<DbPool>;
pub struct DbPool {
@@ -107,13 +105,6 @@ impl Database {
.await?)
}
- pub async fn get_peer_id(&self, guid: &[u8]) -> ResultType<Option<String>> {
- Ok(sqlx::query!("select id from peer where guid = ?", guid)
- .fetch_optional(self.pool.get().await?.deref_mut())
- .await?
- .map(|x| x.id))
- }
-
#[inline]
pub async fn get_conn(&self) -> ResultType<deadpool::managed::Object<DbPool>> {
Ok(self.pool.get().await?)
@@ -135,8 +126,8 @@ impl Database {
pub async fn insert_peer(
&self,
id: &str,
- uuid: &Vec<u8>,
- pk: &Vec<u8>,
+ uuid: &[u8],
+ pk: &[u8],
info: &str,
) -> ResultType<Vec<u8>> {
let guid = uuid::Uuid::new_v4().as_bytes().to_vec();
@@ -157,7 +148,7 @@ impl Database {
&self,
guid: &Vec<u8>,
id: &str,
- pk: &Vec<u8>,
+ pk: &[u8],
info: &str,
) -> ResultType<()> {
sqlx::query!(
@@ -209,13 +200,6 @@ mod tests {
}
}
-#[inline]
-pub fn guid2str(guid: &Vec<u8>) -> String {
- let mut bytes = [0u8; 16];
- bytes[..].copy_from_slice(&guid);
- uuid::Uuid::from_bytes(bytes).to_string()
-}
-
pub(crate) fn get_str(v: &Value) -> Option<&str> {
match v {
Value::String(v) => {
diff --git a/src/peer.rs b/src/peer.rs
index 72999b7..49d440d 100644
--- a/src/peer.rs
+++ b/src/peer.rs
@@ -4,6 +4,7 @@ use hbb_common::{
log,
rendezvous_proto::*,
tokio::sync::{Mutex, RwLock},
+ bytes::Bytes,
ResultType,
};
use serde_derive::{Deserialize, Serialize};
@@ -25,13 +26,12 @@ pub(crate) struct PeerInfo {
pub(crate) ip: String,
}
-#[derive(Clone, Debug)]
pub(crate) struct Peer {
pub(crate) socket_addr: SocketAddr,
pub(crate) last_reg_time: Instant,
pub(crate) guid: Vec<u8>,
- pub(crate) uuid: Vec<u8>,
- pub(crate) pk: Vec<u8>,
+ pub(crate) uuid: Bytes,
+ pub(crate) pk: Bytes,
pub(crate) user: Option<Vec<u8>>,
pub(crate) info: PeerInfo,
pub(crate) disabled: bool,
@@ -44,8 +44,8 @@ impl Default for Peer {
socket_addr: "0.0.0.0:0".parse().unwrap(),
last_reg_time: get_expired_time(),
guid: Vec::new(),
- uuid: Vec::new(),
- pk: Vec::new(),
+ uuid: Bytes::new(),
+ pk: Bytes::new(),
info: Default::default(),
user: None,
disabled: false,
@@ -93,8 +93,8 @@ impl PeerMap {
id: String,
peer: LockPeer,
addr: SocketAddr,
- uuid: Vec<u8>,
- pk: Vec<u8>,
+ uuid: Bytes,
+ pk: Bytes,
ip: String,
) -> register_pk_response::Result {
log::info!("update_pk {} {:?} {:?} {:?}", id, addr, uuid, pk);
@@ -139,8 +139,8 @@ impl PeerMap {
if let Ok(Some(v)) = self.db.get_peer(id).await {
let peer = Peer {
guid: v.guid,
- uuid: v.uuid,
- pk: v.pk,
+ uuid: v.uuid.into(),
+ pk: v.pk.into(),
user: v.user,
info: serde_json::from_str::<PeerInfo>(&v.info).unwrap_or_default(),
disabled: v.status == Some(0),
@@ -177,9 +177,4 @@ impl PeerMap {
pub(crate) async fn is_in_memory(&self, id: &str) -> bool {
self.map.read().await.contains_key(id)
}
-
- #[inline]
- pub(crate) async fn remove(&self, id: &str) {
- self.map.write().await.remove(id);
- }
}
diff --git a/src/relay_server.rs b/src/relay_server.rs
index b6c677c..7b198d6 100644
--- a/src/relay_server.rs
+++ b/src/relay_server.rs
@@ -420,7 +420,7 @@ async fn make_pair_(stream: impl StreamTrait, addr: SocketAddr, key: &str, limit
let mut stream = stream;
if let Ok(Some(Ok(bytes))) = timeout(30_000, stream.recv()).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- if let Some(rendezvous_message::Union::request_relay(rf)) = msg_in.union {
+ if let Some(rendezvous_message::Union::RequestRelay(rf)) = msg_in.union {
if !key.is_empty() && rf.licence_key != key {
return;
}
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
index 08c6c85..60f95d9 100644
--- a/src/rendezvous_server.rs
+++ b/src/rendezvous_server.rs
@@ -89,7 +89,12 @@ enum LoopFailure {
impl RendezvousServer {
#[tokio::main(flavor = "multi_thread")]
- pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
+ pub async fn start(
+ port: i32,
+ serial: i32,
+ key: &str,
+ rmem: usize,
+ ) -> ResultType<()> {
let (key, sk) = Self::get_server_sk(key);
let addr = format!("0.0.0.0:{}", port);
let addr2 = format!("0.0.0.0:{}", port - 1);
@@ -138,7 +143,6 @@ impl RendezvousServer {
log::info!("local-ip: {:?}", rs.inner.local_ip);
std::env::set_var("PORT_FOR_API", port.to_string());
rs.parse_relay_servers(&get_arg("relay-servers"));
- let pm = rs.pm.clone();
let mut listener = new_listener(&addr, false).await?;
let mut listener2 = new_listener(&addr2, false).await?;
let mut listener3 = new_listener(&addr3, false).await?;
@@ -299,7 +303,7 @@ impl RendezvousServer {
) -> ResultType<()> {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
match msg_in.union {
- Some(rendezvous_message::Union::register_peer(rp)) => {
+ Some(rendezvous_message::Union::RegisterPeer(rp)) => {
// B registered
if rp.id.len() > 0 {
log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
@@ -315,7 +319,7 @@ impl RendezvousServer {
}
}
}
- Some(rendezvous_message::Union::register_pk(rk)) => {
+ Some(rendezvous_message::Union::RegisterPk(rk)) => {
if rk.uuid.is_empty() || rk.pk.is_empty() {
return Ok(());
}
@@ -402,7 +406,7 @@ impl RendezvousServer {
});
socket.send(&msg_out, addr).await?
}
- Some(rendezvous_message::Union::punch_hole_request(ph)) => {
+ Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
if self.pm.is_in_memory(&ph.id).await {
self.handle_udp_punch_hole_request(addr, ph, key).await?;
} else {
@@ -414,13 +418,13 @@ impl RendezvousServer {
});
}
}
- Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
+ Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
self.handle_hole_sent(phs, addr, Some(socket)).await?;
}
- Some(rendezvous_message::Union::local_addr(la)) => {
+ Some(rendezvous_message::Union::LocalAddr(la)) => {
self.handle_local_addr(la, addr, Some(socket)).await?;
}
- Some(rendezvous_message::Union::configure_update(mut cu)) => {
+ Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
if addr.ip() == ADDR_127 && cu.serial > self.inner.serial {
let mut inner: Inner = (*self.inner).clone();
inner.serial = cu.serial;
@@ -441,7 +445,7 @@ impl RendezvousServer {
);
}
}
- Some(rendezvous_message::Union::software_update(su)) => {
+ Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
if !self.inner.version.is_empty() && su.url != self.inner.version {
let mut msg_out = RendezvousMessage::new();
msg_out.set_software_update(SoftwareUpdate {
@@ -468,7 +472,7 @@ impl RendezvousServer {
) -> bool {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
match msg_in.union {
- Some(rendezvous_message::Union::punch_hole_request(ph)) => {
+ Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
// there maybe several attempt, so sink can be none
if let Some(sink) = sink.take() {
self.tcp_punch.lock().await.insert(addr, sink);
@@ -476,24 +480,24 @@ impl RendezvousServer {
allow_err!(self.handle_tcp_punch_hole_request(addr, ph, &key, ws).await);
return true;
}
- Some(rendezvous_message::Union::request_relay(mut rf)) => {
+ Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
// there maybe several attempt, so sink can be none
if let Some(sink) = sink.take() {
self.tcp_punch.lock().await.insert(addr, sink);
}
if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
let mut msg_out = RendezvousMessage::new();
- rf.socket_addr = AddrMangle::encode(addr);
+ rf.socket_addr = AddrMangle::encode(addr).into();
msg_out.set_request_relay(rf);
let peer_addr = peer.read().await.socket_addr;
self.tx.send(Data::Msg(msg_out, peer_addr)).ok();
}
return true;
}
- Some(rendezvous_message::Union::relay_response(mut rr)) => {
+ Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
let addr_b = AddrMangle::decode(&rr.socket_addr);
rr.socket_addr = Default::default();
- let id = rr.get_id();
+ let id = rr.id();
if !id.is_empty() {
let pk = self.get_pk(&rr.version, id.to_owned()).await;
rr.set_pk(pk);
@@ -510,13 +514,13 @@ impl RendezvousServer {
msg_out.set_relay_response(rr);
allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
}
- Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
+ Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
allow_err!(self.handle_hole_sent(phs, addr, None).await);
}
- Some(rendezvous_message::Union::local_addr(la)) => {
+ Some(rendezvous_message::Union::LocalAddr(la)) => {
allow_err!(self.handle_local_addr(la, addr, None).await);
}
- Some(rendezvous_message::Union::test_nat_request(tar)) => {
+ Some(rendezvous_message::Union::TestNatRequest(tar)) => {
let mut msg_out = RendezvousMessage::new();
let mut res = TestNatResponse {
port: addr.port() as _,
@@ -531,7 +535,7 @@ impl RendezvousServer {
msg_out.set_test_nat_response(res);
Self::send_to_sink(sink, msg_out).await;
}
- Some(rendezvous_message::Union::register_pk(_rk)) => {
+ Some(rendezvous_message::Union::RegisterPk(_)) => {
let res = register_pk_response::Result::NOT_SUPPORT;
let mut msg_out = RendezvousMessage::new();
msg_out.set_register_pk_response(RegisterPkResponse {
@@ -607,7 +611,7 @@ impl RendezvousServer {
);
let mut msg_out = RendezvousMessage::new();
let mut p = PunchHoleResponse {
- socket_addr: AddrMangle::encode(addr),
+ socket_addr: AddrMangle::encode(addr).into(),
pk: self.get_pk(&phs.version, phs.id).await,
relay_server: phs.relay_server.clone(),
..Default::default()
@@ -714,7 +718,7 @@ impl RendezvousServer {
_ => false,
},
};
- let socket_addr = AddrMangle::encode(addr);
+ let socket_addr = AddrMangle::encode(addr).into();
if same_intranet {
log::debug!(
"Fetch local addr {:?} {:?} request from {:?}",
@@ -858,7 +862,7 @@ impl RendezvousServer {
self.relay_servers = self.relay_servers0.clone();
}
- fn get_relay_server(&self, pa: IpAddr, pb: IpAddr) -> String {
+ fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
if self.relay_servers.is_empty() {
return "".to_owned();
} else if self.relay_servers.len() == 1 {
@@ -1029,7 +1033,7 @@ impl RendezvousServer {
let mut stream = stream;
if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
- if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
+ if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
let mut msg_out = RendezvousMessage::new();
msg_out.set_test_nat_response(TestNatResponse {
port: addr.port() as _,
@@ -1042,12 +1046,21 @@ impl RendezvousServer {
});
}
- async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
+ async fn handle_listener(
+ &self,
+ stream: TcpStream,
+ addr: SocketAddr,
+ key: &str,
+ ws: bool,
+ ) {
log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
let mut rs = self.clone();
let key = key.to_owned();
tokio::spawn(async move {
- allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
+ allow_err!(
+ rs.handle_listener_inner(stream, addr, &key, ws)
+ .await
+ );
});
}
@@ -1067,7 +1080,10 @@ impl RendezvousServer {
while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
match msg {
tungstenite::Message::Binary(bytes) => {
- if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
+ if !self
+ .handle_tcp(&bytes, &mut sink, addr, key, ws)
+ .await
+ {
break;
}
}
@@ -1078,7 +1094,10 @@ impl RendezvousServer {
let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
sink = Some(Sink::TcpStream(a));
while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
- if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
+ if !self
+ .handle_tcp(&bytes, &mut sink, addr, key, ws)
+ .await
+ {
break;
}
}
@@ -1091,13 +1110,13 @@ impl RendezvousServer {
}
#[inline]
- async fn get_pk(&mut self, version: &str, id: String) -> Vec<u8> {
+ async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
if version.is_empty() || self.inner.sk.is_none() {
- Vec::new()
+ Bytes::new()
} else {
match self.pm.get(&id).await {
Some(peer) => {
- let pk = peer.read().await.pk.clone();
+ let pk = peer.read().await.pk.clone().into();
sign::sign(
&hbb_common::message_proto::IdPk {
id,
@@ -1108,8 +1127,9 @@ impl RendezvousServer {
.unwrap_or_default(),
&self.inner.sk.as_ref().unwrap(),
)
+ .into()
}
- _ => Vec::new(),
+ _ => Bytes::new(),
}
}
}
@@ -1214,13 +1234,6 @@ async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
}
#[inline]
-fn distance(a: &(i32, i32), b: &(i32, i32)) -> i32 {
- let dx = a.0 - b.0;
- let dy = a.1 - b.1;
- dx * dx + dy * dy
-}
-
-#[inline]
async fn send_rk_res(
socket: &mut FramedSocket,
addr: SocketAddr,