diff --git a/Cargo.lock b/Cargo.lock index cd82a47..4949f36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1331,6 +1331,7 @@ dependencies = [ "serde_with", "thiserror 1.0.61", "tokio", + "uuid", "web-transport", ] @@ -2731,6 +2732,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index a55b76e..3de13be 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -108,7 +108,13 @@ impl Producer { } } - Err(ServeError::NotFound.into()) + let namespace = subscribed.track_namespace.clone(); + let name = subscribed.track_name.clone(); + Err(ServeError::not_found_ctx(format!( + "track '{}/{}' not found in local or remote tracks", + namespace, name + )) + .into()) } /// Serve a track_status request. @@ -149,6 +155,11 @@ impl Producer { track_status_requested.respond_error(4, "Track not found")?; - Err(ServeError::NotFound.into()) + Err(ServeError::not_found_ctx(format!( + "track '{}/{}' not found for track_status", + track_status_requested.request_msg.track_namespace, + track_status_requested.request_msg.track_name + )) + .into()) } } diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 23032a3..8675645 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -19,6 +19,7 @@ bytes = "1" thiserror = "1" tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" +uuid = { version = "1", features = ["v4"] } web-transport = { workspace = true } diff --git a/moq-transport/src/serve/error.rs b/moq-transport/src/serve/error.rs index 183fbab..bb0995b 100644 --- a/moq-transport/src/serve/error.rs +++ b/moq-transport/src/serve/error.rs @@ -13,6 +13,9 @@ pub enum ServeError { #[error("not found")] NotFound, + #[error("not found: {0} [error:{1}]")] + NotFoundWithId(String, uuid::Uuid), + #[error("duplicate")] Duplicate, @@ -25,8 +28,14 @@ pub enum ServeError { #[error("internal error: {0}")] Internal(String), + #[error("internal error: {0} [error:{1}]")] + InternalWithId(String, uuid::Uuid), + #[error("not implemented: {0}")] NotImplemented(String), + + #[error("not implemented: {0} [error:{1}]")] + NotImplementedWithId(String, uuid::Uuid), } impl ServeError { @@ -42,15 +51,107 @@ impl ServeError { // Pass through application-specific error codes Self::Closed(code) => *code, // TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes - Self::NotFound => 0x4, + Self::NotFound | Self::NotFoundWithId(_, _) => 0x4, // This is more of a session-level error, but keeping a reasonable code Self::Duplicate => 0x5, // NOT_SUPPORTED (0x3) - appears in multiple error code registries Self::Mode => 0x3, Self::Size => 0x3, - Self::NotImplemented(_) => 0x3, + Self::NotImplemented(_) | Self::NotImplementedWithId(_, _) => 0x3, // INTERNAL_ERROR (0x0) - per-request error registries use 0x0 - Self::Internal(_) => 0x0, + Self::Internal(_) | Self::InternalWithId(_, _) => 0x0, } } + + /// Create NotFound error with correlation ID but no additional context. + /// Uses generic messages for both logging and wire protocol. + /// + /// Example: `ServeError::not_found_id()` + #[track_caller] + pub fn not_found_id() -> Self { + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!("[{}] Not found at {}:{}", id, loc.file(), loc.line()); + Self::NotFoundWithId("Track not found".to_string(), id) + } + + /// Create NotFound error with correlation ID and internal context. + /// The internal context is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::not_found_ctx("subscribe_id=123 not in map")` + #[track_caller] + pub fn not_found_ctx(internal_context: impl Into) -> Self { + let context = internal_context.into(); + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!( + "[{}] Not found: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); + Self::NotFoundWithId("Track not found".to_string(), id) + } + + /// Create NotFound error with full control over internal and external messages. + /// The internal context is logged, and the external message is sent on the wire. + /// + /// Example: `ServeError::not_found_full("subscribe_id=123 not in map", "Subscription expired")` + #[track_caller] + pub fn not_found_full( + internal_context: impl Into, + external_message: impl Into, + ) -> Self { + let context = internal_context.into(); + let message = external_message.into(); + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!( + "[{}] Not found: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); + Self::NotFoundWithId(message, id) + } + + /// Create Internal error with correlation ID and internal context. + /// The internal context is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::internal_ctx("subscriber map in bad state")` + #[track_caller] + pub fn internal_ctx(internal_context: impl Into) -> Self { + let context = internal_context.into(); + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::error!( + "[{}] Internal error: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); + Self::InternalWithId("Internal error".to_string(), id) + } + + /// Create NotImplemented error with correlation ID and feature context. + /// The feature name is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::not_implemented_ctx("datagrams")` + #[track_caller] + pub fn not_implemented_ctx(feature: impl Into) -> Self { + let feature = feature.into(); + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!( + "[{}] Not implemented: {} at {}:{}", + id, + feature, + loc.file(), + loc.line() + ); + Self::NotImplementedWithId("Feature not implemented".to_string(), id) + } } diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index df619de..bb9ba69 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -136,7 +136,9 @@ impl Drop for TracksRequest { fn drop(&mut self) { // Close any tracks still in the Queue for track in self.incoming.take().unwrap().close() { - let _ = track.close(ServeError::NotFound); + let _ = track.close(ServeError::not_found_ctx( + "tracks request dropped before track handled", + )); } } } diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index d9aa8f0..278c614 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -33,7 +33,11 @@ impl Default for AnnounceState { impl Drop for AnnounceState { fn drop(&mut self) { for subscriber in self.subscribers.drain(..) { - subscriber.close(ServeError::NotFound).ok(); + subscriber + .close(ServeError::not_found_ctx( + "announce dropped before subscription handled", + )) + .ok(); } } } diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 5c42a0b..25006b9 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -74,8 +74,7 @@ impl SessionError { /// Helper for unimplemented protocol features /// Logs a warning and returns a NotImplemented error instead of panicking pub fn unimplemented(feature: &str) -> Self { - log::warn!("Protocol feature not implemented: {}", feature); - Self::Serve(serve::ServeError::NotImplemented(feature.to_string())) + Self::Serve(serve::ServeError::not_implemented_ctx(feature)) } } @@ -83,7 +82,7 @@ impl From for serve::ServeError { fn from(err: SessionError) -> Self { match err { SessionError::Serve(err) => err, - _ => serve::ServeError::Internal(err.to_string()), + _ => serve::ServeError::internal_ctx(format!("session error: {}", err)), } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 7afeecc..1d0c45b 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -173,7 +173,12 @@ impl Publisher { ) { subscribed.serve(track).await?; } else { - subscribed.close(ServeError::NotFound)?; + let namespace = subscribed.info.track_namespace.clone(); + let name = subscribed.info.track_name.clone(); + subscribed.close(ServeError::not_found_ctx(format!( + "track '{}/{}' not found in tracks", + namespace, name + )))?; } Ok(()) @@ -188,7 +193,13 @@ impl Publisher { track_status_request.request_msg.track_namespace.clone(), &track_status_request.request_msg.track_name, ) - .ok_or(ServeError::NotFound)?; + .ok_or_else(|| { + ServeError::not_found_ctx(format!( + "track '{}/{}' not found for track_status", + track_status_request.request_msg.track_namespace, + track_status_request.request_msg.track_name + )) + })?; track_status_request.respond_ok(&track)?; @@ -326,7 +337,10 @@ impl Publisher { // then send SubscribeError. if let Err(err) = self.unknown_subscribed.push(subscribed) { // Default to closing with a not found error I guess. - err.close(ServeError::NotFound)?; + err.close(ServeError::not_found_ctx(format!( + "unknown_subscribed queue full for namespace {:?}", + namespace + )))?; } Ok(()) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 8f6f3cb..a9fa2e1 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -346,12 +346,10 @@ impl Subscriber { // Look up the subscribe by id let mut subscribes = self.subscribes.lock().unwrap(); let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| { - log::error!( - "[SUBSCRIBER] recv_stream_inner: subscribe_id={} not found, track_alias={}", - subscribe_id, - track_alias - ); - ServeError::NotFound + ServeError::not_found_ctx(format!( + "subscribe_id={} not found for track_alias={}", + subscribe_id, track_alias + )) })?; // Create the appropriate writer based on the stream header type @@ -359,21 +357,16 @@ impl Subscriber { log::trace!("[SUBSCRIBER] recv_stream_inner: creating subgroup writer"); Writer::Subgroup(subscribe.subgroup(stream_header.subgroup_header.unwrap())?) } else { - log::error!( - "[SUBSCRIBER] recv_stream_inner: stream header_type={} not supported", - stream_header.header_type - ); - return Err(SessionError::Serve(ServeError::Internal(format!( + return Err(SessionError::Serve(ServeError::internal_ctx(format!( "unsupported stream header type={}", stream_header.header_type )))); } } else { - log::error!( - "[SUBSCRIBER] recv_stream_inner: subscription track_alias={} not found", + return Err(SessionError::Serve(ServeError::not_found_ctx(format!( + "subscription track_alias={} not found", track_alias - ); - return Err(SessionError::Serve(ServeError::NotFound)); + )))); } };