From 961f150a00991c76cd5e15c2e6b5cbc9683d7953 Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Nov 2025 17:08:50 -0500 Subject: [PATCH 1/5] Add error correlation ids --- Cargo.lock | 12 ++++++ moq-transport/Cargo.toml | 1 + moq-transport/src/serve/error.rs | 63 ++++++++++++++++++++++++++++++-- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd82a47..4949f36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1331,6 +1331,7 @@ dependencies = [ "serde_with", "thiserror 1.0.61", "tokio", + "uuid", "web-transport", ] @@ -2731,6 +2732,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +dependencies = [ + "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 23032a3..8675645 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -19,6 +19,7 @@ bytes = "1" thiserror = "1" tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" +uuid = { version = "1", features = ["v4"] } web-transport = { workspace = true } diff --git a/moq-transport/src/serve/error.rs b/moq-transport/src/serve/error.rs index 183fbab..4b03629 100644 --- a/moq-transport/src/serve/error.rs +++ b/moq-transport/src/serve/error.rs @@ -13,6 +13,9 @@ pub enum ServeError { #[error("not found")] NotFound, + #[error("not found [error:{0}]")] + NotFoundWithId(uuid::Uuid), + #[error("duplicate")] Duplicate, @@ -25,8 +28,14 @@ pub enum ServeError { #[error("internal error: {0}")] Internal(String), + #[error("internal error: {0} [error:{1}]")] + InternalWithId(String, uuid::Uuid), + #[error("not implemented: {0}")] NotImplemented(String), + + #[error("not implemented: {0} [error:{1}]")] + NotImplementedWithId(String, uuid::Uuid), } impl ServeError { @@ -42,15 +51,63 @@ impl ServeError { // Pass through application-specific error codes Self::Closed(code) => *code, // TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes - Self::NotFound => 0x4, + Self::NotFound | Self::NotFoundWithId(_) => 0x4, // This is more of a session-level error, but keeping a reasonable code Self::Duplicate => 0x5, // NOT_SUPPORTED (0x3) - appears in multiple error code registries Self::Mode => 0x3, Self::Size => 0x3, - Self::NotImplemented(_) => 0x3, + Self::NotImplemented(_) | Self::NotImplementedWithId(_, _) => 0x3, // INTERNAL_ERROR (0x0) - per-request error registries use 0x0 - Self::Internal(_) => 0x0, + Self::Internal(_) | Self::InternalWithId(_, _) => 0x0, } } + + /// Create NotFound error with correlation ID and context logging. + /// The correlation ID will be included in the error message sent to the client. + #[track_caller] + pub fn not_found_ctx(context: impl std::fmt::Display) -> Self { + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!( + "[{}] Not found: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); + Self::NotFoundWithId(id) + } + + /// Create Internal error with correlation ID and context logging. + /// The correlation ID will be included in the error message sent to the client. + #[track_caller] + pub fn internal_ctx(context: impl std::fmt::Display) -> Self { + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::error!( + "[{}] Internal error: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); + Self::InternalWithId(context.to_string(), id) + } + + /// Create NotImplemented error with correlation ID and context logging. + /// The correlation ID will be included in the error message sent to the client. + #[track_caller] + pub fn not_implemented_ctx(feature: impl std::fmt::Display) -> Self { + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!( + "[{}] Not implemented: {} at {}:{}", + id, + feature, + loc.file(), + loc.line() + ); + Self::NotImplementedWithId(feature.to_string(), id) + } } From b84488355011a99d56efe44d694be03413d9fe82 Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Nov 2025 17:21:19 -0500 Subject: [PATCH 2/5] Use correlation IDs in errors --- moq-relay-ietf/src/producer.rs | 15 +++++++++++++-- moq-transport/src/serve/tracks.rs | 4 +++- moq-transport/src/session/announce.rs | 6 +++++- moq-transport/src/session/error.rs | 5 ++--- moq-transport/src/session/publisher.rs | 20 +++++++++++++++++--- moq-transport/src/session/subscriber.rs | 23 ++++++++--------------- 6 files changed, 48 insertions(+), 25 deletions(-) diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index a55b76e..3de13be 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -108,7 +108,13 @@ impl Producer { } } - Err(ServeError::NotFound.into()) + let namespace = subscribed.track_namespace.clone(); + let name = subscribed.track_name.clone(); + Err(ServeError::not_found_ctx(format!( + "track '{}/{}' not found in local or remote tracks", + namespace, name + )) + .into()) } /// Serve a track_status request. @@ -149,6 +155,11 @@ impl Producer { track_status_requested.respond_error(4, "Track not found")?; - Err(ServeError::NotFound.into()) + Err(ServeError::not_found_ctx(format!( + "track '{}/{}' not found for track_status", + track_status_requested.request_msg.track_namespace, + track_status_requested.request_msg.track_name + )) + .into()) } } diff --git a/moq-transport/src/serve/tracks.rs b/moq-transport/src/serve/tracks.rs index df619de..bb9ba69 100644 --- a/moq-transport/src/serve/tracks.rs +++ b/moq-transport/src/serve/tracks.rs @@ -136,7 +136,9 @@ impl Drop for TracksRequest { fn drop(&mut self) { // Close any tracks still in the Queue for track in self.incoming.take().unwrap().close() { - let _ = track.close(ServeError::NotFound); + let _ = track.close(ServeError::not_found_ctx( + "tracks request dropped before track handled", + )); } } } diff --git a/moq-transport/src/session/announce.rs b/moq-transport/src/session/announce.rs index d9aa8f0..278c614 100644 --- a/moq-transport/src/session/announce.rs +++ b/moq-transport/src/session/announce.rs @@ -33,7 +33,11 @@ impl Default for AnnounceState { impl Drop for AnnounceState { fn drop(&mut self) { for subscriber in self.subscribers.drain(..) { - subscriber.close(ServeError::NotFound).ok(); + subscriber + .close(ServeError::not_found_ctx( + "announce dropped before subscription handled", + )) + .ok(); } } } diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index 5c42a0b..25006b9 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -74,8 +74,7 @@ impl SessionError { /// Helper for unimplemented protocol features /// Logs a warning and returns a NotImplemented error instead of panicking pub fn unimplemented(feature: &str) -> Self { - log::warn!("Protocol feature not implemented: {}", feature); - Self::Serve(serve::ServeError::NotImplemented(feature.to_string())) + Self::Serve(serve::ServeError::not_implemented_ctx(feature)) } } @@ -83,7 +82,7 @@ impl From for serve::ServeError { fn from(err: SessionError) -> Self { match err { SessionError::Serve(err) => err, - _ => serve::ServeError::Internal(err.to_string()), + _ => serve::ServeError::internal_ctx(format!("session error: {}", err)), } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 7afeecc..1d0c45b 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -173,7 +173,12 @@ impl Publisher { ) { subscribed.serve(track).await?; } else { - subscribed.close(ServeError::NotFound)?; + let namespace = subscribed.info.track_namespace.clone(); + let name = subscribed.info.track_name.clone(); + subscribed.close(ServeError::not_found_ctx(format!( + "track '{}/{}' not found in tracks", + namespace, name + )))?; } Ok(()) @@ -188,7 +193,13 @@ impl Publisher { track_status_request.request_msg.track_namespace.clone(), &track_status_request.request_msg.track_name, ) - .ok_or(ServeError::NotFound)?; + .ok_or_else(|| { + ServeError::not_found_ctx(format!( + "track '{}/{}' not found for track_status", + track_status_request.request_msg.track_namespace, + track_status_request.request_msg.track_name + )) + })?; track_status_request.respond_ok(&track)?; @@ -326,7 +337,10 @@ impl Publisher { // then send SubscribeError. if let Err(err) = self.unknown_subscribed.push(subscribed) { // Default to closing with a not found error I guess. - err.close(ServeError::NotFound)?; + err.close(ServeError::not_found_ctx(format!( + "unknown_subscribed queue full for namespace {:?}", + namespace + )))?; } Ok(()) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 8f6f3cb..a9fa2e1 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -346,12 +346,10 @@ impl Subscriber { // Look up the subscribe by id let mut subscribes = self.subscribes.lock().unwrap(); let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| { - log::error!( - "[SUBSCRIBER] recv_stream_inner: subscribe_id={} not found, track_alias={}", - subscribe_id, - track_alias - ); - ServeError::NotFound + ServeError::not_found_ctx(format!( + "subscribe_id={} not found for track_alias={}", + subscribe_id, track_alias + )) })?; // Create the appropriate writer based on the stream header type @@ -359,21 +357,16 @@ impl Subscriber { log::trace!("[SUBSCRIBER] recv_stream_inner: creating subgroup writer"); Writer::Subgroup(subscribe.subgroup(stream_header.subgroup_header.unwrap())?) } else { - log::error!( - "[SUBSCRIBER] recv_stream_inner: stream header_type={} not supported", - stream_header.header_type - ); - return Err(SessionError::Serve(ServeError::Internal(format!( + return Err(SessionError::Serve(ServeError::internal_ctx(format!( "unsupported stream header type={}", stream_header.header_type )))); } } else { - log::error!( - "[SUBSCRIBER] recv_stream_inner: subscription track_alias={} not found", + return Err(SessionError::Serve(ServeError::not_found_ctx(format!( + "subscription track_alias={} not found", track_alias - ); - return Err(SessionError::Serve(ServeError::NotFound)); + )))); } }; From 2f01b57d0de2235440afb8274dc3de6a2ce438ee Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Nov 2025 17:56:00 -0500 Subject: [PATCH 3/5] Address PR feedback --- moq-transport/src/serve/error.rs | 45 +++++++++++--------------------- 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/moq-transport/src/serve/error.rs b/moq-transport/src/serve/error.rs index 4b03629..457362a 100644 --- a/moq-transport/src/serve/error.rs +++ b/moq-transport/src/serve/error.rs @@ -13,8 +13,8 @@ pub enum ServeError { #[error("not found")] NotFound, - #[error("not found [error:{0}]")] - NotFoundWithId(uuid::Uuid), + #[error("not found: {0} [error:{1}]")] + NotFoundWithId(String, uuid::Uuid), #[error("duplicate")] Duplicate, @@ -51,7 +51,7 @@ impl ServeError { // Pass through application-specific error codes Self::Closed(code) => *code, // TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes - Self::NotFound | Self::NotFoundWithId(_) => 0x4, + Self::NotFound | Self::NotFoundWithId(_, _) => 0x4, // This is more of a session-level error, but keeping a reasonable code Self::Duplicate => 0x5, // NOT_SUPPORTED (0x3) - appears in multiple error code registries @@ -66,48 +66,33 @@ impl ServeError { /// Create NotFound error with correlation ID and context logging. /// The correlation ID will be included in the error message sent to the client. #[track_caller] - pub fn not_found_ctx(context: impl std::fmt::Display) -> Self { + pub fn not_found_ctx(context: impl Into) -> Self { + let context = context.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::warn!( - "[{}] Not found: {} at {}:{}", - id, - context, - loc.file(), - loc.line() - ); - Self::NotFoundWithId(id) + log::warn!("[{}] Not found: {} at {}:{}", id, context, loc.file(), loc.line()); + Self::NotFoundWithId(context, id) } /// Create Internal error with correlation ID and context logging. /// The correlation ID will be included in the error message sent to the client. #[track_caller] - pub fn internal_ctx(context: impl std::fmt::Display) -> Self { + pub fn internal_ctx(context: impl Into) -> Self { + let context = context.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::error!( - "[{}] Internal error: {} at {}:{}", - id, - context, - loc.file(), - loc.line() - ); - Self::InternalWithId(context.to_string(), id) + log::error!("[{}] Internal error: {} at {}:{}", id, context, loc.file(), loc.line()); + Self::InternalWithId(context, id) } /// Create NotImplemented error with correlation ID and context logging. /// The correlation ID will be included in the error message sent to the client. #[track_caller] - pub fn not_implemented_ctx(feature: impl std::fmt::Display) -> Self { + pub fn not_implemented_ctx(feature: impl Into) -> Self { + let feature = feature.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::warn!( - "[{}] Not implemented: {} at {}:{}", - id, - feature, - loc.file(), - loc.line() - ); - Self::NotImplementedWithId(feature.to_string(), id) + log::warn!("[{}] Not implemented: {} at {}:{}", id, feature, loc.file(), loc.line()); + Self::NotImplementedWithId(feature, id) } } From f9d6c8e52a7fd98b87b64f7ef3f96c41202e74cc Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Nov 2025 18:16:45 -0500 Subject: [PATCH 4/5] Improve error expressivity and safety Establish patterns for sending errors with correlation ids that can be mapped to logs, the same with additional logged information, and the same with custom user-facing reason phrases. --- moq-transport/src/serve/error.rs | 61 +++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/moq-transport/src/serve/error.rs b/moq-transport/src/serve/error.rs index 457362a..377c5a0 100644 --- a/moq-transport/src/serve/error.rs +++ b/moq-transport/src/serve/error.rs @@ -63,36 +63,71 @@ impl ServeError { } } - /// Create NotFound error with correlation ID and context logging. - /// The correlation ID will be included in the error message sent to the client. + /// Create NotFound error with correlation ID but no additional context. + /// Uses generic messages for both logging and wire protocol. + /// + /// Example: `ServeError::not_found_id()` #[track_caller] - pub fn not_found_ctx(context: impl Into) -> Self { - let context = context.into(); + pub fn not_found_id() -> Self { + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!("[{}] Not found at {}:{}", id, loc.file(), loc.line()); + Self::NotFoundWithId("Track not found".to_string(), id) + } + + /// Create NotFound error with correlation ID and internal context. + /// The internal context is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::not_found_ctx("subscribe_id=123 not in map")` + #[track_caller] + pub fn not_found_ctx(internal_context: impl Into) -> Self { + let context = internal_context.into(); + let id = uuid::Uuid::new_v4(); + let loc = std::panic::Location::caller(); + log::warn!("[{}] Not found: {} at {}:{}", id, context, loc.file(), loc.line()); + Self::NotFoundWithId("Track not found".to_string(), id) + } + + /// Create NotFound error with full control over internal and external messages. + /// The internal context is logged, and the external message is sent on the wire. + /// + /// Example: `ServeError::not_found_full("subscribe_id=123 not in map", "Subscription expired")` + #[track_caller] + pub fn not_found_full( + internal_context: impl Into, + external_message: impl Into, + ) -> Self { + let context = internal_context.into(); + let message = external_message.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); log::warn!("[{}] Not found: {} at {}:{}", id, context, loc.file(), loc.line()); - Self::NotFoundWithId(context, id) + Self::NotFoundWithId(message, id) } - /// Create Internal error with correlation ID and context logging. - /// The correlation ID will be included in the error message sent to the client. + /// Create Internal error with correlation ID and internal context. + /// The internal context is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::internal_ctx("subscriber map in bad state")` #[track_caller] - pub fn internal_ctx(context: impl Into) -> Self { - let context = context.into(); + pub fn internal_ctx(internal_context: impl Into) -> Self { + let context = internal_context.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); log::error!("[{}] Internal error: {} at {}:{}", id, context, loc.file(), loc.line()); - Self::InternalWithId(context, id) + Self::InternalWithId("Internal error".to_string(), id) } - /// Create NotImplemented error with correlation ID and context logging. - /// The correlation ID will be included in the error message sent to the client. + /// Create NotImplemented error with correlation ID and feature context. + /// The feature name is logged but a generic message is sent on the wire. + /// + /// Example: `ServeError::not_implemented_ctx("datagrams")` #[track_caller] pub fn not_implemented_ctx(feature: impl Into) -> Self { let feature = feature.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); log::warn!("[{}] Not implemented: {} at {}:{}", id, feature, loc.file(), loc.line()); - Self::NotImplementedWithId(feature, id) + Self::NotImplementedWithId("Feature not implemented".to_string(), id) } } From aae63837e207d91a5ba5f4f79d17126210e8bc1c Mon Sep 17 00:00:00 2001 From: Mike English Date: Fri, 7 Nov 2025 18:17:49 -0500 Subject: [PATCH 5/5] cargo fmt --- moq-transport/src/serve/error.rs | 42 +++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/moq-transport/src/serve/error.rs b/moq-transport/src/serve/error.rs index 377c5a0..bb0995b 100644 --- a/moq-transport/src/serve/error.rs +++ b/moq-transport/src/serve/error.rs @@ -65,7 +65,7 @@ impl ServeError { /// Create NotFound error with correlation ID but no additional context. /// Uses generic messages for both logging and wire protocol. - /// + /// /// Example: `ServeError::not_found_id()` #[track_caller] pub fn not_found_id() -> Self { @@ -77,20 +77,26 @@ impl ServeError { /// Create NotFound error with correlation ID and internal context. /// The internal context is logged but a generic message is sent on the wire. - /// + /// /// Example: `ServeError::not_found_ctx("subscribe_id=123 not in map")` #[track_caller] pub fn not_found_ctx(internal_context: impl Into) -> Self { let context = internal_context.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::warn!("[{}] Not found: {} at {}:{}", id, context, loc.file(), loc.line()); + log::warn!( + "[{}] Not found: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); Self::NotFoundWithId("Track not found".to_string(), id) } /// Create NotFound error with full control over internal and external messages. /// The internal context is logged, and the external message is sent on the wire. - /// + /// /// Example: `ServeError::not_found_full("subscribe_id=123 not in map", "Subscription expired")` #[track_caller] pub fn not_found_full( @@ -101,33 +107,51 @@ impl ServeError { let message = external_message.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::warn!("[{}] Not found: {} at {}:{}", id, context, loc.file(), loc.line()); + log::warn!( + "[{}] Not found: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); Self::NotFoundWithId(message, id) } /// Create Internal error with correlation ID and internal context. /// The internal context is logged but a generic message is sent on the wire. - /// + /// /// Example: `ServeError::internal_ctx("subscriber map in bad state")` #[track_caller] pub fn internal_ctx(internal_context: impl Into) -> Self { let context = internal_context.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::error!("[{}] Internal error: {} at {}:{}", id, context, loc.file(), loc.line()); + log::error!( + "[{}] Internal error: {} at {}:{}", + id, + context, + loc.file(), + loc.line() + ); Self::InternalWithId("Internal error".to_string(), id) } /// Create NotImplemented error with correlation ID and feature context. /// The feature name is logged but a generic message is sent on the wire. - /// + /// /// Example: `ServeError::not_implemented_ctx("datagrams")` #[track_caller] pub fn not_implemented_ctx(feature: impl Into) -> Self { let feature = feature.into(); let id = uuid::Uuid::new_v4(); let loc = std::panic::Location::caller(); - log::warn!("[{}] Not implemented: {} at {}:{}", id, feature, loc.file(), loc.line()); + log::warn!( + "[{}] Not implemented: {} at {}:{}", + id, + feature, + loc.file(), + loc.line() + ); Self::NotImplementedWithId("Feature not implemented".to_string(), id) } }