From 2ad086cf1919f3cfd282477f2bff16e0b74e2938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 24 Oct 2023 15:58:34 -0300 Subject: [PATCH 1/3] cargo: Add derivative crate --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 1 + 2 files changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 5cb22058..2078da13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1006,6 +1006,17 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f578e8e2c440e7297e008bb5486a3a8a194775224bbc23729b0dbdfaeebf162e" +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote 1.0.33", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2273,6 +2284,7 @@ dependencies = [ "cached", "chrono", "clap", + "derivative", "directories", "enum_dispatch", "futures", diff --git a/Cargo.toml b/Cargo.toml index b6bf7464..3448d33d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ enum_dispatch = "0.3.12" uuid = { version = "0.8", features = ["v4", "serde"] } ts-rs = "7.0" cached = { version = "0.45", features = ["serde", "serde_json", "async_tokio_rt_multi_thread"] } +derivative = "2.2.0" ## Mavlink mavlink = { version = "0.10.1", features = ["default", "emit-extensions"] } From 6387d1d3fd4136bd947e7f8b2fd6ea0b669cf4fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 24 Oct 2023 15:37:22 -0300 Subject: [PATCH 2/3] src: stream, video: Improve debug --- src/stream/pipeline/mod.rs | 13 ++++--- src/stream/sink/image_sink.rs | 28 ++++++++++----- src/stream/sink/rtsp_sink.rs | 23 ++++++++----- src/stream/sink/udp_sink.rs | 21 ++++++++---- src/stream/sink/webrtc_sink.rs | 55 +++++++++++++++++------------- src/video/video_source.rs | 7 +++- src/video/video_source_gst.rs | 12 +++++++ src/video/video_source_local.rs | 23 ++++++++++++- src/video/video_source_redirect.rs | 12 +++++++ src/video/xml.rs | 3 ++ 10 files changed, 142 insertions(+), 55 deletions(-) diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 260dbd77..f25fdd42 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -77,22 +77,25 @@ impl Pipeline { }) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub fn add_sink(&mut self, sink: Sink) -> Result<()> { self.inner_state_mut().add_sink(sink) } #[allow(dead_code)] // This functions is reserved here for when we start dynamically add/remove Sinks - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub fn remove_sink(&mut self, sink_id: &uuid::Uuid) -> Result<()> { self.inner_state_mut().remove_sink(sink_id) } } -#[derive(Debug)] +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct PipelineState { pub pipeline_id: uuid::Uuid, + #[derivative(Debug = "ignore")] pub pipeline: gst::Pipeline, + #[derivative(Debug = "ignore")] pub sink_tee: gst::Element, pub sinks: HashMap, pub pipeline_runner: PipelineRunner, @@ -140,7 +143,7 @@ impl PipelineState { } /// Links the sink pad from the given Sink to this Pipeline's Tee element - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub fn add_sink(&mut self, mut sink: Sink) -> Result<()> { let pipeline_id = &self.pipeline_id; @@ -213,7 +216,7 @@ impl PipelineState { /// Unlinks the src pad from this Sink from the given sink pad of a Tee element /// /// Important notes about pad unlinking: [here](https://gstreamer.freedesktop.org/documentation/application-development/advanced/pipeline-manipulation.html?gi-language=c#dynamically-changing-the-pipeline) - #[instrument(level = "info", skip(self))] + #[instrument(level = "info")] pub fn remove_sink(&mut self, sink_id: &uuid::Uuid) -> Result<()> { let pipeline_id = &self.pipeline_id; let sink = self.sinks.remove(sink_id).context(format!( diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 9c0bf4bd..7e4d575b 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -57,23 +57,33 @@ impl CachedThumbnails { } } -#[derive(Debug)] +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct ImageSink { sink_id: uuid::Uuid, + #[derivative(Debug = "ignore")] pipeline: gst::Pipeline, + #[derivative(Debug = "ignore")] queue: gst::Element, + #[derivative(Debug = "ignore")] proxysink: gst::Element, + #[derivative(Debug = "ignore")] _proxysrc: gst::Element, + #[derivative(Debug = "ignore")] _transcoding_elements: Vec, + #[derivative(Debug = "ignore")] appsink: gst_app::AppSink, + #[derivative(Debug = "ignore")] tee_src_pad: Option, + #[derivative(Debug = "ignore")] flat_samples_sender: tokio::sync::broadcast::Sender>>>, + #[derivative(Debug = "ignore")] pad_blocker: Arc>>, pipeline_runner: PipelineRunner, thumbnails: Arc>, } impl SinkInterface for ImageSink { - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn link( &mut self, pipeline: &gst::Pipeline, @@ -207,7 +217,7 @@ impl SinkInterface for ImageSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); @@ -265,24 +275,24 @@ impl SinkInterface for ImageSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn get_id(&self) -> uuid::Uuid { self.sink_id } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] fn get_sdp(&self) -> Result { Err(anyhow!( "Not available. Reason: Image Sink doesn't provide endpoints" )) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn start(&self) -> Result<()> { self.pipeline_runner.start() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn eos(&self) { if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) { error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); @@ -532,7 +542,7 @@ impl ImageSink { }) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] async fn try_get_flat_sample(&self) -> Result>> { // Play the pipeline if it's not playing yet. // Here we can ignore the result because we have a timeout when waiting for the snapshot @@ -614,7 +624,7 @@ impl ImageSink { Ok(thumbnail) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub async fn make_jpeg_thumbnail_from_last_frame( &self, quality: u8, diff --git a/src/stream/sink/rtsp_sink.rs b/src/stream/sink/rtsp_sink.rs index 50c713d1..cf3885bb 100644 --- a/src/stream/sink/rtsp_sink.rs +++ b/src/stream/sink/rtsp_sink.rs @@ -6,18 +6,23 @@ use gst::prelude::*; use super::SinkInterface; -#[derive(Debug)] +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct RtspSink { sink_id: uuid::Uuid, + #[derivative(Debug = "ignore")] queue: gst::Element, + #[derivative(Debug = "ignore")] sink: gst::Element, + #[derivative(Debug = "ignore")] sink_sink_pad: gst::Pad, + #[derivative(Debug = "ignore")] tee_src_pad: Option, path: String, socket_path: String, } impl SinkInterface for RtspSink { - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn link( &mut self, pipeline: &gst::Pipeline, @@ -144,7 +149,7 @@ impl SinkInterface for RtspSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { if let Err(error) = std::fs::remove_file(&self.socket_path) { warn!("Failed removing the RTSP Sink socket file. Reason: {error:?}"); @@ -201,24 +206,24 @@ impl SinkInterface for RtspSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn get_id(&self) -> uuid::Uuid { self.sink_id } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] fn get_sdp(&self) -> Result { Err(anyhow!( "Not available. Reason: RTSP Sink should only be connected from its RTSP endpoint." )) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn start(&self) -> Result<()> { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn eos(&self) {} } @@ -259,12 +264,12 @@ impl RtspSink { }) } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] pub fn path(&self) -> String { self.path.clone() } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] pub fn socket_path(&self) -> String { self.socket_path.clone() } diff --git a/src/stream/sink/udp_sink.rs b/src/stream/sink/udp_sink.rs index cc09ee83..d5e087b8 100644 --- a/src/stream/sink/udp_sink.rs +++ b/src/stream/sink/udp_sink.rs @@ -7,21 +7,28 @@ use gst::prelude::*; use super::SinkInterface; use crate::stream::pipeline::runner::PipelineRunner; -#[derive(Debug)] +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct UdpSink { sink_id: uuid::Uuid, pipeline: gst::Pipeline, + #[derivative(Debug = "ignore")] queue: gst::Element, + #[derivative(Debug = "ignore")] proxysink: gst::Element, + #[derivative(Debug = "ignore")] _proxysrc: gst::Element, + #[derivative(Debug = "ignore")] _udpsink: gst::Element, + #[derivative(Debug = "ignore")] udpsink_sink_pad: gst::Pad, + #[derivative(Debug = "ignore")] tee_src_pad: Option, addresses: Vec, pipeline_runner: PipelineRunner, } impl SinkInterface for UdpSink { - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn link( &mut self, pipeline: &gst::Pipeline, @@ -155,7 +162,7 @@ impl SinkInterface for UdpSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); @@ -213,12 +220,12 @@ impl SinkInterface for UdpSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn get_id(&self) -> uuid::Uuid { self.sink_id } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn get_sdp(&self) -> Result { let caps = self .udpsink_sink_pad @@ -257,12 +264,12 @@ impl SinkInterface for UdpSink { Ok(sdp) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn start(&self) -> Result<()> { self.pipeline_runner.start() } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn eos(&self) { if let Err(error) = self.pipeline.post_message(gst::message::Eos::new()) { error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); diff --git a/src/stream/sink/webrtc_sink.rs b/src/stream/sink/webrtc_sink.rs index f253ac5b..def36b3e 100644 --- a/src/stream/sink/webrtc_sink.rs +++ b/src/stream/sink/webrtc_sink.rs @@ -21,11 +21,16 @@ pub struct WebRTCSinkWeakProxy { sender: WeakUnboundedSender>, } -#[derive(Debug)] +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct WebRTCSink { + #[derivative(Debug = "ignore")] pub queue: gst::Element, + #[derivative(Debug = "ignore")] pub webrtcbin: gst::Element, + #[derivative(Debug = "ignore")] pub webrtcbin_sink_pad: gst::Pad, + #[derivative(Debug = "ignore")] pub tee_src_pad: Option, pub bind: BindAnswer, /// MPSC channel's sender to send messages to the respective Websocket from Signaller server. Err can be used to end the WebSocket. @@ -33,7 +38,7 @@ pub struct WebRTCSink { pub end_reason: Option, } impl SinkInterface for WebRTCSink { - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn link( self: &mut WebRTCSink, pipeline: &gst::Pipeline, @@ -192,7 +197,7 @@ impl SinkInterface for WebRTCSink { let bind = bind.clone(); let state = transport.state(); - debug!("DTLS Transport Connection changed to {state:#?}"); + debug!("DTLS Transport Connection changed"); match state { Failed | Closed => { if let Some(webrtcbin) = webrtcbin_clone.upgrade() { @@ -223,7 +228,7 @@ impl SinkInterface for WebRTCSink { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &uuid::Uuid) -> Result<()> { let Some(tee_src_pad) = &self.tee_src_pad else { warn!("Tried to unlink Sink from a pipeline without a Tee src pad."); @@ -276,24 +281,24 @@ impl SinkInterface for WebRTCSink { Ok(()) } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] fn get_id(&self) -> uuid::Uuid { self.bind.session_id } - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace")] fn get_sdp(&self) -> Result { Err(anyhow!( "Not available: WebRTC Sink should only be connected by means of its Signalling protocol." )) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn start(&self) -> Result<()> { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn eos(&self) { if let Err(error) = self.webrtcbin.post_message(gst::message::Eos::new()) { error!("Failed posting Eos message into Sink bus. Reason: {error:?}"); @@ -419,7 +424,7 @@ impl WebRTCSink { Ok(this) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] fn downgrade(&self) -> WebRTCSinkWeakProxy { WebRTCSinkWeakProxy { bind: self.bind.clone(), @@ -427,12 +432,12 @@ impl WebRTCSink { } } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub fn handle_sdp(&self, sdp: &gst_webrtc::WebRTCSessionDescription) -> Result<()> { self.downgrade().handle_sdp(&self.webrtcbin, sdp) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug")] pub fn handle_ice(&self, sdp_m_line_index: &u32, candidate: &str) -> Result<()> { self.downgrade() .handle_ice(&self.webrtcbin, sdp_m_line_index, candidate) @@ -443,7 +448,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask // for a new offer SDP from webrtcbin without any customization and then // asynchronously send it to the peer via the WebSocket connection - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, webrtcbin))] fn on_negotiation_needed(&self, webrtcbin: &gst::Element) -> Result<()> { let this = self.clone(); let webrtcbin_weak = webrtcbin.downgrade(); @@ -486,7 +491,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the // WebSocket connection - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, webrtcbin))] fn on_offer_created( &self, webrtcbin: &gst::Element, @@ -522,7 +527,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the // WebSocket connection - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, _webrtcbin))] fn on_answer_created( &self, _webrtcbin: &gst::Element, @@ -556,7 +561,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, _webrtcbin))] fn on_ice_candidate( &self, _webrtcbin: &gst::Element, @@ -580,12 +585,12 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { .send(Ok(message)) .context("Failed to send ICE candidate")?; - debug!("ICE candidate created!"); + debug!("ICE candidate created"); Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, _webrtcbin))] fn on_ice_gathering_state_change( &self, _webrtcbin: &gst::Element, @@ -598,7 +603,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, webrtcbin))] fn on_ice_connection_state_change( &self, webrtcbin: &gst::Element, @@ -606,7 +611,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { ) -> Result<()> { use gst_webrtc::WebRTCICEConnectionState::*; - debug!("ICE connection changed to {state:#?}"); + debug!("ICE connection changed"); match state { Completed => { let srcpads = webrtcbin.src_pads(); @@ -631,7 +636,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, _webrtcbin))] fn on_connection_state_change( &self, _webrtcbin: &gst::Element, @@ -639,7 +644,7 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { ) -> Result<()> { use gst_webrtc::WebRTCPeerConnectionState::*; - debug!("Connection changed to {state:#?}"); + debug!("Connection changed"); match state { // TODO: This would be the desired workflow, but it is not being detected, so we are using a workaround connecting direcly to the DTLS Transport connection state in the Session constructor. Disconnected | Failed | Closed => { @@ -652,23 +657,27 @@ impl WebRTCBinInterface for WebRTCSinkWeakProxy { Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, webrtcbin))] fn handle_sdp( &self, webrtcbin: &gst::Element, sdp: &gst_webrtc::WebRTCSessionDescription, ) -> Result<()> { + debug!("Received a new SDP"); + webrtcbin.emit_by_name::<()>("set-remote-description", &[&sdp, &None::]); Ok(()) } - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self, webrtcbin))] fn handle_ice( &self, webrtcbin: &gst::Element, sdp_m_line_index: &u32, candidate: &str, ) -> Result<()> { + debug!("Received a new ICE Candidate"); + webrtcbin.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); Ok(()) } diff --git a/src/video/video_source.rs b/src/video/video_source.rs index 48b01914..2edb7f26 100644 --- a/src/video/video_source.rs +++ b/src/video/video_source.rs @@ -4,7 +4,7 @@ use super::video_source_local::VideoSourceLocal; use super::video_source_redirect::VideoSourceRedirect; use tracing::*; -pub trait VideoSource { +pub trait VideoSource: std::fmt::Debug { fn name(&self) -> &String; fn source_string(&self) -> &str; fn formats(&self) -> Vec; @@ -21,6 +21,7 @@ pub trait VideoSourceAvailable { fn cameras_available() -> Vec; } +#[instrument(level = "debug")] pub fn cameras_available() -> Vec { [ &VideoSourceLocal::cameras_available()[..], @@ -30,6 +31,7 @@ pub fn cameras_available() -> Vec { .concat() } +#[instrument(level = "debug")] pub fn get_video_source(source_string: &str) -> Result { let cameras = cameras_available(); @@ -53,12 +55,14 @@ pub fn get_video_source(source_string: &str) -> Result std::io::Result<()> { let camera = get_video_source(source_string)?; debug!("Set camera ({source_string}) control ({control_id}) value ({value})."); return camera.inner().set_control_by_id(control_id, value); } +#[instrument(level = "debug")] pub fn reset_controls(source_string: &str) -> Result<(), Vec> { let camera = match get_video_source(source_string) { Ok(camera) => camera, @@ -103,6 +107,7 @@ mod tests { use super::*; #[test] + #[instrument(level = "debug")] fn simple_test() { println!("{:#?}", cameras_available()); } diff --git a/src/video/video_source_gst.rs b/src/video/video_source_gst.rs index 63823db2..69a87904 100644 --- a/src/video/video_source_gst.rs +++ b/src/video/video_source_gst.rs @@ -4,6 +4,7 @@ use super::video_source_local::VideoSourceLocal; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use tracing::*; #[derive(Apiv2Schema, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceGstType { @@ -19,10 +20,12 @@ pub struct VideoSourceGst { } impl VideoSource for VideoSourceGst { + #[instrument(level = "debug")] fn name(&self) -> &String { &self.name } + #[instrument(level = "debug")] fn source_string(&self) -> &str { match &self.source { VideoSourceGstType::Local(local) => local.source_string(), @@ -30,6 +33,7 @@ impl VideoSource for VideoSourceGst { } } + #[instrument(level = "debug")] fn formats(&self) -> Vec { match &self.source { VideoSourceGstType::Local(local) => local.formats(), @@ -78,6 +82,7 @@ impl VideoSource for VideoSourceGst { } } + #[instrument(level = "debug")] fn set_control_by_name(&self, _control_name: &str, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -85,6 +90,7 @@ impl VideoSource for VideoSourceGst { )) } + #[instrument(level = "debug")] fn set_control_by_id(&self, _control_id: u64, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -92,6 +98,7 @@ impl VideoSource for VideoSourceGst { )) } + #[instrument(level = "debug")] fn control_value_by_name(&self, _control_name: &str) -> std::io::Result { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -99,6 +106,7 @@ impl VideoSource for VideoSourceGst { )) } + #[instrument(level = "debug")] fn control_value_by_id(&self, _control_id: u64) -> std::io::Result { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -106,10 +114,12 @@ impl VideoSource for VideoSourceGst { )) } + #[instrument(level = "debug")] fn controls(&self) -> Vec { vec![] } + #[instrument(level = "debug")] fn is_valid(&self) -> bool { match &self.source { VideoSourceGstType::Local(local) => local.is_valid(), @@ -123,12 +133,14 @@ impl VideoSource for VideoSourceGst { } } + #[instrument(level = "debug")] fn is_shareable(&self) -> bool { true } } impl VideoSourceAvailable for VideoSourceGst { + #[instrument(level = "debug")] fn cameras_available() -> Vec { vec![VideoSourceType::Gst(VideoSourceGst { name: "Fake source".into(), diff --git a/src/video/video_source_local.rs b/src/video/video_source_local.rs index 44a4bb0c..4575e849 100644 --- a/src/video/video_source_local.rs +++ b/src/video/video_source_local.rs @@ -55,6 +55,7 @@ impl VideoSourceLocalType { // // https://www.kernel.org/doc/html/v4.9/media/uapi/v4l/vidioc-querycap.html#:~:text=__u8-,bus_info,-%5B32%5D + #[instrument(level = "debug")] pub fn from_str(description: &str) -> Self { if let Some(result) = VideoSourceLocalType::usb_from_str(description) { return result; @@ -74,6 +75,7 @@ impl VideoSourceLocalType { VideoSourceLocalType::Unknown(description.into()) } + #[instrument(level = "debug")] fn usb_from_str(description: &str) -> Option { let regex = match Regex::new( r"usb-(?P(([0-9a-fA-F]{2}){1,2}:?){4})?\.(usb-)?(?P.*)", @@ -91,6 +93,7 @@ impl VideoSourceLocalType { None } + #[instrument(level = "debug")] fn v4l2_from_str(description: &str) -> Option { let regex = match Regex::new(r"platform:(?P\S+)-v4l2-[0-9]") { Ok(regex) => regex, @@ -108,6 +111,7 @@ impl VideoSourceLocalType { } impl VideoSourceLocal { + #[instrument(level = "debug")] pub fn try_identify_device( &mut self, capture_configuration: &VideoCaptureConfiguration, @@ -168,10 +172,11 @@ impl VideoSourceLocal { // Then in the next reboot, `Alpha` changed to port B, and `Beta` was changed to port A, then it's // impossible to differentiate them using only the name. trace!("Outcome n.5!"); - warn!("There is more than one camera with the same name and encode, which means that their identification/configurations could have been swaped"); + warn!("There is more than one camera with the same name and encode, which means that their identification/configurations could have been swapped"); Ok(None) } + #[instrument(level = "debug")] fn get_cameras_with_same_name( candidates: &[VideoSourceType], name: &str, @@ -189,6 +194,7 @@ impl VideoSourceLocal { .collect() } + #[instrument(level = "debug")] fn get_cameras_with_same_encode( candidates: &[VideoSourceType], encode: &VideoEncodeType, @@ -206,6 +212,7 @@ impl VideoSourceLocal { .collect() } + #[instrument(level = "debug")] fn get_cameras_with_same_bus( candidates: &[VideoSourceType], typ: &VideoSourceLocalType, @@ -223,6 +230,7 @@ impl VideoSourceLocal { } } +#[instrument(level = "debug")] fn convert_v4l_intervals(v4l_intervals: &[v4l::FrameInterval]) -> Vec { let mut intervals: Vec = vec![]; @@ -269,6 +277,7 @@ fn convert_v4l_intervals(v4l_intervals: &[v4l::FrameInterval]) -> Vec Vec { if let Some(formats) = VIDEO_FORMATS.lock().unwrap().get(&device_path.to_string()) { return formats.clone(); @@ -414,6 +423,7 @@ fn get_device_formats(device_path: &str, typ: &VideoSourceLocalType) -> Vec
Result<(), String> { if control.state.is_inactive { return Err("Control is inactive".to_string()); @@ -457,18 +467,22 @@ fn validate_control(control: &Control, value: i64) -> Result<(), String> { } impl VideoSource for VideoSourceLocal { + #[instrument(level = "debug")] fn name(&self) -> &String { &self.name } + #[instrument(level = "debug")] fn source_string(&self) -> &str { &self.device_path } + #[instrument(level = "debug")] fn formats(&self) -> Vec { get_device_formats(&self.device_path, &self.typ) } + #[instrument(level = "debug")] fn set_control_by_name(&self, control_name: &str, value: i64) -> std::io::Result<()> { let Some(control_id) = self .controls() @@ -489,6 +503,7 @@ impl VideoSource for VideoSourceLocal { self.set_control_by_id(control_id, value) } + #[instrument(level = "debug")] fn set_control_by_id(&self, control_id: u64, value: i64) -> std::io::Result<()> { let Some(control) = self .controls() @@ -523,6 +538,7 @@ impl VideoSource for VideoSourceLocal { } } + #[instrument(level = "debug")] fn control_value_by_name(&self, control_name: &str) -> std::io::Result { let Some(control_id) = self .controls() @@ -543,6 +559,7 @@ impl VideoSource for VideoSourceLocal { self.control_value_by_id(control_id) } + #[instrument(level = "debug")] fn control_value_by_id(&self, control_id: u64) -> std::io::Result { let device = Device::with_path(&self.device_path)?; let value = device.control(control_id as u32)?.value; @@ -556,6 +573,7 @@ impl VideoSource for VideoSourceLocal { } } + #[instrument(level = "debug")] fn controls(&self) -> Vec { let mut controls: Vec = vec![]; @@ -643,16 +661,19 @@ impl VideoSource for VideoSourceLocal { controls } + #[instrument(level = "debug")] fn is_valid(&self) -> bool { !self.device_path.is_empty() } + #[instrument(level = "debug")] fn is_shareable(&self) -> bool { false } } impl VideoSourceAvailable for VideoSourceLocal { + #[instrument(level = "debug")] fn cameras_available() -> Vec { let mut cameras: Vec = vec![]; diff --git a/src/video/video_source_redirect.rs b/src/video/video_source_redirect.rs index e1bde715..9af0f302 100644 --- a/src/video/video_source_redirect.rs +++ b/src/video/video_source_redirect.rs @@ -3,6 +3,7 @@ use super::video_source::{VideoSource, VideoSourceAvailable}; use paperclip::actix::Apiv2Schema; use serde::{Deserialize, Serialize}; +use tracing::*; #[derive(Apiv2Schema, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum VideoSourceRedirectType { @@ -16,16 +17,19 @@ pub struct VideoSourceRedirect { } impl VideoSource for VideoSourceRedirect { + #[instrument(level = "debug")] fn name(&self) -> &String { &self.name } + #[instrument(level = "debug")] fn source_string(&self) -> &str { match &self.source { VideoSourceRedirectType::Redirect(string) => string, } } + #[instrument(level = "debug")] fn formats(&self) -> Vec { match &self.source { VideoSourceRedirectType::Redirect(_) => { @@ -34,6 +38,7 @@ impl VideoSource for VideoSourceRedirect { } } + #[instrument(level = "debug")] fn set_control_by_name(&self, _control_name: &str, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -41,6 +46,7 @@ impl VideoSource for VideoSourceRedirect { )) } + #[instrument(level = "debug")] fn set_control_by_id(&self, _control_id: u64, _value: i64) -> std::io::Result<()> { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -48,6 +54,7 @@ impl VideoSource for VideoSourceRedirect { )) } + #[instrument(level = "debug")] fn control_value_by_name(&self, _control_name: &str) -> std::io::Result { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -55,6 +62,7 @@ impl VideoSource for VideoSourceRedirect { )) } + #[instrument(level = "debug")] fn control_value_by_id(&self, _control_id: u64) -> std::io::Result { Err(std::io::Error::new( std::io::ErrorKind::NotFound, @@ -62,22 +70,26 @@ impl VideoSource for VideoSourceRedirect { )) } + #[instrument(level = "debug")] fn controls(&self) -> Vec { vec![] } + #[instrument(level = "debug")] fn is_valid(&self) -> bool { match &self.source { VideoSourceRedirectType::Redirect(_) => true, } } + #[instrument(level = "debug")] fn is_shareable(&self) -> bool { true } } impl VideoSourceAvailable for VideoSourceRedirect { + #[instrument(level = "debug")] fn cameras_available() -> Vec { vec![VideoSourceType::Redirect(VideoSourceRedirect { name: "Redirect source".into(), diff --git a/src/video/xml.rs b/src/video/xml.rs index 2dc176f0..c21181a8 100644 --- a/src/video/xml.rs +++ b/src/video/xml.rs @@ -3,6 +3,7 @@ use super::video_source::VideoSource; use anyhow::{anyhow, Result}; use serde::Serialize; +use tracing::*; #[derive(Debug, Serialize)] #[serde(rename = "mavlinkcamera")] @@ -122,6 +123,7 @@ pub struct Description { impl Description { //TODO: impl fromStr + #[instrument(level = "debug")] pub fn new(description: &str) -> Self { Self { body: description.into(), @@ -129,6 +131,7 @@ impl Description { } } +#[instrument(level = "debug")] pub fn from_video_source(video_source: &dyn VideoSource) -> Result { let controls = video_source.controls(); From aec6968967d15536a8d34ffcf0099dff59bec9e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Tue, 24 Oct 2023 15:39:34 -0300 Subject: [PATCH 3/3] src: stream: Fix typo --- src/stream/gst/utils.rs | 2 +- src/stream/mod.rs | 2 +- src/stream/pipeline/runner.rs | 2 +- src/stream/rtsp/rtsp_server.rs | 2 +- src/stream/sink/webrtc_sink.rs | 2 +- src/stream/webrtc/signalling_server.rs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream/gst/utils.rs b/src/stream/gst/utils.rs index 0c7d6208..51246eab 100644 --- a/src/stream/gst/utils.rs +++ b/src/stream/gst/utils.rs @@ -33,7 +33,7 @@ pub fn set_plugin_rank(plugin_name: &str, rank: gst::Rank) -> Result<()> { feature.set_rank(rank); } else { return Err(anyhow!( - "Cannot found Gstreamer feature {plugin_name:#?} in the registry.", + "Cannot found GStreamer feature {plugin_name:#?} in the registry.", )); } diff --git a/src/stream/mod.rs b/src/stream/mod.rs index c7ad3707..4ac44276 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -71,7 +71,7 @@ impl Stream { ) }) .context(format!( - "Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}" + "Failed when spawning PipelineRunner thread for Pipeline {pipeline_id:#?}" ))?; Ok(Self { diff --git a/src/stream/pipeline/runner.rs b/src/stream/pipeline/runner.rs index 2b38982f..e6915e67 100644 --- a/src/stream/pipeline/runner.rs +++ b/src/stream/pipeline/runner.rs @@ -43,7 +43,7 @@ impl PipelineRunner { } }) .context(format!( - "Failed when spawing PipelineRunner thread for Pipeline {pipeline_id:#?}" + "Failed when spawning PipelineRunner thread for Pipeline {pipeline_id:#?}" ))?, }) } diff --git a/src/stream/rtsp/rtsp_server.rs b/src/stream/rtsp/rtsp_server.rs index e7332b0f..e37a3867 100644 --- a/src/stream/rtsp/rtsp_server.rs +++ b/src/stream/rtsp/rtsp_server.rs @@ -51,7 +51,7 @@ impl RTSPServer { thread::Builder::new() .name("RTSPServer".to_string()) .spawn(move || RTSPServer::run_main_loop(sender)) - .expect("Failed when spawing RTSPServer thread"), + .expect("Failed when spawning RTSPServer thread"), ), main_loop_thread_rx_channel: receiver, } diff --git a/src/stream/sink/webrtc_sink.rs b/src/stream/sink/webrtc_sink.rs index def36b3e..782ba558 100644 --- a/src/stream/sink/webrtc_sink.rs +++ b/src/stream/sink/webrtc_sink.rs @@ -216,7 +216,7 @@ impl SinkInterface for WebRTCSink { error!("Failed removing session {bind:#?}: {error}"); } }) - .expect("Failed spawing DTLSKiller thread"); + .expect("Failed spawning DTLSKiller thread"); } } } diff --git a/src/stream/webrtc/signalling_server.rs b/src/stream/webrtc/signalling_server.rs index 524a9fbf..592c3198 100644 --- a/src/stream/webrtc/signalling_server.rs +++ b/src/stream/webrtc/signalling_server.rs @@ -48,7 +48,7 @@ impl Default for SignallingServer { _server_thread_handle: thread::Builder::new() .name("SignallingServer".to_string()) .spawn(SignallingServer::run_main_loop) - .expect("Failed spawing SignallingServer thread"), + .expect("Failed spawning SignallingServer thread"), } } }