
Complete church management system with bulletin management, media processing, live streaming integration, and web interface. Includes authentication, email notifications, database migrations, and comprehensive test suite.
1027 lines
44 KiB
Rust
1027 lines
44 KiB
Rust
use gstreamer::prelude::*;
|
|
use church_api::error::{ApiError, Result};
|
|
use std::path::Path;
|
|
use std::env;
|
|
use std::time::Duration;
|
|
use tokio::time::{sleep, Instant};
|
|
|
|
/// Create a test video with known duration for seeking tests
|
|
async fn create_seekable_test_video(path: &Path, duration_seconds: u32) -> Result<()> {
|
|
// Initialize GStreamer
|
|
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
// Create a longer video with keyframes for reliable seeking
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", (duration_seconds * 30) as i32) // 30 fps
|
|
.property("pattern", 0i32) // SMPTE bars
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?;
|
|
|
|
let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
|
|
.property("num-buffers", (duration_seconds * 441) as i32) // 44.1kHz, 100 samples per buffer
|
|
.property("freq", 440.0f64)
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?;
|
|
|
|
// Use software encoders for consistent behavior across environments
|
|
let x264enc = gstreamer::ElementFactory::make("x264enc")
|
|
.property("bitrate", 1000u32)
|
|
.property("speed-preset", 1u32) // ultrafast
|
|
.property("key-int-max", 30u32) // Keyframe every 30 frames (1 second)
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?;
|
|
|
|
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?;
|
|
|
|
let avenc_aac = gstreamer::ElementFactory::make("avenc_aac")
|
|
.property("bitrate", 128000i32)
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?;
|
|
|
|
let mp4mux = gstreamer::ElementFactory::make("mp4mux")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?;
|
|
|
|
let filesink = gstreamer::ElementFactory::make("filesink")
|
|
.property("location", path.to_str().unwrap())
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?;
|
|
|
|
// Add elements and link
|
|
pipeline.add_many([
|
|
&videotestsrc, &x264enc, &audiotestsrc, &audioconvert,
|
|
&avenc_aac, &mp4mux, &filesink
|
|
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
|
|
|
|
// Link chains
|
|
videotestsrc.link(&x264enc).map_err(|_| ApiError::Internal("Failed to link video chain".to_string()))?;
|
|
x264enc.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link video to mux".to_string()))?;
|
|
gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac]).map_err(|_| ApiError::Internal("Failed to link audio chain".to_string()))?;
|
|
avenc_aac.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link audio to mux".to_string()))?;
|
|
mp4mux.link(&filesink).map_err(|_| ApiError::Internal("Failed to link mux to sink".to_string()))?;
|
|
|
|
// Run pipeline to completion
|
|
pipeline.set_state(gstreamer::State::Playing).map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {:?}", e)))?;
|
|
|
|
let bus = pipeline.bus().unwrap();
|
|
let timeout = gstreamer::ClockTime::from_seconds(60);
|
|
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) {
|
|
Some(msg) => {
|
|
match msg.view() {
|
|
gstreamer::MessageView::Eos(..) => {},
|
|
gstreamer::MessageView::Error(err) => {
|
|
return Err(ApiError::Internal(format!("GStreamer error: {}", err.error())));
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
None => return Err(ApiError::Internal("Video creation timed out".to_string())),
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Test seeking functionality with proper state management
|
|
async fn test_seeking_with_state_management(source_path: &str, seek_position_seconds: f64) -> Result<()> {
|
|
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
// Create a simple playback pipeline for testing seeking
|
|
let filesrc = gstreamer::ElementFactory::make("filesrc")
|
|
.property("location", source_path)
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create filesrc: {}", e)))?;
|
|
|
|
let qtdemux = gstreamer::ElementFactory::make("qtdemux")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create qtdemux: {}", e)))?;
|
|
|
|
let queue_video = gstreamer::ElementFactory::make("queue")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create video queue: {}", e)))?;
|
|
|
|
let queue_audio = gstreamer::ElementFactory::make("queue")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create audio queue: {}", e)))?;
|
|
|
|
let fakesink_video = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create video fakesink: {}", e)))?;
|
|
|
|
let fakesink_audio = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.map_err(|e| ApiError::Internal(format!("Failed to create audio fakesink: {}", e)))?;
|
|
|
|
pipeline.add_many([
|
|
&filesrc, &qtdemux, &queue_video, &queue_audio,
|
|
&fakesink_video, &fakesink_audio
|
|
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
|
|
|
|
filesrc.link(&qtdemux).map_err(|_| ApiError::Internal("Failed to link filesrc to qtdemux".to_string()))?;
|
|
queue_video.link(&fakesink_video).map_err(|_| ApiError::Internal("Failed to link video queue".to_string()))?;
|
|
queue_audio.link(&fakesink_audio).map_err(|_| ApiError::Internal("Failed to link audio queue".to_string()))?;
|
|
|
|
// Handle dynamic pad connections
|
|
let queue_video_clone = queue_video.clone();
|
|
let queue_audio_clone = queue_audio.clone();
|
|
|
|
qtdemux.connect_pad_added(move |_element, pad| {
|
|
let pad_name = pad.name();
|
|
if pad_name.starts_with("video_") {
|
|
let sink_pad = queue_video_clone.static_pad("sink").unwrap();
|
|
if !sink_pad.is_linked() {
|
|
let _ = pad.link(&sink_pad);
|
|
}
|
|
} else if pad_name.starts_with("audio_") {
|
|
let sink_pad = queue_audio_clone.static_pad("sink").unwrap();
|
|
if !sink_pad.is_linked() {
|
|
let _ = pad.link(&sink_pad);
|
|
}
|
|
}
|
|
});
|
|
|
|
// Step 1: Set pipeline to PAUSED state
|
|
tracing::debug!("Setting pipeline to PAUSED for seeking");
|
|
let state_change_result = pipeline.set_state(gstreamer::State::Paused);
|
|
|
|
match state_change_result {
|
|
Ok(gstreamer::StateChangeSuccess::Success) => {},
|
|
Ok(gstreamer::StateChangeSuccess::NoPreroll) => {
|
|
tracing::debug!("Pipeline state change completed with no preroll");
|
|
},
|
|
Ok(gstreamer::StateChangeSuccess::Async) => {
|
|
// Wait for async state change
|
|
let bus = pipeline.bus().unwrap();
|
|
let timeout = gstreamer::ClockTime::from_seconds(30);
|
|
|
|
let mut state_changed = false;
|
|
while let Some(msg) = bus.timed_pop_filtered(
|
|
Some(timeout),
|
|
&[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
|
|
) {
|
|
match msg.view() {
|
|
gstreamer::MessageView::Error(err) => {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal(format!("Pipeline error during state change: {}", err.error())));
|
|
}
|
|
gstreamer::MessageView::AsyncDone(..) => {
|
|
state_changed = true;
|
|
break;
|
|
}
|
|
gstreamer::MessageView::StateChanged(state_change) => {
|
|
if state_change.src() == Some(pipeline.upcast_ref()) &&
|
|
state_change.current() == gstreamer::State::Paused {
|
|
state_changed = true;
|
|
break;
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
if !state_changed {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal("Pipeline failed to reach PAUSED state".to_string()));
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal(format!("Failed to pause pipeline: {}", e)));
|
|
}
|
|
}
|
|
|
|
// Step 2: Query seeking capabilities
|
|
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
|
|
if !pipeline.query(&mut seek_query.get_mut().unwrap()) {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal("Failed to query seeking capabilities".to_string()));
|
|
}
|
|
|
|
let (seekable, start_pos, end_pos) = seek_query.result();
|
|
if !seekable {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal("Source is not seekable".to_string()));
|
|
}
|
|
|
|
// Step 3: Validate seek position
|
|
let seek_ns = (seek_position_seconds * gstreamer::ClockTime::SECOND.nseconds() as f64) as u64;
|
|
|
|
if let gstreamer::GenericFormattedValue::Time(Some(end)) = end_pos {
|
|
if seek_ns >= end.nseconds() {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal("Seek position beyond range".to_string()));
|
|
}
|
|
}
|
|
|
|
// Step 4: Perform seek
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_ns))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
if let Err(e) = seek_result {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal(format!("Seek failed: {:?}", e)));
|
|
}
|
|
|
|
// Step 5: Wait for seek completion
|
|
let bus = pipeline.bus().unwrap();
|
|
let seek_timeout = gstreamer::ClockTime::from_seconds(10);
|
|
|
|
while let Some(msg) = bus.timed_pop_filtered(
|
|
Some(seek_timeout),
|
|
&[gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
|
|
) {
|
|
match msg.view() {
|
|
gstreamer::MessageView::Error(err) => {
|
|
let _ = pipeline.set_state(gstreamer::State::Null);
|
|
return Err(ApiError::Internal(format!("Error during seek: {}", err.error())));
|
|
}
|
|
gstreamer::MessageView::AsyncDone(..) => {
|
|
break;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
// Step 6: Verify position
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(pos)) = position_query.result() {
|
|
let pos_seconds = pos.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
tracing::info!("Seek completed. Position: {:.2}s (target: {:.2}s)", pos_seconds, seek_position_seconds);
|
|
|
|
// Allow some tolerance for keyframe seeking
|
|
let tolerance = 1.0; // 1 second tolerance
|
|
if (pos_seconds - seek_position_seconds).abs() > tolerance {
|
|
tracing::warn!("Seek position not accurate: got {:.2}s, expected {:.2}s", pos_seconds, seek_position_seconds);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cleanup
|
|
pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn test_gstreamer_seeking_initialization() {
|
|
let result = gstreamer::init();
|
|
assert!(result.is_ok(), "GStreamer should initialize for seeking tests");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_pipeline_state_transitions() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 100i32)
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
// Test state transitions required for seeking
|
|
assert!(pipeline.set_state(gstreamer::State::Ready).is_ok());
|
|
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
|
|
|
|
// Query current state
|
|
let (_, current_state, _) = pipeline.state(Some(gstreamer::ClockTime::from_seconds(5)));
|
|
assert_eq!(current_state, gstreamer::State::Paused);
|
|
|
|
assert!(pipeline.set_state(gstreamer::State::Playing).is_ok());
|
|
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
|
|
assert!(pipeline.set_state(gstreamer::State::Null).is_ok());
|
|
|
|
println!("✅ Pipeline state transitions work correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_seeking_capability_queries() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 300i32) // 10 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
// Set to paused to enable queries
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
|
|
// Wait for preroll
|
|
let bus = pipeline.bus().unwrap();
|
|
let timeout = gstreamer::ClockTime::from_seconds(5);
|
|
while let Some(msg) = bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::AsyncDone, gstreamer::MessageType::StateChanged]) {
|
|
match msg.view() {
|
|
gstreamer::MessageView::AsyncDone(..) => break,
|
|
gstreamer::MessageView::StateChanged(sc) => {
|
|
if sc.src() == Some(pipeline.upcast_ref()) && sc.current() == gstreamer::State::Paused {
|
|
break;
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
// Test seeking query
|
|
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
|
|
let query_success = pipeline.query(&mut seek_query.get_mut().unwrap());
|
|
|
|
assert!(query_success, "Seeking query should succeed");
|
|
|
|
let (seekable, start_pos, end_pos) = seek_query.result();
|
|
println!("Seekable: {}, Range: {:?} to {:?}", seekable, start_pos, end_pos);
|
|
|
|
// videotestsrc should be seekable
|
|
assert!(seekable, "videotestsrc should be seekable");
|
|
assert!(start_pos.is_some(), "Should have start position");
|
|
assert!(end_pos.is_some(), "Should have end position");
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Seeking capability queries work correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_duration_and_position_queries() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 150i32) // 5 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
|
|
// Wait for preroll
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Test duration query
|
|
let mut duration_query = gstreamer::query::Duration::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut duration_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(duration)) = duration_query.result() {
|
|
let duration_seconds = duration.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Duration: {:.2} seconds", duration_seconds);
|
|
assert!(duration_seconds > 4.0 && duration_seconds < 6.0, "Duration should be approximately 5 seconds");
|
|
}
|
|
}
|
|
|
|
// Test position query
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
|
|
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Position: {:.2} seconds", position_seconds);
|
|
assert!(position_seconds == 0.0, "Position should start at 0");
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Duration and position queries work correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_basic_seeking_on_videotestsrc() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 300i32) // 10 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
// Set to paused for seeking
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Perform seek to 5 seconds
|
|
let seek_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
assert!(seek_result.is_ok(), "Seek should succeed on videotestsrc");
|
|
|
|
// Wait for seek to complete
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
// Verify position
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
|
|
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Position after seek: {:.2} seconds", position_seconds);
|
|
// Allow some tolerance due to keyframe seeking
|
|
assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Position should be around 5 seconds");
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Basic seeking on videotestsrc works correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_seek_beyond_bounds_error_handling() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 150i32) // 5 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Try to seek beyond the video duration (10 seconds when video is 5 seconds)
|
|
let seek_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
// This might succeed (GStreamer may clamp to end) or fail - both are acceptable
|
|
println!("Seek beyond bounds result: {:?}", seek_result);
|
|
|
|
// The important thing is that we should detect the bounds beforehand
|
|
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut seek_query.get_mut().unwrap()) {
|
|
let (seekable, _start, end) = seek_query.result();
|
|
if seekable {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(end_time)) = end {
|
|
let end_seconds = end_time.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Video duration: {:.2} seconds", end_seconds);
|
|
assert!(end_seconds > 4.0 && end_seconds < 6.0, "Should detect correct duration");
|
|
|
|
// Our application should prevent seeking beyond this
|
|
assert!(10.0 > end_seconds, "Should detect that 10s is beyond bounds");
|
|
}
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Bounds checking works correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_seek_flags_behavior() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 300i32) // 10 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Test different seek flag combinations
|
|
let test_cases = vec![
|
|
("FLUSH", gstreamer::SeekFlags::FLUSH),
|
|
("KEY_UNIT", gstreamer::SeekFlags::KEY_UNIT),
|
|
("FLUSH | KEY_UNIT", gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT),
|
|
("ACCURATE", gstreamer::SeekFlags::ACCURATE),
|
|
];
|
|
|
|
for (name, flags) in test_cases {
|
|
println!("Testing seek flags: {}", name);
|
|
|
|
let seek_time = 3.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
flags,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
// All flag combinations should work with videotestsrc
|
|
assert!(seek_result.is_ok(), "Seek with {} flags should succeed", name);
|
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
// Verify position
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
|
|
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Position with {} flags: {:.2}s", name, position_seconds);
|
|
}
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Different seek flags work correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_segment_seeking() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 600i32) // 20 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Test segment seeking (start at 5s, stop at 10s)
|
|
let start_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let stop_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(start_time as u64))),
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(stop_time as u64))),
|
|
);
|
|
|
|
assert!(seek_result.is_ok(), "Segment seek should succeed");
|
|
|
|
sleep(Duration::from_millis(200)).await;
|
|
|
|
// Verify we're at the start of the segment
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
|
|
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Segment start position: {:.2}s", position_seconds);
|
|
assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Should be at segment start");
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Segment seeking works correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_multiple_consecutive_seeks() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 600i32) // 20 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Test multiple seeks in sequence
|
|
let seek_positions = vec![5.0, 10.0, 2.0, 15.0, 8.0];
|
|
|
|
for (i, &seek_pos) in seek_positions.iter().enumerate() {
|
|
println!("Performing seek {} to {:.1}s", i + 1, seek_pos);
|
|
|
|
let seek_time = seek_pos * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
assert!(seek_result.is_ok(), "Seek {} should succeed", i + 1);
|
|
|
|
// Small delay between seeks
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
// Verify position
|
|
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
|
|
if pipeline.query(&mut position_query) {
|
|
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
|
|
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
println!("Position after seek {}: {:.2}s", i + 1, position_seconds);
|
|
}
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Multiple consecutive seeks work correctly");
|
|
}
|
|
|
|
// This test would use create_seekable_test_video if encoders are available
|
|
#[tokio::test]
|
|
async fn test_file_based_seeking() {
|
|
let _ = gstreamer::init();
|
|
|
|
// Check if we have required encoders
|
|
if !gstreamer::ElementFactory::find("x264enc").is_some() ||
|
|
!gstreamer::ElementFactory::find("avenc_aac").is_some() {
|
|
println!("⚠️ Skipping file-based seeking test - encoders not available");
|
|
return;
|
|
}
|
|
|
|
let temp_dir = env::temp_dir();
|
|
let video_path = temp_dir.join("seekable_test.mp4");
|
|
|
|
// Create a test video file
|
|
println!("Creating test video file...");
|
|
let create_result = create_seekable_test_video(&video_path, 30).await; // 30 second video
|
|
assert!(create_result.is_ok(), "Should create test video successfully");
|
|
assert!(video_path.exists(), "Test video should exist");
|
|
|
|
// Test seeking on the actual file
|
|
println!("Testing seeking on file...");
|
|
let seek_test_result = test_seeking_with_state_management(video_path.to_str().unwrap(), 10.0).await;
|
|
assert!(seek_test_result.is_ok(), "File-based seeking should work: {:?}", seek_test_result.err());
|
|
|
|
// Test seeking to different positions
|
|
let test_positions = vec![0.0, 5.0, 15.0, 25.0, 29.0];
|
|
for position in test_positions {
|
|
let result = test_seeking_with_state_management(video_path.to_str().unwrap(), position).await;
|
|
assert!(result.is_ok(), "Should be able to seek to {}s: {:?}", position, result.err());
|
|
}
|
|
|
|
println!("✅ File-based seeking tests completed successfully");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_seeking_error_recovery() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
// Use a source that might have seeking limitations
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 30i32) // Very short video (1 second)
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Test seeking way beyond the video length
|
|
let invalid_seek_time = 100.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(invalid_seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
// GStreamer might handle this gracefully or return an error
|
|
println!("Seek to invalid position result: {:?}", seek_result);
|
|
|
|
// The pipeline should still be functional after a failed/clamped seek
|
|
let valid_seek_time = 0.5 * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let recovery_seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(valid_seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
assert!(recovery_seek_result.is_ok(), "Should be able to seek to valid position after invalid seek");
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Error recovery in seeking works correctly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_async_state_change_handling() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
// Create a more complex pipeline that might have async state changes
|
|
let filesrc = gstreamer::ElementFactory::make("filesrc")
|
|
.property("location", "/dev/zero") // Infinite source on Linux
|
|
.build()
|
|
.expect("Failed to create filesrc");
|
|
|
|
let identity = gstreamer::ElementFactory::make("identity")
|
|
.build()
|
|
.expect("Failed to create identity");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&filesrc, &identity, &fakesink]).unwrap();
|
|
gstreamer::Element::link_many([&filesrc, &identity, &fakesink]).unwrap();
|
|
|
|
// Test async state change to PAUSED
|
|
let start = Instant::now();
|
|
let state_change_result = pipeline.set_state(gstreamer::State::Paused);
|
|
|
|
match state_change_result {
|
|
Ok(gstreamer::StateChangeSuccess::Success) => {
|
|
println!("State change was synchronous");
|
|
}
|
|
Ok(gstreamer::StateChangeSuccess::NoPreroll) => {
|
|
println!("State change completed with no preroll");
|
|
}
|
|
Ok(gstreamer::StateChangeSuccess::Async) => {
|
|
println!("State change is async, waiting for completion...");
|
|
|
|
let bus = pipeline.bus().unwrap();
|
|
let timeout = gstreamer::ClockTime::from_seconds(10);
|
|
|
|
let mut completed = false;
|
|
while let Some(msg) = bus.timed_pop_filtered(
|
|
Some(timeout),
|
|
&[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
|
|
) {
|
|
match msg.view() {
|
|
gstreamer::MessageView::Error(err) => {
|
|
panic!("Pipeline error: {}", err.error());
|
|
}
|
|
gstreamer::MessageView::AsyncDone(..) => {
|
|
println!("Async state change completed");
|
|
completed = true;
|
|
break;
|
|
}
|
|
gstreamer::MessageView::StateChanged(sc) => {
|
|
if sc.src() == Some(pipeline.upcast_ref()) {
|
|
println!("Pipeline state: {:?} -> {:?}", sc.old(), sc.current());
|
|
if sc.current() == gstreamer::State::Paused {
|
|
completed = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
assert!(completed, "Async state change should complete");
|
|
}
|
|
Err(e) => {
|
|
// This might fail on systems without /dev/zero, which is okay
|
|
println!("State change failed (expected on some systems): {}", e);
|
|
return;
|
|
}
|
|
}
|
|
|
|
let elapsed = start.elapsed();
|
|
println!("State change took: {:?}", elapsed);
|
|
|
|
// Test that we can query the pipeline now
|
|
let (_, current_state, _) = pipeline.state(None);
|
|
assert_eq!(current_state, gstreamer::State::Paused);
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("✅ Async state change handling works correctly");
|
|
}
|
|
}
|
|
|
|
/// Performance and stress tests for seeking functionality
|
|
#[cfg(test)]
|
|
mod performance_tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Heavy test, run manually
|
|
async fn test_seeking_performance() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 3000i32) // 100 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Benchmark seek performance
|
|
let seek_positions = vec![10.0, 50.0, 25.0, 75.0, 5.0, 90.0, 15.0, 60.0];
|
|
let mut seek_times = Vec::new();
|
|
|
|
for (i, &position) in seek_positions.iter().enumerate() {
|
|
let start = Instant::now();
|
|
|
|
let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
assert!(seek_result.is_ok(), "Seek {} should succeed", i);
|
|
|
|
// Wait for seek completion
|
|
sleep(Duration::from_millis(50)).await;
|
|
|
|
let elapsed = start.elapsed();
|
|
seek_times.push(elapsed);
|
|
|
|
println!("Seek {} to {:.1}s took: {:?}", i + 1, position, elapsed);
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
// Analyze performance
|
|
let total_time: Duration = seek_times.iter().sum();
|
|
let avg_time = total_time / seek_times.len() as u32;
|
|
let max_time = seek_times.iter().max().unwrap();
|
|
let min_time = seek_times.iter().min().unwrap();
|
|
|
|
println!("Seek Performance Summary:");
|
|
println!(" Average: {:?}", avg_time);
|
|
println!(" Min: {:?}", min_time);
|
|
println!(" Max: {:?}", max_time);
|
|
println!(" Total: {:?}", total_time);
|
|
|
|
// Performance assertions (these are reasonable expectations)
|
|
assert!(avg_time < Duration::from_millis(100), "Average seek should be under 100ms");
|
|
assert!(*max_time < Duration::from_millis(500), "No seek should take over 500ms");
|
|
|
|
println!("✅ Seeking performance is acceptable");
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore] // Stress test, run manually
|
|
async fn test_rapid_seeking_stress() {
|
|
let _ = gstreamer::init();
|
|
|
|
let pipeline = gstreamer::Pipeline::new();
|
|
|
|
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
|
|
.property("num-buffers", 1800i32) // 60 seconds at 30fps
|
|
.build()
|
|
.expect("Failed to create videotestsrc");
|
|
|
|
let fakesink = gstreamer::ElementFactory::make("fakesink")
|
|
.build()
|
|
.expect("Failed to create fakesink");
|
|
|
|
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
|
|
videotestsrc.link(&fakesink).unwrap();
|
|
|
|
pipeline.set_state(gstreamer::State::Paused).unwrap();
|
|
sleep(Duration::from_millis(500)).await;
|
|
|
|
// Perform rapid seeks to stress test the pipeline
|
|
let mut successful_seeks = 0;
|
|
let mut failed_seeks = 0;
|
|
|
|
for i in 0..100 {
|
|
let position = (i % 50) as f64; // Seek between 0-50 seconds
|
|
let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64;
|
|
|
|
let seek_result = pipeline.seek(
|
|
1.0,
|
|
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
|
|
gstreamer::SeekType::Set,
|
|
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
|
|
gstreamer::SeekType::None,
|
|
gstreamer::GenericFormattedValue::Time(None),
|
|
);
|
|
|
|
if seek_result.is_ok() {
|
|
successful_seeks += 1;
|
|
} else {
|
|
failed_seeks += 1;
|
|
println!("Seek {} failed: {:?}", i, seek_result.err());
|
|
}
|
|
|
|
// Very small delay to simulate rapid seeking
|
|
sleep(Duration::from_millis(10)).await;
|
|
|
|
if i % 20 == 0 {
|
|
println!("Completed {} rapid seeks ({} successful, {} failed)", i + 1, successful_seeks, failed_seeks);
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gstreamer::State::Null).unwrap();
|
|
|
|
println!("Rapid seek stress test results:");
|
|
println!(" Successful: {}", successful_seeks);
|
|
println!(" Failed: {}", failed_seeks);
|
|
println!(" Success rate: {:.1}%", (successful_seeks as f64 / 100.0) * 100.0);
|
|
|
|
// We expect most seeks to succeed, but some failures are acceptable under stress
|
|
assert!(successful_seeks >= 80, "Should have at least 80% success rate under stress");
|
|
|
|
println!("✅ Rapid seeking stress test completed");
|
|
}
|
|
} |