Major refactoring: Event-driven livestream archiver

- Complete removal of polling loops for better performance
- Event-driven file processing with stability tracking
- Proper structured logging with tracing
- Graceful shutdown handling (SIGTERM/SIGINT)
- Disk space monitoring (5GB minimum threshold)
- Production-ready paths for RTSDA deployment
- Shared video_processing crate integration
- Filename compatibility improvements (| → () separator)
- Environment-based configuration
- Comprehensive error handling and logging

Performance improvements:
- No more wasteful 1-2 second polling
- Only processes files when filesystem events occur
- Proper resource management and cleanup

Production features:
- Default paths: /home/rockvilleav/.rtsda/livestreams → /media/archive/jellyfin/livestreams
- Configurable via INPUT_DIR and OUTPUT_DIR environment variables
- Structured logging configurable via RUST_LOG
- Hardware-accelerated video conversion (Intel QSV)
- NFO file generation for Jellyfin compatibility
This commit is contained in:
Benjamin Slingo 2025-09-06 18:27:59 -04:00
parent a7206fa325
commit 8c84b69028
7 changed files with 770 additions and 217 deletions

32
.env.example Normal file
View file

@ -0,0 +1,32 @@
# Directory paths
INPUT_DIR=/home/user/livestreams
OUTPUT_DIR=/media/archive/livestreams
# Program naming
DIVINE_WORSHIP_NAME=Divine Worship Service - RTSDA
AFTERNOON_PROGRAM_NAME=Afternoon Program - RTSDA
# FFmpeg Configuration
FFMPEG_BINARY=ffmpeg
VIDEO_CODEC=av1_qsv
HW_ACCEL=qsv
HW_DEVICE=qsv=hw
FFMPEG_PRESET=4
VIDEO_BITRATE=6M
MAX_BITRATE=12M
BUFFER_SIZE=24M
AUDIO_CODEC=copy
AUDIO_BITRATE=192k
# File Stability Settings
STABILITY_CHECK_INTERVAL=2
STABILITY_REQUIRED_CHECKS=15
MAX_STABILITY_WAIT_HOURS=4
# Directory Settings
CREATE_YEAR_MONTH_DIRS=true
PRESERVE_ORIGINAL_FILES=true
# NFO Settings
SHOW_TITLE=LiveStreams
CREATE_NFO_FILES=true

365
Cargo.lock generated
View file

@ -17,6 +17,15 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "aho-corasick"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "android-tzdata" name = "android-tzdata"
version = "0.1.1" version = "0.1.1"
@ -38,6 +47,17 @@ version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
@ -127,12 +147,53 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.21" version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "errno"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]] [[package]]
name = "filetime" name = "filetime"
version = "0.2.25" version = "0.2.25"
@ -154,6 +215,18 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "getrandom"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.4+wasi-0.2.4",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.31.1" version = "0.31.1"
@ -233,6 +306,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.169" version = "0.2.169"
@ -250,14 +329,24 @@ dependencies = [
"redox_syscall", "redox_syscall",
] ]
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]] [[package]]
name = "livestream_archiver" name = "livestream_archiver"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"chrono", "chrono",
"notify", "tempfile",
"tokio", "tokio",
"tracing",
"tracing-subscriber",
"video_processing",
] ]
[[package]] [[package]]
@ -276,6 +365,15 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.4" version = "2.7.4"
@ -299,7 +397,7 @@ checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@ -310,7 +408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@ -333,6 +431,24 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399"
dependencies = [
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@ -404,6 +520,32 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.8" version = "0.5.8"
@ -413,12 +555,54 @@ dependencies = [
"bitflags 2.6.0", "bitflags 2.6.0",
] ]
[[package]]
name = "regex"
version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.24" version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustix"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8"
dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "same-file" name = "same-file"
version = "1.0.6" version = "1.0.6"
@ -434,6 +618,15 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "shlex" name = "shlex"
version = "1.3.0" version = "1.3.0"
@ -476,6 +669,43 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sysinfo"
version = "0.30.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"windows",
]
[[package]]
name = "tempfile"
version = "3.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e"
dependencies = [
"fastrand",
"getrandom",
"once_cell",
"rustix",
"windows-sys 0.59.0",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.42.0" version = "1.42.0"
@ -505,12 +735,94 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tracing"
version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.14" version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83"
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "video_processing"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"notify",
"regex",
"sysinfo",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.5.0" version = "2.5.0"
@ -527,6 +839,15 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.14.4+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88a5f4a424faf49c3c2c344f166f0662341d470ea185e939657aaff130f0ec4a"
dependencies = [
"wit-bindgen",
]
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.99" version = "0.2.99"
@ -581,6 +902,22 @@ version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]] [[package]]
name = "winapi-util" name = "winapi-util"
version = "0.1.9" version = "0.1.9"
@ -590,6 +927,22 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-targets 0.52.6",
]
[[package]] [[package]]
name = "windows-core" name = "windows-core"
version = "0.52.0" version = "0.52.0"
@ -746,3 +1099,9 @@ name = "windows_x86_64_msvc"
version = "0.52.6" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "wit-bindgen"
version = "0.45.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c573471f125075647d03df72e026074b7203790d41351cd6edc96f46bcccd36"

View file

@ -1,13 +1,27 @@
[package] [package]
name = "livestream_archiver" name = "livestream_archiver"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2021"
[lib]
name = "livestream_archiver"
path = "src/lib.rs"
[[bin]]
name = "livestream_archiver"
path = "src/main.rs"
[dependencies] [dependencies]
tokio = { version = "1.36", features = ["full"] } tokio = { version = "1.36", features = ["full", "signal"] }
anyhow = "1.0" anyhow = "1.0"
notify = "6.1"
chrono = "0.4" chrono = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
async-trait = "0.1"
video_processing = { git = "https://git.rockvilletollandsda.church/RTSDA/video-processing-support.git", branch = "main" }
[dev-dependencies]
tempfile = "3.0"
# We don't need regex or other conversion-related deps # We don't need regex or other conversion-related deps

3
src/lib.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod services;
pub use services::livestream_archiver::LivestreamArchiver;

View file

@ -1,133 +1,131 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::env; use std::env;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use notify::{Watcher, RecursiveMode, Event, EventKind}; use tracing::{info, error, warn};
use tokio::sync::mpsc; use video_processing::{
use std::collections::HashSet; VideoProcessingConfig, StabilityTracker, SystemMonitor, ShutdownHandler,
use std::sync::{Arc, Mutex}; EventDrivenFileWatcher, FileProcessingService
};
mod services; mod services;
use services::livestream_archiver::LivestreamArchiver; use services::livestream_archiver::LivestreamArchiver;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
env::var("RUST_LOG")
.unwrap_or_else(|_| "livestream_archiver=info,video_processing=info".to_string())
)
.init();
info!("Starting Livestream Archiver v0.1.0");
let watch_path = PathBuf::from( let watch_path = PathBuf::from(
env::var("INPUT_DIR").unwrap_or_else(|_| { env::var("INPUT_DIR").unwrap_or_else(|_| {
eprintln!("INPUT_DIR not set, using default: /home/user/livestreams"); warn!("INPUT_DIR not set, using production default: /home/rockvilleav/.rtsda/livestreams");
"/home/user/livestreams".to_string() "/home/rockvilleav/.rtsda/livestreams".to_string()
}) })
); );
let output_path = PathBuf::from( let output_path = PathBuf::from(
env::var("OUTPUT_DIR").unwrap_or_else(|_| { env::var("OUTPUT_DIR").unwrap_or_else(|_| {
eprintln!("OUTPUT_DIR not set, using default: /media/archive/livestreams"); warn!("OUTPUT_DIR not set, using production default: /media/archive/jellyfin/livestreams");
"/media/archive/livestreams".to_string() "/media/archive/jellyfin/livestreams".to_string()
}) })
); );
// Ensure directories exist // Ensure directories exist
if !watch_path.exists() { if !watch_path.exists() {
info!("Creating watch directory: {}", watch_path.display());
std::fs::create_dir_all(&watch_path)?; std::fs::create_dir_all(&watch_path)?;
} }
if !output_path.exists() { if !output_path.exists() {
info!("Creating output directory: {}", output_path.display());
std::fs::create_dir_all(&output_path)?; std::fs::create_dir_all(&output_path)?;
} }
println!("Starting livestream archiver service..."); info!("Watch directory: {}", watch_path.display());
println!("Watching directory: {}", watch_path.display()); info!("Output directory: {}", output_path.display());
println!("Output directory: {}", output_path.display());
let archiver = LivestreamArchiver::new(output_path.clone()); // Initialize components
let processed_files = Arc::new(Mutex::new(HashSet::new())); let config = VideoProcessingConfig::from_env();
let archiver = Arc::new(LivestreamArchiver::new(output_path.clone()));
let stability_tracker = Arc::new(StabilityTracker::new(config));
let system_monitor = SystemMonitor::new(5); // 5GB minimum free space
let (shutdown_handler, mut shutdown_rx) = ShutdownHandler::new();
// Process existing files first // Check initial disk space
println!("Checking for existing files..."); if !system_monitor.check_disk_space(&output_path)? {
if let Ok(entries) = std::fs::read_dir(&watch_path) { error!("Insufficient disk space to start processing");
for entry in entries { return Ok(());
if let Ok(entry) = entry { }
let path = entry.path();
// Only process .mp4 files
if path.extension().and_then(|ext| ext.to_str()) == Some("mp4") {
// Extract date from filename to check if output exists
if let Some(filename) = path.file_name().and_then(|f| f.to_str()) {
if let Ok(date) = archiver.extract_date_from_filename(filename).await {
// Check if either Divine Worship or Afternoon Program exists for this date
let year_dir = archiver.get_output_path().join(date.format("%Y").to_string());
let month_dir = year_dir.join(format!("{}-{}",
date.format("%m"),
date.format("%B")
));
let divine_worship_file = month_dir.join(format!( // Set up file processing service
"Divine Worship Service - RTSDA | {}.mp4", let file_service = Arc::new(FileProcessingService::new(
date.format("%B %d %Y") Arc::clone(&archiver),
)); Arc::clone(&stability_tracker),
let afternoon_program_file = month_dir.join(format!( ));
"Afternoon Program - RTSDA | {}.mp4",
date.format("%B %d %Y")
));
if !divine_worship_file.exists() && !afternoon_program_file.exists() { // Process existing files
println!("Found unprocessed file: {}", path.display()); if let Err(e) = file_service.process_existing_files(&watch_path).await {
if let Err(e) = archiver.process_file(path).await { error!("Error processing existing files: {}", e);
eprintln!("Error processing existing file: {}", e); }
}
} else { // Set up file watcher
println!("Skipping already processed file: {}", path.display()); let mut file_watcher = EventDrivenFileWatcher::new(watch_path.clone())?;
// Start shutdown signal handler
let shutdown_task = tokio::spawn(async move {
shutdown_handler.wait_for_shutdown_signal().await;
});
info!("Livestream archiver is running. Press Ctrl+C to stop.");
// Main event loop
loop {
tokio::select! {
// Handle shutdown signal
_ = shutdown_rx.recv() => {
info!("Shutdown signal received, stopping...");
break;
}
// Handle file events
event = file_watcher.next_event() => {
if let Some(event) = event {
if EventDrivenFileWatcher::should_process_event(&event) {
let mp4_paths = EventDrivenFileWatcher::extract_mp4_paths(&event);
for path in mp4_paths {
// Check disk space before processing
if !system_monitor.check_disk_space(&output_path)? {
warn!("Insufficient disk space, skipping file: {}", path.display());
continue;
} }
info!("New MP4 file detected: {}", path.display());
let service = Arc::clone(&file_service);
tokio::spawn(async move {
if let Err(e) = service.handle_file_event(path.clone()).await {
error!("Failed to process file {}: {}", path.display(), e);
}
});
} }
} }
} else {
warn!("File watcher channel closed, exiting");
break;
} }
} }
} }
} }
// Set up file watcher for new files // Wait for shutdown to complete
let (tx, mut rx) = mpsc::channel(100); shutdown_task.await?;
info!("Livestream archiver stopped");
let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
let tx = tx.clone();
match res {
Ok(event) => {
println!("Received event: {:?}", event);
if let Err(e) = tx.blocking_send(event) {
eprintln!("Error sending event: {}", e);
}
}
Err(e) => eprintln!("Watch error: {}", e),
}
})?;
watcher.watch(&watch_path, RecursiveMode::NonRecursive)?;
while let Some(event) = rx.recv().await {
println!("Processing event: {:?}", event);
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) => {
for path in event.paths {
if let Ok(canonical_path) = std::fs::canonicalize(&path) {
let path_str = canonical_path.to_string_lossy().to_string();
let mut processed = processed_files.lock().unwrap();
if !processed.contains(&path_str) {
println!("Processing file: {}", path_str);
if let Err(e) = archiver.process_file(path).await {
eprintln!("Error processing file: {}", e);
} else {
processed.insert(path_str);
if processed.len() > 1000 {
processed.clear();
}
}
} else {
println!("Skipping already processed file: {}", path_str);
}
}
}
},
_ => println!("Ignoring event: {:?}", event),
}
}
Ok(()) Ok(())
} }

View file

@ -1,81 +1,78 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::env;
use anyhow::{Result, anyhow}; use anyhow::{Result, anyhow};
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use tokio::process::Command; use video_processing::{
use tokio::time::Duration; VideoProcessingConfig, StabilityTracker, VideoConverter, NfoGenerator, FileProcessor
};
pub struct LivestreamArchiver { pub struct LivestreamArchiver {
output_path: PathBuf, output_path: PathBuf,
config: VideoProcessingConfig,
stability_tracker: StabilityTracker,
video_converter: VideoConverter,
nfo_generator: NfoGenerator,
divine_worship_name: String,
afternoon_program_name: String,
} }
impl LivestreamArchiver { impl LivestreamArchiver {
pub fn new(output_path: PathBuf) -> Self { pub fn new(output_path: PathBuf) -> Self {
let config = VideoProcessingConfig::from_env();
let stability_tracker = StabilityTracker::new(config.clone());
let video_converter = VideoConverter::new(config.clone());
let nfo_generator = NfoGenerator::new(config.clone());
let divine_worship_name = env::var("DIVINE_WORSHIP_NAME")
.unwrap_or_else(|_| "Divine Worship Service - RTSDA".to_string());
let afternoon_program_name = env::var("AFTERNOON_PROGRAM_NAME")
.unwrap_or_else(|_| "Afternoon Program - RTSDA".to_string());
LivestreamArchiver { LivestreamArchiver {
output_path, output_path,
config,
stability_tracker,
video_converter,
nfo_generator,
divine_worship_name,
afternoon_program_name,
} }
} }
pub fn get_output_path(&self) -> &PathBuf { pub fn get_output_path(&self) -> &PathBuf {
&self.output_path &self.output_path
} }
pub fn on_file_event(&self, path: &PathBuf) -> Result<bool> {
self.stability_tracker.on_file_event(path)
}
async fn wait_for_file_ready(&self, path: &PathBuf) -> Result<()> { pub fn remove_file_tracker(&self, path: &PathBuf) {
println!("Waiting for file to be ready: {}", path.display()); self.stability_tracker.remove_file_tracker(path);
}
// Initial delay - let OBS get started pub fn get_tracked_files(&self) -> Vec<PathBuf> {
tokio::time::sleep(Duration::from_secs(10)).await; self.stability_tracker.get_tracked_files()
}
let mut last_size = 0; pub fn check_if_file_already_processed(&self, _filename: &str, date: &chrono::NaiveDateTime) -> bool {
let mut stable_count = 0; let year_dir = self.output_path.join(date.format("%Y").to_string());
let mut last_modified = std::time::SystemTime::now(); let month_dir = year_dir.join(format!("{}-{}",
let required_stable_checks = 15; // Must be stable for 30 seconds date.format("%m"),
date.format("%B")
));
// Check for up to 4 hours (14400 seconds / 2 second interval = 7200 iterations) let divine_worship_file = month_dir.join(format!(
for i in 0..7200 { "{} ({}).mp4",
match tokio::fs::metadata(path).await { self.divine_worship_name,
Ok(metadata) => { date.format("%B %d %Y")
let current_size = metadata.len(); ));
let current_modified = metadata.modified()?; let afternoon_program_file = month_dir.join(format!(
"{} ({}).mp4",
self.afternoon_program_name,
date.format("%B %d %Y")
));
println!("Check {}: Size = {} bytes, Last Modified: {:?}", i, current_size, current_modified); divine_worship_file.exists() || afternoon_program_file.exists()
if current_size > 0 {
if current_size == last_size {
// Also check if file hasn't been modified recently
if current_modified == last_modified {
stable_count += 1;
println!("Size and modification time stable for {} checks", stable_count);
if stable_count >= required_stable_checks {
println!("File appears complete - size and modification time stable for 30 seconds");
// Extra 30 second buffer after stability to be sure
tokio::time::sleep(Duration::from_secs(30)).await;
return Ok(());
}
} else {
println!("File still being modified");
stable_count = 0;
}
} else {
println!("Size changed: {} -> {}", last_size, current_size);
stable_count = 0;
}
last_size = current_size;
last_modified = current_modified;
}
},
Err(e) => {
println!("Error checking file: {}", e);
return Err(anyhow!("Failed to check file metadata: {}", e));
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
// If we reach here, it timed out after 4 hours - something is wrong
println!("Timeout after 4 hours - file is still being written?");
Err(anyhow!("Timeout after 4 hours waiting for file to stabilize"))
} }
pub async fn extract_date_from_filename(&self, filename: &str) -> Result<NaiveDateTime> { pub async fn extract_date_from_filename(&self, filename: &str) -> Result<NaiveDateTime> {
@ -95,10 +92,7 @@ impl LivestreamArchiver {
return Err(anyhow!("Ignoring non-MP4 file")); return Err(anyhow!("Ignoring non-MP4 file"));
} }
println!("Processing livestream recording: {}", path.display()); println!("Processing stable livestream recording: {}", path.display());
// Wait for file to be fully copied
self.wait_for_file_ready(&path).await?;
// Get the filename // Get the filename
let filename = path.file_name() let filename = path.file_name()
@ -121,106 +115,87 @@ impl LivestreamArchiver {
// Check for existing files // Check for existing files
let divine_worship_file = month_dir.join(format!( let divine_worship_file = month_dir.join(format!(
"Divine Worship Service - RTSDA | {}.mp4", "{} ({}).mp4",
self.divine_worship_name,
date.format("%B %d %Y") date.format("%B %d %Y")
)); ));
let afternoon_program_file = month_dir.join(format!( let afternoon_program_file = month_dir.join(format!(
"Afternoon Program - RTSDA | {}.mp4", "{} ({}).mp4",
self.afternoon_program_name,
date.format("%B %d %Y") date.format("%B %d %Y")
)); ));
// Determine which filename to use // Determine which filename to use
let (base_filename, nfo_title, nfo_tag) = if !divine_worship_file.exists() { let (base_filename, nfo_title, nfo_tag) = if !divine_worship_file.exists() {
( (
format!("Divine Worship Service - RTSDA | {}", date.format("%B %d %Y")), format!("{} ({})", self.divine_worship_name, date.format("%B %d %Y")),
format!("Divine Worship Service - RTSDA | {}", date.format("%B %-d %Y")), format!("{} ({})", self.divine_worship_name, date.format("%B %-d %Y")),
"Divine Worship Service" "Divine Worship Service"
) )
} else if !afternoon_program_file.exists() { } else if !afternoon_program_file.exists() {
( (
format!("Afternoon Program - RTSDA | {}", date.format("%B %d %Y")), format!("{} ({})", self.afternoon_program_name, date.format("%B %d %Y")),
format!("Afternoon Program - RTSDA | {}", date.format("%B %-d %Y")), format!("{} ({})", self.afternoon_program_name, date.format("%B %-d %Y")),
"Afternoon Program" "Afternoon Program"
) )
} else { } else {
// Both exist, add suffix to Afternoon Program // Both exist, add suffix to Afternoon Program
let mut suffix = 1; let mut suffix = 1;
let mut test_file = month_dir.join(format!( let mut test_file = month_dir.join(format!(
"Afternoon Program - RTSDA | {} ({}).mp4", "{} ({}) ({}).mp4",
self.afternoon_program_name,
date.format("%B %d %Y"), date.format("%B %d %Y"),
suffix suffix
)); ));
while test_file.exists() { while test_file.exists() {
suffix += 1; suffix += 1;
test_file = month_dir.join(format!( test_file = month_dir.join(format!(
"Afternoon Program - RTSDA | {} ({}).mp4", "{} ({}) ({}).mp4",
self.afternoon_program_name,
date.format("%B %d %Y"), date.format("%B %d %Y"),
suffix suffix
)); ));
} }
( (
format!("Afternoon Program - RTSDA | {} ({})", date.format("%B %d %Y"), suffix), format!("{} ({}) ({})", self.afternoon_program_name, date.format("%B %d %Y"), suffix),
format!("Afternoon Program - RTSDA | {} ({})", date.format("%B %-d %Y"), suffix), format!("{} ({}) ({})", self.afternoon_program_name, date.format("%B %-d %Y"), suffix),
"Afternoon Program" "Afternoon Program"
) )
}; };
let output_file = month_dir.join(format!("{}.mp4", base_filename)); let output_file = month_dir.join(format!("{}.mp4", base_filename));
println!("Converting to AV1 and saving to: {}", output_file.display()); // Convert video using shared crate
self.video_converter.convert_video(&path, &output_file).await?;
// Build ffmpeg command for AV1 conversion using QSV // Create NFO file using shared crate
let status = Command::new("ffmpeg") let date_only = date.date();
.arg("-init_hw_device").arg("qsv=hw") self.nfo_generator.create_nfo_file(&output_file, &nfo_title, &date_only, Some(nfo_tag)).await?;
.arg("-filter_hw_device").arg("hw")
.arg("-hwaccel").arg("qsv")
.arg("-hwaccel_output_format").arg("qsv")
.arg("-i").arg(&path)
.arg("-c:v").arg("av1_qsv")
.arg("-preset").arg("4")
.arg("-b:v").arg("6M")
.arg("-maxrate").arg("12M")
.arg("-bufsize").arg("24M")
.arg("-c:a").arg("copy")
.arg("-n") // Never overwrite existing files
.arg(&output_file)
.status()
.await?;
if !status.success() {
return Err(anyhow!("FFmpeg conversion failed"));
}
// Create NFO file
println!("Creating NFO file...");
let nfo_content = format!(r#"<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<episodedetails>
<title>{}</title>
<showtitle>LiveStreams</showtitle>
<season>{}</season>
<episode>{}</episode>
<aired>{}</aired>
<displayseason>{}</displayseason>
<displayepisode>{}</displayepisode>
<tag>{}</tag>
</episodedetails>"#,
nfo_title,
date.format("%Y").to_string(),
date.format("%m%d").to_string(),
date.format("%Y-%m-%d"),
date.format("%Y"),
date.format("%m%d"),
nfo_tag
);
let nfo_path = output_file.with_extension("nfo");
tokio::fs::write(nfo_path, nfo_content).await?;
println!("Successfully converted {} to AV1 and created NFO", path.display()); println!("Successfully converted {} to AV1 and created NFO", path.display());
// Clean up the file tracker since we're done with this file
self.remove_file_tracker(&path);
// Don't delete original file // Don't delete original file
println!("Original file preserved at: {}", path.display()); println!("Original file preserved at: {}", path.display());
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait]
impl FileProcessor for LivestreamArchiver {
async fn process_file(&self, path: PathBuf) -> Result<()> {
self.process_file(path).await
}
async fn should_skip_existing_file(&self, path: &PathBuf) -> bool {
if let Some(filename) = path.file_name().and_then(|f| f.to_str()) {
if let Ok(date) = self.extract_date_from_filename(filename).await {
return self.check_if_file_already_processed(filename, &date);
}
}
false
}
}

172
tests/integration_tests.rs Normal file
View file

@ -0,0 +1,172 @@
use std::env;
use std::fs;
use std::path::PathBuf;
use tempfile::TempDir;
use chrono::{NaiveDateTime, Datelike, Timelike};
use livestream_archiver::LivestreamArchiver;
#[tokio::test]
async fn test_livestream_archiver_creation() {
let temp_output = TempDir::new().unwrap();
// Test that we can create a new archiver
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
assert_eq!(archiver.get_output_path(), temp_output.path());
}
#[tokio::test]
async fn test_date_extraction_from_filename() {
let temp_output = TempDir::new().unwrap();
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
// Test valid filename
let result = archiver.extract_date_from_filename("2024-12-25_14-30-45.mp4").await;
assert!(result.is_ok());
let date = result.unwrap();
assert_eq!(date.year(), 2024);
assert_eq!(date.month(), 12);
assert_eq!(date.day(), 25);
assert_eq!(date.hour(), 14);
assert_eq!(date.minute(), 30);
assert_eq!(date.second(), 45);
// Test invalid filename
let result = archiver.extract_date_from_filename("invalid-filename.mp4").await;
assert!(result.is_err());
// Test non-mp4 file
let result = archiver.extract_date_from_filename("2024-12-25_14-30-45.avi").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_file_already_processed_detection() {
let temp_output = TempDir::new().unwrap();
// Set up environment variables for program names
env::set_var("DIVINE_WORSHIP_NAME", "Test Divine Worship");
env::set_var("AFTERNOON_PROGRAM_NAME", "Test Afternoon Program");
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
let test_date = NaiveDateTime::parse_from_str("2024-12-25_14-30-45", "%Y-%m-%d_%H-%M-%S").unwrap();
// Initially no files should exist
assert!(!archiver.check_if_file_already_processed("2024-12-25_14-30-45.mp4", &test_date));
// Create the expected output directory structure
let year_dir = temp_output.path().join("2024");
let month_dir = year_dir.join("12-December");
fs::create_dir_all(&month_dir).unwrap();
// Create a divine worship file
let divine_worship_file = month_dir.join("Test Divine Worship (December 25 2024).mp4");
fs::write(&divine_worship_file, "test content").unwrap();
// Now it should detect the file as processed
assert!(archiver.check_if_file_already_processed("2024-12-25_14-30-45.mp4", &test_date));
// Clean up environment variables
env::remove_var("DIVINE_WORSHIP_NAME");
env::remove_var("AFTERNOON_PROGRAM_NAME");
}
#[tokio::test]
async fn test_environment_variable_configuration() {
// Test with custom environment variables
env::set_var("DIVINE_WORSHIP_NAME", "Custom Divine Service");
env::set_var("AFTERNOON_PROGRAM_NAME", "Custom Afternoon Show");
env::set_var("SHOW_TITLE", "Custom Livestreams");
let temp_output = TempDir::new().unwrap();
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
// These are internal fields, so we test them indirectly through the file processing logic
let test_date = NaiveDateTime::parse_from_str("2024-12-25_14-30-45", "%Y-%m-%d_%H-%M-%S").unwrap();
// The archiver should use the custom names when checking for existing files
assert!(!archiver.check_if_file_already_processed("test.mp4", &test_date));
// Create expected directory structure
let year_dir = temp_output.path().join("2024");
let month_dir = year_dir.join("12-December");
fs::create_dir_all(&month_dir).unwrap();
// Create file with custom name
let custom_file = month_dir.join("Custom Divine Service (December 25 2024).mp4");
fs::write(&custom_file, "test content").unwrap();
// Should now detect as processed
assert!(archiver.check_if_file_already_processed("test.mp4", &test_date));
// Clean up
env::remove_var("DIVINE_WORSHIP_NAME");
env::remove_var("AFTERNOON_PROGRAM_NAME");
env::remove_var("SHOW_TITLE");
}
#[tokio::test]
async fn test_stability_tracking_integration() {
let temp_output = TempDir::new().unwrap();
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
let temp_input = TempDir::new().unwrap();
let test_file = temp_input.path().join("2024-12-25_14-30-45.mp4");
// Create initial file
fs::write(&test_file, "initial content").unwrap();
// Test that stability tracking starts
let result = archiver.on_file_event(&test_file);
assert!(result.is_ok());
assert!(!result.unwrap(), "New file should not be immediately stable");
// File should be tracked
let tracked_files = archiver.get_tracked_files();
assert_eq!(tracked_files.len(), 1);
assert_eq!(tracked_files[0], test_file);
// Remove tracking
archiver.remove_file_tracker(&test_file);
assert_eq!(archiver.get_tracked_files().len(), 0);
}
#[tokio::test]
async fn test_directory_structure_creation() {
let temp_output = TempDir::new().unwrap();
// Test date parsing and directory structure
let test_cases = vec![
("2024-01-15_10-30-00.mp4", "2024", "01-January"),
("2024-12-31_23-59-59.mp4", "2024", "12-December"),
("2023-07-04_12-00-00.mp4", "2023", "07-July"),
];
for (filename, expected_year, expected_month) in test_cases {
let archiver = LivestreamArchiver::new(temp_output.path().to_path_buf());
let date = archiver.extract_date_from_filename(filename).await.unwrap();
// The directory structure should match what the archiver expects
let year_dir = temp_output.path().join(expected_year);
let month_dir = year_dir.join(expected_month);
// Create the directory structure as the archiver would
fs::create_dir_all(&month_dir).unwrap();
// Verify structure exists
assert!(year_dir.exists());
assert!(month_dir.exists());
// Test file detection logic
assert!(!archiver.check_if_file_already_processed(filename, &date));
}
}
// Helper function to create mock video files for testing
fn create_mock_mp4_file(path: &PathBuf, size_mb: usize) -> std::io::Result<()> {
let content = vec![0u8; size_mb * 1024 * 1024]; // Create file of specified size in MB
fs::write(path, content)
}