diff options
Diffstat (limited to 'libs/hbb_common/src/fs.rs')
-rw-r--r-- | libs/hbb_common/src/fs.rs | 339 |
1 files changed, 311 insertions, 28 deletions
diff --git a/libs/hbb_common/src/fs.rs b/libs/hbb_common/src/fs.rs index a5bc6ad..570fad8 100644 --- a/libs/hbb_common/src/fs.rs +++ b/libs/hbb_common/src/fs.rs @@ -1,13 +1,17 @@ -use crate::{bail, message_proto::*, ResultType}; +#[cfg(windows)] +use std::os::windows::prelude::*; use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use serde_derive::{Deserialize, Serialize}; +use tokio::{fs::File, io::*}; + +use crate::{bail, get_version_number, message_proto::*, ResultType, Stream}; // https://doc.rust-lang.org/std/os/windows/fs/trait.MetadataExt.html use crate::{ compress::{compress, decompress}, config::{Config, COMPRESS_LEVEL}, }; -#[cfg(windows)] -use std::os::windows::prelude::*; -use tokio::{fs::File, io::*}; pub fn read_dir(path: &PathBuf, include_hidden: bool) -> ResultType<FileDirectory> { let mut dir = FileDirectory { @@ -184,16 +188,63 @@ pub fn get_recursive_files(path: &str, include_hidden: bool) -> ResultType<Vec<F read_dir_recursive(&get_path(path), &get_path(""), include_hidden) } +#[inline] +pub fn is_file_exists(file_path: &str) -> bool { + return Path::new(file_path).exists(); +} + +#[inline] +pub fn can_enable_overwrite_detection(version: i64) -> bool { + version >= get_version_number("1.1.10") +} + #[derive(Default)] pub struct TransferJob { - id: i32, - path: PathBuf, - files: Vec<FileEntry>, - file_num: i32, + pub id: i32, + pub remote: String, + pub path: PathBuf, + pub show_hidden: bool, + pub is_remote: bool, + pub is_last_job: bool, + pub file_num: i32, + pub files: Vec<FileEntry>, + file: Option<File>, total_size: u64, finished_size: u64, transferred: u64, + enable_overwrite_detection: bool, + file_confirmed: bool, + // indicating the last file is skipped + file_skipped: bool, + file_is_waiting: bool, + default_overwrite_strategy: Option<bool>, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct TransferJobMeta { + #[serde(default)] + pub id: i32, + #[serde(default)] + pub remote: String, + #[serde(default)] + pub to: String, + #[serde(default)] + pub show_hidden: bool, + #[serde(default)] + pub file_num: i32, + #[serde(default)] + pub is_remote: bool, +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct RemoveJobMeta { + #[serde(default)] + pub path: String, + #[serde(default)] + pub is_remote: bool, + #[serde(default)] + pub no_confirm: bool, } #[inline] @@ -219,25 +270,54 @@ fn is_compressed_file(name: &str) -> bool { } impl TransferJob { - pub fn new_write(id: i32, path: String, files: Vec<FileEntry>) -> Self { + pub fn new_write( + id: i32, + remote: String, + path: String, + file_num: i32, + show_hidden: bool, + is_remote: bool, + files: Vec<FileEntry>, + enable_overwrite_detection: bool, + ) -> Self { + log::info!("new write {}", path); let total_size = files.iter().map(|x| x.size as u64).sum(); Self { id, + remote, path: get_path(&path), + file_num, + show_hidden, + is_remote, files, total_size, + enable_overwrite_detection, ..Default::default() } } - pub fn new_read(id: i32, path: String, include_hidden: bool) -> ResultType<Self> { - let files = get_recursive_files(&path, include_hidden)?; + pub fn new_read( + id: i32, + remote: String, + path: String, + file_num: i32, + show_hidden: bool, + is_remote: bool, + enable_overwrite_detection: bool, + ) -> ResultType<Self> { + log::info!("new read {}", path); + let files = get_recursive_files(&path, show_hidden)?; let total_size = files.iter().map(|x| x.size as u64).sum(); Ok(Self { id, + remote, path: get_path(&path), + file_num, + show_hidden, + is_remote, files, total_size, + enable_overwrite_detection, ..Default::default() }) } @@ -302,7 +382,7 @@ impl TransferJob { } } - pub async fn write(&mut self, block: FileTransferBlock, raw: Option<&[u8]>) -> ResultType<()> { + pub async fn write(&mut self, block: FileTransferBlock) -> ResultType<()> { if block.id != self.id { bail!("Wrong id"); } @@ -324,25 +404,20 @@ impl TransferJob { let path = format!("{}.download", get_string(&path)); self.file = Some(File::create(&path).await?); } - let data = if let Some(data) = raw { - data - } else { - &block.data - }; if block.compressed { - let tmp = decompress(data); + let tmp = decompress(&block.data); self.file.as_mut().unwrap().write_all(&tmp).await?; self.finished_size += tmp.len() as u64; } else { - self.file.as_mut().unwrap().write_all(data).await?; - self.finished_size += data.len() as u64; + self.file.as_mut().unwrap().write_all(&block.data).await?; + self.finished_size += block.data.len() as u64; } - self.transferred += data.len() as u64; + self.transferred += block.data.len() as u64; Ok(()) } #[inline] - fn join(&self, name: &str) -> PathBuf { + pub fn join(&self, name: &str) -> PathBuf { if name.is_empty() { self.path.clone() } else { @@ -350,7 +425,7 @@ impl TransferJob { } } - pub async fn read(&mut self) -> ResultType<Option<FileTransferBlock>> { + pub async fn read(&mut self, stream: &mut Stream) -> ResultType<Option<FileTransferBlock>> { let file_num = self.file_num as usize; if file_num >= self.files.len() { self.file.take(); @@ -361,13 +436,26 @@ impl TransferJob { match File::open(self.join(&name)).await { Ok(file) => { self.file = Some(file); + self.file_confirmed = false; + self.file_is_waiting = false; } Err(err) => { self.file_num += 1; + self.file_confirmed = false; + self.file_is_waiting = false; return Err(err.into()); } } } + if self.enable_overwrite_detection { + if !self.file_confirmed() { + if !self.file_is_waiting() { + self.send_current_digest(stream).await?; + self.set_file_is_waiting(true); + } + return Ok(None); + } + } const BUF_SIZE: usize = 128 * 1024; let mut buf: Vec<u8> = Vec::with_capacity(BUF_SIZE); unsafe { @@ -380,6 +468,8 @@ impl TransferJob { Err(err) => { self.file_num += 1; self.file = None; + self.file_confirmed = false; + self.file_is_waiting = false; return Err(err.into()); } Ok(n) => { @@ -394,6 +484,8 @@ impl TransferJob { if offset == 0 { self.file_num += 1; self.file = None; + self.file_confirmed = false; + self.file_is_waiting = false; } else { self.finished_size += offset as u64; if !is_compressed_file(name) { @@ -413,6 +505,139 @@ impl TransferJob { ..Default::default() })) } + + async fn send_current_digest(&mut self, stream: &mut Stream) -> ResultType<()> { + let mut msg = Message::new(); + let mut resp = FileResponse::new(); + let meta = self.file.as_ref().unwrap().metadata().await?; + let last_modified = meta + .modified()? + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + resp.set_digest(FileTransferDigest { + id: self.id, + file_num: self.file_num, + last_modified, + file_size: meta.len(), + ..Default::default() + }); + msg.set_file_response(resp); + stream.send(&msg).await?; + log::info!( + "id: {}, file_num:{}, digest message is sent. waiting for confirm. msg: {:?}", + self.id, + self.file_num, + msg + ); + Ok(()) + } + + pub fn set_overwrite_strategy(&mut self, overwrite_strategy: Option<bool>) { + self.default_overwrite_strategy = overwrite_strategy; + } + + pub fn default_overwrite_strategy(&self) -> Option<bool> { + self.default_overwrite_strategy + } + + pub fn set_file_confirmed(&mut self, file_confirmed: bool) { + log::info!("id: {}, file_confirmed: {}", self.id, file_confirmed); + self.file_confirmed = file_confirmed; + self.file_skipped = false; + } + + pub fn set_file_is_waiting(&mut self, file_is_waiting: bool) { + self.file_is_waiting = file_is_waiting; + } + + #[inline] + pub fn file_is_waiting(&self) -> bool { + self.file_is_waiting + } + + #[inline] + pub fn file_confirmed(&self) -> bool { + self.file_confirmed + } + + /// Indicating whether the last file is skipped + #[inline] + pub fn file_skipped(&self) -> bool { + self.file_skipped + } + + /// Indicating whether the whole task is skipped + #[inline] + pub fn job_skipped(&self) -> bool { + self.file_skipped() && self.files.len() == 1 + } + + /// Check whether the job is completed after `read` returns `None` + /// This is a helper function which gives additional lifecycle when the job reads `None`. + /// If returns `true`, it means we can delete the job automatically. `False` otherwise. + /// + /// [`Note`] + /// Conditions: + /// 1. Files are not waiting for confirmation by peers. + #[inline] + pub fn job_completed(&self) -> bool { + // has no error, Condition 2 + if !self.enable_overwrite_detection || (!self.file_confirmed && !self.file_is_waiting) { + return true; + } + return false; + } + + /// Get job error message, useful for getting status when job had finished + pub fn job_error(&self) -> Option<String> { + if self.job_skipped() { + return Some("skipped".to_string()); + } + None + } + + pub fn set_file_skipped(&mut self) -> bool { + log::debug!("skip file {} in job {}", self.file_num, self.id); + self.file.take(); + self.set_file_confirmed(false); + self.set_file_is_waiting(false); + self.file_num += 1; + self.file_skipped = true; + true + } + + pub fn confirm(&mut self, r: &FileTransferSendConfirmRequest) -> bool { + if self.file_num() != r.file_num { + log::info!("file num truncated, ignoring"); + } else { + match r.union { + Some(file_transfer_send_confirm_request::Union::Skip(s)) => { + if s { + self.set_file_skipped(); + } else { + self.set_file_confirmed(true); + } + } + Some(file_transfer_send_confirm_request::Union::OffsetBlk(_offset)) => { + self.set_file_confirmed(true); + } + _ => {} + } + } + true + } + + #[inline] + pub fn gen_meta(&self) -> TransferJobMeta { + TransferJobMeta { + id: self.id, + remote: self.remote.to_string(), + to: self.path.to_string_lossy().to_string(), + file_num: self.file_num, + show_hidden: self.show_hidden, + is_remote: self.is_remote, + } + } } #[inline] @@ -453,12 +678,22 @@ pub fn new_block(block: FileTransferBlock) -> Message { } #[inline] -pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message { +pub fn new_send_confirm(r: FileTransferSendConfirmRequest) -> Message { + let mut msg_out = Message::new(); + let mut action = FileAction::new(); + action.set_send_confirm(r); + msg_out.set_file_action(action); + msg_out +} + +#[inline] +pub fn new_receive(id: i32, path: String, file_num: i32, files: Vec<FileEntry>) -> Message { let mut action = FileAction::new(); action.set_receive(FileTransferReceiveRequest { id, path, files: files.into(), + file_num, ..Default::default() }); let mut msg_out = Message::new(); @@ -467,12 +702,14 @@ pub fn new_receive(id: i32, path: String, files: Vec<FileEntry>) -> Message { } #[inline] -pub fn new_send(id: i32, path: String, include_hidden: bool) -> Message { +pub fn new_send(id: i32, path: String, file_num: i32, include_hidden: bool) -> Message { + log::info!("new send: {},id : {}", path, id); let mut action = FileAction::new(); action.set_send(FileTransferSendRequest { id, path, include_hidden, + file_num, ..Default::default() }); let mut msg_out = Message::new(); @@ -509,7 +746,10 @@ pub async fn handle_read_jobs( ) -> ResultType<()> { let mut finished = Vec::new(); for job in jobs.iter_mut() { - match job.read().await { + if job.is_last_job { + continue; + } + match job.read(stream).await { Err(err) => { stream .send(&new_error(job.id(), err, job.file_num())) @@ -519,8 +759,19 @@ pub async fn handle_read_jobs( stream.send(&new_block(block)).await?; } Ok(None) => { - finished.push(job.id()); - stream.send(&new_done(job.id(), job.file_num())).await?; + if job.job_completed() { + finished.push(job.id()); + let err = job.job_error(); + if err.is_some() { + stream + .send(&new_error(job.id(), err.unwrap(), job.file_num())) + .await?; + } else { + stream.send(&new_done(job.id(), job.file_num())).await?; + } + } else { + // waiting confirmation. + } } } } @@ -565,3 +816,35 @@ pub fn transform_windows_path(entries: &mut Vec<FileEntry>) { entry.name = entry.name.replace("\\", "/"); } } + +pub enum DigestCheckResult { + IsSame, + NeedConfirm(FileTransferDigest), + NoSuchFile, +} + +#[inline] +pub fn is_write_need_confirmation( + file_path: &str, + digest: &FileTransferDigest, +) -> ResultType<DigestCheckResult> { + let path = Path::new(file_path); + if path.exists() && path.is_file() { + let metadata = std::fs::metadata(path)?; + let modified_time = metadata.modified()?; + let remote_mt = Duration::from_secs(digest.last_modified); + let local_mt = modified_time.duration_since(UNIX_EPOCH)?; + if remote_mt == local_mt && digest.file_size == metadata.len() { + return Ok(DigestCheckResult::IsSame); + } + Ok(DigestCheckResult::NeedConfirm(FileTransferDigest { + id: digest.id, + file_num: digest.file_num, + last_modified: local_mt.as_secs(), + file_size: metadata.len(), + ..Default::default() + })) + } else { + Ok(DigestCheckResult::NoSuchFile) + } +}
\ No newline at end of file |