Skip to content

Commit 6ecfcd7

Browse files
Merge pull request #102 from sgodin/datagram-logging
Update logging for Datagrams
2 parents d5a6721 + f6ca5a5 commit 6ecfcd7

File tree

4 files changed

+124
-10
lines changed

4 files changed

+124
-10
lines changed

moq-transport/src/mlog/events.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
//
1212
// TODO: Unimplemented data plane events (from draft-pardue-moq-qlog-moq-events):
1313
// - stream_type_set (when stream type becomes known)
14-
// - object_datagram_created/parsed
1514
// - object_datagram_status_created/parsed
1615
// - fetch_header_created/parsed
1716
// - fetch_object_created/parsed
@@ -61,6 +60,12 @@ pub enum EventData {
6160
#[serde(rename = "subgroup_object_created")]
6261
SubgroupObjectCreated(SubgroupObjectCreated),
6362

63+
#[serde(rename = "object_datagram_parsed")]
64+
ObjectDatagramParsed(ObjectDatagramParsed),
65+
66+
#[serde(rename = "object_datagram_created")]
67+
ObjectDatagramCreated(ObjectDatagramCreated),
68+
6469
#[serde(rename = "loglevel")]
6570
LogLevel(LogLevelEvent),
6671
}
@@ -133,6 +138,28 @@ pub struct SubgroupObjectCreated {
133138
pub object: JsonValue,
134139
}
135140

141+
/// Object Datagram parsed event (data plane)
142+
#[serde_with::skip_serializing_none]
143+
#[derive(Debug, Clone, Serialize, Deserialize)]
144+
pub struct ObjectDatagramParsed {
145+
pub stream_id: u64,
146+
147+
/// Object-specific fields
148+
#[serde(flatten)]
149+
pub object: JsonValue,
150+
}
151+
152+
/// Object Datagram created event (data plane)
153+
#[serde_with::skip_serializing_none]
154+
#[derive(Debug, Clone, Serialize, Deserialize)]
155+
pub struct ObjectDatagramCreated {
156+
pub stream_id: u64,
157+
158+
/// Object-specific fields
159+
#[serde(flatten)]
160+
pub object: JsonValue,
161+
}
162+
136163
/// LogLevel event for flexible logging (qlog loglevel schema)
137164
/// See: https://www.ietf.org/archive/id/draft-ietf-quic-qlog-main-schema-12.html#name-loglevel-events
138165
#[serde_with::skip_serializing_none]
@@ -492,10 +519,10 @@ pub fn go_away_created(time: f64, stream_id: u64, msg: &message::GoAway) -> Even
492519
/// Helper to convert SubgroupHeader to JSON
493520
fn subgroup_header_to_json(header: &data::SubgroupHeader) -> JsonValue {
494521
let mut json = json!({
522+
"header_type": format!("{:?}", header.header_type),
495523
"track_alias": header.track_alias,
496524
"group_id": header.group_id,
497525
"publisher_priority": header.publisher_priority,
498-
"header_type": format!("{:?}", header.header_type),
499526
});
500527

501528
if let Some(subgroup_id) = header.subgroup_id {
@@ -540,7 +567,7 @@ fn subgroup_object_to_json(
540567
"group_id": group_id,
541568
"subgroup_id": subgroup_id,
542569
"object_id": object_id,
543-
"extension_headers_length": 0,
570+
// TODO send object_playload itself
544571
"object_payload_length": object.payload_length,
545572
});
546573

@@ -596,12 +623,12 @@ fn subgroup_object_ext_to_json(
596623
object_id: u64,
597624
object: &data::SubgroupObjectExt,
598625
) -> JsonValue {
599-
// TODO encode extension headers
600626
let mut object_data = json!({
601627
"group_id": group_id,
602628
"subgroup_id": subgroup_id,
603629
"object_id": object_id,
604-
"extension_headers_length": object.extension_headers.0.len(),
630+
"extension_headers": key_value_pairs_to_vec(&object.extension_headers),
631+
// TODO send object_playload itself
605632
"object_payload_length": object.payload_length,
606633
});
607634

@@ -650,6 +677,53 @@ pub fn subgroup_object_ext_created(
650677
}
651678
}
652679

680+
/// Helper to convert Datagram to JSON
681+
fn object_datagram_to_json(datagram: &data::Datagram) -> JsonValue {
682+
let mut json = json!({
683+
"datagram_type": format!("{:?}", datagram.datagram_type),
684+
"track_alias": datagram.track_alias,
685+
"group_id": datagram.group_id,
686+
"object_id": datagram.object_id.unwrap_or(0),
687+
"publisher_priority": datagram.publisher_priority,
688+
// TODO send object_playload
689+
"payload_length": datagram.payload.as_ref().map_or(0, |p| p.len()),
690+
});
691+
692+
if let Some(extension_headers) = &datagram.extension_headers {
693+
json["extension_headers"] = json!(key_value_pairs_to_vec(extension_headers));
694+
}
695+
696+
if let Some(status) = datagram.status {
697+
json["object_status"] = json!(format!("{:?}", status));
698+
}
699+
700+
json
701+
}
702+
703+
/// Create a object_datagram_parsed event
704+
pub fn object_datagram_parsed(time: f64, stream_id: u64, datagram: &data::Datagram) -> Event {
705+
Event {
706+
time,
707+
name: "moqt:object_datagram_parsed".to_string(),
708+
data: EventData::ObjectDatagramParsed(ObjectDatagramParsed {
709+
stream_id,
710+
object: object_datagram_to_json(datagram),
711+
}),
712+
}
713+
}
714+
715+
/// Create a object_datagram_created event
716+
pub fn object_datagram_created(time: f64, stream_id: u64, datagram: &data::Datagram) -> Event {
717+
Event {
718+
time,
719+
name: "moqt:object_datagram_created".to_string(),
720+
data: EventData::ObjectDatagramCreated(ObjectDatagramCreated {
721+
stream_id,
722+
object: object_datagram_to_json(datagram),
723+
}),
724+
}
725+
}
726+
653727
// LogLevel events (generic logging)
654728

655729
/// Log levels for qlog loglevel events

moq-transport/src/mlog/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ pub use writer::MlogWriter;
88

99
pub mod events;
1010
pub use events::{
11-
client_setup_parsed, loglevel_event, server_setup_created, subgroup_header_created,
12-
subgroup_header_parsed, subgroup_object_created, subgroup_object_ext_created,
13-
subgroup_object_ext_parsed, subgroup_object_parsed, Event, EventData, LogLevel,
11+
client_setup_parsed, loglevel_event, object_datagram_created, object_datagram_parsed,
12+
server_setup_created, subgroup_header_created, subgroup_header_parsed, subgroup_object_created,
13+
subgroup_object_ext_created, subgroup_object_ext_parsed, subgroup_object_parsed, Event,
14+
EventData, LogLevel,
1415
};

moq-transport/src/session/subscribed.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,15 +382,29 @@ impl Subscribed {
382382
encoded_datagram.encode(&mut buffer)?;
383383

384384
log::debug!(
385-
"[PUBLISHER] serve_datagrams: sending datagram #{} - group_id={}, object_id={}, priority={}, payload_len={}, total_encoded_len={}",
385+
"[PUBLISHER] serve_datagrams: sending datagram #{} - track_alias={}, group_id={}, object_id={}, priority={}, payload_len={}, total_encoded_len={}",
386386
datagram_count + 1,
387+
encoded_datagram.track_alias,
387388
encoded_datagram.group_id,
388389
encoded_datagram.object_id.unwrap(),
389390
encoded_datagram.publisher_priority,
390391
payload_len,
391392
buffer.len()
392393
);
393394

395+
// Create mlog event for datagram created
396+
if let Some(ref mlog) = self.mlog {
397+
if let Ok(mut mlog_guard) = mlog.lock() {
398+
let time = mlog_guard.elapsed_ms();
399+
let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID
400+
let _ = mlog_guard.add_event(mlog::object_datagram_created(
401+
time,
402+
stream_id,
403+
&encoded_datagram,
404+
));
405+
}
406+
}
407+
394408
self.publisher.send_datagram(buffer.into()).await?;
395409

396410
self.state

moq-transport/src/session/subscriber.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,14 +550,39 @@ impl Subscriber {
550550
let mut cursor = io::Cursor::new(datagram);
551551
let datagram = data::Datagram::decode(&mut cursor)?;
552552

553+
if let Some(ref mlog) = self.mlog {
554+
if let Ok(mut mlog_guard) = mlog.lock() {
555+
let time = mlog_guard.elapsed_ms();
556+
let stream_id = 0; // TODO: Placeholder, need actual QUIC stream ID
557+
let _ =
558+
mlog_guard.add_event(mlog::object_datagram_parsed(time, stream_id, &datagram));
559+
}
560+
}
561+
553562
// Look up the subscribe id for this track alias
554563
if let Some(subscribe_id) = self.get_subscribe_id_by_alias(datagram.track_alias) {
555564
// Look up the subscribe by id
556565
if let Some(subscribe) = self.subscribes.lock().unwrap().get_mut(&subscribe_id) {
566+
log::trace!(
567+
"[SUBSCRIBER] recv_datagram: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}",
568+
datagram.track_alias,
569+
datagram.group_id,
570+
datagram.object_id.unwrap_or(0),
571+
datagram.publisher_priority,
572+
datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)),
573+
datagram.payload.as_ref().map_or(0, |p| p.len()));
557574
subscribe.datagram(datagram)?;
558575
}
576+
} else {
577+
log::warn!(
578+
"[SUBSCRIBER] recv_datagram: discarded due to unknown track_alias: track_alias={}, group_id={}, object_id={}, publisher_priority={}, status={}, payload_length={}",
579+
datagram.track_alias,
580+
datagram.group_id,
581+
datagram.object_id.unwrap_or(0),
582+
datagram.publisher_priority,
583+
datagram.status.as_ref().map_or("None".to_string(), |s| format!("{:?}", s)),
584+
datagram.payload.as_ref().map_or(0, |p| p.len()));
559585
}
560-
// TODO do we want to return an error if we can't find the subscribe?
561586

562587
Ok(())
563588
}

0 commit comments

Comments
 (0)