use gstreamer::prelude::*; use church_api::error::{ApiError, Result}; use std::path::Path; use std::env; use std::time::Duration; use tokio::time::{sleep, Instant}; /// Create a test video with known duration for seeking tests async fn create_seekable_test_video(path: &Path, duration_seconds: u32) -> Result<()> { // Initialize GStreamer gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?; let pipeline = gstreamer::Pipeline::new(); // Create a longer video with keyframes for reliable seeking let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", (duration_seconds * 30) as i32) // 30 fps .property("pattern", 0i32) // SMPTE bars .build() .map_err(|e| ApiError::Internal(format!("Failed to create videotestsrc: {}", e)))?; let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc") .property("num-buffers", (duration_seconds * 441) as i32) // 44.1kHz, 100 samples per buffer .property("freq", 440.0f64) .build() .map_err(|e| ApiError::Internal(format!("Failed to create audiotestsrc: {}", e)))?; // Use software encoders for consistent behavior across environments let x264enc = gstreamer::ElementFactory::make("x264enc") .property("bitrate", 1000u32) .property("speed-preset", 1u32) // ultrafast .property("key-int-max", 30u32) // Keyframe every 30 frames (1 second) .build() .map_err(|e| ApiError::Internal(format!("Failed to create x264enc: {}", e)))?; let audioconvert = gstreamer::ElementFactory::make("audioconvert") .build() .map_err(|e| ApiError::Internal(format!("Failed to create audioconvert: {}", e)))?; let avenc_aac = gstreamer::ElementFactory::make("avenc_aac") .property("bitrate", 128000i32) .build() .map_err(|e| ApiError::Internal(format!("Failed to create avenc_aac: {}", e)))?; let mp4mux = gstreamer::ElementFactory::make("mp4mux") .build() .map_err(|e| ApiError::Internal(format!("Failed to create mp4mux: {}", e)))?; let filesink = gstreamer::ElementFactory::make("filesink") .property("location", path.to_str().unwrap()) .build() .map_err(|e| ApiError::Internal(format!("Failed to create filesink: {}", e)))?; // Add elements and link pipeline.add_many([ &videotestsrc, &x264enc, &audiotestsrc, &audioconvert, &avenc_aac, &mp4mux, &filesink ]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?; // Link chains videotestsrc.link(&x264enc).map_err(|_| ApiError::Internal("Failed to link video chain".to_string()))?; x264enc.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link video to mux".to_string()))?; gstreamer::Element::link_many([&audiotestsrc, &audioconvert, &avenc_aac]).map_err(|_| ApiError::Internal("Failed to link audio chain".to_string()))?; avenc_aac.link(&mp4mux).map_err(|_| ApiError::Internal("Failed to link audio to mux".to_string()))?; mp4mux.link(&filesink).map_err(|_| ApiError::Internal("Failed to link mux to sink".to_string()))?; // Run pipeline to completion pipeline.set_state(gstreamer::State::Playing).map_err(|e| ApiError::Internal(format!("Failed to start pipeline: {:?}", e)))?; let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(60); match bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::Eos]) { Some(msg) => { match msg.view() { gstreamer::MessageView::Eos(..) => {}, gstreamer::MessageView::Error(err) => { return Err(ApiError::Internal(format!("GStreamer error: {}", err.error()))); } _ => {} } } None => return Err(ApiError::Internal("Video creation timed out".to_string())), } pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?; Ok(()) } /// Test seeking functionality with proper state management async fn test_seeking_with_state_management(source_path: &str, seek_position_seconds: f64) -> Result<()> { gstreamer::init().map_err(|e| ApiError::Internal(format!("Failed to init GStreamer: {}", e)))?; let pipeline = gstreamer::Pipeline::new(); // Create a simple playback pipeline for testing seeking let filesrc = gstreamer::ElementFactory::make("filesrc") .property("location", source_path) .build() .map_err(|e| ApiError::Internal(format!("Failed to create filesrc: {}", e)))?; let qtdemux = gstreamer::ElementFactory::make("qtdemux") .build() .map_err(|e| ApiError::Internal(format!("Failed to create qtdemux: {}", e)))?; let queue_video = gstreamer::ElementFactory::make("queue") .build() .map_err(|e| ApiError::Internal(format!("Failed to create video queue: {}", e)))?; let queue_audio = gstreamer::ElementFactory::make("queue") .build() .map_err(|e| ApiError::Internal(format!("Failed to create audio queue: {}", e)))?; let fakesink_video = gstreamer::ElementFactory::make("fakesink") .build() .map_err(|e| ApiError::Internal(format!("Failed to create video fakesink: {}", e)))?; let fakesink_audio = gstreamer::ElementFactory::make("fakesink") .build() .map_err(|e| ApiError::Internal(format!("Failed to create audio fakesink: {}", e)))?; pipeline.add_many([ &filesrc, &qtdemux, &queue_video, &queue_audio, &fakesink_video, &fakesink_audio ]).map_err(|e| ApiError::Internal(format!("Failed to add elements: {}", e)))?; filesrc.link(&qtdemux).map_err(|_| ApiError::Internal("Failed to link filesrc to qtdemux".to_string()))?; queue_video.link(&fakesink_video).map_err(|_| ApiError::Internal("Failed to link video queue".to_string()))?; queue_audio.link(&fakesink_audio).map_err(|_| ApiError::Internal("Failed to link audio queue".to_string()))?; // Handle dynamic pad connections let queue_video_clone = queue_video.clone(); let queue_audio_clone = queue_audio.clone(); qtdemux.connect_pad_added(move |_element, pad| { let pad_name = pad.name(); if pad_name.starts_with("video_") { let sink_pad = queue_video_clone.static_pad("sink").unwrap(); if !sink_pad.is_linked() { let _ = pad.link(&sink_pad); } } else if pad_name.starts_with("audio_") { let sink_pad = queue_audio_clone.static_pad("sink").unwrap(); if !sink_pad.is_linked() { let _ = pad.link(&sink_pad); } } }); // Step 1: Set pipeline to PAUSED state tracing::debug!("Setting pipeline to PAUSED for seeking"); let state_change_result = pipeline.set_state(gstreamer::State::Paused); match state_change_result { Ok(gstreamer::StateChangeSuccess::Success) => {}, Ok(gstreamer::StateChangeSuccess::NoPreroll) => { tracing::debug!("Pipeline state change completed with no preroll"); }, Ok(gstreamer::StateChangeSuccess::Async) => { // Wait for async state change let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(30); let mut state_changed = false; while let Some(msg) = bus.timed_pop_filtered( Some(timeout), &[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone] ) { match msg.view() { gstreamer::MessageView::Error(err) => { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal(format!("Pipeline error during state change: {}", err.error()))); } gstreamer::MessageView::AsyncDone(..) => { state_changed = true; break; } gstreamer::MessageView::StateChanged(state_change) => { if state_change.src() == Some(pipeline.upcast_ref()) && state_change.current() == gstreamer::State::Paused { state_changed = true; break; } } _ => {} } } if !state_changed { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal("Pipeline failed to reach PAUSED state".to_string())); } } Err(e) => { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal(format!("Failed to pause pipeline: {}", e))); } } // Step 2: Query seeking capabilities let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time); if !pipeline.query(&mut seek_query.get_mut().unwrap()) { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal("Failed to query seeking capabilities".to_string())); } let (seekable, start_pos, end_pos) = seek_query.result(); if !seekable { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal("Source is not seekable".to_string())); } // Step 3: Validate seek position let seek_ns = (seek_position_seconds * gstreamer::ClockTime::SECOND.nseconds() as f64) as u64; if let gstreamer::GenericFormattedValue::Time(Some(end)) = end_pos { if seek_ns >= end.nseconds() { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal("Seek position beyond range".to_string())); } } // Step 4: Perform seek let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_ns))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); if let Err(e) = seek_result { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal(format!("Seek failed: {:?}", e))); } // Step 5: Wait for seek completion let bus = pipeline.bus().unwrap(); let seek_timeout = gstreamer::ClockTime::from_seconds(10); while let Some(msg) = bus.timed_pop_filtered( Some(seek_timeout), &[gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone] ) { match msg.view() { gstreamer::MessageView::Error(err) => { let _ = pipeline.set_state(gstreamer::State::Null); return Err(ApiError::Internal(format!("Error during seek: {}", err.error()))); } gstreamer::MessageView::AsyncDone(..) => { break; } _ => {} } } // Step 6: Verify position let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(pos)) = position_query.result() { let pos_seconds = pos.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; tracing::info!("Seek completed. Position: {:.2}s (target: {:.2}s)", pos_seconds, seek_position_seconds); // Allow some tolerance for keyframe seeking let tolerance = 1.0; // 1 second tolerance if (pos_seconds - seek_position_seconds).abs() > tolerance { tracing::warn!("Seek position not accurate: got {:.2}s, expected {:.2}s", pos_seconds, seek_position_seconds); } } } // Cleanup pipeline.set_state(gstreamer::State::Null).map_err(|e| ApiError::Internal(format!("Failed to stop pipeline: {:?}", e)))?; Ok(()) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_gstreamer_seeking_initialization() { let result = gstreamer::init(); assert!(result.is_ok(), "GStreamer should initialize for seeking tests"); } #[tokio::test] async fn test_pipeline_state_transitions() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 100i32) .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); // Test state transitions required for seeking assert!(pipeline.set_state(gstreamer::State::Ready).is_ok()); assert!(pipeline.set_state(gstreamer::State::Paused).is_ok()); // Query current state let (_, current_state, _) = pipeline.state(Some(gstreamer::ClockTime::from_seconds(5))); assert_eq!(current_state, gstreamer::State::Paused); assert!(pipeline.set_state(gstreamer::State::Playing).is_ok()); assert!(pipeline.set_state(gstreamer::State::Paused).is_ok()); assert!(pipeline.set_state(gstreamer::State::Null).is_ok()); println!("✅ Pipeline state transitions work correctly"); } #[tokio::test] async fn test_seeking_capability_queries() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 300i32) // 10 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); // Set to paused to enable queries pipeline.set_state(gstreamer::State::Paused).unwrap(); // Wait for preroll let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(5); while let Some(msg) = bus.timed_pop_filtered(Some(timeout), &[gstreamer::MessageType::AsyncDone, gstreamer::MessageType::StateChanged]) { match msg.view() { gstreamer::MessageView::AsyncDone(..) => break, gstreamer::MessageView::StateChanged(sc) => { if sc.src() == Some(pipeline.upcast_ref()) && sc.current() == gstreamer::State::Paused { break; } } _ => {} } } // Test seeking query let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time); let query_success = pipeline.query(&mut seek_query.get_mut().unwrap()); assert!(query_success, "Seeking query should succeed"); let (seekable, start_pos, end_pos) = seek_query.result(); println!("Seekable: {}, Range: {:?} to {:?}", seekable, start_pos, end_pos); // videotestsrc should be seekable assert!(seekable, "videotestsrc should be seekable"); assert!(start_pos.is_some(), "Should have start position"); assert!(end_pos.is_some(), "Should have end position"); pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Seeking capability queries work correctly"); } #[tokio::test] async fn test_duration_and_position_queries() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 150i32) // 5 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); // Wait for preroll sleep(Duration::from_millis(500)).await; // Test duration query let mut duration_query = gstreamer::query::Duration::new(gstreamer::Format::Time); if pipeline.query(&mut duration_query) { if let gstreamer::GenericFormattedValue::Time(Some(duration)) = duration_query.result() { let duration_seconds = duration.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Duration: {:.2} seconds", duration_seconds); assert!(duration_seconds > 4.0 && duration_seconds < 6.0, "Duration should be approximately 5 seconds"); } } // Test position query let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() { let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Position: {:.2} seconds", position_seconds); assert!(position_seconds == 0.0, "Position should start at 0"); } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Duration and position queries work correctly"); } #[tokio::test] async fn test_basic_seeking_on_videotestsrc() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 300i32) // 10 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); // Set to paused for seeking pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Perform seek to 5 seconds let seek_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); assert!(seek_result.is_ok(), "Seek should succeed on videotestsrc"); // Wait for seek to complete sleep(Duration::from_millis(200)).await; // Verify position let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() { let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Position after seek: {:.2} seconds", position_seconds); // Allow some tolerance due to keyframe seeking assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Position should be around 5 seconds"); } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Basic seeking on videotestsrc works correctly"); } #[tokio::test] async fn test_seek_beyond_bounds_error_handling() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 150i32) // 5 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Try to seek beyond the video duration (10 seconds when video is 5 seconds) let seek_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); // This might succeed (GStreamer may clamp to end) or fail - both are acceptable println!("Seek beyond bounds result: {:?}", seek_result); // The important thing is that we should detect the bounds beforehand let mut seek_query = gstreamer::query::Seeking::new(gstreamer::Format::Time); if pipeline.query(&mut seek_query.get_mut().unwrap()) { let (seekable, _start, end) = seek_query.result(); if seekable { if let gstreamer::GenericFormattedValue::Time(Some(end_time)) = end { let end_seconds = end_time.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Video duration: {:.2} seconds", end_seconds); assert!(end_seconds > 4.0 && end_seconds < 6.0, "Should detect correct duration"); // Our application should prevent seeking beyond this assert!(10.0 > end_seconds, "Should detect that 10s is beyond bounds"); } } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Bounds checking works correctly"); } #[tokio::test] async fn test_seek_flags_behavior() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 300i32) // 10 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Test different seek flag combinations let test_cases = vec![ ("FLUSH", gstreamer::SeekFlags::FLUSH), ("KEY_UNIT", gstreamer::SeekFlags::KEY_UNIT), ("FLUSH | KEY_UNIT", gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT), ("ACCURATE", gstreamer::SeekFlags::ACCURATE), ]; for (name, flags) in test_cases { println!("Testing seek flags: {}", name); let seek_time = 3.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, flags, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); // All flag combinations should work with videotestsrc assert!(seek_result.is_ok(), "Seek with {} flags should succeed", name); sleep(Duration::from_millis(100)).await; // Verify position let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() { let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Position with {} flags: {:.2}s", name, position_seconds); } } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Different seek flags work correctly"); } #[tokio::test] async fn test_segment_seeking() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 600i32) // 20 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Test segment seeking (start at 5s, stop at 10s) let start_time = 5.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let stop_time = 10.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(start_time as u64))), gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(stop_time as u64))), ); assert!(seek_result.is_ok(), "Segment seek should succeed"); sleep(Duration::from_millis(200)).await; // Verify we're at the start of the segment let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() { let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Segment start position: {:.2}s", position_seconds); assert!(position_seconds >= 4.0 && position_seconds <= 6.0, "Should be at segment start"); } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Segment seeking works correctly"); } #[tokio::test] async fn test_multiple_consecutive_seeks() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 600i32) // 20 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Test multiple seeks in sequence let seek_positions = vec![5.0, 10.0, 2.0, 15.0, 8.0]; for (i, &seek_pos) in seek_positions.iter().enumerate() { println!("Performing seek {} to {:.1}s", i + 1, seek_pos); let seek_time = seek_pos * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); assert!(seek_result.is_ok(), "Seek {} should succeed", i + 1); // Small delay between seeks sleep(Duration::from_millis(100)).await; // Verify position let mut position_query = gstreamer::query::Position::new(gstreamer::Format::Time); if pipeline.query(&mut position_query) { if let gstreamer::GenericFormattedValue::Time(Some(position)) = position_query.result() { let position_seconds = position.nseconds() as f64 / gstreamer::ClockTime::SECOND.nseconds() as f64; println!("Position after seek {}: {:.2}s", i + 1, position_seconds); } } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Multiple consecutive seeks work correctly"); } // This test would use create_seekable_test_video if encoders are available #[tokio::test] async fn test_file_based_seeking() { let _ = gstreamer::init(); // Check if we have required encoders if !gstreamer::ElementFactory::find("x264enc").is_some() || !gstreamer::ElementFactory::find("avenc_aac").is_some() { println!("⚠️ Skipping file-based seeking test - encoders not available"); return; } let temp_dir = env::temp_dir(); let video_path = temp_dir.join("seekable_test.mp4"); // Create a test video file println!("Creating test video file..."); let create_result = create_seekable_test_video(&video_path, 30).await; // 30 second video assert!(create_result.is_ok(), "Should create test video successfully"); assert!(video_path.exists(), "Test video should exist"); // Test seeking on the actual file println!("Testing seeking on file..."); let seek_test_result = test_seeking_with_state_management(video_path.to_str().unwrap(), 10.0).await; assert!(seek_test_result.is_ok(), "File-based seeking should work: {:?}", seek_test_result.err()); // Test seeking to different positions let test_positions = vec![0.0, 5.0, 15.0, 25.0, 29.0]; for position in test_positions { let result = test_seeking_with_state_management(video_path.to_str().unwrap(), position).await; assert!(result.is_ok(), "Should be able to seek to {}s: {:?}", position, result.err()); } println!("✅ File-based seeking tests completed successfully"); } #[tokio::test] async fn test_seeking_error_recovery() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); // Use a source that might have seeking limitations let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 30i32) // Very short video (1 second) .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Test seeking way beyond the video length let invalid_seek_time = 100.0 * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(invalid_seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); // GStreamer might handle this gracefully or return an error println!("Seek to invalid position result: {:?}", seek_result); // The pipeline should still be functional after a failed/clamped seek let valid_seek_time = 0.5 * gstreamer::ClockTime::SECOND.nseconds() as f64; let recovery_seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(valid_seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); assert!(recovery_seek_result.is_ok(), "Should be able to seek to valid position after invalid seek"); pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Error recovery in seeking works correctly"); } #[tokio::test] async fn test_async_state_change_handling() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); // Create a more complex pipeline that might have async state changes let filesrc = gstreamer::ElementFactory::make("filesrc") .property("location", "/dev/zero") // Infinite source on Linux .build() .expect("Failed to create filesrc"); let identity = gstreamer::ElementFactory::make("identity") .build() .expect("Failed to create identity"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&filesrc, &identity, &fakesink]).unwrap(); gstreamer::Element::link_many([&filesrc, &identity, &fakesink]).unwrap(); // Test async state change to PAUSED let start = Instant::now(); let state_change_result = pipeline.set_state(gstreamer::State::Paused); match state_change_result { Ok(gstreamer::StateChangeSuccess::Success) => { println!("State change was synchronous"); } Ok(gstreamer::StateChangeSuccess::NoPreroll) => { println!("State change completed with no preroll"); } Ok(gstreamer::StateChangeSuccess::Async) => { println!("State change is async, waiting for completion..."); let bus = pipeline.bus().unwrap(); let timeout = gstreamer::ClockTime::from_seconds(10); let mut completed = false; while let Some(msg) = bus.timed_pop_filtered( Some(timeout), &[gstreamer::MessageType::StateChanged, gstreamer::MessageType::Error, gstreamer::MessageType::AsyncDone] ) { match msg.view() { gstreamer::MessageView::Error(err) => { panic!("Pipeline error: {}", err.error()); } gstreamer::MessageView::AsyncDone(..) => { println!("Async state change completed"); completed = true; break; } gstreamer::MessageView::StateChanged(sc) => { if sc.src() == Some(pipeline.upcast_ref()) { println!("Pipeline state: {:?} -> {:?}", sc.old(), sc.current()); if sc.current() == gstreamer::State::Paused { completed = true; break; } } } _ => {} } } assert!(completed, "Async state change should complete"); } Err(e) => { // This might fail on systems without /dev/zero, which is okay println!("State change failed (expected on some systems): {}", e); return; } } let elapsed = start.elapsed(); println!("State change took: {:?}", elapsed); // Test that we can query the pipeline now let (_, current_state, _) = pipeline.state(None); assert_eq!(current_state, gstreamer::State::Paused); pipeline.set_state(gstreamer::State::Null).unwrap(); println!("✅ Async state change handling works correctly"); } } /// Performance and stress tests for seeking functionality #[cfg(test)] mod performance_tests { use super::*; #[tokio::test] #[ignore] // Heavy test, run manually async fn test_seeking_performance() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 3000i32) // 100 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Benchmark seek performance let seek_positions = vec![10.0, 50.0, 25.0, 75.0, 5.0, 90.0, 15.0, 60.0]; let mut seek_times = Vec::new(); for (i, &position) in seek_positions.iter().enumerate() { let start = Instant::now(); let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); assert!(seek_result.is_ok(), "Seek {} should succeed", i); // Wait for seek completion sleep(Duration::from_millis(50)).await; let elapsed = start.elapsed(); seek_times.push(elapsed); println!("Seek {} to {:.1}s took: {:?}", i + 1, position, elapsed); } pipeline.set_state(gstreamer::State::Null).unwrap(); // Analyze performance let total_time: Duration = seek_times.iter().sum(); let avg_time = total_time / seek_times.len() as u32; let max_time = seek_times.iter().max().unwrap(); let min_time = seek_times.iter().min().unwrap(); println!("Seek Performance Summary:"); println!(" Average: {:?}", avg_time); println!(" Min: {:?}", min_time); println!(" Max: {:?}", max_time); println!(" Total: {:?}", total_time); // Performance assertions (these are reasonable expectations) assert!(avg_time < Duration::from_millis(100), "Average seek should be under 100ms"); assert!(*max_time < Duration::from_millis(500), "No seek should take over 500ms"); println!("✅ Seeking performance is acceptable"); } #[tokio::test] #[ignore] // Stress test, run manually async fn test_rapid_seeking_stress() { let _ = gstreamer::init(); let pipeline = gstreamer::Pipeline::new(); let videotestsrc = gstreamer::ElementFactory::make("videotestsrc") .property("num-buffers", 1800i32) // 60 seconds at 30fps .build() .expect("Failed to create videotestsrc"); let fakesink = gstreamer::ElementFactory::make("fakesink") .build() .expect("Failed to create fakesink"); pipeline.add_many([&videotestsrc, &fakesink]).unwrap(); videotestsrc.link(&fakesink).unwrap(); pipeline.set_state(gstreamer::State::Paused).unwrap(); sleep(Duration::from_millis(500)).await; // Perform rapid seeks to stress test the pipeline let mut successful_seeks = 0; let mut failed_seeks = 0; for i in 0..100 { let position = (i % 50) as f64; // Seek between 0-50 seconds let seek_time = position * gstreamer::ClockTime::SECOND.nseconds() as f64; let seek_result = pipeline.seek( 1.0, gstreamer::SeekFlags::FLUSH | gstreamer::SeekFlags::KEY_UNIT, gstreamer::SeekType::Set, gstreamer::GenericFormattedValue::Time(Some(gstreamer::ClockTime::from_nseconds(seek_time as u64))), gstreamer::SeekType::None, gstreamer::GenericFormattedValue::Time(None), ); if seek_result.is_ok() { successful_seeks += 1; } else { failed_seeks += 1; println!("Seek {} failed: {:?}", i, seek_result.err()); } // Very small delay to simulate rapid seeking sleep(Duration::from_millis(10)).await; if i % 20 == 0 { println!("Completed {} rapid seeks ({} successful, {} failed)", i + 1, successful_seeks, failed_seeks); } } pipeline.set_state(gstreamer::State::Null).unwrap(); println!("Rapid seek stress test results:"); println!(" Successful: {}", successful_seeks); println!(" Failed: {}", failed_seeks); println!(" Success rate: {:.1}%", (successful_seeks as f64 / 100.0) * 100.0); // We expect most seeks to succeed, but some failures are acceptable under stress assert!(successful_seeks >= 80, "Should have at least 80% success rate under stress"); println!("✅ Rapid seeking stress test completed"); } }