church-api/tests/gstreamer_seeking_tests.rs
Benjamin Slingo 0c06e159bb Initial commit: Church API Rust implementation
Complete church management system with bulletin management, media processing, live streaming integration, and web interface. Includes authentication, email notifications, database migrations, and comprehensive test suite.
2025-08-19 20:56:41 -04:00

1027 lines
44 KiB
Rust

use gstreamer::prelude::*;
use church_api::error::{ApiError, Result};
use std::path::Path;
use std::env;
use std::time::Duration;
use tokio::time::{sleep, Instant};
/// Create a test video with known duration for seeking tests
async fn create_seekable_test_video(path: &Path, duration_seconds: u32) -> Result<()> {
// Initialize GStreamer
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
let pipeline = gstreamer::Pipeline::new();
// Create a longer video with keyframes for reliable seeking
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", (duration_seconds * 30) as i32) // 30 fps
.property("pattern", 0i32) // SMPTE bars
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?;
let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
.property("num-buffers", (duration_seconds * 441) as i32) // 44.1kHz, 100 samples per buffer
.property("freq", 440.0f64)
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?;
// Use software encoders for consistent behavior across environments
let x264enc = gstreamer::ElementFactory::make("x264enc")
.property("bitrate", 1000u32)
.property("speed-preset", 1u32) // ultrafast
.property("key-int-max", 30u32) // Keyframe every 30 frames (1 second)
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?;
let audioconvert = gstreamer::ElementFactory::make("audioconvert")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?;
let avenc_aac = gstreamer::ElementFactory::make("avenc_aac")
.property("bitrate", 128000i32)
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?;
let mp4mux = gstreamer::ElementFactory::make("mp4mux")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?;
let filesink = gstreamer::ElementFactory::make("filesink")
.property("location", path.to_str().unwrap())
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?;
// Add elements and link
pipeline.add_many([
&videotestsrc, &x264enc, &audiotestsrc, &audioconvert,
&avenc_aac, &mp4mux, &filesink
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
// Link chains
videotestsrc.link(&x264enc).map_err(|_| ApiError::Internal("Failed to link video chain".to_string()))?;
x264enc.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link video to mux".to_string()))?;
gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac]).map_err(|_| ApiError::Internal("Failed to link audio chain".to_string()))?;
avenc_aac.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link audio to mux".to_string()))?;
mp4mux.link(&filesink).map_err(|_| ApiError::Internal("Failed to link mux to sink".to_string()))?;
// Run pipeline to completion
pipeline.set_state(gstreamer::State::Playing).map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {:?}", e)))?;
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(60);
match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) {
Some(msg) => {
match msg.view() {
gstreamer::MessageView::Eos(..) => {},
gstreamer::MessageView::Error(err) => {
return Err(ApiError::Internal(format!("GStreamer error: {}", err.error())));
}
_ => {}
}
}
None => return Err(ApiError::Internal("Video creation timed out".to_string())),
}
pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?;
Ok(())
}
/// Test seeking functionality with proper state management
async fn test_seeking_with_state_management(source_path: &str, seek_position_seconds: f64) -> Result<()> {
gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?;
let pipeline = gstreamer::Pipeline::new();
// Create a simple playback pipeline for testing seeking
let filesrc = gstreamer::ElementFactory::make("filesrc")
.property("location", source_path)
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create filesrc: {}", e)))?;
let qtdemux = gstreamer::ElementFactory::make("qtdemux")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create qtdemux: {}", e)))?;
let queue_video = gstreamer::ElementFactory::make("queue")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create video queue: {}", e)))?;
let queue_audio = gstreamer::ElementFactory::make("queue")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audio queue: {}", e)))?;
let fakesink_video = gstreamer::ElementFactory::make("fakesink")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create video fakesink: {}", e)))?;
let fakesink_audio = gstreamer::ElementFactory::make("fakesink")
.build()
.map_err(|e| ApiError::Internal(format!("Failed to create audio fakesink: {}", e)))?;
pipeline.add_many([
&filesrc, &qtdemux, &queue_video, &queue_audio,
&fakesink_video, &fakesink_audio
]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?;
filesrc.link(&qtdemux).map_err(|_| ApiError::Internal("Failed to link filesrc to qtdemux".to_string()))?;
queue_video.link(&fakesink_video).map_err(|_| ApiError::Internal("Failed to link video queue".to_string()))?;
queue_audio.link(&fakesink_audio).map_err(|_| ApiError::Internal("Failed to link audio queue".to_string()))?;
// Handle dynamic pad connections
let queue_video_clone = queue_video.clone();
let queue_audio_clone = queue_audio.clone();
qtdemux.connect_pad_added(move |_element, pad| {
let pad_name = pad.name();
if pad_name.starts_with("video_") {
let sink_pad = queue_video_clone.static_pad("sink").unwrap();
if !sink_pad.is_linked() {
let _ = pad.link(&sink_pad);
}
} else if pad_name.starts_with("audio_") {
let sink_pad = queue_audio_clone.static_pad("sink").unwrap();
if !sink_pad.is_linked() {
let _ = pad.link(&sink_pad);
}
}
});
// Step 1: Set pipeline to PAUSED state
tracing::debug!("Setting pipeline to PAUSED for seeking");
let state_change_result = pipeline.set_state(gstreamer::State::Paused);
match state_change_result {
Ok(gstreamer::StateChangeSuccess::Success) => {},
Ok(gstreamer::StateChangeSuccess::NoPreroll) => {
tracing::debug!("Pipeline state change completed with no preroll");
},
Ok(gstreamer::StateChangeSuccess::Async) => {
// Wait for async state change
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(30);
let mut state_changed = false;
while let Some(msg) = bus.timed_pop_filtered(
Some(timeout),
&[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
) {
match msg.view() {
gstreamer::MessageView::Error(err) => {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal(format!("Pipeline error during state change: {}", err.error())));
}
gstreamer::MessageView::AsyncDone(..) => {
state_changed = true;
break;
}
gstreamer::MessageView::StateChanged(state_change) => {
if state_change.src() == Some(pipeline.upcast_ref()) &&
state_change.current() == gstreamer::State::Paused {
state_changed = true;
break;
}
}
_ => {}
}
}
if !state_changed {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal("Pipeline failed to reach PAUSED state".to_string()));
}
}
Err(e) => {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal(format!("Failed to pause pipeline: {}", e)));
}
}
// Step 2: Query seeking capabilities
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
if !pipeline.query(&mut seek_query.get_mut().unwrap()) {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal("Failed to query seeking capabilities".to_string()));
}
let (seekable, start_pos, end_pos) = seek_query.result();
if !seekable {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal("Source is not seekable".to_string()));
}
// Step 3: Validate seek position
let seek_ns = (seek_position_seconds * gstreamer::ClockTime::SECOND.nseconds() as f64) as u64;
if let gstreamer::GenericFormattedValue::Time(Some(end)) = end_pos {
if seek_ns >= end.nseconds() {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal("Seek position beyond range".to_string()));
}
}
// Step 4: Perform seek
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_ns))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
if let Err(e) = seek_result {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal(format!("Seek failed: {:?}", e)));
}
// Step 5: Wait for seek completion
let bus = pipeline.bus().unwrap();
let seek_timeout = gstreamer::ClockTime::from_seconds(10);
while let Some(msg) = bus.timed_pop_filtered(
Some(seek_timeout),
&[gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
) {
match msg.view() {
gstreamer::MessageView::Error(err) => {
let _ = pipeline.set_state(gstreamer::State::Null);
return Err(ApiError::Internal(format!("Error during seek: {}", err.error())));
}
gstreamer::MessageView::AsyncDone(..) => {
break;
}
_ => {}
}
}
// Step 6: Verify position
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(pos)) = position_query.result() {
let pos_seconds = pos.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
tracing::info!("Seek completed. Position: {:.2}s (target: {:.2}s)", pos_seconds, seek_position_seconds);
// Allow some tolerance for keyframe seeking
let tolerance = 1.0; // 1 second tolerance
if (pos_seconds - seek_position_seconds).abs() > tolerance {
tracing::warn!("Seek position not accurate: got {:.2}s, expected {:.2}s", pos_seconds, seek_position_seconds);
}
}
}
// Cleanup
pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_gstreamer_seeking_initialization() {
let result = gstreamer::init();
assert!(result.is_ok(), "GStreamer should initialize for seeking tests");
}
#[tokio::test]
async fn test_pipeline_state_transitions() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 100i32)
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
// Test state transitions required for seeking
assert!(pipeline.set_state(gstreamer::State::Ready).is_ok());
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
// Query current state
let (_, current_state, _) = pipeline.state(Some(gstreamer::ClockTime::from_seconds(5)));
assert_eq!(current_state, gstreamer::State::Paused);
assert!(pipeline.set_state(gstreamer::State::Playing).is_ok());
assert!(pipeline.set_state(gstreamer::State::Paused).is_ok());
assert!(pipeline.set_state(gstreamer::State::Null).is_ok());
println!("✅ Pipeline state transitions work correctly");
}
#[tokio::test]
async fn test_seeking_capability_queries() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 300i32) // 10 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
// Set to paused to enable queries
pipeline.set_state(gstreamer::State::Paused).unwrap();
// Wait for preroll
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(5);
while let Some(msg) = bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::AsyncDone, gstreamer::MessageType::StateChanged]) {
match msg.view() {
gstreamer::MessageView::AsyncDone(..) => break,
gstreamer::MessageView::StateChanged(sc) => {
if sc.src() == Some(pipeline.upcast_ref()) && sc.current() == gstreamer::State::Paused {
break;
}
}
_ => {}
}
}
// Test seeking query
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
let query_success = pipeline.query(&mut seek_query.get_mut().unwrap());
assert!(query_success, "Seeking query should succeed");
let (seekable, start_pos, end_pos) = seek_query.result();
println!("Seekable: {}, Range: {:?} to {:?}", seekable, start_pos, end_pos);
// videotestsrc should be seekable
assert!(seekable, "videotestsrc should be seekable");
assert!(start_pos.is_some(), "Should have start position");
assert!(end_pos.is_some(), "Should have end position");
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Seeking capability queries work correctly");
}
#[tokio::test]
async fn test_duration_and_position_queries() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 150i32) // 5 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
// Wait for preroll
sleep(Duration::from_millis(500)).await;
// Test duration query
let mut duration_query = gstreamer::query::Duration::new(gstreamer::Format::Time);
if pipeline.query(&mut duration_query) {
if let gstreamer::GenericFormattedValue::Time(Some(duration)) = duration_query.result() {
let duration_seconds = duration.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Duration: {:.2} seconds", duration_seconds);
assert!(duration_seconds > 4.0 && duration_seconds < 6.0, "Duration should be approximately 5 seconds");
}
}
// Test position query
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Position: {:.2} seconds", position_seconds);
assert!(position_seconds == 0.0, "Position should start at 0");
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Duration and position queries work correctly");
}
#[tokio::test]
async fn test_basic_seeking_on_videotestsrc() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 300i32) // 10 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
// Set to paused for seeking
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Perform seek to 5 seconds
let seek_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
assert!(seek_result.is_ok(), "Seek should succeed on videotestsrc");
// Wait for seek to complete
sleep(Duration::from_millis(200)).await;
// Verify position
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Position after seek: {:.2} seconds", position_seconds);
// Allow some tolerance due to keyframe seeking
assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Position should be around 5 seconds");
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Basic seeking on videotestsrc works correctly");
}
#[tokio::test]
async fn test_seek_beyond_bounds_error_handling() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 150i32) // 5 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Try to seek beyond the video duration (10 seconds when video is 5 seconds)
let seek_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
// This might succeed (GStreamer may clamp to end) or fail - both are acceptable
println!("Seek beyond bounds result: {:?}", seek_result);
// The important thing is that we should detect the bounds beforehand
let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time);
if pipeline.query(&mut seek_query.get_mut().unwrap()) {
let (seekable, _start, end) = seek_query.result();
if seekable {
if let gstreamer::GenericFormattedValue::Time(Some(end_time)) = end {
let end_seconds = end_time.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Video duration: {:.2} seconds", end_seconds);
assert!(end_seconds > 4.0 && end_seconds < 6.0, "Should detect correct duration");
// Our application should prevent seeking beyond this
assert!(10.0 > end_seconds, "Should detect that 10s is beyond bounds");
}
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Bounds checking works correctly");
}
#[tokio::test]
async fn test_seek_flags_behavior() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 300i32) // 10 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Test different seek flag combinations
let test_cases = vec![
("FLUSH", gstreamer::SeekFlags::FLUSH),
("KEY_UNIT", gstreamer::SeekFlags::KEY_UNIT),
("FLUSH | KEY_UNIT", gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT),
("ACCURATE", gstreamer::SeekFlags::ACCURATE),
];
for (name, flags) in test_cases {
println!("Testing seek flags: {}", name);
let seek_time = 3.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
flags,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
// All flag combinations should work with videotestsrc
assert!(seek_result.is_ok(), "Seek with {} flags should succeed", name);
sleep(Duration::from_millis(100)).await;
// Verify position
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Position with {} flags: {:.2}s", name, position_seconds);
}
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Different seek flags work correctly");
}
#[tokio::test]
async fn test_segment_seeking() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 600i32) // 20 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Test segment seeking (start at 5s, stop at 10s)
let start_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let stop_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(start_time as u64))),
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(stop_time as u64))),
);
assert!(seek_result.is_ok(), "Segment seek should succeed");
sleep(Duration::from_millis(200)).await;
// Verify we're at the start of the segment
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Segment start position: {:.2}s", position_seconds);
assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Should be at segment start");
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Segment seeking works correctly");
}
#[tokio::test]
async fn test_multiple_consecutive_seeks() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 600i32) // 20 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Test multiple seeks in sequence
let seek_positions = vec![5.0, 10.0, 2.0, 15.0, 8.0];
for (i, &seek_pos) in seek_positions.iter().enumerate() {
println!("Performing seek {} to {:.1}s", i + 1, seek_pos);
let seek_time = seek_pos * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
assert!(seek_result.is_ok(), "Seek {} should succeed", i + 1);
// Small delay between seeks
sleep(Duration::from_millis(100)).await;
// Verify position
let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time);
if pipeline.query(&mut position_query) {
if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() {
let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64;
println!("Position after seek {}: {:.2}s", i + 1, position_seconds);
}
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Multiple consecutive seeks work correctly");
}
// This test would use create_seekable_test_video if encoders are available
#[tokio::test]
async fn test_file_based_seeking() {
let _ = gstreamer::init();
// Check if we have required encoders
if !gstreamer::ElementFactory::find("x264enc").is_some() ||
!gstreamer::ElementFactory::find("avenc_aac").is_some() {
println!("⚠️ Skipping file-based seeking test - encoders not available");
return;
}
let temp_dir = env::temp_dir();
let video_path = temp_dir.join("seekable_test.mp4");
// Create a test video file
println!("Creating test video file...");
let create_result = create_seekable_test_video(&video_path, 30).await; // 30 second video
assert!(create_result.is_ok(), "Should create test video successfully");
assert!(video_path.exists(), "Test video should exist");
// Test seeking on the actual file
println!("Testing seeking on file...");
let seek_test_result = test_seeking_with_state_management(video_path.to_str().unwrap(), 10.0).await;
assert!(seek_test_result.is_ok(), "File-based seeking should work: {:?}", seek_test_result.err());
// Test seeking to different positions
let test_positions = vec![0.0, 5.0, 15.0, 25.0, 29.0];
for position in test_positions {
let result = test_seeking_with_state_management(video_path.to_str().unwrap(), position).await;
assert!(result.is_ok(), "Should be able to seek to {}s: {:?}", position, result.err());
}
println!("✅ File-based seeking tests completed successfully");
}
#[tokio::test]
async fn test_seeking_error_recovery() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
// Use a source that might have seeking limitations
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 30i32) // Very short video (1 second)
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Test seeking way beyond the video length
let invalid_seek_time = 100.0 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(invalid_seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
// GStreamer might handle this gracefully or return an error
println!("Seek to invalid position result: {:?}", seek_result);
// The pipeline should still be functional after a failed/clamped seek
let valid_seek_time = 0.5 * gstreamer::ClockTime::SECOND.nseconds() as f64;
let recovery_seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(valid_seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
assert!(recovery_seek_result.is_ok(), "Should be able to seek to valid position after invalid seek");
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Error recovery in seeking works correctly");
}
#[tokio::test]
async fn test_async_state_change_handling() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
// Create a more complex pipeline that might have async state changes
let filesrc = gstreamer::ElementFactory::make("filesrc")
.property("location", "/dev/zero") // Infinite source on Linux
.build()
.expect("Failed to create filesrc");
let identity = gstreamer::ElementFactory::make("identity")
.build()
.expect("Failed to create identity");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&filesrc, &identity, &fakesink]).unwrap();
gstreamer::Element::link_many([&filesrc, &identity, &fakesink]).unwrap();
// Test async state change to PAUSED
let start = Instant::now();
let state_change_result = pipeline.set_state(gstreamer::State::Paused);
match state_change_result {
Ok(gstreamer::StateChangeSuccess::Success) => {
println!("State change was synchronous");
}
Ok(gstreamer::StateChangeSuccess::NoPreroll) => {
println!("State change completed with no preroll");
}
Ok(gstreamer::StateChangeSuccess::Async) => {
println!("State change is async, waiting for completion...");
let bus = pipeline.bus().unwrap();
let timeout = gstreamer::ClockTime::from_seconds(10);
let mut completed = false;
while let Some(msg) = bus.timed_pop_filtered(
Some(timeout),
&[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone]
) {
match msg.view() {
gstreamer::MessageView::Error(err) => {
panic!("Pipeline error: {}", err.error());
}
gstreamer::MessageView::AsyncDone(..) => {
println!("Async state change completed");
completed = true;
break;
}
gstreamer::MessageView::StateChanged(sc) => {
if sc.src() == Some(pipeline.upcast_ref()) {
println!("Pipeline state: {:?} -> {:?}", sc.old(), sc.current());
if sc.current() == gstreamer::State::Paused {
completed = true;
break;
}
}
}
_ => {}
}
}
assert!(completed, "Async state change should complete");
}
Err(e) => {
// This might fail on systems without /dev/zero, which is okay
println!("State change failed (expected on some systems): {}", e);
return;
}
}
let elapsed = start.elapsed();
println!("State change took: {:?}", elapsed);
// Test that we can query the pipeline now
let (_, current_state, _) = pipeline.state(None);
assert_eq!(current_state, gstreamer::State::Paused);
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("✅ Async state change handling works correctly");
}
}
/// Performance and stress tests for seeking functionality
#[cfg(test)]
mod performance_tests {
use super::*;
#[tokio::test]
#[ignore] // Heavy test, run manually
async fn test_seeking_performance() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 3000i32) // 100 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Benchmark seek performance
let seek_positions = vec![10.0, 50.0, 25.0, 75.0, 5.0, 90.0, 15.0, 60.0];
let mut seek_times = Vec::new();
for (i, &position) in seek_positions.iter().enumerate() {
let start = Instant::now();
let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
assert!(seek_result.is_ok(), "Seek {} should succeed", i);
// Wait for seek completion
sleep(Duration::from_millis(50)).await;
let elapsed = start.elapsed();
seek_times.push(elapsed);
println!("Seek {} to {:.1}s took: {:?}", i + 1, position, elapsed);
}
pipeline.set_state(gstreamer::State::Null).unwrap();
// Analyze performance
let total_time: Duration = seek_times.iter().sum();
let avg_time = total_time / seek_times.len() as u32;
let max_time = seek_times.iter().max().unwrap();
let min_time = seek_times.iter().min().unwrap();
println!("Seek Performance Summary:");
println!(" Average: {:?}", avg_time);
println!(" Min: {:?}", min_time);
println!(" Max: {:?}", max_time);
println!(" Total: {:?}", total_time);
// Performance assertions (these are reasonable expectations)
assert!(avg_time < Duration::from_millis(100), "Average seek should be under 100ms");
assert!(*max_time < Duration::from_millis(500), "No seek should take over 500ms");
println!("✅ Seeking performance is acceptable");
}
#[tokio::test]
#[ignore] // Stress test, run manually
async fn test_rapid_seeking_stress() {
let _ = gstreamer::init();
let pipeline = gstreamer::Pipeline::new();
let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
.property("num-buffers", 1800i32) // 60 seconds at 30fps
.build()
.expect("Failed to create videotestsrc");
let fakesink = gstreamer::ElementFactory::make("fakesink")
.build()
.expect("Failed to create fakesink");
pipeline.add_many([&videotestsrc, &fakesink]).unwrap();
videotestsrc.link(&fakesink).unwrap();
pipeline.set_state(gstreamer::State::Paused).unwrap();
sleep(Duration::from_millis(500)).await;
// Perform rapid seeks to stress test the pipeline
let mut successful_seeks = 0;
let mut failed_seeks = 0;
for i in 0..100 {
let position = (i % 50) as f64; // Seek between 0-50 seconds
let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64;
let seek_result = pipeline.seek(
1.0,
gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT,
gstreamer::SeekType::Set,
gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))),
gstreamer::SeekType::None,
gstreamer::GenericFormattedValue::Time(None),
);
if seek_result.is_ok() {
successful_seeks += 1;
} else {
failed_seeks += 1;
println!("Seek {} failed: {:?}", i, seek_result.err());
}
// Very small delay to simulate rapid seeking
sleep(Duration::from_millis(10)).await;
if i % 20 == 0 {
println!("Completed {} rapid seeks ({} successful, {} failed)", i + 1, successful_seeks, failed_seeks);
}
}
pipeline.set_state(gstreamer::State::Null).unwrap();
println!("Rapid seek stress test results:");
println!(" Successful: {}", successful_seeks);
println!(" Failed: {}", failed_seeks);
println!(" Success rate: {:.1}%", (successful_seeks as f64 / 100.0) * 100.0);
// We expect most seeks to succeed, but some failures are acceptable under stress
assert!(successful_seeks >= 80, "Should have at least 80% success rate under stress");
println!("✅ Rapid seeking stress test completed");
}
}