path: root/src
diff options
authoropen-trade <[email protected]>2020-03-06 17:18:22 +0800
committeropen-trade <[email protected]>2020-03-06 17:18:22 +0800
commit1695b5e3574b578aae0d75189e4395c631ca3b8a (patch)
tree95023624e8963efb1ab41b5baee91161e6baaf7a /src
parentd188a5f5ea0b6498eecd6a9c8239cb2d55341f81 (diff)
Diffstat (limited to 'src')
4 files changed, 411 insertions, 3 deletions
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..6cb9769
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+mod rendezvous_server;
+pub use rendezvous_server::*;
+#[path = "./protos/message.rs"]
+mod message_proto;
+pub use message_proto::*; \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index e7a11a9..d1aad28 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,12 @@
-fn main() {
- println!("Hello, world!");
+// https://tools.ietf.org/rfc/rfc5128.txt
+// https://blog.csdn.net/bytxl/article/details/44344855
+use hbbs::*;
+use std::error::Error;
+async fn main() -> Result<(), Box<dyn Error>> {
+ env_logger::init();
+ RendezvousServer::start("").await?;
+ Ok(())
+} \ No newline at end of file
diff --git a/src/protos/message.rs b/src/protos/message.rs
new file mode 100644
index 0000000..a40385f
--- /dev/null
+++ b/src/protos/message.rs
@@ -0,0 +1,301 @@
+// This file is generated by rust-protobuf 2.10.2. Do not edit
+// @generated
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![cfg_attr(rustfmt, rustfmt_skip)]
+//! Generated file from `message.proto`
+use protobuf::Message as Message_imported_for_functions;
+use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_10_2;
+pub struct Message {
+ // message fields
+ pub addr: ::std::string::String,
+ // message oneof groups
+ pub union: ::std::option::Option<Message_oneof_union>,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+impl<'a> ::std::default::Default for &'a Message {
+ fn default() -> &'a Message {
+ <Message as ::protobuf::Message>::default_instance()
+ }
+pub enum Message_oneof_union {
+ socket_addr(::std::vec::Vec<u8>),
+impl Message {
+ pub fn new() -> Message {
+ ::std::default::Default::default()
+ }
+ // string addr = 1;
+ pub fn get_addr(&self) -> &str {
+ &self.addr
+ }
+ pub fn clear_addr(&mut self) {
+ self.addr.clear();
+ }
+ // Param is passed by value, moved
+ pub fn set_addr(&mut self, v: ::std::string::String) {
+ self.addr = v;
+ }
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_addr(&mut self) -> &mut ::std::string::String {
+ &mut self.addr
+ }
+ // Take field
+ pub fn take_addr(&mut self) -> ::std::string::String {
+ ::std::mem::replace(&mut self.addr, ::std::string::String::new())
+ }
+ // bytes socket_addr = 6;
+ pub fn get_socket_addr(&self) -> &[u8] {
+ match self.union {
+ ::std::option::Option::Some(Message_oneof_union::socket_addr(ref v)) => v,
+ _ => &[],
+ }
+ }
+ pub fn clear_socket_addr(&mut self) {
+ self.union = ::std::option::Option::None;
+ }
+ pub fn has_socket_addr(&self) -> bool {
+ match self.union {
+ ::std::option::Option::Some(Message_oneof_union::socket_addr(..)) => true,
+ _ => false,
+ }
+ }
+ // Param is passed by value, moved
+ pub fn set_socket_addr(&mut self, v: ::std::vec::Vec<u8>) {
+ self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(v))
+ }
+ // Mutable pointer to the field.
+ pub fn mut_socket_addr(&mut self) -> &mut ::std::vec::Vec<u8> {
+ if let ::std::option::Option::Some(Message_oneof_union::socket_addr(_)) = self.union {
+ } else {
+ self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(::std::vec::Vec::new()));
+ }
+ match self.union {
+ ::std::option::Option::Some(Message_oneof_union::socket_addr(ref mut v)) => v,
+ _ => panic!(),
+ }
+ }
+ // Take field
+ pub fn take_socket_addr(&mut self) -> ::std::vec::Vec<u8> {
+ if self.has_socket_addr() {
+ match self.union.take() {
+ ::std::option::Option::Some(Message_oneof_union::socket_addr(v)) => v,
+ _ => panic!(),
+ }
+ } else {
+ ::std::vec::Vec::new()
+ }
+ }
+impl ::protobuf::Message for Message {
+ fn is_initialized(&self) -> bool {
+ true
+ }
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.addr)?;
+ },
+ 6 => {
+ if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited {
+ return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
+ }
+ self.union = ::std::option::Option::Some(Message_oneof_union::socket_addr(is.read_bytes()?));
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if !self.addr.is_empty() {
+ my_size += ::protobuf::rt::string_size(1, &self.addr);
+ }
+ if let ::std::option::Option::Some(ref v) = self.union {
+ match v {
+ &Message_oneof_union::socket_addr(ref v) => {
+ my_size += ::protobuf::rt::bytes_size(6, &v);
+ },
+ };
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if !self.addr.is_empty() {
+ os.write_string(1, &self.addr)?;
+ }
+ if let ::std::option::Option::Some(ref v) = self.union {
+ match v {
+ &Message_oneof_union::socket_addr(ref v) => {
+ os.write_bytes(6, v)?;
+ },
+ };
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+ fn new() -> Message {
+ Message::new()
+ }
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
+ lock: ::protobuf::lazy::ONCE_INIT,
+ ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
+ };
+ unsafe {
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+ "addr",
+ |m: &Message| { &m.addr },
+ |m: &mut Message| { &mut m.addr },
+ ));
+ fields.push(::protobuf::reflect::accessor::make_singular_bytes_accessor::<_>(
+ "socket_addr",
+ Message::has_socket_addr,
+ Message::get_socket_addr,
+ ));
+ ::protobuf::reflect::MessageDescriptor::new::<Message>(
+ "Message",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+ }
+ fn default_instance() -> &'static Message {
+ static mut instance: ::protobuf::lazy::Lazy<Message> = ::protobuf::lazy::Lazy {
+ lock: ::protobuf::lazy::ONCE_INIT,
+ ptr: 0 as *const Message,
+ };
+ unsafe {
+ instance.get(Message::new)
+ }
+ }
+impl ::protobuf::Clear for Message {
+ fn clear(&mut self) {
+ self.addr.clear();
+ self.union = ::std::option::Option::None;
+ self.unknown_fields.clear();
+ }
+impl ::std::fmt::Debug for Message {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+impl ::protobuf::reflect::ProtobufValue for Message {
+ fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
+ ::protobuf::reflect::ProtobufValueRef::Message(self)
+ }
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\rmessage.proto\x12\x03hbb\"=\n\x07Message\x12\x0e\n\x04addr\x18\x01\
+ \x20\x01(\tB\0\x12\x17\n\x0bsocket_addr\x18\x06\x20\x01(\x0cH\0B\0B\x07\
+ \n\x05union:\0B\0b\x06proto3\
+static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
+ lock: ::protobuf::lazy::ONCE_INIT,
+ ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ unsafe {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+ }
diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs
new file mode 100644
index 0000000..1b5d3d8
--- /dev/null
+++ b/src/rendezvous_server.rs
@@ -0,0 +1,93 @@
+use super::message_proto::Message;
+use bytes::Bytes;
+use futures::{FutureExt, SinkExt};
+use protobuf::{parse_from_bytes, Message as _};
+use std::{
+ collections::HashMap,
+ error::Error,
+ net::{Ipv4Addr, SocketAddr, SocketAddrV4},
+ time::{SystemTime, UNIX_EPOCH},
+use tokio::net::UdpSocket;
+use tokio::stream::StreamExt;
+use tokio_util::{codec::BytesCodec, udp::UdpFramed};
+/// Certain router and firewalls scan the packet and if they
+/// find an IP address belonging to their pool that they use to do the NAT mapping/translation, so here we mangle the ip address
+pub struct V4AddrMangle(Vec<u8>);
+impl V4AddrMangle {
+ pub fn encode(addr: SocketAddrV4) -> Self {
+ let tm = (SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_micros() as u32) as u128;
+ let ip = u32::from_ne_bytes(addr.ip().octets()) as u128;
+ let port = addr.port() as u128;
+ let v = ((ip + tm) << 49) | (tm << 17) | (port + (tm & 0xFFFF));
+ let bytes = v.to_ne_bytes();
+ let mut n_padding = 0;
+ for i in bytes.iter().rev() {
+ if i == &0u8 {
+ n_padding += 1;
+ } else {
+ break;
+ }
+ }
+ Self(bytes[..(16 - n_padding)].to_vec())
+ }
+ pub fn decode(&self) -> SocketAddrV4 {
+ let mut padded = [0u8; 16];
+ padded[..self.0.len()].copy_from_slice(&self.0);
+ let number = u128::from_ne_bytes(padded);
+ let tm = (number >> 17) & (u32::max_value() as u128);
+ let ip = (((number >> 49) - tm) as u32).to_ne_bytes();
+ let port = (number & 0xFFFFFF) - (tm & 0xFFFF);
+ SocketAddrV4::new(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]), port as u16)
+ }
+pub struct Peer {
+ socket_addr: SocketAddr,
+type PeerMap = HashMap<String, Peer>;
+pub struct RendezvousServer {
+ peer_map: PeerMap,
+impl RendezvousServer {
+ pub async fn start(addr: &str) -> Result<Self, Box<dyn Error>> {
+ let socket = UdpSocket::bind(addr).await?;
+ let mut socket = UdpFramed::new(socket, BytesCodec::new());
+ let rs = Self {
+ peer_map: PeerMap::new(),
+ };
+ while let Some(Ok((bytes, addr))) = socket.next().await {
+ if let SocketAddr::V4(addr_v4) = addr {
+ if let Ok(msg_in) = parse_from_bytes::<Message>(&bytes) {
+ let msg_out = Message::new();
+ socket
+ .send((Bytes::from(msg_out.write_to_bytes().unwrap()), addr))
+ .await?;
+ }
+ }
+ }
+ Ok(rs)
+ }
+mod tests {
+ use super::*;
+ #[test]
+ fn test_mangle() {
+ let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116);
+ assert_eq!(addr, V4AddrMangle::encode(addr).decode());
+ println!("{:?}", V4AddrMangle::encode(addr).0);
+ }