diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..917098a --- /dev/null +++ b/.env.example @@ -0,0 +1,32 @@ +# Directory paths +INPUT_DIR=/home/user/livestreams +OUTPUT_DIR=/media/archive/livestreams + +# Program naming +DIVINE_WORSHIP_NAME=Divine Worship Service - RTSDA +AFTERNOON_PROGRAM_NAME=Afternoon Program - RTSDA + +# FFmpeg Configuration +FFMPEG_BINARY=ffmpeg +VIDEO_CODEC=av1_qsv +HW_ACCEL=qsv +HW_DEVICE=qsv=hw +FFMPEG_PRESET=4 +VIDEO_BITRATE=6M +MAX_BITRATE=12M +BUFFER_SIZE=24M +AUDIO_CODEC=copy +AUDIO_BITRATE=192k + +# File Stability Settings +STABILITY_CHECK_INTERVAL=2 +STABILITY_REQUIRED_CHECKS=15 +MAX_STABILITY_WAIT_HOURS=4 + +# Directory Settings +CREATE_YEAR_MONTH_DIRS=true +PRESERVE_ORIGINAL_FILES=true + +# NFO Settings +SHOW_TITLE=LiveStreams +CREATE_NFO_FILES=true \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fde4950..1d348bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -38,6 +47,17 @@ version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -127,12 +147,53 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + +[[package]] +name = "errno" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "filetime" version = "0.2.25" @@ -154,6 +215,18 @@ dependencies = [ "libc", ] +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.4+wasi-0.2.4", +] + [[package]] name = "gimli" version = "0.31.1" @@ -233,6 +306,12 @@ dependencies = [ "libc", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.169" @@ -250,14 +329,24 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "livestream_archiver" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "chrono", - "notify", + "tempfile", "tokio", + "tracing", + "tracing-subscriber", + "video_processing", ] [[package]] @@ -276,6 +365,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.4" @@ -299,7 +397,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -310,7 +408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -333,6 +431,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -404,6 +520,32 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.8" @@ -413,12 +555,54 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "regex" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustix" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.59.0", +] + [[package]] name = "same-file" version = "1.0.6" @@ -434,6 +618,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -476,6 +669,43 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sysinfo" +version = "0.30.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + +[[package]] +name = "tempfile" +version = "3.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" +dependencies = [ + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tokio" version = "1.42.0" @@ -505,12 +735,94 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "video_processing" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "notify", + "regex", + "sysinfo", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -527,6 +839,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.4+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a5f4a424faf49c3c2c344f166f0662341d470ea185e939657aaff130f0ec4a" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.99" @@ -581,6 +902,22 @@ version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.9" @@ -590,6 +927,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -746,3 +1099,9 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen" +version = "0.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c573471f125075647d03df72e026074b7203790d41351cd6edc96f46bcccd36" diff --git a/Cargo.toml b/Cargo.toml index 9e22573..12573db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,27 @@ [package] name = "livestream_archiver" version = "0.1.0" -edition = "2024" +edition = "2021" + +[lib] +name = "livestream_archiver" +path = "src/lib.rs" + +[[bin]] +name = "livestream_archiver" +path = "src/main.rs" [dependencies] -tokio = { version = "1.36", features = ["full"] } +tokio = { version = "1.36", features = ["full", "signal"] } anyhow = "1.0" -notify = "6.1" chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +async-trait = "0.1" +video_processing = { git = "https://git.rockvilletollandsda.church/RTSDA/video-processing-support.git", branch = "main" } + +[dev-dependencies] +tempfile = "3.0" # We don't need regex or other conversion-related deps diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..2c5c090 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,3 @@ +pub mod services; + +pub use services::livestream_archiver::LivestreamArchiver; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7f6c969..9208823 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,133 +1,131 @@ use std::path::PathBuf; use std::env; +use std::sync::Arc; use anyhow::Result; -use notify::{Watcher, RecursiveMode, Event, EventKind}; -use tokio::sync::mpsc; -use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use tracing::{info, error, warn}; +use video_processing::{ + VideoProcessingConfig, StabilityTracker, SystemMonitor, ShutdownHandler, + EventDrivenFileWatcher, FileProcessingService +}; mod services; use services::livestream_archiver::LivestreamArchiver; #[tokio::main] async fn main() -> Result<()> { + // Initialize logging + tracing_subscriber::fmt() + .with_env_filter( + env::var("RUST_LOG") + .unwrap_or_else(|_| "livestream_archiver=info,video_processing=info".to_string()) + ) + .init(); + + info!("Starting Livestream Archiver v0.1.0"); + let watch_path = PathBuf::from( env::var("INPUT_DIR").unwrap_or_else(|_| { - eprintln!("INPUT_DIR not set, using default: /home/user/livestreams"); - "/home/user/livestreams".to_string() + warn!("INPUT_DIR not set, using production default: /home/rockvilleav/.rtsda/livestreams"); + "/home/rockvilleav/.rtsda/livestreams".to_string() }) ); let output_path = PathBuf::from( env::var("OUTPUT_DIR").unwrap_or_else(|_| { - eprintln!("OUTPUT_DIR not set, using default: /media/archive/livestreams"); - "/media/archive/livestreams".to_string() + warn!("OUTPUT_DIR not set, using production default: /media/archive/jellyfin/livestreams"); + "/media/archive/jellyfin/livestreams".to_string() }) ); // Ensure directories exist if !watch_path.exists() { + info!("Creating watch directory: {}", watch_path.display()); std::fs::create_dir_all(&watch_path)?; } if !output_path.exists() { + info!("Creating output directory: {}", output_path.display()); std::fs::create_dir_all(&output_path)?; } - println!("Starting livestream archiver service..."); - println!("Watching directory: {}", watch_path.display()); - println!("Output directory: {}", output_path.display()); + info!("Watch directory: {}", watch_path.display()); + info!("Output directory: {}", output_path.display()); - let archiver = LivestreamArchiver::new(output_path.clone()); - let processed_files = Arc::new(Mutex::new(HashSet::new())); + // Initialize components + let config = VideoProcessingConfig::from_env(); + let archiver = Arc::new(LivestreamArchiver::new(output_path.clone())); + let stability_tracker = Arc::new(StabilityTracker::new(config)); + let system_monitor = SystemMonitor::new(5); // 5GB minimum free space + let (shutdown_handler, mut shutdown_rx) = ShutdownHandler::new(); - // Process existing files first - println!("Checking for existing files..."); - if let Ok(entries) = std::fs::read_dir(&watch_path) { - for entry in entries { - if let Ok(entry) = entry { - let path = entry.path(); - // Only process .mp4 files - if path.extension().and_then(|ext| ext.to_str()) == Some("mp4") { - // Extract date from filename to check if output exists - if let Some(filename) = path.file_name().and_then(|f| f.to_str()) { - if let Ok(date) = archiver.extract_date_from_filename(filename).await { - // Check if either Divine Worship or Afternoon Program exists for this date - let year_dir = archiver.get_output_path().join(date.format("%Y").to_string()); - let month_dir = year_dir.join(format!("{}-{}", - date.format("%m"), - date.format("%B") - )); - - let divine_worship_file = month_dir.join(format!( - "Divine Worship Service - RTSDA | {}.mp4", - date.format("%B %d %Y") - )); - let afternoon_program_file = month_dir.join(format!( - "Afternoon Program - RTSDA | {}.mp4", - date.format("%B %d %Y") - )); - - if !divine_worship_file.exists() && !afternoon_program_file.exists() { - println!("Found unprocessed file: {}", path.display()); - if let Err(e) = archiver.process_file(path).await { - eprintln!("Error processing existing file: {}", e); - } - } else { - println!("Skipping already processed file: {}", path.display()); - } - } - } - } - } - } + // Check initial disk space + if !system_monitor.check_disk_space(&output_path)? { + error!("Insufficient disk space to start processing"); + return Ok(()); } - // Set up file watcher for new files - let (tx, mut rx) = mpsc::channel(100); - - let mut watcher = notify::recommended_watcher(move |res: Result| { - let tx = tx.clone(); - match res { - Ok(event) => { - println!("Received event: {:?}", event); - if let Err(e) = tx.blocking_send(event) { - eprintln!("Error sending event: {}", e); - } + // Set up file processing service + let file_service = Arc::new(FileProcessingService::new( + Arc::clone(&archiver), + Arc::clone(&stability_tracker), + )); + + // Process existing files + if let Err(e) = file_service.process_existing_files(&watch_path).await { + error!("Error processing existing files: {}", e); + } + + // Set up file watcher + let mut file_watcher = EventDrivenFileWatcher::new(watch_path.clone())?; + + // Start shutdown signal handler + let shutdown_task = tokio::spawn(async move { + shutdown_handler.wait_for_shutdown_signal().await; + }); + + info!("Livestream archiver is running. Press Ctrl+C to stop."); + + // Main event loop + loop { + tokio::select! { + // Handle shutdown signal + _ = shutdown_rx.recv() => { + info!("Shutdown signal received, stopping..."); + break; } - Err(e) => eprintln!("Watch error: {}", e), - } - })?; - - watcher.watch(&watch_path, RecursiveMode::NonRecursive)?; - - while let Some(event) = rx.recv().await { - println!("Processing event: {:?}", event); - - match event.kind { - EventKind::Create(_) | EventKind::Modify(_) => { - for path in event.paths { - if let Ok(canonical_path) = std::fs::canonicalize(&path) { - let path_str = canonical_path.to_string_lossy().to_string(); - let mut processed = processed_files.lock().unwrap(); + + // Handle file events + event = file_watcher.next_event() => { + if let Some(event) = event { + if EventDrivenFileWatcher::should_process_event(&event) { + let mp4_paths = EventDrivenFileWatcher::extract_mp4_paths(&event); - if !processed.contains(&path_str) { - println!("Processing file: {}", path_str); - if let Err(e) = archiver.process_file(path).await { - eprintln!("Error processing file: {}", e); - } else { - processed.insert(path_str); - if processed.len() > 1000 { - processed.clear(); - } + for path in mp4_paths { + // Check disk space before processing + if !system_monitor.check_disk_space(&output_path)? { + warn!("Insufficient disk space, skipping file: {}", path.display()); + continue; } - } else { - println!("Skipping already processed file: {}", path_str); + + info!("New MP4 file detected: {}", path.display()); + + let service = Arc::clone(&file_service); + tokio::spawn(async move { + if let Err(e) = service.handle_file_event(path.clone()).await { + error!("Failed to process file {}: {}", path.display(), e); + } + }); } } + } else { + warn!("File watcher channel closed, exiting"); + break; } - }, - _ => println!("Ignoring event: {:?}", event), + } } } + // Wait for shutdown to complete + shutdown_task.await?; + info!("Livestream archiver stopped"); + Ok(()) } diff --git a/src/services/livestream_archiver.rs b/src/services/livestream_archiver.rs index df2cd57..374a8db 100644 --- a/src/services/livestream_archiver.rs +++ b/src/services/livestream_archiver.rs @@ -1,81 +1,78 @@ use std::path::PathBuf; +use std::env; use anyhow::{Result, anyhow}; use chrono::NaiveDateTime; -use tokio::process::Command; -use tokio::time::Duration; +use video_processing::{ + VideoProcessingConfig, StabilityTracker, VideoConverter, NfoGenerator, FileProcessor +}; pub struct LivestreamArchiver { output_path: PathBuf, + config: VideoProcessingConfig, + stability_tracker: StabilityTracker, + video_converter: VideoConverter, + nfo_generator: NfoGenerator, + divine_worship_name: String, + afternoon_program_name: String, } impl LivestreamArchiver { pub fn new(output_path: PathBuf) -> Self { + let config = VideoProcessingConfig::from_env(); + let stability_tracker = StabilityTracker::new(config.clone()); + let video_converter = VideoConverter::new(config.clone()); + let nfo_generator = NfoGenerator::new(config.clone()); + + let divine_worship_name = env::var("DIVINE_WORSHIP_NAME") + .unwrap_or_else(|_| "Divine Worship Service - RTSDA".to_string()); + let afternoon_program_name = env::var("AFTERNOON_PROGRAM_NAME") + .unwrap_or_else(|_| "Afternoon Program - RTSDA".to_string()); + LivestreamArchiver { output_path, + config, + stability_tracker, + video_converter, + nfo_generator, + divine_worship_name, + afternoon_program_name, } } pub fn get_output_path(&self) -> &PathBuf { &self.output_path } - - async fn wait_for_file_ready(&self, path: &PathBuf) -> Result<()> { - println!("Waiting for file to be ready: {}", path.display()); + pub fn on_file_event(&self, path: &PathBuf) -> Result { + self.stability_tracker.on_file_event(path) + } + + pub fn remove_file_tracker(&self, path: &PathBuf) { + self.stability_tracker.remove_file_tracker(path); + } + + pub fn get_tracked_files(&self) -> Vec { + self.stability_tracker.get_tracked_files() + } + + pub fn check_if_file_already_processed(&self, _filename: &str, date: &chrono::NaiveDateTime) -> bool { + let year_dir = self.output_path.join(date.format("%Y").to_string()); + let month_dir = year_dir.join(format!("{}-{}", + date.format("%m"), + date.format("%B") + )); - // Initial delay - let OBS get started - tokio::time::sleep(Duration::from_secs(10)).await; + let divine_worship_file = month_dir.join(format!( + "{} ({}).mp4", + self.divine_worship_name, + date.format("%B %d %Y") + )); + let afternoon_program_file = month_dir.join(format!( + "{} ({}).mp4", + self.afternoon_program_name, + date.format("%B %d %Y") + )); - let mut last_size = 0; - let mut stable_count = 0; - let mut last_modified = std::time::SystemTime::now(); - let required_stable_checks = 15; // Must be stable for 30 seconds - - // Check for up to 4 hours (14400 seconds / 2 second interval = 7200 iterations) - for i in 0..7200 { - match tokio::fs::metadata(path).await { - Ok(metadata) => { - let current_size = metadata.len(); - let current_modified = metadata.modified()?; - - println!("Check {}: Size = {} bytes, Last Modified: {:?}", i, current_size, current_modified); - - if current_size > 0 { - if current_size == last_size { - // Also check if file hasn't been modified recently - if current_modified == last_modified { - stable_count += 1; - println!("Size and modification time stable for {} checks", stable_count); - - if stable_count >= required_stable_checks { - println!("File appears complete - size and modification time stable for 30 seconds"); - // Extra 30 second buffer after stability to be sure - tokio::time::sleep(Duration::from_secs(30)).await; - return Ok(()); - } - } else { - println!("File still being modified"); - stable_count = 0; - } - } else { - println!("Size changed: {} -> {}", last_size, current_size); - stable_count = 0; - } - - last_size = current_size; - last_modified = current_modified; - } - }, - Err(e) => { - println!("Error checking file: {}", e); - return Err(anyhow!("Failed to check file metadata: {}", e)); - } - } - tokio::time::sleep(Duration::from_secs(2)).await; - } - - // If we reach here, it timed out after 4 hours - something is wrong - println!("Timeout after 4 hours - file is still being written?"); - Err(anyhow!("Timeout after 4 hours waiting for file to stabilize")) + divine_worship_file.exists() || afternoon_program_file.exists() } pub async fn extract_date_from_filename(&self, filename: &str) -> Result { @@ -95,10 +92,7 @@ impl LivestreamArchiver { return Err(anyhow!("Ignoring non-MP4 file")); } - println!("Processing livestream recording: {}", path.display()); - - // Wait for file to be fully copied - self.wait_for_file_ready(&path).await?; + println!("Processing stable livestream recording: {}", path.display()); // Get the filename let filename = path.file_name() @@ -121,106 +115,87 @@ impl LivestreamArchiver { // Check for existing files let divine_worship_file = month_dir.join(format!( - "Divine Worship Service - RTSDA | {}.mp4", + "{} ({}).mp4", + self.divine_worship_name, date.format("%B %d %Y") )); let afternoon_program_file = month_dir.join(format!( - "Afternoon Program - RTSDA | {}.mp4", + "{} ({}).mp4", + self.afternoon_program_name, date.format("%B %d %Y") )); // Determine which filename to use let (base_filename, nfo_title, nfo_tag) = if !divine_worship_file.exists() { ( - format!("Divine Worship Service - RTSDA | {}", date.format("%B %d %Y")), - format!("Divine Worship Service - RTSDA | {}", date.format("%B %-d %Y")), + format!("{} ({})", self.divine_worship_name, date.format("%B %d %Y")), + format!("{} ({})", self.divine_worship_name, date.format("%B %-d %Y")), "Divine Worship Service" ) } else if !afternoon_program_file.exists() { ( - format!("Afternoon Program - RTSDA | {}", date.format("%B %d %Y")), - format!("Afternoon Program - RTSDA | {}", date.format("%B %-d %Y")), + format!("{} ({})", self.afternoon_program_name, date.format("%B %d %Y")), + format!("{} ({})", self.afternoon_program_name, date.format("%B %-d %Y")), "Afternoon Program" ) } else { // Both exist, add suffix to Afternoon Program let mut suffix = 1; let mut test_file = month_dir.join(format!( - "Afternoon Program - RTSDA | {} ({}).mp4", + "{} ({}) ({}).mp4", + self.afternoon_program_name, date.format("%B %d %Y"), suffix )); while test_file.exists() { suffix += 1; test_file = month_dir.join(format!( - "Afternoon Program - RTSDA | {} ({}).mp4", + "{} ({}) ({}).mp4", + self.afternoon_program_name, date.format("%B %d %Y"), suffix )); } ( - format!("Afternoon Program - RTSDA | {} ({})", date.format("%B %d %Y"), suffix), - format!("Afternoon Program - RTSDA | {} ({})", date.format("%B %-d %Y"), suffix), + format!("{} ({}) ({})", self.afternoon_program_name, date.format("%B %d %Y"), suffix), + format!("{} ({}) ({})", self.afternoon_program_name, date.format("%B %-d %Y"), suffix), "Afternoon Program" ) }; let output_file = month_dir.join(format!("{}.mp4", base_filename)); - println!("Converting to AV1 and saving to: {}", output_file.display()); + // Convert video using shared crate + self.video_converter.convert_video(&path, &output_file).await?; - // Build ffmpeg command for AV1 conversion using QSV - let status = Command::new("ffmpeg") - .arg("-init_hw_device").arg("qsv=hw") - .arg("-filter_hw_device").arg("hw") - .arg("-hwaccel").arg("qsv") - .arg("-hwaccel_output_format").arg("qsv") - .arg("-i").arg(&path) - .arg("-c:v").arg("av1_qsv") - .arg("-preset").arg("4") - .arg("-b:v").arg("6M") - .arg("-maxrate").arg("12M") - .arg("-bufsize").arg("24M") - .arg("-c:a").arg("copy") - .arg("-n") // Never overwrite existing files - .arg(&output_file) - .status() - .await?; - - if !status.success() { - return Err(anyhow!("FFmpeg conversion failed")); - } - - // Create NFO file - println!("Creating NFO file..."); - let nfo_content = format!(r#" - - {} - LiveStreams - {} - {} - {} - {} - {} - {} -"#, - nfo_title, - date.format("%Y").to_string(), - date.format("%m%d").to_string(), - date.format("%Y-%m-%d"), - date.format("%Y"), - date.format("%m%d"), - nfo_tag - ); - - let nfo_path = output_file.with_extension("nfo"); - tokio::fs::write(nfo_path, nfo_content).await?; + // Create NFO file using shared crate + let date_only = date.date(); + self.nfo_generator.create_nfo_file(&output_file, &nfo_title, &date_only, Some(nfo_tag)).await?; println!("Successfully converted {} to AV1 and created NFO", path.display()); + // Clean up the file tracker since we're done with this file + self.remove_file_tracker(&path); + // Don't delete original file println!("Original file preserved at: {}", path.display()); Ok(()) } } + +#[async_trait::async_trait] +impl FileProcessor for LivestreamArchiver { + async fn process_file(&self, path: PathBuf) -> Result<()> { + self.process_file(path).await + } + + async fn should_skip_existing_file(&self, path: &PathBuf) -> bool { + if let Some(filename) = path.file_name().and_then(|f| f.to_str()) { + if let Ok(date) = self.extract_date_from_filename(filename).await { + return self.check_if_file_already_processed(filename, &date); + } + } + false + } +} diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 0000000..890f9ed --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,172 @@ +use std::env; +use std::fs; +use std::path::PathBuf; +use tempfile::TempDir; +use chrono::{NaiveDateTime, Datelike, Timelike}; +use livestream_archiver::LivestreamArchiver; + +#[tokio::test] +async fn test_livestream_archiver_creation() { + let temp_output = TempDir::new().unwrap(); + + // Test that we can create a new archiver + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + assert_eq!(archiver.get_output_path(), temp_output.path()); +} + +#[tokio::test] +async fn test_date_extraction_from_filename() { + let temp_output = TempDir::new().unwrap(); + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + // Test valid filename + let result = archiver.extract_date_from_filename("2024-12-25_14-30-45.mp4").await; + assert!(result.is_ok()); + + let date = result.unwrap(); + assert_eq!(date.year(), 2024); + assert_eq!(date.month(), 12); + assert_eq!(date.day(), 25); + assert_eq!(date.hour(), 14); + assert_eq!(date.minute(), 30); + assert_eq!(date.second(), 45); + + // Test invalid filename + let result = archiver.extract_date_from_filename("invalid-filename.mp4").await; + assert!(result.is_err()); + + // Test non-mp4 file + let result = archiver.extract_date_from_filename("2024-12-25_14-30-45.avi").await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_file_already_processed_detection() { + let temp_output = TempDir::new().unwrap(); + + // Set up environment variables for program names + env::set_var("DIVINE_WORSHIP_NAME", "Test Divine Worship"); + env::set_var("AFTERNOON_PROGRAM_NAME", "Test Afternoon Program"); + + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + let test_date = NaiveDateTime::parse_from_str("2024-12-25_14-30-45", "%Y-%m-%d_%H-%M-%S").unwrap(); + + // Initially no files should exist + assert!(!archiver.check_if_file_already_processed("2024-12-25_14-30-45.mp4", &test_date)); + + // Create the expected output directory structure + let year_dir = temp_output.path().join("2024"); + let month_dir = year_dir.join("12-December"); + fs::create_dir_all(&month_dir).unwrap(); + + // Create a divine worship file + let divine_worship_file = month_dir.join("Test Divine Worship (December 25 2024).mp4"); + fs::write(&divine_worship_file, "test content").unwrap(); + + // Now it should detect the file as processed + assert!(archiver.check_if_file_already_processed("2024-12-25_14-30-45.mp4", &test_date)); + + // Clean up environment variables + env::remove_var("DIVINE_WORSHIP_NAME"); + env::remove_var("AFTERNOON_PROGRAM_NAME"); +} + +#[tokio::test] +async fn test_environment_variable_configuration() { + // Test with custom environment variables + env::set_var("DIVINE_WORSHIP_NAME", "Custom Divine Service"); + env::set_var("AFTERNOON_PROGRAM_NAME", "Custom Afternoon Show"); + env::set_var("SHOW_TITLE", "Custom Livestreams"); + + let temp_output = TempDir::new().unwrap(); + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + // These are internal fields, so we test them indirectly through the file processing logic + let test_date = NaiveDateTime::parse_from_str("2024-12-25_14-30-45", "%Y-%m-%d_%H-%M-%S").unwrap(); + + // The archiver should use the custom names when checking for existing files + assert!(!archiver.check_if_file_already_processed("test.mp4", &test_date)); + + // Create expected directory structure + let year_dir = temp_output.path().join("2024"); + let month_dir = year_dir.join("12-December"); + fs::create_dir_all(&month_dir).unwrap(); + + // Create file with custom name + let custom_file = month_dir.join("Custom Divine Service (December 25 2024).mp4"); + fs::write(&custom_file, "test content").unwrap(); + + // Should now detect as processed + assert!(archiver.check_if_file_already_processed("test.mp4", &test_date)); + + // Clean up + env::remove_var("DIVINE_WORSHIP_NAME"); + env::remove_var("AFTERNOON_PROGRAM_NAME"); + env::remove_var("SHOW_TITLE"); +} + +#[tokio::test] +async fn test_stability_tracking_integration() { + let temp_output = TempDir::new().unwrap(); + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + let temp_input = TempDir::new().unwrap(); + let test_file = temp_input.path().join("2024-12-25_14-30-45.mp4"); + + // Create initial file + fs::write(&test_file, "initial content").unwrap(); + + // Test that stability tracking starts + let result = archiver.on_file_event(&test_file); + assert!(result.is_ok()); + assert!(!result.unwrap(), "New file should not be immediately stable"); + + // File should be tracked + let tracked_files = archiver.get_tracked_files(); + assert_eq!(tracked_files.len(), 1); + assert_eq!(tracked_files[0], test_file); + + // Remove tracking + archiver.remove_file_tracker(&test_file); + assert_eq!(archiver.get_tracked_files().len(), 0); +} + +#[tokio::test] +async fn test_directory_structure_creation() { + let temp_output = TempDir::new().unwrap(); + + // Test date parsing and directory structure + let test_cases = vec![ + ("2024-01-15_10-30-00.mp4", "2024", "01-January"), + ("2024-12-31_23-59-59.mp4", "2024", "12-December"), + ("2023-07-04_12-00-00.mp4", "2023", "07-July"), + ]; + + for (filename, expected_year, expected_month) in test_cases { + let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf()); + + let date = archiver.extract_date_from_filename(filename).await.unwrap(); + + // The directory structure should match what the archiver expects + let year_dir = temp_output.path().join(expected_year); + let month_dir = year_dir.join(expected_month); + + // Create the directory structure as the archiver would + fs::create_dir_all(&month_dir).unwrap(); + + // Verify structure exists + assert!(year_dir.exists()); + assert!(month_dir.exists()); + + // Test file detection logic + assert!(!archiver.check_if_file_already_processed(filename, &date)); + } +} + +// Helper function to create mock video files for testing +fn create_mock_mp4_file(path: &PathBuf, size_mb: usize) -> std::io::Result<()> { + let content = vec![0u8; size_mb * 1024 * 1024]; // Create file of specified size in MB + fs::write(path, content) +} \ No newline at end of file