Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions moq-relay-ietf/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ impl Producer {
}
}

Err(ServeError::NotFound.into())
let namespace = subscribed.track_namespace.clone();
let name = subscribed.track_name.clone();
Err(ServeError::not_found_ctx(format!(
"track '{}/{}' not found in local or remote tracks",
namespace, name
))
.into())
}

/// Serve a track_status request.
Expand Down Expand Up @@ -149,6 +155,11 @@ impl Producer {

track_status_requested.respond_error(4, "Track not found")?;

Err(ServeError::NotFound.into())
Err(ServeError::not_found_ctx(format!(
"track '{}/{}' not found for track_status",
track_status_requested.request_msg.track_namespace,
track_status_requested.request_msg.track_name
))
.into())
}
}
1 change: 1 addition & 0 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
log = "0.4"
uuid = { version = "1", features = ["v4"] }

web-transport = { workspace = true }

Expand Down
63 changes: 60 additions & 3 deletions moq-transport/src/serve/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub enum ServeError {
#[error("not found")]
NotFound,

#[error("not found [error:{0}]")]
NotFoundWithId(uuid::Uuid),

#[error("duplicate")]
Duplicate,

Expand All @@ -25,8 +28,14 @@ pub enum ServeError {
#[error("internal error: {0}")]
Internal(String),

#[error("internal error: {0} [error:{1}]")]
InternalWithId(String, uuid::Uuid),

#[error("not implemented: {0}")]
NotImplemented(String),

#[error("not implemented: {0} [error:{1}]")]
NotImplementedWithId(String, uuid::Uuid),
}

impl ServeError {
Expand All @@ -42,15 +51,63 @@ impl ServeError {
// Pass through application-specific error codes
Self::Closed(code) => *code,
// TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes
Self::NotFound => 0x4,
Self::NotFound | Self::NotFoundWithId(_) => 0x4,
// This is more of a session-level error, but keeping a reasonable code
Self::Duplicate => 0x5,
// NOT_SUPPORTED (0x3) - appears in multiple error code registries
Self::Mode => 0x3,
Self::Size => 0x3,
Self::NotImplemented(_) => 0x3,
Self::NotImplemented(_) | Self::NotImplementedWithId(_, _) => 0x3,
// INTERNAL_ERROR (0x0) - per-request error registries use 0x0
Self::Internal(_) => 0x0,
Self::Internal(_) | Self::InternalWithId(_, _) => 0x0,
}
}

/// Create NotFound error with correlation ID and context logging.
/// The correlation ID will be included in the error message sent to the client.
#[track_caller]
pub fn not_found_ctx(context: impl std::fmt::Display) -> Self {
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!(
"[{}] Not found: {} at {}:{}",
id,
context,
loc.file(),
loc.line()
);
Self::NotFoundWithId(id)
}

/// Create Internal error with correlation ID and context logging.
/// The correlation ID will be included in the error message sent to the client.
#[track_caller]
pub fn internal_ctx(context: impl std::fmt::Display) -> Self {
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::error!(
"[{}] Internal error: {} at {}:{}",
id,
context,
loc.file(),
loc.line()
);
Self::InternalWithId(context.to_string(), id)
}

/// Create NotImplemented error with correlation ID and context logging.
/// The correlation ID will be included in the error message sent to the client.
#[track_caller]
pub fn not_implemented_ctx(feature: impl std::fmt::Display) -> Self {
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!(
"[{}] Not implemented: {} at {}:{}",
id,
feature,
loc.file(),
loc.line()
);
Self::NotImplementedWithId(feature.to_string(), id)
}
}
4 changes: 3 additions & 1 deletion moq-transport/src/serve/tracks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ impl Drop for TracksRequest {
fn drop(&mut self) {
// Close any tracks still in the Queue
for track in self.incoming.take().unwrap().close() {
let _ = track.close(ServeError::NotFound);
let _ = track.close(ServeError::not_found_ctx(
"tracks request dropped before track handled",
));
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion moq-transport/src/session/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl Default for AnnounceState {
impl Drop for AnnounceState {
fn drop(&mut self) {
for subscriber in self.subscribers.drain(..) {
subscriber.close(ServeError::NotFound).ok();
subscriber
.close(ServeError::not_found_ctx(
"announce dropped before subscription handled",
))
.ok();
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions moq-transport/src/session/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,15 @@ impl SessionError {
/// Helper for unimplemented protocol features
/// Logs a warning and returns a NotImplemented error instead of panicking
pub fn unimplemented(feature: &str) -> Self {
log::warn!("Protocol feature not implemented: {}", feature);
Self::Serve(serve::ServeError::NotImplemented(feature.to_string()))
Self::Serve(serve::ServeError::not_implemented_ctx(feature))
}
}

impl From<SessionError> for serve::ServeError {
fn from(err: SessionError) -> Self {
match err {
SessionError::Serve(err) => err,
_ => serve::ServeError::Internal(err.to_string()),
_ => serve::ServeError::internal_ctx(format!("session error: {}", err)),
}
}
}
20 changes: 17 additions & 3 deletions moq-transport/src/session/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ impl Publisher {
) {
subscribed.serve(track).await?;
} else {
subscribed.close(ServeError::NotFound)?;
let namespace = subscribed.info.track_namespace.clone();
let name = subscribed.info.track_name.clone();
subscribed.close(ServeError::not_found_ctx(format!(
"track '{}/{}' not found in tracks",
namespace, name
)))?;
}

Ok(())
Expand All @@ -188,7 +193,13 @@ impl Publisher {
track_status_request.request_msg.track_namespace.clone(),
&track_status_request.request_msg.track_name,
)
.ok_or(ServeError::NotFound)?;
.ok_or_else(|| {
ServeError::not_found_ctx(format!(
"track '{}/{}' not found for track_status",
track_status_request.request_msg.track_namespace,
track_status_request.request_msg.track_name
))
})?;

track_status_request.respond_ok(&track)?;

Expand Down Expand Up @@ -326,7 +337,10 @@ impl Publisher {
// then send SubscribeError.
if let Err(err) = self.unknown_subscribed.push(subscribed) {
// Default to closing with a not found error I guess.
err.close(ServeError::NotFound)?;
err.close(ServeError::not_found_ctx(format!(
"unknown_subscribed queue full for namespace {:?}",
namespace
)))?;
}

Ok(())
Expand Down
23 changes: 8 additions & 15 deletions moq-transport/src/session/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,34 +346,27 @@ impl Subscriber {
// Look up the subscribe by id
let mut subscribes = self.subscribes.lock().unwrap();
let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| {
log::error!(
"[SUBSCRIBER] recv_stream_inner: subscribe_id={} not found, track_alias={}",
subscribe_id,
track_alias
);
ServeError::NotFound
ServeError::not_found_ctx(format!(
"subscribe_id={} not found for track_alias={}",
subscribe_id, track_alias
))
})?;

// Create the appropriate writer based on the stream header type
if stream_header.header_type.is_subgroup() {
log::trace!("[SUBSCRIBER] recv_stream_inner: creating subgroup writer");
Writer::Subgroup(subscribe.subgroup(stream_header.subgroup_header.unwrap())?)
} else {
log::error!(
"[SUBSCRIBER] recv_stream_inner: stream header_type={} not supported",
stream_header.header_type
);
return Err(SessionError::Serve(ServeError::Internal(format!(
return Err(SessionError::Serve(ServeError::internal_ctx(format!(
"unsupported stream header type={}",
stream_header.header_type
))));
}
} else {
log::error!(
"[SUBSCRIBER] recv_stream_inner: subscription track_alias={} not found",
return Err(SessionError::Serve(ServeError::not_found_ctx(format!(
"subscription track_alias={} not found",
track_alias
);
return Err(SessionError::Serve(ServeError::NotFound));
))));
}
};

Expand Down