Skip to content

Commit 36d7cc1

Browse files
authored
Merge pull request #116 from cloudflare/me/error-correlation-ids
Add error correlation ids
2 parents 8164687 + aae6383 commit 36d7cc1

File tree

9 files changed

+165
-28
lines changed

9 files changed

+165
-28
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

moq-relay-ietf/src/producer.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,13 @@ impl Producer {
108108
}
109109
}
110110

111-
Err(ServeError::NotFound.into())
111+
let namespace = subscribed.track_namespace.clone();
112+
let name = subscribed.track_name.clone();
113+
Err(ServeError::not_found_ctx(format!(
114+
"track '{}/{}' not found in local or remote tracks",
115+
namespace, name
116+
))
117+
.into())
112118
}
113119

114120
/// Serve a track_status request.
@@ -149,6 +155,11 @@ impl Producer {
149155

150156
track_status_requested.respond_error(4, "Track not found")?;
151157

152-
Err(ServeError::NotFound.into())
158+
Err(ServeError::not_found_ctx(format!(
159+
"track '{}/{}' not found for track_status",
160+
track_status_requested.request_msg.track_namespace,
161+
track_status_requested.request_msg.track_name
162+
))
163+
.into())
153164
}
154165
}

moq-transport/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ bytes = "1"
1919
thiserror = "1"
2020
tokio = { version = "1", features = ["macros", "io-util", "sync"] }
2121
log = "0.4"
22+
uuid = { version = "1", features = ["v4"] }
2223

2324
web-transport = { workspace = true }
2425

moq-transport/src/serve/error.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ pub enum ServeError {
1313
#[error("not found")]
1414
NotFound,
1515

16+
#[error("not found: {0} [error:{1}]")]
17+
NotFoundWithId(String, uuid::Uuid),
18+
1619
#[error("duplicate")]
1720
Duplicate,
1821

@@ -25,8 +28,14 @@ pub enum ServeError {
2528
#[error("internal error: {0}")]
2629
Internal(String),
2730

31+
#[error("internal error: {0} [error:{1}]")]
32+
InternalWithId(String, uuid::Uuid),
33+
2834
#[error("not implemented: {0}")]
2935
NotImplemented(String),
36+
37+
#[error("not implemented: {0} [error:{1}]")]
38+
NotImplementedWithId(String, uuid::Uuid),
3039
}
3140

3241
impl ServeError {
@@ -42,15 +51,107 @@ impl ServeError {
4251
// Pass through application-specific error codes
4352
Self::Closed(code) => *code,
4453
// TRACK_DOES_NOT_EXIST (0x4) from SUBSCRIBE_ERROR codes
45-
Self::NotFound => 0x4,
54+
Self::NotFound | Self::NotFoundWithId(_, _) => 0x4,
4655
// This is more of a session-level error, but keeping a reasonable code
4756
Self::Duplicate => 0x5,
4857
// NOT_SUPPORTED (0x3) - appears in multiple error code registries
4958
Self::Mode => 0x3,
5059
Self::Size => 0x3,
51-
Self::NotImplemented(_) => 0x3,
60+
Self::NotImplemented(_) | Self::NotImplementedWithId(_, _) => 0x3,
5261
// INTERNAL_ERROR (0x0) - per-request error registries use 0x0
53-
Self::Internal(_) => 0x0,
62+
Self::Internal(_) | Self::InternalWithId(_, _) => 0x0,
5463
}
5564
}
65+
66+
/// Create NotFound error with correlation ID but no additional context.
67+
/// Uses generic messages for both logging and wire protocol.
68+
///
69+
/// Example: `ServeError::not_found_id()`
70+
#[track_caller]
71+
pub fn not_found_id() -> Self {
72+
let id = uuid::Uuid::new_v4();
73+
let loc = std::panic::Location::caller();
74+
log::warn!("[{}] Not found at {}:{}", id, loc.file(), loc.line());
75+
Self::NotFoundWithId("Track not found".to_string(), id)
76+
}
77+
78+
/// Create NotFound error with correlation ID and internal context.
79+
/// The internal context is logged but a generic message is sent on the wire.
80+
///
81+
/// Example: `ServeError::not_found_ctx("subscribe_id=123 not in map")`
82+
#[track_caller]
83+
pub fn not_found_ctx(internal_context: impl Into<String>) -> Self {
84+
let context = internal_context.into();
85+
let id = uuid::Uuid::new_v4();
86+
let loc = std::panic::Location::caller();
87+
log::warn!(
88+
"[{}] Not found: {} at {}:{}",
89+
id,
90+
context,
91+
loc.file(),
92+
loc.line()
93+
);
94+
Self::NotFoundWithId("Track not found".to_string(), id)
95+
}
96+
97+
/// Create NotFound error with full control over internal and external messages.
98+
/// The internal context is logged, and the external message is sent on the wire.
99+
///
100+
/// Example: `ServeError::not_found_full("subscribe_id=123 not in map", "Subscription expired")`
101+
#[track_caller]
102+
pub fn not_found_full(
103+
internal_context: impl Into<String>,
104+
external_message: impl Into<String>,
105+
) -> Self {
106+
let context = internal_context.into();
107+
let message = external_message.into();
108+
let id = uuid::Uuid::new_v4();
109+
let loc = std::panic::Location::caller();
110+
log::warn!(
111+
"[{}] Not found: {} at {}:{}",
112+
id,
113+
context,
114+
loc.file(),
115+
loc.line()
116+
);
117+
Self::NotFoundWithId(message, id)
118+
}
119+
120+
/// Create Internal error with correlation ID and internal context.
121+
/// The internal context is logged but a generic message is sent on the wire.
122+
///
123+
/// Example: `ServeError::internal_ctx("subscriber map in bad state")`
124+
#[track_caller]
125+
pub fn internal_ctx(internal_context: impl Into<String>) -> Self {
126+
let context = internal_context.into();
127+
let id = uuid::Uuid::new_v4();
128+
let loc = std::panic::Location::caller();
129+
log::error!(
130+
"[{}] Internal error: {} at {}:{}",
131+
id,
132+
context,
133+
loc.file(),
134+
loc.line()
135+
);
136+
Self::InternalWithId("Internal error".to_string(), id)
137+
}
138+
139+
/// Create NotImplemented error with correlation ID and feature context.
140+
/// The feature name is logged but a generic message is sent on the wire.
141+
///
142+
/// Example: `ServeError::not_implemented_ctx("datagrams")`
143+
#[track_caller]
144+
pub fn not_implemented_ctx(feature: impl Into<String>) -> Self {
145+
let feature = feature.into();
146+
let id = uuid::Uuid::new_v4();
147+
let loc = std::panic::Location::caller();
148+
log::warn!(
149+
"[{}] Not implemented: {} at {}:{}",
150+
id,
151+
feature,
152+
loc.file(),
153+
loc.line()
154+
);
155+
Self::NotImplementedWithId("Feature not implemented".to_string(), id)
156+
}
56157
}

moq-transport/src/serve/tracks.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ impl Drop for TracksRequest {
136136
fn drop(&mut self) {
137137
// Close any tracks still in the Queue
138138
for track in self.incoming.take().unwrap().close() {
139-
let _ = track.close(ServeError::NotFound);
139+
let _ = track.close(ServeError::not_found_ctx(
140+
"tracks request dropped before track handled",
141+
));
140142
}
141143
}
142144
}

moq-transport/src/session/announce.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ impl Default for AnnounceState {
3333
impl Drop for AnnounceState {
3434
fn drop(&mut self) {
3535
for subscriber in self.subscribers.drain(..) {
36-
subscriber.close(ServeError::NotFound).ok();
36+
subscriber
37+
.close(ServeError::not_found_ctx(
38+
"announce dropped before subscription handled",
39+
))
40+
.ok();
3741
}
3842
}
3943
}

moq-transport/src/session/error.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,15 @@ impl SessionError {
7474
/// Helper for unimplemented protocol features
7575
/// Logs a warning and returns a NotImplemented error instead of panicking
7676
pub fn unimplemented(feature: &str) -> Self {
77-
log::warn!("Protocol feature not implemented: {}", feature);
78-
Self::Serve(serve::ServeError::NotImplemented(feature.to_string()))
77+
Self::Serve(serve::ServeError::not_implemented_ctx(feature))
7978
}
8079
}
8180

8281
impl From<SessionError> for serve::ServeError {
8382
fn from(err: SessionError) -> Self {
8483
match err {
8584
SessionError::Serve(err) => err,
86-
_ => serve::ServeError::Internal(err.to_string()),
85+
_ => serve::ServeError::internal_ctx(format!("session error: {}", err)),
8786
}
8887
}
8988
}

moq-transport/src/session/publisher.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,12 @@ impl Publisher {
173173
) {
174174
subscribed.serve(track).await?;
175175
} else {
176-
subscribed.close(ServeError::NotFound)?;
176+
let namespace = subscribed.info.track_namespace.clone();
177+
let name = subscribed.info.track_name.clone();
178+
subscribed.close(ServeError::not_found_ctx(format!(
179+
"track '{}/{}' not found in tracks",
180+
namespace, name
181+
)))?;
177182
}
178183

179184
Ok(())
@@ -188,7 +193,13 @@ impl Publisher {
188193
track_status_request.request_msg.track_namespace.clone(),
189194
&track_status_request.request_msg.track_name,
190195
)
191-
.ok_or(ServeError::NotFound)?;
196+
.ok_or_else(|| {
197+
ServeError::not_found_ctx(format!(
198+
"track '{}/{}' not found for track_status",
199+
track_status_request.request_msg.track_namespace,
200+
track_status_request.request_msg.track_name
201+
))
202+
})?;
192203

193204
track_status_request.respond_ok(&track)?;
194205

@@ -326,7 +337,10 @@ impl Publisher {
326337
// then send SubscribeError.
327338
if let Err(err) = self.unknown_subscribed.push(subscribed) {
328339
// Default to closing with a not found error I guess.
329-
err.close(ServeError::NotFound)?;
340+
err.close(ServeError::not_found_ctx(format!(
341+
"unknown_subscribed queue full for namespace {:?}",
342+
namespace
343+
)))?;
330344
}
331345

332346
Ok(())

moq-transport/src/session/subscriber.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -346,34 +346,27 @@ impl Subscriber {
346346
// Look up the subscribe by id
347347
let mut subscribes = self.subscribes.lock().unwrap();
348348
let subscribe = subscribes.get_mut(&subscribe_id).ok_or_else(|| {
349-
log::error!(
350-
"[SUBSCRIBER] recv_stream_inner: subscribe_id={} not found, track_alias={}",
351-
subscribe_id,
352-
track_alias
353-
);
354-
ServeError::NotFound
349+
ServeError::not_found_ctx(format!(
350+
"subscribe_id={} not found for track_alias={}",
351+
subscribe_id, track_alias
352+
))
355353
})?;
356354

357355
// Create the appropriate writer based on the stream header type
358356
if stream_header.header_type.is_subgroup() {
359357
log::trace!("[SUBSCRIBER] recv_stream_inner: creating subgroup writer");
360358
Writer::Subgroup(subscribe.subgroup(stream_header.subgroup_header.unwrap())?)
361359
} else {
362-
log::error!(
363-
"[SUBSCRIBER] recv_stream_inner: stream header_type={} not supported",
364-
stream_header.header_type
365-
);
366-
return Err(SessionError::Serve(ServeError::Internal(format!(
360+
return Err(SessionError::Serve(ServeError::internal_ctx(format!(
367361
"unsupported stream header type={}",
368362
stream_header.header_type
369363
))));
370364
}
371365
} else {
372-
log::error!(
373-
"[SUBSCRIBER] recv_stream_inner: subscription track_alias={} not found",
366+
return Err(SessionError::Serve(ServeError::not_found_ctx(format!(
367+
"subscription track_alias={} not found",
374368
track_alias
375-
);
376-
return Err(SessionError::Serve(ServeError::NotFound));
369+
))));
377370
}
378371
};
379372

0 commit comments

Comments
 (0)