use gstreamer::prelude::*; use church_api::handlers::smart_streaming::{detect_av1_support, detect_hevc_support}; use church_api::error::{ApiError, Result}; use church_api::utils::codec_detection::ClientCapabilities; use std::path::Path; use std::fs; use tempfile::tempdir; use uuid::Uuid; /// Mock database pool for testing fn create_mock_pool() -> sqlx::PgPool { // This would normally be a real connection, but for testing we'll use a mock // In a real test setup, you'd use sqlx::test or testcontainers unimplemented!("Use a test database or mock for real tests") } /// Create a simple test video file for GStreamer testing async fn create_test_video(path: &Path) -> Result<()> { use gstreamer::prelude::*; // Initialize GStreamer gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?; // Create a simple test pipeline to generate a short video let pipeline = gstreamer::Pipeline::new(); // Create test pattern video source let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 300i32) // 10 seconds at 30fps .property("pattern", 0i32) // SMPTE color bars .build() .map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?; // Create test audio source let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc") .property("num-buffers", 441i32) // 10 seconds at 44.1kHz .property("freq", 440.0f64) // 440Hz tone .build() .map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?; // Video encoding pipeline let x264enc = gstreamer::ElementFactory::make("x264enc") .property("bitrate", 1000u32) .property("speed-preset", 1u32) // ultrafast .build() .map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?; // Audio encoding pipeline let audioconvert = gstreamer::ElementFactory::make("audioconvert") .build() .map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?; let avenc_aac = gstreamer::ElementFactory::make("avenc_aac") .property("bitrate", 128000i32) .build() .map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?; // MP4 muxer and file sink let mp4mux = gstreamer::ElementFactory::make("mp4mux") .build() .map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?; let filesink = gstreamer::ElementFactory::make("filesink") .property("location", path.to_str().unwrap()) .build() .map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?; // Add elements to pipeline pipeline.add_many([ &videotestsrc, &x264enc, &audiotestsrc, &audioconvert, &avenc_aac, &mp4mux, &filesink ]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?; // Link video chain videotestsrc.link(&x264enc) .map_err(|e| ApiError::Internal(format!("Failed to link video chain: {}", e)))?; x264enc.link(&mp4mux) .map_err(|e| ApiError::Internal(format!("Failed to link video to mux: {}", e)))?; // Link audio chain gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac]) .map_err(|e| ApiError::Internal(format!("Failed to link audio chain: {}", e)))?; avenc_aac.link(&mp4mux) .map_err(|e| ApiError::Internal(format!("Failed to link audio to mux: {}", e)))?; mp4mux.link(&filesink) .map_err(|e| ApiError::Internal(format!("Failed to link mux to sink: {}", e)))?; // Run pipeline pipeline.set_state(gstreamer::State::Playing) .map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {}", e)))?; // Wait for completion let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(30); match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) { Some(msg) => { match msg.view() { gstreamer::MessageView::Eos(..) => { println!("✅ Test video created successfully"); } gstreamer::MessageView::Error(err) => { return Err(ApiError::Internal(format!("GStreamer error: {}", err.error()))); } _ => {} } } None => { return Err(ApiError::Internal("Test video creation timed out".to_string())); } } // Clean up pipeline.set_state(gstreamer::State::Null) .map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {}", e)))?; Ok(()) } #[cfg(test)] mod tests { use super::*; use std::time::Instant; #[tokio::test] async fn test_gstreamer_initialization() { let result = gstreamer::init(); assert!(result.is_ok(), "GStreamer should initialize successfully"); } #[tokio::test] async fn test_client_capability_detection() { // Test AV1 support detection assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")); assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0")); assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Edg/120.0.0.0")); assert!(!detect_av1_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15")); // Test HEVC support detection assert!(detect_hevc_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15")); assert!(detect_hevc_support("Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15")); assert!(detect_hevc_support("Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36")); } #[tokio::test] async fn test_gstreamer_element_availability() { // Initialize GStreamer for testing let _ = gstreamer::init(); // Test that key elements are available assert!(gstreamer::ElementFactory::find("filesrc").is_some(), "filesrc should be available"); assert!(gstreamer::ElementFactory::find("filesink").is_some(), "filesink should be available"); assert!(gstreamer::ElementFactory::find("qtdemux").is_some(), "qtdemux should be available"); assert!(gstreamer::ElementFactory::find("mp4mux").is_some(), "mp4mux should be available"); assert!(gstreamer::ElementFactory::find("mpegtsmux").is_some(), "mpegtsmux should be available"); // Test codec availability (these might not be available in CI) let has_x264 = gstreamer::ElementFactory::find("x264enc").is_some(); let has_aac = gstreamer::ElementFactory::find("avenc_aac").is_some(); println!("x264enc available: {}", has_x264); println!("avenc_aac available: {}", has_aac); // Test hardware acceleration availability let has_vaapi_h264 = gstreamer::ElementFactory::find("vaapih264enc").is_some(); let has_vaapi_av1 = gstreamer::ElementFactory::find("vaapidecode_av1").is_some(); println!("vaapih264enc available: {}", has_vaapi_h264); println!("vaapidecode_av1 available: {}", has_vaapi_av1); } #[tokio::test] async fn test_create_simple_pipeline() { // Initialize GStreamer let _ = gstreamer::init(); // Create a simple test pipeline to verify GStreamer works let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 10i32) .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]) .expect("Failed to add elements"); videotestsrc.link(&fakesink) .expect("Failed to link elements"); // Test state changes assert!(pipeline.set_state(gstreamer::State::Ready).is_ok()); assert!(pipeline.set_state(gstreamer::State::Paused).is_ok()); assert!(pipeline.set_state(gstreamer::State::Playing).is_ok()); // Let it run briefly tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; assert!(pipeline.set_state(gstreamer::State::Null).is_ok()); println!("✅ Simple GStreamer pipeline test passed"); } #[tokio::test] async fn test_video_file_creation() { let temp_dir = tempdir().expect("Failed to create temp dir"); let video_path = temp_dir.path().join("test_video.mp4"); // Only run this test if we have the required encoders let _ = gstreamer::init(); if !gstreamer::ElementFactory::find("x264enc").is_some() || !gstreamer::ElementFactory::find("avenc_aac").is_some() { println!("⚠️ Skipping video creation test - encoders not available"); return; } let result = create_test_video(&video_path).await; assert!(result.is_ok(), "Test video creation should succeed"); assert!(video_path.exists(), "Test video file should exist"); let metadata = fs::metadata(&video_path).expect("Failed to get file metadata"); assert!(metadata.len() > 1000, "Video file should be substantial size"); println!("✅ Test video created: {} bytes", metadata.len()); } #[tokio::test] async fn test_streaming_transcoding_service_creation() { // Test that we can create the streaming service without a real DB // This tests the service initialization logic // Note: In a real test you'd use a test database // For now we'll just test what we can without DB dependency // Test segment creation logic let media_id = Uuid::new_v4(); let segment = StreamingSegment { index: 0, start_time: 0.0, duration: 10.0, status: SegmentStatus::NotStarted, file_path: None, }; assert_eq!(segment.index, 0); assert_eq!(segment.start_time, 0.0); assert_eq!(segment.duration, 10.0); assert_eq!(segment.status, SegmentStatus::NotStarted); println!("✅ Streaming transcoding service structures work correctly"); } #[tokio::test] async fn test_performance_benchmarks() { let _ = gstreamer::init(); // Benchmark GStreamer initialization time let start = Instant::now(); let pipeline = gstreamer::Pipeline::new(); let init_time = start.elapsed(); println!("GStreamer pipeline creation time: {:?}", init_time); assert!(init_time.as_millis() < 100, "Pipeline creation should be fast"); // Test element creation performance let start = Instant::now(); let _filesrc = gstreamer::ElementFactory::make("filesrc").build(); let _qtdemux = gstreamer::ElementFactory::make("qtdemux").build(); let _queue = gstreamer::ElementFactory::make("queue").build(); let element_time = start.elapsed(); println!("Element creation time: {:?}", element_time); assert!(element_time.as_millis() < 50, "Element creation should be very fast"); drop(pipeline); println!("✅ Performance benchmarks passed"); } #[tokio::test] async fn test_error_handling() { let _ = gstreamer::init(); // Test handling of invalid elements let result = gstreamer::ElementFactory::make("nonexistent-element").build(); assert!(result.is_err(), "Should fail to create nonexistent element"); // Test pipeline with incompatible elements let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .build() .expect("Failed to create videotestsrc"); let audioconvert = gstreamer::ElementFactory::make("audioconvert") .build() .expect("Failed to create audioconvert"); pipeline.add_many([&videotestsrc, &audioconvert]) .expect("Failed to add elements"); // This should fail - can't link video to audio converter let link_result = videotestsrc.link(&audioconvert); assert!(link_result.is_err(), "Should fail to link incompatible elements"); println!("✅ Error handling tests passed"); } #[tokio::test] async fn test_hardware_acceleration_detection() { // Test VA-API detection let vaapi_available = std::path::Path::new("/dev/dri/renderD128").exists(); println!("VA-API hardware acceleration available: {}", vaapi_available); // Test Intel QSV environment let libva_driver = std::env::var("LIBVA_DRIVER_NAME").unwrap_or_default(); let libva_path = std::env::var("LIBVA_DRIVERS_PATH").unwrap_or_default(); println!("LIBVA_DRIVER_NAME: {}", libva_driver); println!("LIBVA_DRIVERS_PATH: {}", libva_path); // In CI this will likely be false, but in production it should be true if vaapi_available { println!("✅ Hardware acceleration available"); } else { println!("ℹ️ Hardware acceleration not available (normal in CI)"); } } #[tokio::test] async fn test_memory_cleanup() { // Test that GStreamer pipelines clean up properly let _ = gstreamer::init(); // Create and destroy multiple pipelines for i in 0..10 { let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 1i32) .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]) .expect("Failed to add elements"); videotestsrc.link(&fakesink) .expect("Failed to link elements"); // Quick run pipeline.set_state(gstreamer::State::Playing) .expect("Failed to start pipeline"); // Wait briefly tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; // Clean shutdown pipeline.set_state(gstreamer::State::Null) .expect("Failed to stop pipeline"); drop(pipeline); if i % 5 == 0 { println!("Created and cleaned up {} pipelines", i + 1); } } println!("✅ Memory cleanup test completed - no leaks expected"); } #[tokio::test] async fn test_codec_specific_elements() { let _ = gstreamer::init(); // Test AV1 decoder availability and creation if gstreamer::ElementFactory::find("av1dec").is_some() { let av1dec = gstreamer::ElementFactory::make("av1dec").build(); assert!(av1dec.is_ok(), "Should be able to create av1dec"); println!("✅ AV1 software decoder available"); } else { println!("ℹ️ AV1 software decoder not available"); } if gstreamer::ElementFactory::find("vaapidecode_av1").is_some() { let vaapi_av1 = gstreamer::ElementFactory::make("vaapidecode_av1").build(); assert!(vaapi_av1.is_ok(), "Should be able to create vaapidecode_av1"); println!("✅ AV1 hardware decoder available"); } else { println!("ℹ️ AV1 hardware decoder not available"); } // Test H.264 encoder availability if gstreamer::ElementFactory::find("x264enc").is_some() { let x264enc = gstreamer::ElementFactory::make("x264enc").build(); assert!(x264enc.is_ok(), "Should be able to create x264enc"); println!("✅ H.264 software encoder available"); } else { println!("⚠️ H.264 software encoder not available"); } if gstreamer::ElementFactory::find("vaapih264enc").is_some() { let vaapi_h264 = gstreamer::ElementFactory::make("vaapih264enc").build(); assert!(vaapi_h264.is_ok(), "Should be able to create vaapih264enc"); println!("✅ H.264 hardware encoder available"); } else { println!("ℹ️ H.264 hardware encoder not available"); } } } #[cfg(test)] mod integration_tests { use super::*; /// Integration test that requires a real video file /// This should be run manually with a test video file #[ignore] // Ignored by default since it requires external file #[tokio::test] async fn test_real_video_transcoding() { let test_video_path = "/tmp/test_video.mp4"; // You need to provide this if !Path::new(test_video_path).exists() { println!("⚠️ Test video not found at {}, skipping integration test", test_video_path); return; } let temp_dir = tempdir().expect("Failed to create temp dir"); let output_path = temp_dir.path().join("transcoded_segment.ts"); // Test the actual transcoding function // Note: This would normally use the real function from smart_streaming // but we can't easily import it due to module structure println!("✅ Would test real video transcoding with file: {}", test_video_path); println!("✅ Output would go to: {}", output_path.display()); // In a real integration test, you would: // 1. Call transcode_hls_segment_gstreamer() // 2. Verify the output file exists and is valid // 3. Check the transcoding time is reasonable // 4. Verify the output format is correct } /// Load test for concurrent transcoding #[ignore] // Resource intensive test #[tokio::test] async fn test_concurrent_transcoding_performance() { let _ = gstreamer::init(); // Simulate multiple concurrent transcoding requests let mut handles = vec![]; for i in 0..5 { let handle = tokio::spawn(async move { let start = Instant::now(); // Create a quick test pipeline per "request" let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 30i32) // 1 second at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]) .expect("Failed to add elements"); videotestsrc.link(&fakesink) .expect("Failed to link elements"); pipeline.set_state(gstreamer::State::Playing) .expect("Failed to start pipeline"); // Wait for completion let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(5); match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Eos]) { Some(_) => { let elapsed = start.elapsed(); println!("Pipeline {} completed in {:?}", i, elapsed); elapsed } None => { println!("Pipeline {} timed out", i); start.elapsed() } } }); handles.push(handle); } // Wait for all to complete let mut total_time = std::time::Duration::new(0, 0); for handle in handles { let elapsed = handle.await.expect("Task should complete"); total_time += elapsed; } println!("✅ Concurrent test completed. Average time: {:?}", total_time / 5); } }