
Complete church management system with bulletin management, media processing, live streaming integration, and web interface. Includes authentication, email notifications, database migrations, and comprehensive test suite.
513 lines
21 KiB
Rust
513 lines
21 KiB
Rust
use church_api::services::unified_transcoding::UnifiedTranscodingService;
|
||
use gstreamer::prelude::*;
|
||
use church_api::handlers::smart_streaming::{detect_av1_support, detect_hevc_support};
|
||
use church_api::error::{ApiError, Result};
|
||
use church_api::utils::codec_detection::ClientCapabilities;
|
||
use std::path::Path;
|
||
use std::fs;
|
||
use tempfile::tempdir;
|
||
use uuid::Uuid;
|
||
|
||
/// Mock database pool for testing
|
||
fn create_mock_pool() -> sqlx::PgPool {
|
||
// This would normally be a real connection, but for testing we'll use a mock
|
||
// In a real test setup, you'd use sqlx::test or testcontainers
|
||
unimplemented!("Use a test database or mock for real tests")
|
||
}
|
||
|
||
/// Create a simple test video file for GStreamer testing
|
||
async fn create_test_video(path: &Path) -> Result<()> {
|
||
use gstreamer::prelude::*;
|
||
|
||
// Initialize GStreamer
|
||
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
|
||
|
||
// Create a simple test pipeline to generate a short video
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
// Create test pattern video source
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 300i32) // 10 seconds at 30fps
|
||
.property("pattern", 0i32) // SMPTE color bars
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?;
|
||
|
||
// Create test audio source
|
||
let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
|
||
.property("num-buffers", 441i32) // 10 seconds at 44.1kHz
|
||
.property("freq", 440.0f64) // 440Hz tone
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?;
|
||
|
||
// Video encoding pipeline
|
||
let x264enc = gstreamer::ElementFactory::make("x264enc")
|
||
.property("bitrate", 1000u32)
|
||
.property("speed-preset", 1u32) // ultrafast
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?;
|
||
|
||
// Audio encoding pipeline
|
||
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?;
|
||
|
||
let avenc_aac = gstreamer::ElementFactory::make("avenc_aac")
|
||
.property("bitrate", 128000i32)
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?;
|
||
|
||
// MP4 muxer and file sink
|
||
let mp4mux = gstreamer::ElementFactory::make("mp4mux")
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?;
|
||
|
||
let filesink = gstreamer::ElementFactory::make("filesink")
|
||
.property("location", path.to_str().unwrap())
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?;
|
||
|
||
// Add elements to pipeline
|
||
pipeline.add_many([
|
||
&videotestsrc, &x264enc, &audiotestsrc, &audioconvert,
|
||
&avenc_aac, &mp4mux, &filesink
|
||
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
|
||
|
||
// Link video chain
|
||
videotestsrc.link(&x264enc)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link video chain: {}", e)))?;
|
||
x264enc.link(&mp4mux)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link video to mux: {}", e)))?;
|
||
|
||
// Link audio chain
|
||
gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac])
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link audio chain: {}", e)))?;
|
||
avenc_aac.link(&mp4mux)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link audio to mux: {}", e)))?;
|
||
|
||
mp4mux.link(&filesink)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link mux to sink: {}", e)))?;
|
||
|
||
// Run pipeline
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {}", e)))?;
|
||
|
||
// Wait for completion
|
||
let bus = pipeline.bus().unwrap();
|
||
let timeout = gstreamer::ClockTime::from_seconds(30);
|
||
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) {
|
||
Some(msg) => {
|
||
match msg.view() {
|
||
gstreamer::MessageView::Eos(..) => {
|
||
println!("✅ Test video created successfully");
|
||
}
|
||
gstreamer::MessageView::Error(err) => {
|
||
return Err(ApiError::Internal(format!("GStreamer error: {}", err.error())));
|
||
}
|
||
_ => {}
|
||
}
|
||
}
|
||
None => {
|
||
return Err(ApiError::Internal("Test video creation timed out".to_string()));
|
||
}
|
||
}
|
||
|
||
// Clean up
|
||
pipeline.set_state(gstreamer::State::Null)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {}", e)))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use std::time::Instant;
|
||
|
||
#[tokio::test]
|
||
async fn test_gstreamer_initialization() {
|
||
let result = gstreamer::init();
|
||
assert!(result.is_ok(), "GStreamer should initialize successfully");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_client_capability_detection() {
|
||
// Test AV1 support detection
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"));
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0"));
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Edg/120.0.0.0"));
|
||
assert!(!detect_av1_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
|
||
|
||
// Test HEVC support detection
|
||
assert!(detect_hevc_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
|
||
assert!(detect_hevc_support("Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15"));
|
||
assert!(detect_hevc_support("Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36"));
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_gstreamer_element_availability() {
|
||
// Initialize GStreamer for testing
|
||
let _ = gstreamer::init();
|
||
|
||
// Test that key elements are available
|
||
assert!(gstreamer::ElementFactory::find("filesrc").is_some(), "filesrc should be available");
|
||
assert!(gstreamer::ElementFactory::find("filesink").is_some(), "filesink should be available");
|
||
assert!(gstreamer::ElementFactory::find("qtdemux").is_some(), "qtdemux should be available");
|
||
assert!(gstreamer::ElementFactory::find("mp4mux").is_some(), "mp4mux should be available");
|
||
assert!(gstreamer::ElementFactory::find("mpegtsmux").is_some(), "mpegtsmux should be available");
|
||
|
||
// Test codec availability (these might not be available in CI)
|
||
let has_x264 = gstreamer::ElementFactory::find("x264enc").is_some();
|
||
let has_aac = gstreamer::ElementFactory::find("avenc_aac").is_some();
|
||
println!("x264enc available: {}", has_x264);
|
||
println!("avenc_aac available: {}", has_aac);
|
||
|
||
// Test hardware acceleration availability
|
||
let has_vaapi_h264 = gstreamer::ElementFactory::find("vaapih264enc").is_some();
|
||
let has_vaapi_av1 = gstreamer::ElementFactory::find("vaapidecode_av1").is_some();
|
||
println!("vaapih264enc available: {}", has_vaapi_h264);
|
||
println!("vaapidecode_av1 available: {}", has_vaapi_av1);
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_create_simple_pipeline() {
|
||
// Initialize GStreamer
|
||
let _ = gstreamer::init();
|
||
|
||
// Create a simple test pipeline to verify GStreamer works
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 10i32)
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
// Test state changes
|
||
assert!(pipeline.set_state(gstreamer::State::Ready).is_ok());
|
||
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
|
||
assert!(pipeline.set_state(gstreamer::State::Playing).is_ok());
|
||
|
||
// Let it run briefly
|
||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||
|
||
assert!(pipeline.set_state(gstreamer::State::Null).is_ok());
|
||
|
||
println!("✅ Simple GStreamer pipeline test passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_video_file_creation() {
|
||
let temp_dir = tempdir().expect("Failed to create temp dir");
|
||
let video_path = temp_dir.path().join("test_video.mp4");
|
||
|
||
// Only run this test if we have the required encoders
|
||
let _ = gstreamer::init();
|
||
if !gstreamer::ElementFactory::find("x264enc").is_some() ||
|
||
!gstreamer::ElementFactory::find("avenc_aac").is_some() {
|
||
println!("⚠️ Skipping video creation test - encoders not available");
|
||
return;
|
||
}
|
||
|
||
let result = create_test_video(&video_path).await;
|
||
assert!(result.is_ok(), "Test video creation should succeed");
|
||
assert!(video_path.exists(), "Test video file should exist");
|
||
|
||
let metadata = fs::metadata(&video_path).expect("Failed to get file metadata");
|
||
assert!(metadata.len() > 1000, "Video file should be substantial size");
|
||
|
||
println!("✅ Test video created: {} bytes", metadata.len());
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_streaming_transcoding_service_creation() {
|
||
// Test that we can create the streaming service without a real DB
|
||
// This tests the service initialization logic
|
||
|
||
// Note: In a real test you'd use a test database
|
||
// For now we'll just test what we can without DB dependency
|
||
|
||
// Test segment creation logic
|
||
let media_id = Uuid::new_v4();
|
||
let segment = StreamingSegment {
|
||
index: 0,
|
||
start_time: 0.0,
|
||
duration: 10.0,
|
||
status: SegmentStatus::NotStarted,
|
||
file_path: None,
|
||
};
|
||
|
||
assert_eq!(segment.index, 0);
|
||
assert_eq!(segment.start_time, 0.0);
|
||
assert_eq!(segment.duration, 10.0);
|
||
assert_eq!(segment.status, SegmentStatus::NotStarted);
|
||
|
||
println!("✅ Streaming transcoding service structures work correctly");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_performance_benchmarks() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Benchmark GStreamer initialization time
|
||
let start = Instant::now();
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
let init_time = start.elapsed();
|
||
|
||
println!("GStreamer pipeline creation time: {:?}", init_time);
|
||
assert!(init_time.as_millis() < 100, "Pipeline creation should be fast");
|
||
|
||
// Test element creation performance
|
||
let start = Instant::now();
|
||
let _filesrc = gstreamer::ElementFactory::make("filesrc").build();
|
||
let _qtdemux = gstreamer::ElementFactory::make("qtdemux").build();
|
||
let _queue = gstreamer::ElementFactory::make("queue").build();
|
||
let element_time = start.elapsed();
|
||
|
||
println!("Element creation time: {:?}", element_time);
|
||
assert!(element_time.as_millis() < 50, "Element creation should be very fast");
|
||
|
||
drop(pipeline);
|
||
println!("✅ Performance benchmarks passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_error_handling() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Test handling of invalid elements
|
||
let result = gstreamer::ElementFactory::make("nonexistent-element").build();
|
||
assert!(result.is_err(), "Should fail to create nonexistent element");
|
||
|
||
// Test pipeline with incompatible elements
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
|
||
.build()
|
||
.expect("Failed to create audioconvert");
|
||
|
||
pipeline.add_many([&videotestsrc, &audioconvert])
|
||
.expect("Failed to add elements");
|
||
|
||
// This should fail - can't link video to audio converter
|
||
let link_result = videotestsrc.link(&audioconvert);
|
||
assert!(link_result.is_err(), "Should fail to link incompatible elements");
|
||
|
||
println!("✅ Error handling tests passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_hardware_acceleration_detection() {
|
||
// Test VA-API detection
|
||
let vaapi_available = std::path::Path::new("/dev/dri/renderD128").exists();
|
||
println!("VA-API hardware acceleration available: {}", vaapi_available);
|
||
|
||
// Test Intel QSV environment
|
||
let libva_driver = std::env::var("LIBVA_DRIVER_NAME").unwrap_or_default();
|
||
let libva_path = std::env::var("LIBVA_DRIVERS_PATH").unwrap_or_default();
|
||
|
||
println!("LIBVA_DRIVER_NAME: {}", libva_driver);
|
||
println!("LIBVA_DRIVERS_PATH: {}", libva_path);
|
||
|
||
// In CI this will likely be false, but in production it should be true
|
||
if vaapi_available {
|
||
println!("✅ Hardware acceleration available");
|
||
} else {
|
||
println!("ℹ️ Hardware acceleration not available (normal in CI)");
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_memory_cleanup() {
|
||
// Test that GStreamer pipelines clean up properly
|
||
let _ = gstreamer::init();
|
||
|
||
// Create and destroy multiple pipelines
|
||
for i in 0..10 {
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 1i32)
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
// Quick run
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.expect("Failed to start pipeline");
|
||
|
||
// Wait briefly
|
||
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||
|
||
// Clean shutdown
|
||
pipeline.set_state(gstreamer::State::Null)
|
||
.expect("Failed to stop pipeline");
|
||
|
||
drop(pipeline);
|
||
|
||
if i % 5 == 0 {
|
||
println!("Created and cleaned up {} pipelines", i + 1);
|
||
}
|
||
}
|
||
|
||
println!("✅ Memory cleanup test completed - no leaks expected");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_codec_specific_elements() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Test AV1 decoder availability and creation
|
||
if gstreamer::ElementFactory::find("av1dec").is_some() {
|
||
let av1dec = gstreamer::ElementFactory::make("av1dec").build();
|
||
assert!(av1dec.is_ok(), "Should be able to create av1dec");
|
||
println!("✅ AV1 software decoder available");
|
||
} else {
|
||
println!("ℹ️ AV1 software decoder not available");
|
||
}
|
||
|
||
if gstreamer::ElementFactory::find("vaapidecode_av1").is_some() {
|
||
let vaapi_av1 = gstreamer::ElementFactory::make("vaapidecode_av1").build();
|
||
assert!(vaapi_av1.is_ok(), "Should be able to create vaapidecode_av1");
|
||
println!("✅ AV1 hardware decoder available");
|
||
} else {
|
||
println!("ℹ️ AV1 hardware decoder not available");
|
||
}
|
||
|
||
// Test H.264 encoder availability
|
||
if gstreamer::ElementFactory::find("x264enc").is_some() {
|
||
let x264enc = gstreamer::ElementFactory::make("x264enc").build();
|
||
assert!(x264enc.is_ok(), "Should be able to create x264enc");
|
||
println!("✅ H.264 software encoder available");
|
||
} else {
|
||
println!("⚠️ H.264 software encoder not available");
|
||
}
|
||
|
||
if gstreamer::ElementFactory::find("vaapih264enc").is_some() {
|
||
let vaapi_h264 = gstreamer::ElementFactory::make("vaapih264enc").build();
|
||
assert!(vaapi_h264.is_ok(), "Should be able to create vaapih264enc");
|
||
println!("✅ H.264 hardware encoder available");
|
||
} else {
|
||
println!("ℹ️ H.264 hardware encoder not available");
|
||
}
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod integration_tests {
|
||
use super::*;
|
||
|
||
/// Integration test that requires a real video file
|
||
/// This should be run manually with a test video file
|
||
#[ignore] // Ignored by default since it requires external file
|
||
#[tokio::test]
|
||
async fn test_real_video_transcoding() {
|
||
let test_video_path = "/tmp/test_video.mp4"; // You need to provide this
|
||
|
||
if !Path::new(test_video_path).exists() {
|
||
println!("⚠️ Test video not found at {}, skipping integration test", test_video_path);
|
||
return;
|
||
}
|
||
|
||
let temp_dir = tempdir().expect("Failed to create temp dir");
|
||
let output_path = temp_dir.path().join("transcoded_segment.ts");
|
||
|
||
// Test the actual transcoding function
|
||
// Note: This would normally use the real function from smart_streaming
|
||
// but we can't easily import it due to module structure
|
||
|
||
println!("✅ Would test real video transcoding with file: {}", test_video_path);
|
||
println!("✅ Output would go to: {}", output_path.display());
|
||
|
||
// In a real integration test, you would:
|
||
// 1. Call transcode_hls_segment_gstreamer()
|
||
// 2. Verify the output file exists and is valid
|
||
// 3. Check the transcoding time is reasonable
|
||
// 4. Verify the output format is correct
|
||
}
|
||
|
||
/// Load test for concurrent transcoding
|
||
#[ignore] // Resource intensive test
|
||
#[tokio::test]
|
||
async fn test_concurrent_transcoding_performance() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Simulate multiple concurrent transcoding requests
|
||
let mut handles = vec![];
|
||
|
||
for i in 0..5 {
|
||
let handle = tokio::spawn(async move {
|
||
let start = Instant::now();
|
||
|
||
// Create a quick test pipeline per "request"
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 30i32) // 1 second at 30fps
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.expect("Failed to start pipeline");
|
||
|
||
// Wait for completion
|
||
let bus = pipeline.bus().unwrap();
|
||
let timeout = gstreamer::ClockTime::from_seconds(5);
|
||
|
||
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Eos]) {
|
||
Some(_) => {
|
||
let elapsed = start.elapsed();
|
||
println!("Pipeline {} completed in {:?}", i, elapsed);
|
||
elapsed
|
||
}
|
||
None => {
|
||
println!("Pipeline {} timed out", i);
|
||
start.elapsed()
|
||
}
|
||
}
|
||
});
|
||
|
||
handles.push(handle);
|
||
}
|
||
|
||
// Wait for all to complete
|
||
let mut total_time = std::time::Duration::new(0, 0);
|
||
for handle in handles {
|
||
let elapsed = handle.await.expect("Task should complete");
|
||
total_time += elapsed;
|
||
}
|
||
|
||
println!("✅ Concurrent test completed. Average time: {:?}", total_time / 5);
|
||
}
|
||
} |