From 73ff8d79f70b36483d1d33587cdc9549c8e472bd Mon Sep 17 00:00:00 2001 From: Jeremy Lin Date: Fri, 2 Apr 2021 20:16:49 -0700 Subject: Add a generic job scheduler Also rewrite deletion of old sends using the job scheduler. --- .env.template | 13 +++++++++++++ Cargo.lock | 33 ++++++++++++++++++++++++++++++++- Cargo.toml | 10 ++++++++++ src/api/core/mod.rs | 2 +- src/api/core/sends.rs | 24 ++++++++---------------- src/api/mod.rs | 2 +- src/config.rs | 8 ++++++++ src/db/models/send.rs | 22 ++++++++++++++++------ src/main.rs | 43 +++++++++++++++++++++++++++++++++++++------ 9 files changed, 126 insertions(+), 31 deletions(-) diff --git a/.env.template b/.env.template index a85ce22d..ce571ff6 100644 --- a/.env.template +++ b/.env.template @@ -56,6 +56,19 @@ # WEBSOCKET_ADDRESS=0.0.0.0 # WEBSOCKET_PORT=3012 +## Job scheduler settings +## +## Job schedules use a cron-like syntax (as parsed by https://crates.io/crates/cron), +## and are always in terms of UTC time (regardless of your local time zone settings). +## +## How often (in ms) the job scheduler thread checks for jobs that need running. +## Set to 0 to globally disable scheduled jobs. +# JOB_POLL_INTERVAL_MS=30000 +## +## Cron schedule of the job that checks for Sends past their deletion date. +## Defaults to hourly. Set blank to disable this job. +# SEND_PURGE_SCHEDULE="0 0 * * * *" + ## Enable extended logging, which shows timestamps and targets in the logs # EXTENDED_LOGGING=true diff --git a/Cargo.lock b/Cargo.lock index e60c8d0d..7a18fadc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ dependencies = [ "handlebars", "html5ever", "idna 0.2.2", + "job_scheduler", "jsonwebtoken", "lettre", "libsqlite3-sys", @@ -401,6 +402,17 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "cron" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e009ed0b762cf7a967a34dfdc67d5967d3f828f12901d37081432c3dd1668f8f" +dependencies = [ + "chrono", + "nom 4.1.1", + "once_cell", +] + [[package]] name = "crypto-mac" version = "0.3.0" @@ -1097,6 +1109,16 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "job_scheduler" +version = "1.2.1" +source = "git+https://github.com/jjlin/job_scheduler?rev=ee023418dbba2bfe1e30a5fd7d937f9e33739806#ee023418dbba2bfe1e30a5fd7d937f9e33739806" +dependencies = [ + "chrono", + "cron", + "uuid", +] + [[package]] name = "js-sys" version = "0.3.49" @@ -1160,7 +1182,7 @@ dependencies = [ "idna 0.2.2", "mime 0.3.16", "native-tls", - "nom", + "nom 6.1.2", "once_cell", "quoted_printable", "rand 0.8.3", @@ -1475,6 +1497,15 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "nom" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c349f68f25f596b9f44cf0e7c69752a5c633b0550c3ff849518bfba0233774a" +dependencies = [ + "memchr", +] + [[package]] name = "nom" version = "6.1.2" diff --git a/Cargo.toml b/Cargo.toml index 24c24eba..4edffc42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,9 @@ chrono = { version = "0.4.19", features = ["serde"] } chrono-tz = "0.5.3" time = "0.2.26" +# Job scheduler +job_scheduler = "1.2.1" + # TOTP library oath = "0.10.2" @@ -136,3 +139,10 @@ rocket_contrib = { git = 'https://github.com/SergioBenitez/Rocket', rev = '263e3 # For favicon extraction from main website data-url = { git = 'https://github.com/servo/rust-url', package="data-url", rev = '540ede02d0771824c0c80ff9f57fe8eff38b1291' } + +# The maintainer of the `job_scheduler` crate doesn't seem to have responded +# to any issues or PRs for almost a year (as of April 2021). This hopefully +# temporary fork updates Cargo.toml to use more up-to-date dependencies. +# In particular, `cron` has since implemented parsing of some common syntax +# that wasn't previously supported (https://github.com/zslayton/cron/pull/64). +job_scheduler = { git = 'https://github.com/jjlin/job_scheduler', rev = 'ee023418dbba2bfe1e30a5fd7d937f9e33739806' } diff --git a/src/api/core/mod.rs b/src/api/core/mod.rs index 36e83f0e..8a7e5f9b 100644 --- a/src/api/core/mod.rs +++ b/src/api/core/mod.rs @@ -5,7 +5,7 @@ mod organizations; pub mod two_factor; mod sends; -pub use sends::start_send_deletion_scheduler; +pub use sends::purge_sends; pub fn routes() -> Vec { let mut mod_routes = routes![ diff --git a/src/api/core/sends.rs b/src/api/core/sends.rs index ec6809a2..3cd568c5 100644 --- a/src/api/core/sends.rs +++ b/src/api/core/sends.rs @@ -9,7 +9,7 @@ use serde_json::Value; use crate::{ api::{ApiResult, EmptyResult, JsonResult, JsonUpcase, Notify, UpdateType}, auth::{Headers, Host}, - db::{models::*, DbConn}, + db::{models::*, DbConn, DbPool}, CONFIG, }; @@ -27,21 +27,13 @@ pub fn routes() -> Vec { ] } -pub fn start_send_deletion_scheduler(pool: crate::db::DbPool) { - std::thread::spawn(move || { - loop { - if let Ok(conn) = pool.get() { - info!("Initiating send deletion"); - for send in Send::find_all(&conn) { - if chrono::Utc::now().naive_utc() >= send.deletion_date { - send.delete(&conn).ok(); - } - } - } - - std::thread::sleep(std::time::Duration::from_secs(3600)); - } - }); +pub fn purge_sends(pool: DbPool) { + debug!("Purging sends"); + if let Ok(conn) = pool.get() { + Send::purge(&conn); + } else { + error!("Failed to get DB connection while purging sends") + } } #[derive(Deserialize)] diff --git a/src/api/mod.rs b/src/api/mod.rs index 840c65ff..f417751c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -10,8 +10,8 @@ use serde_json::Value; pub use crate::api::{ admin::routes as admin_routes, + core::purge_sends, core::routes as core_routes, - core::start_send_deletion_scheduler, icons::routes as icons_routes, identity::routes as identity_routes, notifications::routes as notifications_routes, diff --git a/src/config.rs b/src/config.rs index 6c41c975..7c3c5461 100644 --- a/src/config.rs +++ b/src/config.rs @@ -316,6 +316,14 @@ make_config! { /// Websocket port websocket_port: u16, false, def, 3012; }, + jobs { + /// Job scheduler poll interval |> How often the job scheduler thread checks for jobs to run. + /// Set to 0 to globally disable scheduled jobs. + job_poll_interval_ms: u64, false, def, 30_000; + /// Send purge schedule |> Cron schedule of the job that checks for Sends past their deletion date. + /// Defaults to hourly. Set blank to disable this job. + send_purge_schedule: String, false, def, "0 0 * * * *".to_string(); + }, /// General settings settings { diff --git a/src/db/models/send.rs b/src/db/models/send.rs index 0356d818..0644b1e1 100644 --- a/src/db/models/send.rs +++ b/src/db/models/send.rs @@ -205,6 +205,13 @@ impl Send { }} } + /// Purge all sends that are past their deletion date. + pub fn purge(conn: &DbConn) { + for send in Self::find_by_past_deletion_date(&conn) { + send.delete(&conn).ok(); + } + } + pub fn update_users_revision(&self, conn: &DbConn) { match &self.user_uuid { Some(user_uuid) => { @@ -223,12 +230,6 @@ impl Send { Ok(()) } - pub fn find_all(conn: &DbConn) -> Vec { - db_run! {conn: { - sends::table.load::(conn).expect("Error loading sends").from_db() - }} - } - pub fn find_by_access_id(access_id: &str, conn: &DbConn) -> Option { use data_encoding::BASE64URL_NOPAD; use uuid::Uuid; @@ -271,4 +272,13 @@ impl Send { .load::(conn).expect("Error loading sends").from_db() }} } + + pub fn find_by_past_deletion_date(conn: &DbConn) -> Vec { + let now = Utc::now().naive_utc(); + db_run! {conn: { + sends::table + .filter(sends::deletion_date.lt(now)) + .load::(conn).expect("Error loading sends").from_db() + }} + } } diff --git a/src/main.rs b/src/main.rs index 50975c66..4cdf4ff2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ extern crate diesel; #[macro_use] extern crate diesel_migrations; +use job_scheduler::{JobScheduler, Job}; use std::{ fs::create_dir_all, panic, @@ -23,6 +24,7 @@ use std::{ process::{exit, Command}, str::FromStr, thread, + time::Duration, }; #[macro_use] @@ -56,7 +58,9 @@ fn main() { create_icon_cache_folder(); - launch_rocket(extra_debug); + let pool = create_db_pool(); + schedule_jobs(pool.clone()); + launch_rocket(pool, extra_debug); // Blocks until program termination. } const HELP: &str = "\ @@ -301,17 +305,17 @@ fn check_web_vault() { } } -fn launch_rocket(extra_debug: bool) { - let pool = match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) { +fn create_db_pool() -> db::DbPool { + match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) { Ok(p) => p, Err(e) => { error!("Error creating database pool: {:?}", e); exit(1); } - }; - - api::start_send_deletion_scheduler(pool.clone()); + } +} +fn launch_rocket(pool: db::DbPool, extra_debug: bool) { let basepath = &CONFIG.domain_path(); // If adding more paths here, consider also adding them to @@ -334,3 +338,30 @@ fn launch_rocket(extra_debug: bool) { // The launch will restore the original logging level error!("Launch error {:#?}", result); } + +fn schedule_jobs(pool: db::DbPool) { + if CONFIG.job_poll_interval_ms() == 0 { + info!("Job scheduler disabled."); + return; + } + thread::Builder::new().name("job-scheduler".to_string()).spawn(move || { + let mut sched = JobScheduler::new(); + + // Purge sends that are past their deletion date. + if !CONFIG.send_purge_schedule().is_empty() { + sched.add(Job::new(CONFIG.send_purge_schedule().parse().unwrap(), || { + api::purge_sends(pool.clone()); + })); + } + + // Periodically check for jobs to run. We probably won't need any + // jobs that run more often than once a minute, so a default poll + // interval of 30 seconds should be sufficient. Users who want to + // schedule jobs to run more frequently for some reason can reduce + // the poll interval accordingly. + loop { + sched.tick(); + thread::sleep(Duration::from_millis(CONFIG.job_poll_interval_ms())); + } + }).expect("Error spawning job scheduler thread"); +} -- cgit v1.2.3 From d77333576b1268cd24f17348ffe6d72e07855f54 Mon Sep 17 00:00:00 2001 From: Jeremy Lin Date: Fri, 2 Apr 2021 20:52:15 -0700 Subject: Add support for auto-deleting trashed items Upstream will soon auto-delete trashed items after 30 days, but some people use the trash as an archive folder, so to avoid unexpected data loss, this implementation requires the user to explicitly enable auto-deletion. --- .env.template | 4 ++++ src/api/core/ciphers.rs | 11 ++++++++++- src/api/core/mod.rs | 1 + src/api/mod.rs | 1 + src/config.rs | 8 ++++++++ src/db/models/cipher.rs | 24 +++++++++++++++++++++++- src/main.rs | 7 +++++++ 7 files changed, 54 insertions(+), 2 deletions(-) diff --git a/.env.template b/.env.template index ce571ff6..e5665296 100644 --- a/.env.template +++ b/.env.template @@ -68,6 +68,10 @@ ## Cron schedule of the job that checks for Sends past their deletion date. ## Defaults to hourly. Set blank to disable this job. # SEND_PURGE_SCHEDULE="0 0 * * * *" +## +## Cron schedule of the job that checks for trashed items to delete permanently. +## Defaults to daily. Set blank to disable this job. +# TRASH_PURGE_SCHEDULE="0 0 0 * * *" ## Enable extended logging, which shows timestamps and targets in the logs # EXTENDED_LOGGING=true diff --git a/src/api/core/ciphers.rs b/src/api/core/ciphers.rs index 7b0de205..58ae80b1 100644 --- a/src/api/core/ciphers.rs +++ b/src/api/core/ciphers.rs @@ -13,7 +13,7 @@ use crate::{ api::{self, EmptyResult, JsonResult, JsonUpcase, Notify, PasswordData, UpdateType}, auth::Headers, crypto, - db::{models::*, DbConn}, + db::{models::*, DbConn, DbPool}, CONFIG, }; @@ -77,6 +77,15 @@ pub fn routes() -> Vec { ] } +pub fn purge_trashed_ciphers(pool: DbPool) { + debug!("Purging trashed ciphers"); + if let Ok(conn) = pool.get() { + Cipher::purge_trash(&conn); + } else { + error!("Failed to get DB connection while purging trashed ciphers") + } +} + #[derive(FromForm, Default)] struct SyncData { #[form(field = "excludeDomains")] diff --git a/src/api/core/mod.rs b/src/api/core/mod.rs index 8a7e5f9b..2964d4fb 100644 --- a/src/api/core/mod.rs +++ b/src/api/core/mod.rs @@ -5,6 +5,7 @@ mod organizations; pub mod two_factor; mod sends; +pub use ciphers::purge_trashed_ciphers; pub use sends::purge_sends; pub fn routes() -> Vec { diff --git a/src/api/mod.rs b/src/api/mod.rs index f417751c..2132b30b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,6 +11,7 @@ use serde_json::Value; pub use crate::api::{ admin::routes as admin_routes, core::purge_sends, + core::purge_trashed_ciphers, core::routes as core_routes, icons::routes as icons_routes, identity::routes as identity_routes, diff --git a/src/config.rs b/src/config.rs index 7c3c5461..bc2f359e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -323,6 +323,9 @@ make_config! { /// Send purge schedule |> Cron schedule of the job that checks for Sends past their deletion date. /// Defaults to hourly. Set blank to disable this job. send_purge_schedule: String, false, def, "0 0 * * * *".to_string(); + /// Trash purge schedule |> Cron schedule of the job that checks for trashed items to delete permanently. + /// Defaults to daily. Set blank to disable this job. + trash_purge_schedule: String, false, def, "0 0 0 * * *".to_string(); }, /// General settings @@ -347,6 +350,11 @@ make_config! { /// Per-organization attachment limit (KB) |> Limit in kilobytes for an organization attachments, once the limit is exceeded it won't be possible to upload more org_attachment_limit: i64, true, option; + /// Trash auto-delete days |> Number of days to wait before auto-deleting a trashed item. + /// If unset, trashed items are not auto-deleted. This setting applies globally, so make + /// sure to inform all users of any changes to this setting. + trash_auto_delete_days: i64, true, option; + /// Disable icon downloads |> Set to true to disable icon downloading, this would still serve icons from /// $ICON_CACHE_FOLDER, but it won't produce any external network request. Needs to set $ICON_CACHE_TTL to 0, /// otherwise it will delete them and they won't be downloaded again. diff --git a/src/db/models/cipher.rs b/src/db/models/cipher.rs index 365865f8..e4ae04c8 100644 --- a/src/db/models/cipher.rs +++ b/src/db/models/cipher.rs @@ -1,6 +1,8 @@ -use chrono::{NaiveDateTime, Utc}; +use chrono::{Duration, NaiveDateTime, Utc}; use serde_json::Value; +use crate::CONFIG; + use super::{ Attachment, CollectionCipher, @@ -271,6 +273,17 @@ impl Cipher { Ok(()) } + /// Purge all ciphers that are old enough to be auto-deleted. + pub fn purge_trash(conn: &DbConn) { + if let Some(auto_delete_days) = CONFIG.trash_auto_delete_days() { + let now = Utc::now().naive_utc(); + let dt = now - Duration::days(auto_delete_days); + for cipher in Self::find_deleted_before(&dt, conn) { + cipher.delete(&conn).ok(); + } + } + } + pub fn move_to_folder(&self, folder_uuid: Option, user_uuid: &str, conn: &DbConn) -> EmptyResult { User::update_uuid_revision(user_uuid, conn); @@ -511,6 +524,15 @@ impl Cipher { }} } + /// Find all ciphers that were deleted before the specified datetime. + pub fn find_deleted_before(dt: &NaiveDateTime, conn: &DbConn) -> Vec { + db_run! {conn: { + ciphers::table + .filter(ciphers::deleted_at.lt(dt)) + .load::(conn).expect("Error loading ciphers").from_db() + }} + } + pub fn get_collections(&self, user_id: &str, conn: &DbConn) -> Vec { db_run! {conn: { ciphers_collections::table diff --git a/src/main.rs b/src/main.rs index 4cdf4ff2..d5985bac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -354,6 +354,13 @@ fn schedule_jobs(pool: db::DbPool) { })); } + // Purge trashed items that are old enough to be auto-deleted. + if !CONFIG.trash_purge_schedule().is_empty() { + sched.add(Job::new(CONFIG.trash_purge_schedule().parse().unwrap(), || { + api::purge_trashed_ciphers(pool.clone()); + })); + } + // Periodically check for jobs to run. We probably won't need any // jobs that run more often than once a minute, so a default poll // interval of 30 seconds should be sufficient. Users who want to -- cgit v1.2.3 From 90e0b7fec6cc025561f9f732fb06d15f72e5c892 Mon Sep 17 00:00:00 2001 From: Jeremy Lin Date: Mon, 5 Apr 2021 23:12:36 -0700 Subject: Offset scheduled jobs by 5 minutes This is intended to avoid contention with database backups that many users probably schedule to start at exactly the top of an hour. --- .env.template | 8 ++++---- src/config.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.env.template b/.env.template index e5665296..e4d0b1e1 100644 --- a/.env.template +++ b/.env.template @@ -66,12 +66,12 @@ # JOB_POLL_INTERVAL_MS=30000 ## ## Cron schedule of the job that checks for Sends past their deletion date. -## Defaults to hourly. Set blank to disable this job. -# SEND_PURGE_SCHEDULE="0 0 * * * *" +## Defaults to hourly (5 minutes after the hour). Set blank to disable this job. +# SEND_PURGE_SCHEDULE="0 5 * * * *" ## ## Cron schedule of the job that checks for trashed items to delete permanently. -## Defaults to daily. Set blank to disable this job. -# TRASH_PURGE_SCHEDULE="0 0 0 * * *" +## Defaults to daily (5 minutes after midnight). Set blank to disable this job. +# TRASH_PURGE_SCHEDULE="0 5 0 * * *" ## Enable extended logging, which shows timestamps and targets in the logs # EXTENDED_LOGGING=true diff --git a/src/config.rs b/src/config.rs index bc2f359e..86031c72 100644 --- a/src/config.rs +++ b/src/config.rs @@ -322,10 +322,10 @@ make_config! { job_poll_interval_ms: u64, false, def, 30_000; /// Send purge schedule |> Cron schedule of the job that checks for Sends past their deletion date. /// Defaults to hourly. Set blank to disable this job. - send_purge_schedule: String, false, def, "0 0 * * * *".to_string(); + send_purge_schedule: String, false, def, "0 5 * * * *".to_string(); /// Trash purge schedule |> Cron schedule of the job that checks for trashed items to delete permanently. /// Defaults to daily. Set blank to disable this job. - trash_purge_schedule: String, false, def, "0 0 0 * * *".to_string(); + trash_purge_schedule: String, false, def, "0 5 0 * * *".to_string(); }, /// General settings -- cgit v1.2.3