church-api/tests/gstreamer_integration_tests.rs
Benjamin Slingo cabf8aa83d Remove legacy batch transcoding infrastructure
- Remove TranscodedMedia, TranscodingStatus, and TranscodingRequest models
- Delete 184-line commented stream_media function
- Clean up phantom service imports and dead references
- Remove unused test imports for non-existent services

Total cleanup: 265 lines of dead code removed from legacy Jellyfin compatibility layer.
Smart streaming pipeline remains unchanged and functional.
2025-09-08 12:31:17 -04:00

512 lines
21 KiB
Rust
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use gstreamer::prelude::*;
use church_api::handlers::smart_streaming::{detect_av1_support, detect_hevc_support};
use church_api::error::{ApiError, Result};
use church_api::utils::codec_detection::ClientCapabilities;
use std::path::Path;
use std::fs;
use tempfile::tempdir;
use uuid::Uuid;
/// Mock database pool for testing
fn create_mock_pool() -> sqlx::PgPool {
// This would normally be a real connection, but for testing we'll use a mock
// In a real test setup, you'd use sqlx::test or testcontainers
unimplemented!("Use a test database or mock for real tests")
}
/// Create a simple test video file for GStreamer testing
async fn create_test_video(path: &Path) -> Result<()> {
use gstreamer::prelude::*;
// Initialize GStreamer
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
// Create a simple test pipeline to generate a short video
let pipeline = gstreamer::Pipeline::new();
// Create test pattern video source
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 300i32) // 10 seconds at 30fps
.property("pattern", 0i32) // SMPTE color bars
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?;
// Create test audio source
let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
.property("num-buffers", 441i32) // 10 seconds at 44.1kHz
.property("freq", 440.0f64) // 440Hz tone
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?;
// Video encoding pipeline
let x264enc = gstreamer::ElementFactory::make("x264enc")
.property("bitrate", 1000u32)
.property("speed-preset", 1u32) // ultrafast
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?;
// Audio encoding pipeline
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?;
let avenc_aac = gstreamer::ElementFactory::make("avenc_aac")
.property("bitrate", 128000i32)
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?;
// MP4 muxer and file sink
let mp4mux = gstreamer::ElementFactory::make("mp4mux")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?;
let filesink = gstreamer::ElementFactory::make("filesink")
.property("location", path.to_str().unwrap())
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?;
// Add elements to pipeline
pipeline.add_many([
&videotestsrc, &x264enc, &audiotestsrc, &audioconvert,
&avenc_aac, &mp4mux, &filesink
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
// Link video chain
videotestsrc.link(&x264enc)
.map_err(|e| ApiError::Internal(format!("Failed to link video chain: {}", e)))?;
x264enc.link(&mp4mux)
.map_err(|e| ApiError::Internal(format!("Failed to link video to mux: {}", e)))?;
// Link audio chain
gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac])
.map_err(|e| ApiError::Internal(format!("Failed to link audio chain: {}", e)))?;
avenc_aac.link(&mp4mux)
.map_err(|e| ApiError::Internal(format!("Failed to link audio to mux: {}", e)))?;
mp4mux.link(&filesink)
.map_err(|e| ApiError::Internal(format!("Failed to link mux to sink: {}", e)))?;
// Run pipeline
pipeline.set_state(gstreamer::State::Playing)
.map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {}", e)))?;
// Wait for completion
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(30);
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) {
Some(msg) => {
match msg.view() {
gstreamer::MessageView::Eos(..) => {
println!("✅ Test video created successfully");
}
gstreamer::MessageView::Error(err) => {
return Err(ApiError::Internal(format!("GStreamer error: {}", err.error())));
}
_ => {}
}
}
None => {
return Err(ApiError::Internal("Test video creation timed out".to_string()));
}
}
// Clean up
pipeline.set_state(gstreamer::State::Null)
.map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {}", e)))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Instant;
#[tokio::test]
async fn test_gstreamer_initialization() {
let result = gstreamer::init();
assert!(result.is_ok(), "GStreamer should initialize successfully");
}
#[tokio::test]
async fn test_client_capability_detection() {
// Test AV1 support detection
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"));
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0"));
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Edg/120.0.0.0"));
assert!(!detect_av1_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
// Test HEVC support detection
assert!(detect_hevc_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
assert!(detect_hevc_support("Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15"));
assert!(detect_hevc_support("Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36"));
}
#[tokio::test]
async fn test_gstreamer_element_availability() {
// Initialize GStreamer for testing
let _ = gstreamer::init();
// Test that key elements are available
assert!(gstreamer::ElementFactory::find("filesrc").is_some(), "filesrc should be available");
assert!(gstreamer::ElementFactory::find("filesink").is_some(), "filesink should be available");
assert!(gstreamer::ElementFactory::find("qtdemux").is_some(), "qtdemux should be available");
assert!(gstreamer::ElementFactory::find("mp4mux").is_some(), "mp4mux should be available");
assert!(gstreamer::ElementFactory::find("mpegtsmux").is_some(), "mpegtsmux should be available");
// Test codec availability (these might not be available in CI)
let has_x264 = gstreamer::ElementFactory::find("x264enc").is_some();
let has_aac = gstreamer::ElementFactory::find("avenc_aac").is_some();
println!("x264enc available: {}", has_x264);
println!("avenc_aac available: {}", has_aac);
// Test hardware acceleration availability
let has_vaapi_h264 = gstreamer::ElementFactory::find("vaapih264enc").is_some();
let has_vaapi_av1 = gstreamer::ElementFactory::find("vaapidecode_av1").is_some();
println!("vaapih264enc available: {}", has_vaapi_h264);
println!("vaapidecode_av1 available: {}", has_vaapi_av1);
}
#[tokio::test]
async fn test_create_simple_pipeline() {
// Initialize GStreamer
let _ = gstreamer::init();
// Create a simple test pipeline to verify GStreamer works
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 10i32)
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink])
.expect("Failed to add elements");
videotestsrc.link(&fakesink)
.expect("Failed to link elements");
// Test state changes
assert!(pipeline.set_state(gstreamer::State::Ready).is_ok());
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
assert!(pipeline.set_state(gstreamer::State::Playing).is_ok());
// Let it run briefly
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert!(pipeline.set_state(gstreamer::State::Null).is_ok());
println!("✅ Simple GStreamer pipeline test passed");
}
#[tokio::test]
async fn test_video_file_creation() {
let temp_dir = tempdir().expect("Failed to create temp dir");
let video_path = temp_dir.path().join("test_video.mp4");
// Only run this test if we have the required encoders
let _ = gstreamer::init();
if !gstreamer::ElementFactory::find("x264enc").is_some() ||
!gstreamer::ElementFactory::find("avenc_aac").is_some() {
println!("⚠️ Skipping video creation test - encoders not available");
return;
}
let result = create_test_video(&video_path).await;
assert!(result.is_ok(), "Test video creation should succeed");
assert!(video_path.exists(), "Test video file should exist");
let metadata = fs::metadata(&video_path).expect("Failed to get file metadata");
assert!(metadata.len() > 1000, "Video file should be substantial size");
println!("✅ Test video created: {} bytes", metadata.len());
}
#[tokio::test]
async fn test_streaming_transcoding_service_creation() {
// Test that we can create the streaming service without a real DB
// This tests the service initialization logic
// Note: In a real test you'd use a test database
// For now we'll just test what we can without DB dependency
// Test segment creation logic
let media_id = Uuid::new_v4();
let segment = StreamingSegment {
index: 0,
start_time: 0.0,
duration: 10.0,
status: SegmentStatus::NotStarted,
file_path: None,
};
assert_eq!(segment.index, 0);
assert_eq!(segment.start_time, 0.0);
assert_eq!(segment.duration, 10.0);
assert_eq!(segment.status, SegmentStatus::NotStarted);
println!("✅ Streaming transcoding service structures work correctly");
}
#[tokio::test]
async fn test_performance_benchmarks() {
let _ = gstreamer::init();
// Benchmark GStreamer initialization time
let start = Instant::now();
let pipeline = gstreamer::Pipeline::new();
let init_time = start.elapsed();
println!("GStreamer pipeline creation time: {:?}", init_time);
assert!(init_time.as_millis() < 100, "Pipeline creation should be fast");
// Test element creation performance
let start = Instant::now();
let _filesrc = gstreamer::ElementFactory::make("filesrc").build();
let _qtdemux = gstreamer::ElementFactory::make("qtdemux").build();
let _queue = gstreamer::ElementFactory::make("queue").build();
let element_time = start.elapsed();
println!("Element creation time: {:?}", element_time);
assert!(element_time.as_millis() < 50, "Element creation should be very fast");
drop(pipeline);
println!("✅ Performance benchmarks passed");
}
#[tokio::test]
async fn test_error_handling() {
let _ = gstreamer::init();
// Test handling of invalid elements
let result = gstreamer::ElementFactory::make("nonexistent-element").build();
assert!(result.is_err(), "Should fail to create nonexistent element");
// Test pipeline with incompatible elements
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.build()
.expect("Failed to create videotestsrc");
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
.build()
.expect("Failed to create audioconvert");
pipeline.add_many([&videotestsrc, &audioconvert])
.expect("Failed to add elements");
// This should fail - can't link video to audio converter
let link_result = videotestsrc.link(&audioconvert);
assert!(link_result.is_err(), "Should fail to link incompatible elements");
println!("✅ Error handling tests passed");
}
#[tokio::test]
async fn test_hardware_acceleration_detection() {
// Test VA-API detection
let vaapi_available = std::path::Path::new("/dev/dri/renderD128").exists();
println!("VA-API hardware acceleration available: {}", vaapi_available);
// Test Intel QSV environment
let libva_driver = std::env::var("LIBVA_DRIVER_NAME").unwrap_or_default();
let libva_path = std::env::var("LIBVA_DRIVERS_PATH").unwrap_or_default();
println!("LIBVA_DRIVER_NAME: {}", libva_driver);
println!("LIBVA_DRIVERS_PATH: {}", libva_path);
// In CI this will likely be false, but in production it should be true
if vaapi_available {
println!("✅ Hardware acceleration available");
} else {
println!(" Hardware acceleration not available (normal in CI)");
}
}
#[tokio::test]
async fn test_memory_cleanup() {
// Test that GStreamer pipelines clean up properly
let _ = gstreamer::init();
// Create and destroy multiple pipelines
for i in 0..10 {
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 1i32)
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink])
.expect("Failed to add elements");
videotestsrc.link(&fakesink)
.expect("Failed to link elements");
// Quick run
pipeline.set_state(gstreamer::State::Playing)
.expect("Failed to start pipeline");
// Wait briefly
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Clean shutdown
pipeline.set_state(gstreamer::State::Null)
.expect("Failed to stop pipeline");
drop(pipeline);
if i % 5 == 0 {
println!("Created and cleaned up {} pipelines", i + 1);
}
}
println!("✅ Memory cleanup test completed - no leaks expected");
}
#[tokio::test]
async fn test_codec_specific_elements() {
let _ = gstreamer::init();
// Test AV1 decoder availability and creation
if gstreamer::ElementFactory::find("av1dec").is_some() {
let av1dec = gstreamer::ElementFactory::make("av1dec").build();
assert!(av1dec.is_ok(), "Should be able to create av1dec");
println!("✅ AV1 software decoder available");
} else {
println!(" AV1 software decoder not available");
}
if gstreamer::ElementFactory::find("vaapidecode_av1").is_some() {
let vaapi_av1 = gstreamer::ElementFactory::make("vaapidecode_av1").build();
assert!(vaapi_av1.is_ok(), "Should be able to create vaapidecode_av1");
println!("✅ AV1 hardware decoder available");
} else {
println!(" AV1 hardware decoder not available");
}
// Test H.264 encoder availability
if gstreamer::ElementFactory::find("x264enc").is_some() {
let x264enc = gstreamer::ElementFactory::make("x264enc").build();
assert!(x264enc.is_ok(), "Should be able to create x264enc");
println!("✅ H.264 software encoder available");
} else {
println!("⚠️ H.264 software encoder not available");
}
if gstreamer::ElementFactory::find("vaapih264enc").is_some() {
let vaapi_h264 = gstreamer::ElementFactory::make("vaapih264enc").build();
assert!(vaapi_h264.is_ok(), "Should be able to create vaapih264enc");
println!("✅ H.264 hardware encoder available");
} else {
println!(" H.264 hardware encoder not available");
}
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
/// Integration test that requires a real video file
/// This should be run manually with a test video file
#[ignore] // Ignored by default since it requires external file
#[tokio::test]
async fn test_real_video_transcoding() {
let test_video_path = "/tmp/test_video.mp4"; // You need to provide this
if !Path::new(test_video_path).exists() {
println!("⚠️ Test video not found at {}, skipping integration test", test_video_path);
return;
}
let temp_dir = tempdir().expect("Failed to create temp dir");
let output_path = temp_dir.path().join("transcoded_segment.ts");
// Test the actual transcoding function
// Note: This would normally use the real function from smart_streaming
// but we can't easily import it due to module structure
println!("✅ Would test real video transcoding with file: {}", test_video_path);
println!("✅ Output would go to: {}", output_path.display());
// In a real integration test, you would:
// 1. Call transcode_hls_segment_gstreamer()
// 2. Verify the output file exists and is valid
// 3. Check the transcoding time is reasonable
// 4. Verify the output format is correct
}
/// Load test for concurrent transcoding
#[ignore] // Resource intensive test
#[tokio::test]
async fn test_concurrent_transcoding_performance() {
let _ = gstreamer::init();
// Simulate multiple concurrent transcoding requests
let mut handles = vec![];
for i in 0..5 {
let handle = tokio::spawn(async move {
let start = Instant::now();
// Create a quick test pipeline per "request"
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 30i32) // 1 second at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink])
.expect("Failed to add elements");
videotestsrc.link(&fakesink)
.expect("Failed to link elements");
pipeline.set_state(gstreamer::State::Playing)
.expect("Failed to start pipeline");
// Wait for completion
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(5);
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Eos]) {
Some(_) => {
let elapsed = start.elapsed();
println!("Pipeline {} completed in {:?}", i, elapsed);
elapsed
}
None => {
println!("Pipeline {} timed out", i);
start.elapsed()
}
}
});
handles.push(handle);
}
// Wait for all to complete
let mut total_time = std::time::Duration::new(0, 0);
for handle in handles {
let elapsed = handle.await.expect("Task should complete");
total_time += elapsed;
}
println!("✅ Concurrent test completed. Average time: {:?}", total_time / 5);
}
}