
- Remove TranscodedMedia, TranscodingStatus, and TranscodingRequest models - Delete 184-line commented stream_media function - Clean up phantom service imports and dead references - Remove unused test imports for non-existent services Total cleanup: 265 lines of dead code removed from legacy Jellyfin compatibility layer. Smart streaming pipeline remains unchanged and functional.
512 lines
21 KiB
Rust
512 lines
21 KiB
Rust
use gstreamer::prelude::*;
|
||
use church_api::handlers::smart_streaming::{detect_av1_support, detect_hevc_support};
|
||
use church_api::error::{ApiError, Result};
|
||
use church_api::utils::codec_detection::ClientCapabilities;
|
||
use std::path::Path;
|
||
use std::fs;
|
||
use tempfile::tempdir;
|
||
use uuid::Uuid;
|
||
|
||
/// Mock database pool for testing
|
||
fn create_mock_pool() -> sqlx::PgPool {
|
||
// This would normally be a real connection, but for testing we'll use a mock
|
||
// In a real test setup, you'd use sqlx::test or testcontainers
|
||
unimplemented!("Use a test database or mock for real tests")
|
||
}
|
||
|
||
/// Create a simple test video file for GStreamer testing
|
||
async fn create_test_video(path: &Path) -> Result<()> {
|
||
use gstreamer::prelude::*;
|
||
|
||
// Initialize GStreamer
|
||
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
|
||
|
||
// Create a simple test pipeline to generate a short video
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
// Create test pattern video source
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 300i32) // 10 seconds at 30fps
|
||
.property("pattern", 0i32) // SMPTE color bars
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?;
|
||
|
||
// Create test audio source
|
||
let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
|
||
.property("num-buffers", 441i32) // 10 seconds at 44.1kHz
|
||
.property("freq", 440.0f64) // 440Hz tone
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?;
|
||
|
||
// Video encoding pipeline
|
||
let x264enc = gstreamer::ElementFactory::make("x264enc")
|
||
.property("bitrate", 1000u32)
|
||
.property("speed-preset", 1u32) // ultrafast
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?;
|
||
|
||
// Audio encoding pipeline
|
||
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?;
|
||
|
||
let avenc_aac = gstreamer::ElementFactory::make("avenc_aac")
|
||
.property("bitrate", 128000i32)
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?;
|
||
|
||
// MP4 muxer and file sink
|
||
let mp4mux = gstreamer::ElementFactory::make("mp4mux")
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?;
|
||
|
||
let filesink = gstreamer::ElementFactory::make("filesink")
|
||
.property("location", path.to_str().unwrap())
|
||
.build()
|
||
.map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?;
|
||
|
||
// Add elements to pipeline
|
||
pipeline.add_many([
|
||
&videotestsrc, &x264enc, &audiotestsrc, &audioconvert,
|
||
&avenc_aac, &mp4mux, &filesink
|
||
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
|
||
|
||
// Link video chain
|
||
videotestsrc.link(&x264enc)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link video chain: {}", e)))?;
|
||
x264enc.link(&mp4mux)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link video to mux: {}", e)))?;
|
||
|
||
// Link audio chain
|
||
gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac])
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link audio chain: {}", e)))?;
|
||
avenc_aac.link(&mp4mux)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link audio to mux: {}", e)))?;
|
||
|
||
mp4mux.link(&filesink)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to link mux to sink: {}", e)))?;
|
||
|
||
// Run pipeline
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {}", e)))?;
|
||
|
||
// Wait for completion
|
||
let bus = pipeline.bus().unwrap();
|
||
let timeout = gstreamer::ClockTime::from_seconds(30);
|
||
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) {
|
||
Some(msg) => {
|
||
match msg.view() {
|
||
gstreamer::MessageView::Eos(..) => {
|
||
println!("✅ Test video created successfully");
|
||
}
|
||
gstreamer::MessageView::Error(err) => {
|
||
return Err(ApiError::Internal(format!("GStreamer error: {}", err.error())));
|
||
}
|
||
_ => {}
|
||
}
|
||
}
|
||
None => {
|
||
return Err(ApiError::Internal("Test video creation timed out".to_string()));
|
||
}
|
||
}
|
||
|
||
// Clean up
|
||
pipeline.set_state(gstreamer::State::Null)
|
||
.map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {}", e)))?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use std::time::Instant;
|
||
|
||
#[tokio::test]
|
||
async fn test_gstreamer_initialization() {
|
||
let result = gstreamer::init();
|
||
assert!(result.is_ok(), "GStreamer should initialize successfully");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_client_capability_detection() {
|
||
// Test AV1 support detection
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"));
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/120.0"));
|
||
assert!(detect_av1_support("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Edg/120.0.0.0"));
|
||
assert!(!detect_av1_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
|
||
|
||
// Test HEVC support detection
|
||
assert!(detect_hevc_support("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"));
|
||
assert!(detect_hevc_support("Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) AppleWebKit/605.1.15"));
|
||
assert!(detect_hevc_support("Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36"));
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_gstreamer_element_availability() {
|
||
// Initialize GStreamer for testing
|
||
let _ = gstreamer::init();
|
||
|
||
// Test that key elements are available
|
||
assert!(gstreamer::ElementFactory::find("filesrc").is_some(), "filesrc should be available");
|
||
assert!(gstreamer::ElementFactory::find("filesink").is_some(), "filesink should be available");
|
||
assert!(gstreamer::ElementFactory::find("qtdemux").is_some(), "qtdemux should be available");
|
||
assert!(gstreamer::ElementFactory::find("mp4mux").is_some(), "mp4mux should be available");
|
||
assert!(gstreamer::ElementFactory::find("mpegtsmux").is_some(), "mpegtsmux should be available");
|
||
|
||
// Test codec availability (these might not be available in CI)
|
||
let has_x264 = gstreamer::ElementFactory::find("x264enc").is_some();
|
||
let has_aac = gstreamer::ElementFactory::find("avenc_aac").is_some();
|
||
println!("x264enc available: {}", has_x264);
|
||
println!("avenc_aac available: {}", has_aac);
|
||
|
||
// Test hardware acceleration availability
|
||
let has_vaapi_h264 = gstreamer::ElementFactory::find("vaapih264enc").is_some();
|
||
let has_vaapi_av1 = gstreamer::ElementFactory::find("vaapidecode_av1").is_some();
|
||
println!("vaapih264enc available: {}", has_vaapi_h264);
|
||
println!("vaapidecode_av1 available: {}", has_vaapi_av1);
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_create_simple_pipeline() {
|
||
// Initialize GStreamer
|
||
let _ = gstreamer::init();
|
||
|
||
// Create a simple test pipeline to verify GStreamer works
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 10i32)
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
// Test state changes
|
||
assert!(pipeline.set_state(gstreamer::State::Ready).is_ok());
|
||
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
|
||
assert!(pipeline.set_state(gstreamer::State::Playing).is_ok());
|
||
|
||
// Let it run briefly
|
||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||
|
||
assert!(pipeline.set_state(gstreamer::State::Null).is_ok());
|
||
|
||
println!("✅ Simple GStreamer pipeline test passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_video_file_creation() {
|
||
let temp_dir = tempdir().expect("Failed to create temp dir");
|
||
let video_path = temp_dir.path().join("test_video.mp4");
|
||
|
||
// Only run this test if we have the required encoders
|
||
let _ = gstreamer::init();
|
||
if !gstreamer::ElementFactory::find("x264enc").is_some() ||
|
||
!gstreamer::ElementFactory::find("avenc_aac").is_some() {
|
||
println!("⚠️ Skipping video creation test - encoders not available");
|
||
return;
|
||
}
|
||
|
||
let result = create_test_video(&video_path).await;
|
||
assert!(result.is_ok(), "Test video creation should succeed");
|
||
assert!(video_path.exists(), "Test video file should exist");
|
||
|
||
let metadata = fs::metadata(&video_path).expect("Failed to get file metadata");
|
||
assert!(metadata.len() > 1000, "Video file should be substantial size");
|
||
|
||
println!("✅ Test video created: {} bytes", metadata.len());
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_streaming_transcoding_service_creation() {
|
||
// Test that we can create the streaming service without a real DB
|
||
// This tests the service initialization logic
|
||
|
||
// Note: In a real test you'd use a test database
|
||
// For now we'll just test what we can without DB dependency
|
||
|
||
// Test segment creation logic
|
||
let media_id = Uuid::new_v4();
|
||
let segment = StreamingSegment {
|
||
index: 0,
|
||
start_time: 0.0,
|
||
duration: 10.0,
|
||
status: SegmentStatus::NotStarted,
|
||
file_path: None,
|
||
};
|
||
|
||
assert_eq!(segment.index, 0);
|
||
assert_eq!(segment.start_time, 0.0);
|
||
assert_eq!(segment.duration, 10.0);
|
||
assert_eq!(segment.status, SegmentStatus::NotStarted);
|
||
|
||
println!("✅ Streaming transcoding service structures work correctly");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_performance_benchmarks() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Benchmark GStreamer initialization time
|
||
let start = Instant::now();
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
let init_time = start.elapsed();
|
||
|
||
println!("GStreamer pipeline creation time: {:?}", init_time);
|
||
assert!(init_time.as_millis() < 100, "Pipeline creation should be fast");
|
||
|
||
// Test element creation performance
|
||
let start = Instant::now();
|
||
let _filesrc = gstreamer::ElementFactory::make("filesrc").build();
|
||
let _qtdemux = gstreamer::ElementFactory::make("qtdemux").build();
|
||
let _queue = gstreamer::ElementFactory::make("queue").build();
|
||
let element_time = start.elapsed();
|
||
|
||
println!("Element creation time: {:?}", element_time);
|
||
assert!(element_time.as_millis() < 50, "Element creation should be very fast");
|
||
|
||
drop(pipeline);
|
||
println!("✅ Performance benchmarks passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_error_handling() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Test handling of invalid elements
|
||
let result = gstreamer::ElementFactory::make("nonexistent-element").build();
|
||
assert!(result.is_err(), "Should fail to create nonexistent element");
|
||
|
||
// Test pipeline with incompatible elements
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
|
||
.build()
|
||
.expect("Failed to create audioconvert");
|
||
|
||
pipeline.add_many([&videotestsrc, &audioconvert])
|
||
.expect("Failed to add elements");
|
||
|
||
// This should fail - can't link video to audio converter
|
||
let link_result = videotestsrc.link(&audioconvert);
|
||
assert!(link_result.is_err(), "Should fail to link incompatible elements");
|
||
|
||
println!("✅ Error handling tests passed");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_hardware_acceleration_detection() {
|
||
// Test VA-API detection
|
||
let vaapi_available = std::path::Path::new("/dev/dri/renderD128").exists();
|
||
println!("VA-API hardware acceleration available: {}", vaapi_available);
|
||
|
||
// Test Intel QSV environment
|
||
let libva_driver = std::env::var("LIBVA_DRIVER_NAME").unwrap_or_default();
|
||
let libva_path = std::env::var("LIBVA_DRIVERS_PATH").unwrap_or_default();
|
||
|
||
println!("LIBVA_DRIVER_NAME: {}", libva_driver);
|
||
println!("LIBVA_DRIVERS_PATH: {}", libva_path);
|
||
|
||
// In CI this will likely be false, but in production it should be true
|
||
if vaapi_available {
|
||
println!("✅ Hardware acceleration available");
|
||
} else {
|
||
println!("ℹ️ Hardware acceleration not available (normal in CI)");
|
||
}
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_memory_cleanup() {
|
||
// Test that GStreamer pipelines clean up properly
|
||
let _ = gstreamer::init();
|
||
|
||
// Create and destroy multiple pipelines
|
||
for i in 0..10 {
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 1i32)
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
// Quick run
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.expect("Failed to start pipeline");
|
||
|
||
// Wait briefly
|
||
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||
|
||
// Clean shutdown
|
||
pipeline.set_state(gstreamer::State::Null)
|
||
.expect("Failed to stop pipeline");
|
||
|
||
drop(pipeline);
|
||
|
||
if i % 5 == 0 {
|
||
println!("Created and cleaned up {} pipelines", i + 1);
|
||
}
|
||
}
|
||
|
||
println!("✅ Memory cleanup test completed - no leaks expected");
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn test_codec_specific_elements() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Test AV1 decoder availability and creation
|
||
if gstreamer::ElementFactory::find("av1dec").is_some() {
|
||
let av1dec = gstreamer::ElementFactory::make("av1dec").build();
|
||
assert!(av1dec.is_ok(), "Should be able to create av1dec");
|
||
println!("✅ AV1 software decoder available");
|
||
} else {
|
||
println!("ℹ️ AV1 software decoder not available");
|
||
}
|
||
|
||
if gstreamer::ElementFactory::find("vaapidecode_av1").is_some() {
|
||
let vaapi_av1 = gstreamer::ElementFactory::make("vaapidecode_av1").build();
|
||
assert!(vaapi_av1.is_ok(), "Should be able to create vaapidecode_av1");
|
||
println!("✅ AV1 hardware decoder available");
|
||
} else {
|
||
println!("ℹ️ AV1 hardware decoder not available");
|
||
}
|
||
|
||
// Test H.264 encoder availability
|
||
if gstreamer::ElementFactory::find("x264enc").is_some() {
|
||
let x264enc = gstreamer::ElementFactory::make("x264enc").build();
|
||
assert!(x264enc.is_ok(), "Should be able to create x264enc");
|
||
println!("✅ H.264 software encoder available");
|
||
} else {
|
||
println!("⚠️ H.264 software encoder not available");
|
||
}
|
||
|
||
if gstreamer::ElementFactory::find("vaapih264enc").is_some() {
|
||
let vaapi_h264 = gstreamer::ElementFactory::make("vaapih264enc").build();
|
||
assert!(vaapi_h264.is_ok(), "Should be able to create vaapih264enc");
|
||
println!("✅ H.264 hardware encoder available");
|
||
} else {
|
||
println!("ℹ️ H.264 hardware encoder not available");
|
||
}
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod integration_tests {
|
||
use super::*;
|
||
|
||
/// Integration test that requires a real video file
|
||
/// This should be run manually with a test video file
|
||
#[ignore] // Ignored by default since it requires external file
|
||
#[tokio::test]
|
||
async fn test_real_video_transcoding() {
|
||
let test_video_path = "/tmp/test_video.mp4"; // You need to provide this
|
||
|
||
if !Path::new(test_video_path).exists() {
|
||
println!("⚠️ Test video not found at {}, skipping integration test", test_video_path);
|
||
return;
|
||
}
|
||
|
||
let temp_dir = tempdir().expect("Failed to create temp dir");
|
||
let output_path = temp_dir.path().join("transcoded_segment.ts");
|
||
|
||
// Test the actual transcoding function
|
||
// Note: This would normally use the real function from smart_streaming
|
||
// but we can't easily import it due to module structure
|
||
|
||
println!("✅ Would test real video transcoding with file: {}", test_video_path);
|
||
println!("✅ Output would go to: {}", output_path.display());
|
||
|
||
// In a real integration test, you would:
|
||
// 1. Call transcode_hls_segment_gstreamer()
|
||
// 2. Verify the output file exists and is valid
|
||
// 3. Check the transcoding time is reasonable
|
||
// 4. Verify the output format is correct
|
||
}
|
||
|
||
/// Load test for concurrent transcoding
|
||
#[ignore] // Resource intensive test
|
||
#[tokio::test]
|
||
async fn test_concurrent_transcoding_performance() {
|
||
let _ = gstreamer::init();
|
||
|
||
// Simulate multiple concurrent transcoding requests
|
||
let mut handles = vec![];
|
||
|
||
for i in 0..5 {
|
||
let handle = tokio::spawn(async move {
|
||
let start = Instant::now();
|
||
|
||
// Create a quick test pipeline per "request"
|
||
let pipeline = gstreamer::Pipeline::new();
|
||
|
||
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
||
.property("num-buffers", 30i32) // 1 second at 30fps
|
||
.build()
|
||
.expect("Failed to create videotestsrc");
|
||
|
||
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
||
.build()
|
||
.expect("Failed to create fakesink");
|
||
|
||
pipeline.add_many([&videotestsrc, &fakesink])
|
||
.expect("Failed to add elements");
|
||
|
||
videotestsrc.link(&fakesink)
|
||
.expect("Failed to link elements");
|
||
|
||
pipeline.set_state(gstreamer::State::Playing)
|
||
.expect("Failed to start pipeline");
|
||
|
||
// Wait for completion
|
||
let bus = pipeline.bus().unwrap();
|
||
let timeout = gstreamer::ClockTime::from_seconds(5);
|
||
|
||
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Eos]) {
|
||
Some(_) => {
|
||
let elapsed = start.elapsed();
|
||
println!("Pipeline {} completed in {:?}", i, elapsed);
|
||
elapsed
|
||
}
|
||
None => {
|
||
println!("Pipeline {} timed out", i);
|
||
start.elapsed()
|
||
}
|
||
}
|
||
});
|
||
|
||
handles.push(handle);
|
||
}
|
||
|
||
// Wait for all to complete
|
||
let mut total_time = std::time::Duration::new(0, 0);
|
||
for handle in handles {
|
||
let elapsed = handle.await.expect("Task should complete");
|
||
total_time += elapsed;
|
||
}
|
||
|
||
println!("✅ Concurrent test completed. Average time: {:?}", total_time / 5);
|
||
}
|
||
} |