Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions moq-relay-ietf/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ impl Producer {
}
}

Err(ServeError::NotFound.into())
let namespace = subscribed.track_namespace.clone();
let name = subscribed.track_name.clone();
Err(ServeError::not_found_ctx(format!(
"track '{}/{}' not found in local or remote tracks",
namespace, name
))
.into())
}

/// Serve a track_status request.
Expand Down Expand Up @@ -149,6 +155,11 @@ impl Producer {

track_status_requested.respond_error(4, "Track not found")?;

Err(ServeError::NotFound.into())
Err(ServeError::not_found_ctx(format!(
"track '{}/{}' not found for track_status",
track_status_requested.request_msg.track_namespace,
track_status_requested.request_msg.track_name
))
.into())
}
}
1 change: 1 addition & 0 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
log = "0.4"
uuid = { version = "1", features = ["v4"] }

web-transport = { workspace = true }

Expand Down
107 changes: 104 additions & 3 deletions moq-transport/src/serve/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub enum ServeError {
#[error("not found")]
NotFound,

#[error("not found: {0} [error:{1}]")]
NotFoundWithId(String, uuid::Uuid),

#[error("duplicate")]
Duplicate,

Expand All @@ -25,8 +28,14 @@ pub enum ServeError {
#[error("internal error: {0}")]
Internal(String),

#[error("internal error: {0} [error:{1}]")]
InternalWithId(String, uuid::Uuid),

#[error("not implemented: {0}")]
NotImplemented(String),

#[error("not implemented: {0} [error:{1}]")]
NotImplementedWithId(String, uuid::Uuid),
}

impl ServeError {
Expand All @@ -42,15 +51,107 @@ impl ServeError {
// Pass through application-specific error codes
Self::Closed(code) => *code,
// TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes
Self::NotFound => 0x4,
Self::NotFound | Self::NotFoundWithId(_, _) => 0x4,
// This is more of a session-level error, but keeping a reasonable code
Self::Duplicate => 0x5,
// NOT_SUPPORTED (0x3) - appears in multiple error code registries
Self::Mode => 0x3,
Self::Size => 0x3,
Self::NotImplemented(_) => 0x3,
Self::NotImplemented(_) | Self::NotImplementedWithId(_, _) => 0x3,
// INTERNAL_ERROR (0x0) - per-request error registries use 0x0
Self::Internal(_) => 0x0,
Self::Internal(_) | Self::InternalWithId(_, _) => 0x0,
}
}

/// Create NotFound error with correlation ID but no additional context.
/// Uses generic messages for both logging and wire protocol.
///
/// Example: `ServeError::not_found_id()`
#[track_caller]
pub fn not_found_id() -> Self {
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!("[{}] Not found at {}:{}", id, loc.file(), loc.line());
Self::NotFoundWithId("Track not found".to_string(), id)
}

/// Create NotFound error with correlation ID and internal context.
/// The internal context is logged but a generic message is sent on the wire.
///
/// Example: `ServeError::not_found_ctx("subscribe_id=123 not in map")`
#[track_caller]
pub fn not_found_ctx(internal_context: impl Into<String>) -> Self {
let context = internal_context.into();
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!(
"[{}] Not found: {} at {}:{}",
id,
context,
loc.file(),
loc.line()
);
Self::NotFoundWithId("Track not found".to_string(), id)
}

/// Create NotFound error with full control over internal and external messages.
/// The internal context is logged, and the external message is sent on the wire.
///
/// Example: `ServeError::not_found_full("subscribe_id=123 not in map", "Subscription expired")`
#[track_caller]
pub fn not_found_full(
internal_context: impl Into<String>,
external_message: impl Into<String>,
) -> Self {
let context = internal_context.into();
let message = external_message.into();
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!(
"[{}] Not found: {} at {}:{}",
id,
context,
loc.file(),
loc.line()
);
Self::NotFoundWithId(message, id)
}

/// Create Internal error with correlation ID and internal context.
/// The internal context is logged but a generic message is sent on the wire.
///
/// Example: `ServeError::internal_ctx("subscriber map in bad state")`
#[track_caller]
pub fn internal_ctx(internal_context: impl Into<String>) -> Self {
let context = internal_context.into();
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::error!(
"[{}] Internal error: {} at {}:{}",
id,
context,
loc.file(),
loc.line()
);
Self::InternalWithId("Internal error".to_string(), id)
}

/// Create NotImplemented error with correlation ID and feature context.
/// The feature name is logged but a generic message is sent on the wire.
///
/// Example: `ServeError::not_implemented_ctx("datagrams")`
#[track_caller]
pub fn not_implemented_ctx(feature: impl Into<String>) -> Self {
let feature = feature.into();
let id = uuid::Uuid::new_v4();
let loc = std::panic::Location::caller();
log::warn!(
"[{}] Not implemented: {} at {}:{}",
id,
feature,
loc.file(),
loc.line()
);
Self::NotImplementedWithId("Feature not implemented".to_string(), id)
}
}
4 changes: 3 additions & 1 deletion moq-transport/src/serve/tracks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ impl Drop for TracksRequest {
fn drop(&mut self) {
// Close any tracks still in the Queue
for track in self.incoming.take().unwrap().close() {
let _ = track.close(ServeError::NotFound);
let _ = track.close(ServeError::not_found_ctx(
"tracks request dropped before track handled",
));
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion moq-transport/src/session/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl Default for AnnounceState {
impl Drop for AnnounceState {
fn drop(&mut self) {
for subscriber in self.subscribers.drain(..) {
subscriber.close(ServeError::NotFound).ok();
subscriber
.close(ServeError::not_found_ctx(
"announce dropped before subscription handled",
))
.ok();
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions moq-transport/src/session/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,15 @@ impl SessionError {
/// Helper for unimplemented protocol features
/// Logs a warning and returns a NotImplemented error instead of panicking
pub fn unimplemented(feature: &str) -> Self {
log::warn!("Protocol feature not implemented: {}", feature);
Self::Serve(serve::ServeError::NotImplemented(feature.to_string()))
Self::Serve(serve::ServeError::not_implemented_ctx(feature))
}
}

impl From<SessionError> for serve::ServeError {
fn from(err: SessionError) -> Self {
match err {
SessionError::Serve(err) => err,
_ => serve::ServeError::Internal(err.to_string()),
_ => serve::ServeError::internal_ctx(format!("session error: {}", err)),
}
}
}
20 changes: 17 additions & 3 deletions moq-transport/src/session/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ impl Publisher {
) {
subscribed.serve(track).await?;
} else {
subscribed.close(ServeError::NotFound)?;
let namespace = subscribed.info.track_namespace.clone();
let name = subscribed.info.track_name.clone();
subscribed.close(ServeError::not_found_ctx(format!(
"track '{}/{}' not found in tracks",
namespace, name
)))?;
}

Ok(())
Expand All @@ -188,7 +193,13 @@ impl Publisher {
track_status_request.request_msg.track_namespace.clone(),
&track_status_request.request_msg.track_name,
)
.ok_or(ServeError::NotFound)?;
.ok_or_else(|| {
ServeError::not_found_ctx(format!(
"track '{}/{}' not found for track_status",
track_status_request.request_msg.track_namespace,
track_status_request.request_msg.track_name
))
})?;

track_status_request.respond_ok(&track)?;

Expand Down Expand Up @@ -326,7 +337,10 @@ impl Publisher {
// then send SubscribeError.
if let Err(err) = self.unknown_subscribed.push(subscribed) {
// Default to closing with a not found error I guess.
err.close(ServeError::NotFound)?;
err.close(ServeError::not_found_ctx(format!(
"unknown_subscribed queue full for namespace {:?}",
namespace
)))?;
}

Ok(())
Expand Down
23 changes: 8 additions & 15 deletions moq-transport/src/session/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,34 +346,27 @@ impl Subscriber {
// Look up the subscribe by id
let mut subscribes = self.subscribes.lock().unwrap();
let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| {
log::error!(
"[SUBSCRIBER] recv_stream_inner: subscribe_id={} not found, track_alias={}",
subscribe_id,
track_alias
);
ServeError::NotFound
ServeError::not_found_ctx(format!(
"subscribe_id={} not found for track_alias={}",
subscribe_id, track_alias
))
})?;

// Create the appropriate writer based on the stream header type
if stream_header.header_type.is_subgroup() {
log::trace!("[SUBSCRIBER] recv_stream_inner: creating subgroup writer");
Writer::Subgroup(subscribe.subgroup(stream_header.subgroup_header.unwrap())?)
} else {
log::error!(
"[SUBSCRIBER] recv_stream_inner: stream header_type={} not supported",
stream_header.header_type
);
return Err(SessionError::Serve(ServeError::Internal(format!(
return Err(SessionError::Serve(ServeError::internal_ctx(format!(
"unsupported stream header type={}",
stream_header.header_type
))));
}
} else {
log::error!(
"[SUBSCRIBER] recv_stream_inner: subscription track_alias={} not found",
return Err(SessionError::Serve(ServeError::not_found_ctx(format!(
"subscription track_alias={} not found",
track_alias
);
return Err(SessionError::Serve(ServeError::NotFound));
))));
}
};

Expand Down