aboutsummaryrefslogtreecommitdiffhomepage
path: root/libs/hbb_common/src/fs.rs
diff options
context:
space:
mode:
Diffstat (limited to 'libs/hbb_common/src/fs.rs')
-rw-r--r--libs/hbb_common/src/fs.rs339
1 files changed, 311 insertions, 28 deletions
diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs
index a5bc6ad..570fad8 100644
--- a/libs/hbb_common/src/fs.rs
+++ b/libs/hbb_common/src/fs.rs
@@ -1,13 +1,17 @@
-use crate::{bail, message_proto::*, ResultType};
+#[cfg(windows)]
+use std::os::windows::prelude::*;
use std::path::{Path, PathBuf};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use serde_derive::{Deserialize, Serialize};
+use tokio::{fs::File, io::*};
+
+use crate::{bail, get_version_number, message_proto::*, ResultType, Stream};
// https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html
use crate::{
compress::{compress, decompress},
config::{Config, COMPRESS_LEVEL},
};
-#[cfg(windows)]
-use std::os::windows::prelude::*;
-use tokio::{fs::File, io::*};
pub fn read_dir(path: &PathBuf, include_hidden: bool) -> ResultType<FileDirectory> {
let mut dir = FileDirectory {
@@ -184,16 +188,63 @@ pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType<Vec<F
read_dir_recursive(&get_path(path), &get_path(""), include_hidden)
}
+#[inline]
+pub fn is_file_exists(file_path: &str) -> bool {
+ return Path::new(file_path).exists();
+}
+
+#[inline]
+pub fn can_enable_overwrite_detection(version: i64) -> bool {
+ version >= get_version_number("1.1.10")
+}
+
#[derive(Default)]
pub struct TransferJob {
- id: i32,
- path: PathBuf,
- files: Vec<FileEntry>,
- file_num: i32,
+ pub id: i32,
+ pub remote: String,
+ pub path: PathBuf,
+ pub show_hidden: bool,
+ pub is_remote: bool,
+ pub is_last_job: bool,
+ pub file_num: i32,
+ pub files: Vec<FileEntry>,
+
file: Option<File>,
total_size: u64,
finished_size: u64,
transferred: u64,
+ enable_overwrite_detection: bool,
+ file_confirmed: bool,
+ // indicating the last file is skipped
+ file_skipped: bool,
+ file_is_waiting: bool,
+ default_overwrite_strategy: Option<bool>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize, Clone)]
+pub struct TransferJobMeta {
+ #[serde(default)]
+ pub id: i32,
+ #[serde(default)]
+ pub remote: String,
+ #[serde(default)]
+ pub to: String,
+ #[serde(default)]
+ pub show_hidden: bool,
+ #[serde(default)]
+ pub file_num: i32,
+ #[serde(default)]
+ pub is_remote: bool,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize, Clone)]
+pub struct RemoveJobMeta {
+ #[serde(default)]
+ pub path: String,
+ #[serde(default)]
+ pub is_remote: bool,
+ #[serde(default)]
+ pub no_confirm: bool,
}
#[inline]
@@ -219,25 +270,54 @@ fn is_compressed_file(name: &str) -> bool {
}
impl TransferJob {
- pub fn new_write(id: i32, path: String, files: Vec<FileEntry>) -> Self {
+ pub fn new_write(
+ id: i32,
+ remote: String,
+ path: String,
+ file_num: i32,
+ show_hidden: bool,
+ is_remote: bool,
+ files: Vec<FileEntry>,
+ enable_overwrite_detection: bool,
+ ) -> Self {
+ log::info!("new write {}", path);
let total_size = files.iter().map(|x| x.size as u64).sum();
Self {
id,
+ remote,
path: get_path(&path),
+ file_num,
+ show_hidden,
+ is_remote,
files,
total_size,
+ enable_overwrite_detection,
..Default::default()
}
}
- pub fn new_read(id: i32, path: String, include_hidden: bool) -> ResultType<Self> {
- let files = get_recursive_files(&path, include_hidden)?;
+ pub fn new_read(
+ id: i32,
+ remote: String,
+ path: String,
+ file_num: i32,
+ show_hidden: bool,
+ is_remote: bool,
+ enable_overwrite_detection: bool,
+ ) -> ResultType<Self> {
+ log::info!("new read {}", path);
+ let files = get_recursive_files(&path, show_hidden)?;
let total_size = files.iter().map(|x| x.size as u64).sum();
Ok(Self {
id,
+ remote,
path: get_path(&path),
+ file_num,
+ show_hidden,
+ is_remote,
files,
total_size,
+ enable_overwrite_detection,
..Default::default()
})
}
@@ -302,7 +382,7 @@ impl TransferJob {
}
}
- pub async fn write(&mut self, block: FileTransferBlock, raw: Option<&[u8]>) -> ResultType<()> {
+ pub async fn write(&mut self, block: FileTransferBlock) -> ResultType<()> {
if block.id != self.id {
bail!("Wrong id");
}
@@ -324,25 +404,20 @@ impl TransferJob {
let path = format!("{}.download", get_string(&path));
self.file = Some(File::create(&path).await?);
}
- let data = if let Some(data) = raw {
- data
- } else {
- &block.data
- };
if block.compressed {
- let tmp = decompress(data);
+ let tmp = decompress(&block.data);
self.file.as_mut().unwrap().write_all(&tmp).await?;
self.finished_size += tmp.len() as u64;
} else {
- self.file.as_mut().unwrap().write_all(data).await?;
- self.finished_size += data.len() as u64;
+ self.file.as_mut().unwrap().write_all(&block.data).await?;
+ self.finished_size += block.data.len() as u64;
}
- self.transferred += data.len() as u64;
+ self.transferred += block.data.len() as u64;
Ok(())
}
#[inline]
- fn join(&self, name: &str) -> PathBuf {
+ pub fn join(&self, name: &str) -> PathBuf {
if name.is_empty() {
self.path.clone()
} else {
@@ -350,7 +425,7 @@ impl TransferJob {
}
}
- pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> {
+ pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> {
let file_num = self.file_num as usize;
if file_num >= self.files.len() {
self.file.take();
@@ -361,13 +436,26 @@ impl TransferJob {
match File::open(self.join(&name)).await {
Ok(file) => {
self.file = Some(file);
+ self.file_confirmed = false;
+ self.file_is_waiting = false;
}
Err(err) => {
self.file_num += 1;
+ self.file_confirmed = false;
+ self.file_is_waiting = false;
return Err(err.into());
}
}
}
+ if self.enable_overwrite_detection {
+ if !self.file_confirmed() {
+ if !self.file_is_waiting() {
+ self.send_current_digest(stream).await?;
+ self.set_file_is_waiting(true);
+ }
+ return Ok(None);
+ }
+ }
const BUF_SIZE: usize = 128 * 1024;
let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE);
unsafe {
@@ -380,6 +468,8 @@ impl TransferJob {
Err(err) => {
self.file_num += 1;
self.file = None;
+ self.file_confirmed = false;
+ self.file_is_waiting = false;
return Err(err.into());
}
Ok(n) => {
@@ -394,6 +484,8 @@ impl TransferJob {
if offset == 0 {
self.file_num += 1;
self.file = None;
+ self.file_confirmed = false;
+ self.file_is_waiting = false;
} else {
self.finished_size += offset as u64;
if !is_compressed_file(name) {
@@ -413,6 +505,139 @@ impl TransferJob {
..Default::default()
}))
}
+
+ async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> {
+ let mut msg = Message::new();
+ let mut resp = FileResponse::new();
+ let meta = self.file.as_ref().unwrap().metadata().await?;
+ let last_modified = meta
+ .modified()?
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs();
+ resp.set_digest(FileTransferDigest {
+ id: self.id,
+ file_num: self.file_num,
+ last_modified,
+ file_size: meta.len(),
+ ..Default::default()
+ });
+ msg.set_file_response(resp);
+ stream.send(&msg).await?;
+ log::info!(
+ "id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}",
+ self.id,
+ self.file_num,
+ msg
+ );
+ Ok(())
+ }
+
+ pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option<bool>) {
+ self.default_overwrite_strategy = overwrite_strategy;
+ }
+
+ pub fn default_overwrite_strategy(&self) -> Option<bool> {
+ self.default_overwrite_strategy
+ }
+
+ pub fn set_file_confirmed(&mut self, file_confirmed: bool) {
+ log::info!("id: {}, file_confirmed: {}", self.id, file_confirmed);
+ self.file_confirmed = file_confirmed;
+ self.file_skipped = false;
+ }
+
+ pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) {
+ self.file_is_waiting = file_is_waiting;
+ }
+
+ #[inline]
+ pub fn file_is_waiting(&self) -> bool {
+ self.file_is_waiting
+ }
+
+ #[inline]
+ pub fn file_confirmed(&self) -> bool {
+ self.file_confirmed
+ }
+
+ /// Indicating whether the last file is skipped
+ #[inline]
+ pub fn file_skipped(&self) -> bool {
+ self.file_skipped
+ }
+
+ /// Indicating whether the whole task is skipped
+ #[inline]
+ pub fn job_skipped(&self) -> bool {
+ self.file_skipped() && self.files.len() == 1
+ }
+
+ /// Check whether the job is completed after `read` returns `None`
+ /// This is a helper function which gives additional lifecycle when the job reads `None`.
+ /// If returns `true`, it means we can delete the job automatically. `False` otherwise.
+ ///
+ /// [`Note`]
+ /// Conditions:
+ /// 1. Files are not waiting for confirmation by peers.
+ #[inline]
+ pub fn job_completed(&self) -> bool {
+ // has no error, Condition 2
+ if !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting) {
+ return true;
+ }
+ return false;
+ }
+
+ /// Get job error message, useful for getting status when job had finished
+ pub fn job_error(&self) -> Option<String> {
+ if self.job_skipped() {
+ return Some("skipped".to_string());
+ }
+ None
+ }
+
+ pub fn set_file_skipped(&mut self) -> bool {
+ log::debug!("skip file {} in job {}", self.file_num, self.id);
+ self.file.take();
+ self.set_file_confirmed(false);
+ self.set_file_is_waiting(false);
+ self.file_num += 1;
+ self.file_skipped = true;
+ true
+ }
+
+ pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool {
+ if self.file_num() != r.file_num {
+ log::info!("file num truncated, ignoring");
+ } else {
+ match r.union {
+ Some(file_transfer_send_confirm_request::Union::Skip(s)) => {
+ if s {
+ self.set_file_skipped();
+ } else {
+ self.set_file_confirmed(true);
+ }
+ }
+ Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => {
+ self.set_file_confirmed(true);
+ }
+ _ => {}
+ }
+ }
+ true
+ }
+
+ #[inline]
+ pub fn gen_meta(&self) -> TransferJobMeta {
+ TransferJobMeta {
+ id: self.id,
+ remote: self.remote.to_string(),
+ to: self.path.to_string_lossy().to_string(),
+ file_num: self.file_num,
+ show_hidden: self.show_hidden,
+ is_remote: self.is_remote,
+ }
+ }
}
#[inline]
@@ -453,12 +678,22 @@ pub fn new_block(block: FileTransferBlock) -> Message {
}
#[inline]
-pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message {
+pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message {
+ let mut msg_out = Message::new();
+ let mut action = FileAction::new();
+ action.set_send_confirm(r);
+ msg_out.set_file_action(action);
+ msg_out
+}
+
+#[inline]
+pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec<FileEntry>) -> Message {
let mut action = FileAction::new();
action.set_receive(FileTransferReceiveRequest {
id,
path,
files: files.into(),
+ file_num,
..Default::default()
});
let mut msg_out = Message::new();
@@ -467,12 +702,14 @@ pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message {
}
#[inline]
-pub fn new_send(id: i32, path: String, include_hidden: bool) -> Message {
+pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message {
+ log::info!("new send: {},id : {}", path, id);
let mut action = FileAction::new();
action.set_send(FileTransferSendRequest {
id,
path,
include_hidden,
+ file_num,
..Default::default()
});
let mut msg_out = Message::new();
@@ -509,7 +746,10 @@ pub async fn handle_read_jobs(
) -> ResultType<()> {
let mut finished = Vec::new();
for job in jobs.iter_mut() {
- match job.read().await {
+ if job.is_last_job {
+ continue;
+ }
+ match job.read(stream).await {
Err(err) => {
stream
.send(&new_error(job.id(), err, job.file_num()))
@@ -519,8 +759,19 @@ pub async fn handle_read_jobs(
stream.send(&new_block(block)).await?;
}
Ok(None) => {
- finished.push(job.id());
- stream.send(&new_done(job.id(), job.file_num())).await?;
+ if job.job_completed() {
+ finished.push(job.id());
+ let err = job.job_error();
+ if err.is_some() {
+ stream
+ .send(&new_error(job.id(), err.unwrap(), job.file_num()))
+ .await?;
+ } else {
+ stream.send(&new_done(job.id(), job.file_num())).await?;
+ }
+ } else {
+ // waiting confirmation.
+ }
}
}
}
@@ -565,3 +816,35 @@ pub fn transform_windows_path(entries: &mut Vec<FileEntry>) {
entry.name = entry.name.replace("\\", "/");
}
}
+
+pub enum DigestCheckResult {
+ IsSame,
+ NeedConfirm(FileTransferDigest),
+ NoSuchFile,
+}
+
+#[inline]
+pub fn is_write_need_confirmation(
+ file_path: &str,
+ digest: &FileTransferDigest,
+) -> ResultType<DigestCheckResult> {
+ let path = Path::new(file_path);
+ if path.exists() && path.is_file() {
+ let metadata = std::fs::metadata(path)?;
+ let modified_time = metadata.modified()?;
+ let remote_mt = Duration::from_secs(digest.last_modified);
+ let local_mt = modified_time.duration_since(UNIX_EPOCH)?;
+ if remote_mt == local_mt && digest.file_size == metadata.len() {
+ return Ok(DigestCheckResult::IsSame);
+ }
+ Ok(DigestCheckResult::NeedConfirm(FileTransferDigest {
+ id: digest.id,
+ file_num: digest.file_num,
+ last_modified: local_mt.as_secs(),
+ file_size: metadata.len(),
+ ..Default::default()
+ }))
+ } else {
+ Ok(DigestCheckResult::NoSuchFile)
+ }
+} \ No newline at end of file