Skip to content

Commit

Permalink
feat: test with ffmpeg (no offer generated)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wkkkkk committed Aug 7, 2023
1 parent dc41ba0 commit 08ec3a0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 70 deletions.
8 changes: 7 additions & 1 deletion scripts/gst-commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ gst-launch-1.0 srtsrc uri="srt://127.0.0.1:1234" ! tsdemux ! queue ! h264parse
### Video & Audio
#### producer
gst-launch-1.0 -v \
videotestsrc ! clockoverlay ! video/x-raw, height=360, width=640 ! videoconvert ! x264enc tune=zerolatency ! video/x-h264, profile=main ! mux. \
videotestsrc ! clockoverlay ! video/x-raw, height=360, width=640 ! videoconvert ! x264enc tune=zerolatency ! video/x-h264, profile=constrained-baseline ! mux. \
audiotestsrc ! audio/x-raw, format=S16LE, channels=2, rate=44100 ! audioconvert ! voaacenc ! aacparse ! mux. \
mpegtsmux name=mux ! queue ! srtsink uri="srt://127.0.0.1:1234?mode=caller" wait-for-connection=false

gst-launch-1.0 -v \
Expand Down Expand Up @@ -70,3 +71,8 @@ gst-launch-1.0 -v whepsrc whep-endpoint="http://localhost:8000/subscriptions" \
rtpmp2tdepay ! decodebin name=d \
d. ! queue ! autovideosink sync=false \
d. ! queue ! audioconvert ! autoaudiosink sync=false


## FFmpeg cmds
ffmpeg -f lavfi -i testsrc=size=640x360:rate=30 -f lavfi -i sine=frequency=440 -pix_fmt yuv420p -c:v libx264 -vprofile main -c:a aac -f mpegts "srt://127.0.0.1:1234?mode=caller"
ffmpeg -f lavfi -i sine=frequency=440 -c:a aac -f mpegts "srt://127.0.0.1:1234?mode=caller"
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::signal;
use tokio::task;
use tokio::time::{sleep, Duration};

/// Run a pipeline until it encounters EOS or an error. Clean up the pipeline after it finishes.
/// This function can be called multiple times to handle EOS.
Expand Down Expand Up @@ -57,6 +58,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

// Reset and rerun the pipeline when it encounters EOS
sleep(Duration::from_secs(3)).await;
tracing::error!("Pipeline reaches EOS. Reset and rerun the pipeline");
}

// Stop the pipeline when the thread is aborted
Expand Down
151 changes: 82 additions & 69 deletions src/stream/gst_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,24 @@ impl PipelineBase for SharablePipeline {
let demux = pipeline
.by_name("demux")
.ok_or(MyError::MissingElement("demux".to_string()))?;
let queue_video: gst::Element = gst::ElementFactory::make("queue")
.name("video-queue-".to_string() + &id)
.build()?;
let queue_audio: gst::Element = gst::ElementFactory::make("queue")
.name("audio-queue-".to_string() + &id)
.build()?;
if !demux
.pads()
.into_iter()
.any(|pad| pad.name().starts_with("video") || pad.name().starts_with("audio"))
{
tracing::error!("Demux has no pad available. No connection can be added.");
return Err(MyError::FailedOperation("No available stream".to_string()).into());
}

// Create whip sink when demux has at least one video or audio pad
let whipsink = gst::ElementFactory::make("whipsink")
.name("whip-sink-".to_string() + &id)
.property(
"whip-endpoint",
format!("http://localhost:{}/whip_sink/{}", pipeline_state.port, id),
)
.build()?;
pipeline.add_many(&[&queue_video, &queue_audio, &whipsink])?;
pipeline.add_many(&[&whipsink])?;

if demux
.pads()
Expand All @@ -92,9 +96,13 @@ impl PipelineBase for SharablePipeline {
let output_tee_video = pipeline
.by_name("output_tee_video")
.ok_or(MyError::MissingElement("output_tee_video".to_string()))?;
let queue_video: gst::Element = gst::ElementFactory::make("queue")
.name("video-queue-".to_string() + &id)
.build()?;
pipeline.add_many(&[&queue_video])?;
gst::Element::link_many(&[&output_tee_video, &queue_video, &whipsink])?;

let video_elements = &[&output_tee_video, &queue_video, &whipsink];
let video_elements = &[&output_tee_video, &queue_video];
for e in video_elements {
e.sync_state_with_parent()?;
}
Expand All @@ -110,24 +118,22 @@ impl PipelineBase for SharablePipeline {
let output_tee_audio = pipeline
.by_name("output_tee_audio")
.ok_or(MyError::MissingElement("output_tee_audio".to_string()))?;
let queue_audio: gst::Element = gst::ElementFactory::make("queue")
.name("audio-queue-".to_string() + &id)
.build()?;
pipeline.add_many(&[&queue_audio])?;
gst::Element::link_many(&[&output_tee_audio, &queue_audio, &whipsink])?;

let audio_elements = &[&output_tee_audio, &queue_audio, &whipsink];
let audio_elements = &[&output_tee_audio, &queue_audio];
for e in audio_elements {
e.sync_state_with_parent()?;
}

tracing::debug!("Successfully linked audio to whip sink");
}

if !demux
.pads()
.into_iter()
.any(|pad| pad.name().starts_with("video") || pad.name().starts_with("audio"))
{
tracing::error!("No pad's name starts with 'video' or 'audio'");
return Err(MyError::MissingElement("video or audio pad".to_string()).into());
}
whipsink.sync_state_with_parent()?;
demux.sync_state_with_parent()?;

Ok(())
}
Expand All @@ -146,67 +152,74 @@ impl PipelineBase for SharablePipeline {
let pipeline = pipeline_state.pipeline.as_ref().unwrap();
tracing::debug!("Remove connection {} from pipeline", id);

let video_tee_name = "output_tee_video";
let audio_tee_name = "output_tee_audio";
let video_queue_name = "video-queue-".to_string() + &id;
let audio_queue_name = "audio-queue-".to_string() + &id;
let whip_sink_name = "whip-sink-".to_string() + &id;

// Try to remove branch from pipeline
// Return Ok if branch is removed
let remove_branch_from_pipeline =
|pipeline: &Pipeline, tee_name: &str, queue_name: &str| -> Result<(), Error> {
tracing::debug!("Removing queue {} from pipeline", queue_name);
let queue = pipeline
.by_name(queue_name)
.ok_or(MyError::MissingElement(queue_name.to_string()))?;
let queue_sink_pad =
queue
.static_pad("sink")
.ok_or(MyError::MissingElement(format!(
"{}'s sink pad",
queue_name
)))?;

// Remove src pad from tee if queue is linked
let name = queue_name.to_string();
if queue_sink_pad.is_linked() {
let tee_src_pad = queue_sink_pad
.peer()
.ok_or(MyError::MissingElement(format!("{}'s src pad", tee_name)))?;
tracing::debug!("Queue {} is linked", queue_name);

let pipeline_weak = pipeline.downgrade();
// Block tee's source pad with a pad probe.
// the probe callback is called as soon as the pad becomes idle
tee_src_pad.add_probe(gst::PadProbeType::IDLE, move |_pad, _info| {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
// do nothing if pipeline is already dropped
None => return gst::PadProbeReturn::Ok,
};

if queue.set_state(gst::State::Null).is_ok()
&& pipeline.remove(&queue).is_ok()
{
tracing::debug!("Queue {} is removed from pipeline", name);
} else {
tracing::error!("Failed to remove queue {} from pipeline", name);
}
// Return Ok if branch is removed (or not exist)
let remove_branch_from_pipeline = |pipeline: &Pipeline,
queue_name: &str|
-> Result<(), Error> {
tracing::debug!("Removing queue {} from pipeline", queue_name);
// Check if queue exists
let queue = pipeline.by_name(queue_name);
if queue.is_none() {
tracing::warn!("Queue {} does not exist", queue_name);
return Ok(());
}

// drop src pad afterwards
gst::PadProbeReturn::Drop
});
} else {
tracing::error!("Queue {} is not linked", name);
}
let queue = queue.unwrap();
let queue_sink_pad =
queue
.static_pad("sink")
.ok_or(MyError::MissingElement(format!(
"{}'s sink pad",
queue_name
)))?;

// Remove src pad from tee if queue is linked
let name = queue_name.to_string();
if queue_sink_pad.is_linked() {
let tee_src_pad = queue_sink_pad
.peer()
.ok_or(MyError::MissingElement("tee's src pad".to_string()))?;

let pipeline_weak = pipeline.downgrade();
// Block tee's source pad with a pad probe.
// the probe callback is called as soon as the pad becomes idle
tee_src_pad.add_probe(gst::PadProbeType::IDLE, move |_pad, _info| {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
// drop pad if pipeline is already dropped
None => return gst::PadProbeReturn::Drop,
};

if queue.set_state(gst::State::Null).is_ok() && pipeline.remove(&queue).is_ok()
{
tracing::debug!("Queue {} is removed from pipeline", name);
} else {
tracing::error!("Failed to remove queue {} from pipeline", name);
}

Ok(())
};
// remove src pad afterwards
gst::PadProbeReturn::Remove
});
} else {
return Err(MyError::FailedOperation(format!(
"Queue {} is not linked and can not be removed.",
name
))
.into());
}

Ok(())
};

// TODO: check if the order would cause blocking issues when removing branches
remove_branch_from_pipeline(pipeline, audio_tee_name, &audio_queue_name)?;
remove_branch_from_pipeline(pipeline, video_tee_name, &video_queue_name)?;
// TODO: pause the demux and resume it after removing branches
remove_branch_from_pipeline(pipeline, &video_queue_name)?;
remove_branch_from_pipeline(pipeline, &audio_queue_name)?;

// Remove whip sink from pipeline
// If whip sink fails to send offer, it is removed from
Expand Down

0 comments on commit 08ec3a0

Please sign in to comment.