From 512b1196fd13e429d14bf70f019aacf89b6ff840 Mon Sep 17 00:00:00 2001 From: Artur Jamro Date: Tue, 17 Sep 2019 16:38:21 -0700 Subject: [PATCH] Cache worker tests --- wasmtime-environ/Cargo.toml | 1 + wasmtime-environ/src/cache.rs | 149 ++-- wasmtime-environ/src/cache/config.rs | 3 +- wasmtime-environ/src/cache/config/tests.rs | 3 +- wasmtime-environ/src/cache/tests.rs | 100 ++- wasmtime-environ/src/cache/worker.rs | 127 ++- wasmtime-environ/src/cache/worker/tests.rs | 758 ++++++++++++++++++ .../cache/worker/tests/system_time_stub.rs | 29 + 8 files changed, 1039 insertions(+), 131 deletions(-) create mode 100644 wasmtime-environ/src/cache/worker/tests.rs create mode 100644 wasmtime-environ/src/cache/worker/tests/system_time_stub.rs diff --git a/wasmtime-environ/Cargo.toml b/wasmtime-environ/Cargo.toml index 91db75a978..fda6ca1b4d 100644 --- a/wasmtime-environ/Cargo.toml +++ b/wasmtime-environ/Cargo.toml @@ -45,6 +45,7 @@ target-lexicon = { version = "0.8.1", default-features = false } pretty_env_logger = "0.3.0" rand = { version = "0.7.0", features = ["small_rng"] } cranelift-codegen = { version = "0.44.0", features = ["enable-serde", "all-arch"] } +filetime = "0.2.7" [features] default = ["std"] diff --git a/wasmtime-environ/src/cache.rs b/wasmtime-environ/src/cache.rs index ad8ec302af..9c982102f1 100644 --- a/wasmtime-environ/src/cache.rs +++ b/wasmtime-environ/src/cache.rs @@ -15,12 +15,13 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::string::{String, ToString}; +#[macro_use] // for tests mod config; mod worker; use config::{cache_config, CacheConfig}; pub use config::{create_new_config, init}; -use worker::worker; +use worker::{worker, Worker}; lazy_static! { static ref SELF_MTIME: String = { @@ -48,9 +49,12 @@ lazy_static! { }; } -pub struct ModuleCacheEntry<'config> { - mod_cache_path: Option, +pub struct ModuleCacheEntry<'config, 'worker>(Option>); + +struct ModuleCacheEntryInner<'config, 'worker> { + mod_cache_path: PathBuf, cache_config: &'config CacheConfig, + worker: &'worker Worker, } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -72,7 +76,7 @@ type ModuleCacheDataTupleType = ( struct Sha256Hasher(Sha256); -impl<'config> ModuleCacheEntry<'config> { +impl<'config, 'worker> ModuleCacheEntry<'config, 'worker> { pub fn new<'data>( module: &Module, function_body_inputs: &PrimaryMap>, @@ -80,87 +84,104 @@ impl<'config> ModuleCacheEntry<'config> { compiler_name: &str, generate_debug_info: bool, ) -> Self { - Self::new_with_config( - module, - function_body_inputs, - isa, - compiler_name, - generate_debug_info, - cache_config(), - ) + let cache_config = cache_config(); + if cache_config.enabled() { + Self(Some(ModuleCacheEntryInner::new( + module, + function_body_inputs, + isa, + compiler_name, + generate_debug_info, + cache_config, + worker(), + ))) + } else { + Self(None) + } + } + + #[cfg(test)] + fn from_inner<'data>(inner: ModuleCacheEntryInner<'config, 'worker>) -> Self { + Self(Some(inner)) + } + + pub fn get_data(&self) -> Option { + if let Some(inner) = &self.0 { + inner.get_data().map(|val| { + inner.worker.on_cache_get_async(&inner.mod_cache_path); // call on success + val + }) + } else { + None + } } - fn new_with_config<'data>( + pub fn update_data(&self, data: &ModuleCacheData) { + if let Some(inner) = &self.0 { + inner.update_data(data).map(|val| { + inner.worker.on_cache_update_async(&inner.mod_cache_path); // call on success + val + }); + } + } +} + +impl<'config, 'worker> ModuleCacheEntryInner<'config, 'worker> { + fn new<'data>( module: &Module, function_body_inputs: &PrimaryMap>, isa: &dyn isa::TargetIsa, compiler_name: &str, generate_debug_info: bool, cache_config: &'config CacheConfig, + worker: &'worker Worker, ) -> Self { - let mod_cache_path = if cache_config.enabled() { - let hash = Sha256Hasher::digest(module, function_body_inputs); - let compiler_dir = if cfg!(debug_assertions) { - format!( - "{comp_name}-{comp_ver}-{comp_mtime}", - comp_name = compiler_name, - comp_ver = env!("GIT_REV"), - comp_mtime = *SELF_MTIME, - ) - } else { - format!( - "{comp_name}-{comp_ver}", - comp_name = compiler_name, - comp_ver = env!("GIT_REV"), - ) - }; - let mod_filename = format!( - "mod-{mod_hash}{mod_dbg}", - mod_hash = base64::encode_config(&hash, base64::URL_SAFE_NO_PAD), // standard encoding uses '/' which can't be used for filename - mod_dbg = if generate_debug_info { ".d" } else { "" }, - ); - Some( - cache_config - .directory() - .join(isa.triple().to_string()) - .join(compiler_dir) - .join(mod_filename), + let hash = Sha256Hasher::digest(module, function_body_inputs); + let compiler_dir = if cfg!(debug_assertions) { + format!( + "{comp_name}-{comp_ver}-{comp_mtime}", + comp_name = compiler_name, + comp_ver = env!("GIT_REV"), + comp_mtime = *SELF_MTIME, ) } else { - None + format!( + "{comp_name}-{comp_ver}", + comp_name = compiler_name, + comp_ver = env!("GIT_REV"), + ) }; + let mod_filename = format!( + "mod-{mod_hash}{mod_dbg}", + mod_hash = base64::encode_config(&hash, base64::URL_SAFE_NO_PAD), // standard encoding uses '/' which can't be used for filename + mod_dbg = if generate_debug_info { ".d" } else { "" }, + ); + let mod_cache_path = cache_config + .directory() + .join(isa.triple().to_string()) + .join(compiler_dir) + .join(mod_filename); Self { mod_cache_path, cache_config, + worker, } } - pub fn get_data(&self) -> Option { - let path = self.mod_cache_path.as_ref()?; - trace!("get_data() for path: {}", path.display()); - let compressed_cache_bytes = fs::read(path).ok()?; + fn get_data(&self) -> Option { + trace!("get_data() for path: {}", self.mod_cache_path.display()); + let compressed_cache_bytes = fs::read(&self.mod_cache_path).ok()?; let cache_bytes = zstd::decode_all(&compressed_cache_bytes[..]) .map_err(|err| warn!("Failed to decompress cached code: {}", err)) .ok()?; - let ret = bincode::deserialize(&cache_bytes[..]) + bincode::deserialize(&cache_bytes[..]) .map_err(|err| warn!("Failed to deserialize cached code: {}", err)) - .ok()?; - - worker().on_cache_get_async(path); // call on success - Some(ret) - } - - pub fn update_data(&self, data: &ModuleCacheData) { - if self.update_data_impl(data).is_some() { - let path = self.mod_cache_path.as_ref().unwrap(); - worker().on_cache_update_async(path); // call on success - } + .ok() } - fn update_data_impl(&self, data: &ModuleCacheData) -> Option<()> { - let path = self.mod_cache_path.as_ref()?; - trace!("update_data() for path: {}", path.display()); + fn update_data(&self, data: &ModuleCacheData) -> Option<()> { + trace!("update_data() for path: {}", self.mod_cache_path.display()); let serialized_data = bincode::serialize(&data) .map_err(|err| warn!("Failed to serialize cached code: {}", err)) .ok()?; @@ -173,17 +194,17 @@ impl<'config> ModuleCacheEntry<'config> { // Optimize syscalls: first, try writing to disk. It should succeed in most cases. // Otherwise, try creating the cache directory and retry writing to the file. - if fs_write_atomic(path, "mod", &compressed_data) { + if fs_write_atomic(&self.mod_cache_path, "mod", &compressed_data) { return Some(()); } debug!( "Attempting to create the cache directory, because \ failed to write cached code to disk, path: {}", - path.display(), + self.mod_cache_path.display(), ); - let cache_dir = path.parent().unwrap(); + let cache_dir = self.mod_cache_path.parent().unwrap(); fs::create_dir_all(cache_dir) .map_err(|err| { warn!( @@ -194,7 +215,7 @@ impl<'config> ModuleCacheEntry<'config> { }) .ok()?; - if fs_write_atomic(path, "mod", &compressed_data) { + if fs_write_atomic(&self.mod_cache_path, "mod", &compressed_data) { Some(()) } else { None diff --git a/wasmtime-environ/src/cache/config.rs b/wasmtime-environ/src/cache/config.rs index db07b3f250..4777a51d58 100644 --- a/wasmtime-environ/src/cache/config.rs +++ b/wasmtime-environ/src/cache/config.rs @@ -621,4 +621,5 @@ impl CacheConfig { } #[cfg(test)] -mod tests; +#[macro_use] +pub mod tests; diff --git a/wasmtime-environ/src/cache/config/tests.rs b/wasmtime-environ/src/cache/config/tests.rs index 0ed60a5452..7a9094a2ad 100644 --- a/wasmtime-environ/src/cache/config/tests.rs +++ b/wasmtime-environ/src/cache/config/tests.rs @@ -8,7 +8,8 @@ use tempfile::{self, TempDir}; // that's why these function and macro always use custom cache directory // note: tempdir removes directory when being dropped, so we need to return it to the caller, // so the paths are valid -fn test_prolog() -> (TempDir, PathBuf, PathBuf) { +pub fn test_prolog() -> (TempDir, PathBuf, PathBuf) { + let _ = pretty_env_logger::try_init(); let temp_dir = tempfile::tempdir().expect("Can't create temporary directory"); let cache_dir = temp_dir.path().join("cache-dir"); let config_path = temp_dir.path().join("cache-config.toml"); diff --git a/wasmtime-environ/src/cache/tests.rs b/wasmtime-environ/src/cache/tests.rs index 74b68a8c09..a427e846aa 100644 --- a/wasmtime-environ/src/cache/tests.rs +++ b/wasmtime-environ/src/cache/tests.rs @@ -1,3 +1,4 @@ +use super::config::tests::test_prolog; use super::*; use crate::address_map::{FunctionAddressMap, InstructionAddressMap}; use crate::compilation::{CodeAndJTOffsets, Relocation, RelocationTarget}; @@ -14,22 +15,17 @@ use std::fs; use std::str::FromStr; use std::vec::Vec; use target_lexicon::triple; -use tempfile; // Since cache system is a global thing, each test needs to be run in seperate process. // So, init() tests are run as integration tests. // However, caching is a private thing, an implementation detail, and needs to be tested // from the inside of the module. +// We test init() in exactly one test, rest of the tests doesn't rely on it. #[test] -fn test_write_read_cache() { - pretty_env_logger::init(); - let dir = tempfile::tempdir().expect("Can't create temporary directory"); - - let cache_dir = dir.path().join("cache-dir"); - let baseline_compression_level = 5; - - let config_path = dir.path().join("cache-config.toml"); +fn test_cache_init() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let baseline_compression_level = 4; let config_content = format!( "[cache]\n\ enabled = true\n\ @@ -42,6 +38,8 @@ fn test_write_read_cache() { let errors = init(true, Some(&config_path), None); assert!(errors.is_empty()); + + // test if we can use config let cache_config = cache_config(); assert!(cache_config.enabled()); // assumption: config init creates cache directory and returns canonicalized path @@ -54,6 +52,31 @@ fn test_write_read_cache() { baseline_compression_level ); + // test if we can use worker + let worker = worker(); + worker.on_cache_update_async(config_path); +} + +#[test] +fn test_write_read_cache() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + baseline-compression-level = 3\n", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + // assumption: config load creates cache directory and returns canonicalized path + assert_eq!( + *cache_config.directory(), + fs::canonicalize(cache_dir).unwrap() + ); + let mut rng = SmallRng::from_seed([ 0x42, 0x04, 0xF3, 0x44, 0x11, 0x22, 0x33, 0x44, 0x67, 0x68, 0xFF, 0x00, 0x44, 0x23, 0x7F, 0x96, @@ -72,27 +95,59 @@ fn test_write_read_cache() { let compiler1 = "test-1"; let compiler2 = "test-2"; - let entry1 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa1, compiler1, false); - assert!(entry1.mod_cache_path().is_some()); + let entry1 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new( + &module1, + &function_body_inputs1, + &*isa1, + compiler1, + false, + &cache_config, + &worker, + )); + assert!(entry1.0.is_some()); assert!(entry1.get_data().is_none()); let data1 = new_module_cache_data(&mut rng); entry1.update_data(&data1); assert_eq!(entry1.get_data().expect("Cache should be available"), data1); - let entry2 = ModuleCacheEntry::new(&module2, &function_body_inputs1, &*isa1, compiler1, false); + let entry2 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new( + &module2, + &function_body_inputs1, + &*isa1, + compiler1, + false, + &cache_config, + &worker, + )); let data2 = new_module_cache_data(&mut rng); entry2.update_data(&data2); assert_eq!(entry1.get_data().expect("Cache should be available"), data1); assert_eq!(entry2.get_data().expect("Cache should be available"), data2); - let entry3 = ModuleCacheEntry::new(&module1, &function_body_inputs2, &*isa1, compiler1, false); + let entry3 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new( + &module1, + &function_body_inputs2, + &*isa1, + compiler1, + false, + &cache_config, + &worker, + )); let data3 = new_module_cache_data(&mut rng); entry3.update_data(&data3); assert_eq!(entry1.get_data().expect("Cache should be available"), data1); assert_eq!(entry2.get_data().expect("Cache should be available"), data2); assert_eq!(entry3.get_data().expect("Cache should be available"), data3); - let entry4 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa2, compiler1, false); + let entry4 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new( + &module1, + &function_body_inputs1, + &*isa2, + compiler1, + false, + &cache_config, + &worker, + )); let data4 = new_module_cache_data(&mut rng); entry4.update_data(&data4); assert_eq!(entry1.get_data().expect("Cache should be available"), data1); @@ -100,7 +155,15 @@ fn test_write_read_cache() { assert_eq!(entry3.get_data().expect("Cache should be available"), data3); assert_eq!(entry4.get_data().expect("Cache should be available"), data4); - let entry5 = ModuleCacheEntry::new(&module1, &function_body_inputs1, &*isa1, compiler2, false); + let entry5 = ModuleCacheEntry::from_inner(ModuleCacheEntryInner::new( + &module1, + &function_body_inputs1, + &*isa1, + compiler2, + false, + &cache_config, + &worker, + )); let data5 = new_module_cache_data(&mut rng); entry5.update_data(&data5); assert_eq!(entry1.get_data().expect("Cache should be available"), data1); @@ -186,7 +249,6 @@ fn new_function_body_inputs<'data>( } fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData { - // WARNING: if method changed, update PartialEq impls below, too! let funcs = (0..rng.gen_range(0, 10)) .map(|i| { let mut sm = SecondaryMap::new(); // doesn't implement from iterator @@ -276,9 +338,3 @@ fn new_module_cache_data(rng: &mut impl Rng) -> ModuleCacheData { stack_slots, )) } - -impl ModuleCacheEntry<'_> { - pub fn mod_cache_path(&self) -> &Option { - &self.mod_cache_path - } -} diff --git a/wasmtime-environ/src/cache/worker.rs b/wasmtime-environ/src/cache/worker.rs index d418c04296..5a512dc4ca 100644 --- a/wasmtime-environ/src/cache/worker.rs +++ b/wasmtime-environ/src/cache/worker.rs @@ -6,8 +6,6 @@ //! Background tasks can be CPU intensive, but the worker thread has low priority. use super::{cache_config, fs_write_atomic, CacheConfig}; -#[cfg(test)] -use core::borrow::Borrow; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use spin::Once; @@ -19,31 +17,34 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; #[cfg(test)] -use std::sync::{atomic::AtomicU32, Arc}; -use std::thread::{self}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; use std::time::Duration; +#[cfg(not(test))] use std::time::SystemTime; use std::vec::Vec; +#[cfg(test)] +use tests::system_time_stub::SystemTimeStub as SystemTime; pub(super) struct Worker { sender: SyncSender, #[cfg(test)] - stats: Arc, + stats: Arc<(Mutex, Condvar)>, } struct WorkerThread { receiver: Receiver, cache_config: CacheConfig, #[cfg(test)] - stats: Arc, + stats: Arc<(Mutex, Condvar)>, } #[cfg(test)] #[derive(Default)] struct WorkerStats { - dropped: AtomicU32, - sent: AtomicU32, - handled: AtomicU32, + dropped: u32, + sent: u32, + handled: u32, } static WORKER: Once = Once::new(); @@ -87,7 +88,7 @@ impl Worker { let (tx, rx) = sync_channel(queue_size); #[cfg(test)] - let stats = Arc::new(WorkerStats::default()); + let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new())); let worker_thread = WorkerThread { receiver: rx, @@ -101,7 +102,7 @@ impl Worker { // non-tests binary has only a static worker, so Rust doesn't drop it thread::spawn(move || worker_thread.run(init_file_per_thread_logger)); - Worker { + Self { sender: tx, #[cfg(test)] stats: stats, @@ -121,11 +122,15 @@ impl Worker { #[inline] fn send_cache_event(&self, event: CacheEvent) { #[cfg(test)] - let stats: &WorkerStats = self.stats.borrow(); + let mut stats = self + .stats + .0 + .lock() + .expect("Failed to acquire worker stats lock"); match self.sender.try_send(event.clone()) { Ok(()) => { #[cfg(test)] - stats.sent.fetch_add(1, atomic::Ordering::SeqCst); + let _ = stats.sent += 1; } Err(err) => { info!( @@ -135,24 +140,30 @@ impl Worker { ); #[cfg(test)] - stats.dropped.fetch_add(1, atomic::Ordering::SeqCst); + let _ = stats.dropped += 1; } } } - #[allow(dead_code)] // todo for worker tests #[cfg(test)] pub(super) fn events_dropped(&self) -> u32 { - let stats: &WorkerStats = self.stats.borrow(); - stats.dropped.load(atomic::Ordering::SeqCst) + let stats = self + .stats + .0 + .lock() + .expect("Failed to acquire worker stats lock"); + stats.dropped } - // todo wait_for_* instead? - #[allow(dead_code)] // todo for worker tests #[cfg(test)] - pub(super) fn all_events_handled(&self) -> bool { - let stats: &WorkerStats = self.stats.borrow(); - stats.sent.load(atomic::Ordering::SeqCst) == stats.handled.load(atomic::Ordering::SeqCst) + pub(super) fn wait_for_all_events_handled(&self) { + let (stats, condvar) = &*self.stats; + let mut stats = stats.lock().expect("Failed to acquire worker stats lock"); + while stats.handled != stats.sent { + stats = condvar + .wait(stats) + .expect("Failed to reacquire worker stats lock"); + } } } @@ -186,6 +197,7 @@ enum CacheEntry { impl WorkerThread { fn run(self, init_file_per_thread_logger: Option<&'static str>) { + #[cfg(not(test))] // We want to test the worker without relying on init() being called assert!(INIT_CALLED.load(atomic::Ordering::SeqCst)); if let Some(prefix) = init_file_per_thread_logger { @@ -197,7 +209,7 @@ impl WorkerThread { Self::lower_thread_priority(); #[cfg(test)] - let stats: &WorkerStats = self.stats.borrow(); + let (stats, condvar) = &*self.stats; for event in self.receiver.iter() { match event { @@ -206,7 +218,11 @@ impl WorkerThread { } #[cfg(test)] - stats.handled.fetch_add(1, atomic::Ordering::SeqCst); + { + let mut stats = stats.lock().expect("Failed to acquire worker stats lock"); + stats.handled += 1; + condvar.notify_all(); + } } // The receiver can stop iteration iff the channel has hung up. @@ -422,13 +438,29 @@ impl WorkerThread { trace!("Trying to clean up cache"); let mut cache_index = self.list_cache_contents(); + let future_tolerance = SystemTime::now() + .checked_add( + self.cache_config + .allowed_clock_drift_for_files_from_future(), + ) + .expect("Brace your cache, the next Big Bang is coming (time overflow)"); cache_index.sort_unstable_by(|lhs, rhs| { // sort by age use CacheEntry::*; match (lhs, rhs) { (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => { - rhs_mt.cmp(lhs_mt) - } // later == younger + match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) { + // later == younger + (false, false) => rhs_mt.cmp(lhs_mt), + // files from far future are treated as oldest recognized files + // we want to delete them, so the cache keeps track of recent files + // however, we don't delete them uncodintionally, + // because .stats file can be overwritten with a meaningful mtime + (true, false) => cmp::Ordering::Greater, + (false, true) => cmp::Ordering::Less, + (true, true) => cmp::Ordering::Equal, + } + } // unrecognized is kind of infinity (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less, (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater, @@ -467,12 +499,12 @@ impl WorkerThread { total_size += size; if start_delete_idx_if_deleting_recognized_items.is_none() { - if total_size >= tsl_if_deleting || (idx + 1) as u64 >= fcl_if_deleting { + if total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting { start_delete_idx_if_deleting_recognized_items = Some(idx); } } - if total_size >= total_size_limit || (idx + 1) as u64 >= file_count_limit { + if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit { start_delete_idx = start_delete_idx_if_deleting_recognized_items; break; } @@ -593,20 +625,28 @@ impl WorkerThread { add_unrecognized!(file: path); } (2, false) => { - // assumption: only mod cache (no ext), .stats & .wip-* files let ext = path.extension(); if ext.is_none() || ext == Some(OsStr::new("stats")) { + // mod or stats file cache_files.insert(path, entry); } else { - // assume it's .wip file (lock) - if is_fs_lock_expired( - Some(&entry), - &path, - cache_config.optimizing_compression_task_timeout(), - cache_config.allowed_clock_drift_for_files_from_future(), - ) { + let recognized = if let Some(ext_str) = ext.unwrap().to_str() { + // check if valid lock + ext_str.starts_with("wip-") + && !is_fs_lock_expired( + Some(&entry), + &path, + cache_config.optimizing_compression_task_timeout(), + cache_config.allowed_clock_drift_for_files_from_future(), + ) + } else { + // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure + false + }; + + if !recognized { add_unrecognized!(file: path); - } // else: skip active lock + } } } (_, is_dir) => add_unrecognized!(is_dir, path), @@ -661,7 +701,7 @@ impl WorkerThread { ); vec.push(CacheEntry::Recognized { path: mod_path.to_path_buf(), - mtime: stats_mtime, + mtime: stats_mtime.into(), // .into() called for the SystemTimeStub if cfg(test) size: mod_metadata.len(), }) } @@ -677,7 +717,7 @@ impl WorkerThread { ); vec.push(CacheEntry::Recognized { path: mod_path.to_path_buf(), - mtime: mod_mtime, + mtime: mod_mtime.into(), // .into() called for the SystemTimeStub if cfg(test) size: mod_metadata.len(), }) } @@ -833,8 +873,7 @@ fn is_fs_lock_expired( allowed_future_drift: Duration, ) -> bool { let mtime = match entry - .map(|e| e.metadata()) - .unwrap_or_else(|| path.metadata()) + .map_or_else(|| path.metadata(), |e| e.metadata()) .and_then(|metadata| metadata.modified()) { Ok(mt) => mt, @@ -848,7 +887,8 @@ fn is_fs_lock_expired( } }; - match mtime.elapsed() { + // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic + match SystemTime::now().duration_since(mtime) { Ok(elapsed) => elapsed >= threshold, Err(err) => { trace!( @@ -864,4 +904,5 @@ fn is_fs_lock_expired( } } -// todo tests +#[cfg(test)] +mod tests; diff --git a/wasmtime-environ/src/cache/worker/tests.rs b/wasmtime-environ/src/cache/worker/tests.rs new file mode 100644 index 0000000000..f1323fd7ef --- /dev/null +++ b/wasmtime-environ/src/cache/worker/tests.rs @@ -0,0 +1,758 @@ +use super::*; +use crate::cache::config::tests::test_prolog; +use std::iter::repeat; +use std::process; +// load_config! comes from crate::cache(::config::tests); + +// when doing anything with the tests, make sure they are DETERMINISTIC +// -- the result shouldn't rely on system time! +pub mod system_time_stub; + +#[test] +fn test_on_get_create_stats_file() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + worker.on_cache_get_async(mod_file); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let stats_file = cache_dir.join("some-mod.stats"); + let stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, 1); + assert_eq!( + stats.compression_level, + cache_config.baseline_compression_level() + ); +} + +#[test] +fn test_on_get_update_usage_counter() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let stats_file = cache_dir.join("some-mod.stats"); + let default_stats = ModuleCacheStatistics::default(&cache_config); + assert!(write_stats_file(&stats_file, &default_stats)); + + let mut usages = 0; + for times_used in &[4, 7, 2] { + for _ in 0..*times_used { + worker.on_cache_get_async(mod_file.clone()); + usages += 1; + } + + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, usages); + } +} + +#[test] +fn test_on_get_recompress_no_mod_file() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + baseline-compression-level = 3\n\ + optimized-compression-level = 7\n\ + optimized-compression-usage-counter-threshold = '256'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let stats_file = cache_dir.join("some-mod.stats"); + let mut start_stats = ModuleCacheStatistics::default(&cache_config); + start_stats.usages = 250; + assert!(write_stats_file(&stats_file, &start_stats)); + + let mut usages = start_stats.usages; + for times_used in &[4, 7, 2] { + for _ in 0..*times_used { + worker.on_cache_get_async(mod_file.clone()); + usages += 1; + } + + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, usages); + assert_eq!( + stats.compression_level, + cache_config.baseline_compression_level() + ); + } +} + +#[test] +fn test_on_get_recompress_with_mod_file() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + baseline-compression-level = 3\n\ + optimized-compression-level = 7\n\ + optimized-compression-usage-counter-threshold = '256'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let mod_data = "some test data to be compressed"; + let data = zstd::encode_all( + mod_data.as_bytes(), + cache_config.baseline_compression_level(), + ) + .expect("Failed to compress sample mod file"); + fs::write(&mod_file, &data).expect("Failed to write sample mod file"); + + let stats_file = cache_dir.join("some-mod.stats"); + let mut start_stats = ModuleCacheStatistics::default(&cache_config); + start_stats.usages = 250; + assert!(write_stats_file(&stats_file, &start_stats)); + + // scenarios: + // 1. Shouldn't be recompressed + // 2. Should be recompressed + // 3. After lowering compression level, should be recompressed + let scenarios = [(4, false), (7, true), (2, false)]; + + let mut usages = start_stats.usages; + assert!(usages < cache_config.optimized_compression_usage_counter_threshold()); + let mut tested_higher_opt_compr_lvl = false; + for (times_used, lower_compr_lvl) in &scenarios { + for _ in 0..*times_used { + worker.on_cache_get_async(mod_file.clone()); + usages += 1; + } + + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, usages); + assert_eq!( + stats.compression_level, + if usages < cache_config.optimized_compression_usage_counter_threshold() { + cache_config.baseline_compression_level() + } else { + cache_config.optimized_compression_level() + } + ); + let compressed_data = fs::read(&mod_file).expect("Failed to read mod file"); + let decoded_data = + zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file"); + assert_eq!(decoded_data, mod_data.as_bytes()); + + if *lower_compr_lvl { + assert!(usages >= cache_config.optimized_compression_usage_counter_threshold()); + tested_higher_opt_compr_lvl = true; + stats.compression_level -= 1; + assert!(write_stats_file(&stats_file, &stats)); + } + } + assert!(usages >= cache_config.optimized_compression_usage_counter_threshold()); + assert!(tested_higher_opt_compr_lvl); +} + +#[test] +fn test_on_get_recompress_lock() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + baseline-compression-level = 3\n\ + optimized-compression-level = 7\n\ + optimized-compression-usage-counter-threshold = '256'\n\ + optimizing-compression-task-timeout = '30m'\n\ + allowed-clock-drift-for-files-from-future = '1d'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let mod_data = "some test data to be compressed"; + let data = zstd::encode_all( + mod_data.as_bytes(), + cache_config.baseline_compression_level(), + ) + .expect("Failed to compress sample mod file"); + fs::write(&mod_file, &data).expect("Failed to write sample mod file"); + + let stats_file = cache_dir.join("some-mod.stats"); + let mut start_stats = ModuleCacheStatistics::default(&cache_config); + start_stats.usages = 255; + + let lock_file = cache_dir.join("some-mod.wip-lock"); + + let scenarios = [ + // valid lock + (true, "past", Duration::from_secs(30 * 60 - 1)), + // valid future lock + (true, "future", Duration::from_secs(24 * 60 * 60)), + // expired lock + (false, "past", Duration::from_secs(30 * 60)), + // expired future lock + (false, "future", Duration::from_secs(24 * 60 * 60 + 1)), + ]; + + for (lock_valid, duration_sign, duration) in &scenarios { + assert!(write_stats_file(&stats_file, &start_stats)); // restore usage & compression level + create_file_with_mtime(&lock_file, "", duration_sign, &duration); + + worker.on_cache_get_async(mod_file.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, start_stats.usages + 1); + assert_eq!( + stats.compression_level, + if *lock_valid { + cache_config.baseline_compression_level() + } else { + cache_config.optimized_compression_level() + } + ); + let compressed_data = fs::read(&mod_file).expect("Failed to read mod file"); + let decoded_data = + zstd::decode_all(&compressed_data[..]).expect("Failed to decompress mod file"); + assert_eq!(decoded_data, mod_data.as_bytes()); + } +} + +#[test] +fn test_on_update_fresh_stats_file() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + baseline-compression-level = 3\n\ + optimized-compression-level = 7\n\ + cleanup-interval = '1h'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let stats_file = cache_dir.join("some-mod.stats"); + let cleanup_certificate = cache_dir.join(".cleanup.wip-done"); + create_file_with_mtime(&cleanup_certificate, "", "future", &Duration::from_secs(0)); + // the below created by the worker if it cleans up + let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id())); + + // scenarios: + // 1. Create new stats file + // 2. Overwrite existing file + for update_file in &[true, false] { + worker.on_cache_update_async(mod_file.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + let mut stats = read_stats_file(&stats_file).expect("Failed to read stats file"); + assert_eq!(stats.usages, 1); + assert_eq!( + stats.compression_level, + cache_config.baseline_compression_level() + ); + + if *update_file { + stats.usages += 42; + stats.compression_level += 1; + assert!(write_stats_file(&stats_file, &stats)); + } + + assert!(!worker_lock_file.exists()); + } +} + +#[test] +fn test_on_update_cleanup_limits_trash_locks() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + cleanup-interval = '30m'\n\ + optimizing-compression-task-timeout = '30m'\n\ + allowed-clock-drift-for-files-from-future = '1d'\n\ + file-count-soft-limit = '5'\n\ + files-total-size-soft-limit = '30K'\n\ + file-count-limit-percent-if-deleting = '70%'\n\ + files-total-size-limit-percent-if-deleting = '70%' + ", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + let content_1k = "a".repeat(1_000); + let content_10k = "a".repeat(10_000); + + let mods_files_dir = cache_dir.join("target-triple").join("compiler-version"); + let mod_with_stats = mods_files_dir.join("mod-with-stats"); + let trash_dirs = [ + mods_files_dir.join("trash"), + mods_files_dir.join("trash").join("trash"), + ]; + let trash_files = [ + cache_dir.join("trash-file"), + cache_dir.join("trash-file.wip-lock"), + cache_dir.join("target-triple").join("trash.txt"), + cache_dir.join("target-triple").join("trash.txt.wip-lock"), + mods_files_dir.join("trash.ogg"), + mods_files_dir.join("trash").join("trash.doc"), + mods_files_dir.join("trash").join("trash.doc.wip-lock"), + mods_files_dir.join("trash").join("trash").join("trash.xls"), + mods_files_dir + .join("trash") + .join("trash") + .join("trash.xls.wip-lock"), + ]; + let mod_locks = [ + // valid lock + ( + mods_files_dir.join("mod0.wip-lock"), + true, + "past", + Duration::from_secs(30 * 60 - 1), + ), + // valid future lock + ( + mods_files_dir.join("mod1.wip-lock"), + true, + "future", + Duration::from_secs(24 * 60 * 60), + ), + // expired lock + ( + mods_files_dir.join("mod2.wip-lock"), + false, + "past", + Duration::from_secs(30 * 60), + ), + // expired future lock + ( + mods_files_dir.join("mod3.wip-lock"), + false, + "future", + Duration::from_secs(24 * 60 * 60 + 1), + ), + ]; + // the below created by the worker if it cleans up + let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id())); + + let scenarios = [ + // Close to limits, but not reached, only trash deleted + (2, 2, 4), + // File count limit exceeded + (1, 10, 3), + // Total size limit exceeded + (4, 0, 2), + // Both limits exceeded + (3, 5, 3), + ]; + + for (files_10k, files_1k, remaining_files) in &scenarios { + let mut secs_ago = 100; + + for d in &trash_dirs { + fs::create_dir_all(d).expect("Failed to create directories"); + } + for f in &trash_files { + create_file_with_mtime(f, "", "past", &Duration::from_secs(0)); + } + for (f, _, sign, duration) in &mod_locks { + create_file_with_mtime(f, "", sign, &duration); + } + + let mut mods_paths = vec![]; + for content in repeat(&content_10k) + .take(*files_10k) + .chain(repeat(&content_1k).take(*files_1k)) + { + mods_paths.push(mods_files_dir.join(format!("test-mod-{}", mods_paths.len()))); + create_file_with_mtime( + mods_paths.last().unwrap(), + content, + "past", + &Duration::from_secs(secs_ago), + ); + assert!(secs_ago > 0); + secs_ago -= 1; + } + + // creating .stats file updates mtime what affects test results + // so we use a separate nonexistent module here (orphaned .stats will be removed anyway) + worker.on_cache_update_async(mod_with_stats.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + for ent in trash_dirs.iter().chain(trash_files.iter()) { + assert!(!ent.exists()); + } + for (f, valid, ..) in &mod_locks { + assert_eq!(f.exists(), *valid); + } + for (idx, path) in mods_paths.iter().enumerate() { + let should_exist = idx >= mods_paths.len() - *remaining_files; + assert_eq!(path.exists(), should_exist); + if should_exist { + // cleanup before next iteration + fs::remove_file(path).expect("Failed to remove a file"); + } + } + fs::remove_file(&worker_lock_file).expect("Failed to remove lock file"); + } +} + +#[test] +fn test_on_update_cleanup_lru_policy() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + file-count-soft-limit = '5'\n\ + files-total-size-soft-limit = '30K'\n\ + file-count-limit-percent-if-deleting = '80%'\n\ + files-total-size-limit-percent-if-deleting = '70%'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + let content_1k = "a".repeat(1_000); + let content_5k = "a".repeat(5_000); + let content_10k = "a".repeat(10_000); + + let mods_files_dir = cache_dir.join("target-triple").join("compiler-version"); + fs::create_dir_all(&mods_files_dir).expect("Failed to create directories"); + let nonexistent_mod_file = cache_dir.join("nonexistent-mod"); + let orphaned_stats_file = cache_dir.join("orphaned-mod.stats"); + let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id())); + + // content, how long ago created, how long ago stats created (if created), should be alive + let scenarios = [ + &[ + (&content_10k, 29, None, false), + (&content_10k, 28, None, false), + (&content_10k, 27, None, false), + (&content_1k, 26, None, true), + (&content_10k, 25, None, true), + (&content_1k, 24, None, true), + ], + &[ + (&content_10k, 29, None, false), + (&content_10k, 28, None, false), + (&content_10k, 27, None, true), + (&content_1k, 26, None, true), + (&content_5k, 25, None, true), + (&content_1k, 24, None, true), + ], + &[ + (&content_10k, 29, Some(19), true), + (&content_10k, 28, None, false), + (&content_10k, 27, None, false), + (&content_1k, 26, Some(18), true), + (&content_5k, 25, None, true), + (&content_1k, 24, None, true), + ], + &[ + (&content_10k, 29, Some(19), true), + (&content_10k, 28, Some(18), true), + (&content_10k, 27, None, false), + (&content_1k, 26, Some(17), true), + (&content_5k, 25, None, false), + (&content_1k, 24, None, false), + ], + &[ + (&content_10k, 29, Some(19), true), + (&content_10k, 28, None, false), + (&content_1k, 27, None, false), + (&content_5k, 26, Some(18), true), + (&content_1k, 25, None, false), + (&content_10k, 24, None, false), + ], + ]; + + for mods in &scenarios { + let filenames = (0..mods.len()) + .map(|i| { + ( + mods_files_dir.join(format!("mod-{}", i)), + mods_files_dir.join(format!("mod-{}.stats", i)), + ) + }) + .collect::>(); + + for ((content, mod_secs_ago, create_stats, _), (mod_filename, stats_filename)) in + mods.iter().zip(filenames.iter()) + { + create_file_with_mtime( + mod_filename, + content, + "past", + &Duration::from_secs(*mod_secs_ago), + ); + if let Some(stats_secs_ago) = create_stats { + create_file_with_mtime( + stats_filename, + "cleanup doesn't care", + "past", + &Duration::from_secs(*stats_secs_ago), + ); + } + } + create_file_with_mtime( + &orphaned_stats_file, + "cleanup doesn't care", + "past", + &Duration::from_secs(0), + ); + + worker.on_cache_update_async(nonexistent_mod_file.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + assert!(!orphaned_stats_file.exists()); + for ((_, _, create_stats, alive), (mod_filename, stats_filename)) in + mods.iter().zip(filenames.iter()) + { + assert_eq!(mod_filename.exists(), *alive); + assert_eq!(stats_filename.exists(), *alive && create_stats.is_some()); + + // cleanup for next iteration + if *alive { + fs::remove_file(&mod_filename).expect("Failed to remove a file"); + if create_stats.is_some() { + fs::remove_file(&stats_filename).expect("Failed to remove a file"); + } + } + } + + fs::remove_file(&worker_lock_file).expect("Failed to remove lock file"); + } +} + +// clock drift should be applied to mod cache & stats, too +// however, postpone deleting files to as late as possible +#[test] +fn test_on_update_cleanup_future_files() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + allowed-clock-drift-for-files-from-future = '1d'\n\ + file-count-soft-limit = '3'\n\ + files-total-size-soft-limit = '1M'\n\ + file-count-limit-percent-if-deleting = '70%'\n\ + files-total-size-limit-percent-if-deleting = '70%'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + let content_1k = "a".repeat(1_000); + + let mods_files_dir = cache_dir.join("target-triple").join("compiler-version"); + fs::create_dir_all(&mods_files_dir).expect("Failed to create directories"); + let nonexistent_mod_file = cache_dir.join("nonexistent-mod"); + // the below created by the worker if it cleans up + let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id())); + + let scenarios: [&[_]; 5] = [ + // NOT cleaning up, everythings ok + &[ + (Duration::from_secs(0), None, true), + (Duration::from_secs(24 * 60 * 60), None, true), + ], + // NOT cleaning up, everythings ok + &[ + (Duration::from_secs(0), None, true), + (Duration::from_secs(24 * 60 * 60 + 1), None, true), + ], + // cleaning up, removing files from oldest + &[ + (Duration::from_secs(0), None, false), + (Duration::from_secs(24 * 60 * 60), None, true), + (Duration::from_secs(1), None, false), + (Duration::from_secs(2), None, true), + ], + // cleaning up, removing files from oldest; deleting file from far future + &[ + (Duration::from_secs(0), None, false), + (Duration::from_secs(1), None, true), + (Duration::from_secs(24 * 60 * 60 + 1), None, false), + (Duration::from_secs(2), None, true), + ], + // cleaning up, removing files from oldest; file from far future should have .stats from +-now => it's a legitimate file + &[ + (Duration::from_secs(0), None, false), + (Duration::from_secs(1), None, false), + ( + Duration::from_secs(24 * 60 * 60 + 1), + Some(Duration::from_secs(3)), + true, + ), + (Duration::from_secs(2), None, true), + ], + ]; + + for mods in &scenarios { + let filenames = (0..mods.len()) + .map(|i| { + ( + mods_files_dir.join(format!("mod-{}", i)), + mods_files_dir.join(format!("mod-{}.stats", i)), + ) + }) + .collect::>(); + + for ((duration, opt_stats_duration, _), (mod_filename, stats_filename)) in + mods.iter().zip(filenames.iter()) + { + create_file_with_mtime(mod_filename, &content_1k, "future", duration); + if let Some(stats_duration) = opt_stats_duration { + create_file_with_mtime(stats_filename, "", "future", stats_duration); + } + } + + worker.on_cache_update_async(nonexistent_mod_file.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + for ((_, opt_stats_duration, alive), (mod_filename, stats_filename)) in + mods.iter().zip(filenames.iter()) + { + assert_eq!(mod_filename.exists(), *alive); + assert_eq!( + stats_filename.exists(), + *alive && opt_stats_duration.is_some() + ); + if *alive { + fs::remove_file(mod_filename).expect("Failed to remove a file"); + if opt_stats_duration.is_some() { + fs::remove_file(stats_filename).expect("Failed to remove a file"); + } + } + } + + fs::remove_file(&worker_lock_file).expect("Failed to remove lock file"); + } +} + +// this tests if worker triggered cleanup or not when some cleanup lock/certificate was out there +#[test] +fn test_on_update_cleanup_self_lock() { + let (_tempdir, cache_dir, config_path) = test_prolog(); + let cache_config = load_config!( + config_path, + "[cache]\n\ + enabled = true\n\ + directory = {cache_dir}\n\ + worker-event-queue-size = '16'\n\ + cleanup-interval = '30m'\n\ + allowed-clock-drift-for-files-from-future = '1d'", + cache_dir + ); + assert!(cache_config.enabled()); + let worker = Worker::start_new(&cache_config, None); + + let mod_file = cache_dir.join("some-mod"); + let trash_file = cache_dir.join("trash-file.txt"); + + let lock_file = cache_dir.join(".cleanup.wip-lock"); + // the below created by the worker if it cleans up + let worker_lock_file = cache_dir.join(format!(".cleanup.wip-{}", process::id())); + + let scenarios = [ + // valid lock + (true, "past", Duration::from_secs(30 * 60 - 1)), + // valid future lock + (true, "future", Duration::from_secs(24 * 60 * 60)), + // expired lock + (false, "past", Duration::from_secs(30 * 60)), + // expired future lock + (false, "future", Duration::from_secs(24 * 60 * 60 + 1)), + ]; + + for (lock_valid, duration_sign, duration) in &scenarios { + create_file_with_mtime( + &trash_file, + "with trash content", + "future", + &Duration::from_secs(0), + ); + create_file_with_mtime(&lock_file, "", duration_sign, &duration); + + worker.on_cache_update_async(mod_file.clone()); + worker.wait_for_all_events_handled(); + assert_eq!(worker.events_dropped(), 0); + + assert_eq!(trash_file.exists(), *lock_valid); + assert_eq!(lock_file.exists(), *lock_valid); + if *lock_valid { + assert!(!worker_lock_file.exists()); + } else { + fs::remove_file(&worker_lock_file).expect("Failed to remove lock file"); + } + } +} + +fn create_file_with_mtime(filename: &Path, contents: &str, offset_sign: &str, offset: &Duration) { + fs::write(filename, contents).expect("Failed to create a file"); + let mtime = match offset_sign { + "past" => system_time_stub::NOW + .checked_sub(*offset) + .expect("Failed to calculate new mtime"), + "future" => system_time_stub::NOW + .checked_add(*offset) + .expect("Failed to calculate new mtime"), + _ => unreachable!(), + }; + filetime::set_file_mtime(filename, mtime.into()).expect("Failed to set mtime"); +} diff --git a/wasmtime-environ/src/cache/worker/tests/system_time_stub.rs b/wasmtime-environ/src/cache/worker/tests/system_time_stub.rs new file mode 100644 index 0000000000..5e457d63fc --- /dev/null +++ b/wasmtime-environ/src/cache/worker/tests/system_time_stub.rs @@ -0,0 +1,29 @@ +use lazy_static::lazy_static; +use std::time::{Duration, SystemTime, SystemTimeError}; + +lazy_static! { + pub static ref NOW: SystemTime = SystemTime::now(); // no need for RefCell and set_now() for now +} + +#[derive(PartialOrd, PartialEq, Ord, Eq)] +pub struct SystemTimeStub(SystemTime); + +impl SystemTimeStub { + pub fn now() -> Self { + Self(*NOW) + } + + pub fn checked_add(&self, duration: Duration) -> Option { + self.0.checked_add(duration).map(|t| t.into()) + } + + pub fn duration_since(&self, earlier: SystemTime) -> Result { + self.0.duration_since(earlier) + } +} + +impl From for SystemTimeStub { + fn from(time: SystemTime) -> Self { + Self(time) + } +}