From 560df9270651fba098fae3ea751eada61fe93486 Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 10:20:16 +0200 Subject: [PATCH 1/6] Handle selective subscription start To do this while adhering to the draft as much as possible, the following changes had to be made: * Add filter type parameter for subscribe calls * Fix filter type checks in Subscribe encoding * Remove SubscribeLocation according to draft-04 * Update Subscribe according to draft-08 (end -> end_group) * Update FilterType according to draft-09 (remove LatestGroup) --- moq-clock-ietf/src/clock.rs | 4 +- moq-clock-ietf/src/main.rs | 4 +- moq-relay-ietf/src/consumer.rs | 4 +- moq-relay-ietf/src/remote.rs | 3 +- moq-sub/src/media.rs | 7 +- moq-transport/src/message/filter_type.rs | 3 - moq-transport/src/message/subscribe.rs | 105 ++++++----------------- moq-transport/src/serve/subgroup.rs | 25 ++++-- moq-transport/src/session/subscribe.rs | 61 ++++++++++--- moq-transport/src/session/subscribed.rs | 60 ++++++++++--- moq-transport/src/session/subscriber.rs | 16 ++-- 11 files changed, 163 insertions(+), 129 deletions(-) diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs index 0dc6ce06..67f9cae5 100644 --- a/moq-clock-ietf/src/clock.rs +++ b/moq-clock-ietf/src/clock.rs @@ -19,8 +19,8 @@ impl Publisher { let start = Utc::now(); let mut now = start; - // Just for fun, don't start at zero. - let mut sequence = start.minute(); + // For better reproducibility and easier testing, start at 0. + let mut sequence = 0; loop { let segment = self diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs index 6e38a94e..3549f64d 100644 --- a/moq-clock-ietf/src/main.rs +++ b/moq-clock-ietf/src/main.rs @@ -10,7 +10,7 @@ mod clock; use moq_transport::{ coding::Tuple, serve, - session::{Publisher, Subscriber}, + session::{Publisher, SubscribeFilter, Subscriber}, }; #[derive(Parser, Clone)] @@ -93,7 +93,7 @@ async fn main() -> anyhow::Result<()> { tokio::select! { res = session.run() => res.context("session error")?, res = clock.run() => res.context("clock error")?, - res = subscriber.subscribe(prod) => res.context("failed to subscribe to track")?, + res = subscriber.subscribe(prod, SubscribeFilter::LatestObject) => res.context("failed to subscribe to track")?, } } diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 52c57e76..0ed69d7f 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -2,7 +2,7 @@ use anyhow::Context; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_transport::{ serve::Tracks, - session::{Announced, SessionError, Subscriber}, + session::{Announced, SessionError, SubscribeFilter, Subscriber}, }; use crate::{Api, Locals, Producer}; @@ -96,7 +96,7 @@ impl Consumer { let info = track.clone(); log::info!("forwarding subscribe: {:?}", info); - if let Err(err) = remote.subscribe(track).await { + if let Err(err) = remote.subscribe(track, SubscribeFilter::LatestObject).await { log::warn!("failed forwarding subscribe: {:?}, error: {}", info, err) } diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index b64706a0..7769dc77 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -12,6 +12,7 @@ use futures::StreamExt; use moq_native_ietf::quic; use moq_transport::coding::Tuple; use moq_transport::serve::{Track, TrackReader, TrackWriter}; +use moq_transport::session::SubscribeFilter; use moq_transport::watch::State; use url::Url; @@ -233,7 +234,7 @@ impl RemoteProducer { let mut subscriber = subscriber.clone(); tasks.push(async move { - if let Err(err) = subscriber.subscribe(track).await { + if let Err(err) = subscriber.subscribe(track, SubscribeFilter::LatestObject).await { log::warn!("failed serving track: {:?}, error: {}", info, err); } }); diff --git a/moq-sub/src/media.rs b/moq-sub/src/media.rs index c33399c3..9044f6f0 100644 --- a/moq-sub/src/media.rs +++ b/moq-sub/src/media.rs @@ -6,7 +6,7 @@ use moq_transport::serve::{ SubgroupObjectReader, SubgroupReader, TrackReader, TrackReaderMode, Tracks, TracksReader, TracksWriter, }; -use moq_transport::session::Subscriber; +use moq_transport::session::{SubscribeFilter, Subscriber}; use mp4::ReadBox; use tokio::{ io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -43,7 +43,7 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track).await.unwrap_or_else(|err| { + subscriber.subscribe(track, SubscribeFilter::LatestObject).await.unwrap_or_else(|err| { warn!("failed to subscribe to init track: {err:?}"); }); }); @@ -101,7 +101,8 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track).await.unwrap_or_else(|err| { + subscriber.subscribe(track, SubscribeFilter::LatestObject) + .await.unwrap_or_else(|err| { warn!("failed to subscribe to track: {err:?}"); }); }); diff --git a/moq-transport/src/message/filter_type.rs b/moq-transport/src/message/filter_type.rs index e826443c..5aa79f8e 100644 --- a/moq-transport/src/message/filter_type.rs +++ b/moq-transport/src/message/filter_type.rs @@ -4,7 +4,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types #[derive(Clone, Debug, PartialEq)] pub enum FilterType { - LatestGroup = 0x1, LatestObject = 0x2, AbsoluteStart = 0x3, AbsoluteRange = 0x4, @@ -13,7 +12,6 @@ pub enum FilterType { impl Encode for FilterType { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { match self { - Self::LatestGroup => (0x1_u64).encode(w), Self::LatestObject => (0x2_u64).encode(w), Self::AbsoluteStart => (0x3_u64).encode(w), Self::AbsoluteRange => (0x4_u64).encode(w), @@ -24,7 +22,6 @@ impl Encode for FilterType { impl Decode for FilterType { fn decode(r: &mut R) -> Result { match u64::decode(r)? { - 0x01 => Ok(Self::LatestGroup), 0x02 => Ok(Self::LatestObject), 0x03 => Ok(Self::AbsoluteStart), 0x04 => Ok(Self::AbsoluteRange), diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 6dd32723..edc39eb9 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -22,9 +22,9 @@ pub struct Subscribe { /// Filter type pub filter_type: FilterType, - /// The start/end group/object. (TODO: Make optional) - pub start: Option, // TODO: Make optional - pub end: Option, // TODO: Make optional + /// The start/end group/object + pub start: Option, + pub end_group: Option, /// Optional parameters pub params: Params, @@ -43,43 +43,28 @@ impl Decode for Subscribe { let filter_type = FilterType::decode(r)?; let start: Option; - let end: Option; + let end_group: Option; match filter_type { FilterType::AbsoluteStart => { if r.remaining() < 2 { return Err(DecodeError::MissingField); } start = Some(SubscribePair::decode(r)?); - end = None; + end_group = None; } FilterType::AbsoluteRange => { if r.remaining() < 4 { return Err(DecodeError::MissingField); } start = Some(SubscribePair::decode(r)?); - end = Some(SubscribePair::decode(r)?); + end_group = Some(u64::decode(r)?); } _ => { start = None; - end = None; + end_group = None; } } - if let Some(s) = &start { - // You can't have a start object without a start group. - if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - if let Some(e) = &end { - // You can't have an end object without an end group. - if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - - // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. - let params = Params::decode(r)?; Ok(Self { @@ -91,7 +76,7 @@ impl Decode for Subscribe { group_order, filter_type, start, - end, + end_group, params, }) } @@ -113,14 +98,18 @@ impl Encode for Subscribe { if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { - if self.start.is_none() || self.end.is_none() { - return Err(EncodeError::MissingField); - } if let Some(start) = &self.start { start.encode(w)?; + } else { + return Err(EncodeError::MissingField); } - if let Some(end) = &self.end { - end.encode(w)?; + } + + if self.filter_type == FilterType::AbsoluteRange { + if let Some(end_group) = &self.end_group { + end_group.encode(w)?; + } else { + return Err(EncodeError::MissingField); } } @@ -130,17 +119,20 @@ impl Encode for Subscribe { } } -#[derive(Clone, Debug, PartialEq)] +// Note: When derived on structs, it will produce a lexicographic ordering +// based on the top-to-bottom declaration order of the struct’s members. +// Therefore, it will work just as expected. +#[derive(Clone, Debug, PartialEq, PartialOrd)] pub struct SubscribePair { - pub group: SubscribeLocation, - pub object: SubscribeLocation, + pub group: u64, + pub object: u64, } impl Decode for SubscribePair { fn decode(r: &mut R) -> Result { Ok(Self { - group: SubscribeLocation::decode(r)?, - object: SubscribeLocation::decode(r)?, + group: u64::decode(r)?, + object: u64::decode(r)?, }) } } @@ -152,50 +144,3 @@ impl Encode for SubscribePair { Ok(()) } } - -/// Signal where the subscription should begin, relative to the current cache. -#[derive(Clone, Debug, PartialEq)] -pub enum SubscribeLocation { - None, - Absolute(u64), - Latest(u64), - Future(u64), -} - -impl Decode for SubscribeLocation { - fn decode(r: &mut R) -> Result { - let kind = u64::decode(r)?; - - match kind { - 0 => Ok(Self::None), - 1 => Ok(Self::Absolute(u64::decode(r)?)), - 2 => Ok(Self::Latest(u64::decode(r)?)), - 3 => Ok(Self::Future(u64::decode(r)?)), - _ => Err(DecodeError::InvalidSubscribeLocation), - } - } -} - -impl Encode for SubscribeLocation { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id().encode(w)?; - - match self { - Self::None => Ok(()), - Self::Absolute(val) => val.encode(w), - Self::Latest(val) => val.encode(w), - Self::Future(val) => val.encode(w), - } - } -} - -impl SubscribeLocation { - fn id(&self) -> u64 { - match self { - Self::None => 0, - Self::Absolute(_) => 1, - Self::Latest(_) => 2, - Self::Future(_) => 3, - } - } -} diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index d3fa889e..f6382e6c 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -299,24 +299,29 @@ impl SubgroupWriter { /// Create the next object ID with the given payload. pub fn write(&mut self, payload: bytes::Bytes) -> Result<(), ServeError> { - let mut object = self.create(payload.len())?; + let mut object = self.create(payload.len(), None)?; object.write(payload)?; Ok(()) } /// Write an object over multiple writes. /// + /// The argument id is optional for convenience (e.g., when the function + /// is invoked at the original publisher and the GroupIDs are sequential). + /// /// BAD STUFF will happen if the size is wrong; this is an advanced feature. - pub fn create(&mut self, size: usize) -> Result { + pub fn create(&mut self, size: usize, id: Option) -> Result { let (writer, reader) = SubgroupObject { group: self.info.clone(), - object_id: self.next, + object_id: id.unwrap_or(self.next), status: ObjectStatus::Object, size, } .produce(); - self.next += 1; + if id.is_none() { + self.next += 1; + } let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; state.objects.push(reader); @@ -392,13 +397,23 @@ impl SubgroupReader { } pub async fn next(&mut self) -> Result, ServeError> { + self._next(true).await + } + + pub async fn peek(&mut self) -> Result, ServeError> { + self._next(false).await + } + + async fn _next(&mut self, inc: bool) -> Result, ServeError> { loop { { let state = self.state.lock(); if self.index < state.objects.len() { let object = state.objects[self.index].clone(); - self.index += 1; + if inc { + self.index += 1; + } return Ok(Some(object)); } diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 620ab09a..f972675b 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -3,7 +3,7 @@ use std::ops; use crate::{ coding::Tuple, data, - message::{self, FilterType, GroupOrder, SubscribeLocation, SubscribePair}, + message::{self, FilterType, GroupOrder, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, }; @@ -41,12 +41,58 @@ pub struct Subscribe { pub info: SubscribeInfo, } +#[derive(Debug, Clone)] +pub enum SubscribeFilter { + AbsoluteStart(SubscribePair), + AbsoluteRange(SubscribePair, u64), + LatestObject +} + +impl SubscribeFilter { + fn unwrap(self) -> (FilterType, Option, Option) { + match self { + SubscribeFilter::AbsoluteStart(start) => (FilterType::AbsoluteStart, Some(start), None), + SubscribeFilter::AbsoluteRange(start, end_group) => (FilterType::AbsoluteRange, Some(start), Some(end_group)), + SubscribeFilter::LatestObject => (FilterType::LatestObject, None, None), + } + } +} + +impl From<&message::Subscribe> for SubscribeFilter { + fn from(subscribe: &message::Subscribe) -> Self { + match subscribe.filter_type { + FilterType::AbsoluteStart => Self::AbsoluteStart( + subscribe.start.clone().expect("AbsoluteStart, but no StartGroup nor StartObject") + ), + FilterType::AbsoluteRange => Self::AbsoluteRange( + subscribe.start.clone().expect("AbsoluteRange, but no StartGroup nor StartObject"), + subscribe.end_group.clone().expect("AbsoluteRange, but no EndGroup"), + ), + FilterType::LatestObject => Self::LatestObject, + } + } +} + +impl From<&message::SubscribeUpdate> for SubscribeFilter { + fn from(update: &message::SubscribeUpdate) -> Self { + if update.end_group != 0 { + Self::AbsoluteRange(update.start.clone(), update.end_group - 1) + } else if !(update.start.group == 0 && update.start.object == 0) { + Self::AbsoluteStart(update.start.clone()) // A value of 0 means the subscription is open-ended. + } else { + Self::LatestObject // If starts from the beginning and is open-ended, it must be LatestObject. + } + } +} + impl Subscribe { pub(super) fn new( mut subscriber: Subscriber, id: u64, track: TrackWriter, + filter: SubscribeFilter, ) -> (Subscribe, SubscribeRecv) { + let (filter_type, start, end_group) = filter.clone().unwrap(); subscriber.send_message(message::Subscribe { id, track_alias: id, @@ -55,16 +101,9 @@ impl Subscribe { // TODO add prioritization logic on the publisher side subscriber_priority: 127, // default to mid value, see: https://github.com/moq-wg/moq-transport/issues/504 group_order: GroupOrder::Publisher, // defer to publisher send order - filter_type: FilterType::LatestGroup, - // TODO add these to the publisher. - start: Some(SubscribePair { - group: SubscribeLocation::Latest(0), - object: SubscribeLocation::Absolute(0), - }), - end: Some(SubscribePair { - group: SubscribeLocation::None, - object: SubscribeLocation::None, - }), + filter_type, + start, + end_group, params: Default::default(), }); diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 04dd95ac..3329c6a6 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -8,30 +8,34 @@ use crate::serve::{ServeError, TrackReaderMode}; use crate::watch::State; use crate::{data, message, serve}; -use super::{Publisher, SessionError, SubscribeInfo, Writer}; +use super::{Publisher, SessionError, SubscribeFilter, SubscribeInfo, Writer}; #[derive(Debug)] struct SubscribedState { - max_group_id: Option<(u64, u64)>, + max_group_id: Option, + filter: SubscribeFilter, closed: Result<(), ServeError>, } impl SubscribedState { - fn update_max_group_id(&mut self, group_id: u64, object_id: u64) -> Result<(), ServeError> { - if let Some((max_group, max_object)) = self.max_group_id { - if group_id >= max_group && object_id >= max_object { - self.max_group_id = Some((group_id, object_id)); + fn update_max_group_id(&mut self, group_id: u64) -> Result<(), ServeError> { + if let Some(max_group_id) = self.max_group_id { + if group_id > max_group_id { + self.max_group_id = Some(group_id); } } Ok(()) } + Ok(()) + } } impl Default for SubscribedState { fn default() -> Self { Self { max_group_id: None, + filter: SubscribeFilter::LatestObject, closed: Ok(()), } } @@ -48,7 +52,12 @@ pub struct Subscribed { impl Subscribed { pub(super) fn new(publisher: Publisher, msg: message::Subscribe) -> (Self, SubscribedRecv) { - let (send, recv) = State::default().split(); + let (send, recv) = State::new( + SubscribedState { + filter: SubscribeFilter::from(&msg), + ..Default::default() + } + ).split(); let info = SubscribeInfo { namespace: msg.track_namespace.clone(), name: msg.track_name.clone(), @@ -82,7 +91,10 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Cancel)? - .max_group_id = latest; + .max_group_id = match latest { + None => None, + Some(latest) => Some(latest.0), + }; self.publisher.send_message(message::SubscribeOk { id: self.msg.id, @@ -197,7 +209,7 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(object.group_id, object.object_id)?; + .update_max_group_id(object.group_id)?; writer.encode(&header).await?; @@ -260,6 +272,32 @@ impl Subscribed { mut publisher: Publisher, state: State, ) -> Result<(), SessionError> { + log::trace!("serving group {}", subgroup.group_id); + + let filter = state.lock().filter.clone(); + if let SubscribeFilter::AbsoluteStart(SubscribePair { + group: group_id, + object: object_id, + }) = filter + { + if subgroup.group_id < group_id { + log::trace!("skipping group {}", subgroup.group_id); + return Ok(()); + } else if subgroup.group_id == group_id { + while let Some(object) = subgroup.peek().await? { + if object.object_id >= object_id { + break; + } + log::trace!( + "skipping object {} of group {}", + object.object_id, + subgroup.group_id + ); + subgroup.next().await?; + } + } + } + let mut stream = publisher.open_uni().await?; // TODO figure out u32 vs u64 priority @@ -284,7 +322,7 @@ impl Subscribed { state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(subgroup.group_id, object.object_id)?; + .update_max_group_id(subgroup.group_id)?; log::trace!("sent group object: {:?}", header); @@ -324,7 +362,7 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(datagram.group_id, datagram.object_id)?; + .update_max_group_id(datagram.group_id)?; } Ok(()) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 8778b1b0..1b6d09a5 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -1,20 +1,18 @@ use std::{ - collections::{hash_map, HashMap}, - io, - sync::{atomic, Arc, Mutex}, + collections::{hash_map, HashMap}, io, sync::{atomic, Arc, Mutex} }; use crate::{ coding::{Decode, Tuple}, data, - message::{self, Message}, + message::{self, Message, SubscribeUpdate}, serve::{self, ServeError}, setup, }; use crate::watch::Queue; -use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeRecv}; +use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeFilter, SubscribeRecv}; // TODO remove Clone. #[derive(Clone)] @@ -55,16 +53,16 @@ impl Subscriber { self.announced_queue.pop().await } - pub async fn subscribe(&mut self, track: serve::TrackWriter) -> Result<(), ServeError> { + pub async fn subscribe(&mut self, track: serve::TrackWriter, filter: SubscribeFilter) -> Result<(), ServeError> { let id = self.subscribe_next.fetch_add(1, atomic::Ordering::Relaxed); - let (send, recv) = Subscribe::new(self.clone(), id, track); + let (send, recv) = Subscribe::new(self.clone(), id, track, filter); self.subscribes.lock().unwrap().insert(id, recv); send.closed().await } - pub(super) fn send_message>(&mut self, msg: M) { + pub fn send_message>(&mut self, msg: M) { let msg = msg.into(); // Remove our entry on terminal state. @@ -272,7 +270,7 @@ impl Subscriber { log::trace!("received group object: {:?}", object); let mut remain = object.size; - let mut object = group.create(object.size)?; + let mut object = group.create(object.size, Some(object.object_id))?; while remain > 0 { let data = reader From 58beb17d731d94b8d80299f9995aa925938fc1e5 Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 10:36:44 +0200 Subject: [PATCH 2/6] Update SubscribeDone according to draft-08 * Change fields of SubscribeDone * Introduce stream_count to `SubscribedState` * Remove dependency on msg format from `Subscribed` --- moq-transport/src/message/subscribe_done.rs | 30 +++++------------ moq-transport/src/session/subscribe.rs | 8 +++-- moq-transport/src/session/subscribed.rs | 37 +++++++++++++-------- 3 files changed, 36 insertions(+), 39 deletions(-) diff --git a/moq-transport/src/message/subscribe_done.rs b/moq-transport/src/message/subscribe_done.rs index a7f0c2e6..36656dc0 100644 --- a/moq-transport/src/message/subscribe_done.rs +++ b/moq-transport/src/message/subscribe_done.rs @@ -3,37 +3,32 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; /// Sent by the publisher to cleanly terminate a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeDone { - /// The ID for this subscription. + /// The ID for this subscription pub id: u64, /// The error code pub code: u64, + /// Number of DATA streams opened for this subscription + /// (1 << 62) - 1 if it cannot be determined + pub count: u64, + /// An optional error reason pub reason: String, - - /// The final group/object sent on this subscription. - pub last: Option<(u64, u64)>, } impl Decode for SubscribeDone { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let code = u64::decode(r)?; + let count = u64::decode(r)?; let reason = String::decode(r)?; - Self::decode_remaining(r, 1)?; - let last = match r.get_u8() { - 0 => None, - 1 => Some((u64::decode(r)?, u64::decode(r)?)), - _ => return Err(DecodeError::InvalidValue), - }; - Ok(Self { id, code, + count, reason, - last, }) } } @@ -42,18 +37,9 @@ impl Encode for SubscribeDone { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w)?; self.code.encode(w)?; + self.count.encode(w)?; self.reason.encode(w)?; - Self::encode_remaining(w, 1)?; - - if let Some((group, object)) = self.last { - w.put_u8(1); - group.encode(w)?; - object.encode(w)?; - } else { - w.put_u8(0); - } - Ok(()) } } diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index f972675b..a1262e24 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -13,8 +13,10 @@ use super::Subscriber; #[derive(Debug, Clone)] pub struct SubscribeInfo { + pub id: u64, pub namespace: Tuple, pub name: String, + pub alias: u64, } struct SubscribeState { @@ -36,7 +38,6 @@ impl Default for SubscribeState { pub struct Subscribe { state: State, subscriber: Subscriber, - id: u64, pub info: SubscribeInfo, } @@ -108,8 +109,10 @@ impl Subscribe { }); let info = SubscribeInfo { + id, namespace: track.namespace.clone(), name: track.name.clone(), + alias: id, }; let (send, recv) = State::default().split(); @@ -117,7 +120,6 @@ impl Subscribe { let send = Subscribe { state: send, subscriber, - id, info, }; @@ -148,7 +150,7 @@ impl Subscribe { impl Drop for Subscribe { fn drop(&mut self) { self.subscriber - .send_message(message::Unsubscribe { id: self.id }); + .send_message(message::Unsubscribe { id: self.info.id }); } } diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 3329c6a6..0ac5e0e8 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -13,7 +13,9 @@ use super::{Publisher, SessionError, SubscribeFilter, SubscribeInfo, Writer}; #[derive(Debug)] struct SubscribedState { max_group_id: Option, + stream_count: u64, filter: SubscribeFilter, + priority: u8, closed: Result<(), ServeError>, } @@ -27,6 +29,9 @@ impl SubscribedState { Ok(()) } + fn increment_stream_count(&mut self) -> Result<(), ServeError> { + self.stream_count += 1; + Ok(()) } } @@ -35,7 +40,9 @@ impl Default for SubscribedState { fn default() -> Self { Self { max_group_id: None, + stream_count: 0, filter: SubscribeFilter::LatestObject, + priority: 127, closed: Ok(()), } } @@ -44,7 +51,6 @@ impl Default for SubscribedState { pub struct Subscribed { publisher: Publisher, state: State, - msg: message::Subscribe, ok: bool, pub info: SubscribeInfo, @@ -59,14 +65,15 @@ impl Subscribed { } ).split(); let info = SubscribeInfo { + id: msg.id, namespace: msg.track_namespace.clone(), name: msg.track_name.clone(), + alias: msg.track_alias, }; let send = Self { publisher, state: send, - msg, info, ok: false, }; @@ -97,7 +104,7 @@ impl Subscribed { }; self.publisher.send_message(message::SubscribeOk { - id: self.msg.id, + id: self.info.id, expires: None, group_order: message::GroupOrder::Descending, // TODO: resolve correct value from publisher / subscriber prefs latest, @@ -156,19 +163,19 @@ impl Drop for Subscribed { .err() .cloned() .unwrap_or(ServeError::Done); - let max_group_id = state.max_group_id; + let stream_count = state.stream_count; drop(state); // Important to avoid a deadlock if self.ok { self.publisher.send_message(message::SubscribeDone { - id: self.msg.id, - last: max_group_id, + id: self.info.id, code: err.code(), + count: stream_count, reason: err.to_string(), }); } else { self.publisher.send_message(message::SubscribeError { - id: self.msg.id, + id: self.info.id, alias: 0, code: err.code(), reason: err.to_string(), @@ -180,6 +187,7 @@ impl Drop for Subscribed { impl Subscribed { async fn serve_track(&mut self, mut track: serve::StreamReader) -> Result<(), SessionError> { let mut stream = self.publisher.open_uni().await?; + self.state.lock_mut().ok_or(ServeError::Done)?.increment_stream_count()?; // TODO figure out u32 vs u64 priority stream.set_priority(track.priority as i32); @@ -187,8 +195,8 @@ impl Subscribed { let mut writer = Writer::new(stream); let header: data::Header = data::TrackHeader { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, publisher_priority: track.priority, } .into(); @@ -205,7 +213,6 @@ impl Subscribed { size: object.size, status: object.status, }; - self.state .lock_mut() .ok_or(ServeError::Done)? @@ -239,8 +246,8 @@ impl Subscribed { res = subgroups.next(), if done.is_none() => match res { Ok(Some(subgroup)) => { let header = data::SubgroupHeader { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, group_id: subgroup.group_id, subgroup_id: subgroup.subgroup_id, publisher_priority: subgroup.priority, @@ -299,6 +306,8 @@ impl Subscribed { } let mut stream = publisher.open_uni().await?; + state.lock_mut().ok_or(ServeError::Done)?.increment_stream_count()?; + // TODO figure out u32 vs u64 priority stream.set_priority(subgroup.priority as i32); @@ -343,8 +352,8 @@ impl Subscribed { ) -> Result<(), SessionError> { while let Some(datagram) = datagrams.read().await? { let datagram = data::Datagram { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, group_id: datagram.group_id, object_id: datagram.object_id, publisher_priority: datagram.priority, From 3945a4d10505992cb66517e7452ec5d9d5cd74ea Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 13:37:32 +0200 Subject: [PATCH 3/6] Implement SubscribeUpdate --- moq-transport/src/message/subscribe_update.rs | 123 +++++------------- moq-transport/src/session/publisher.rs | 8 +- moq-transport/src/session/subscribed.rs | 38 ++++++ moq-transport/src/session/subscriber.rs | 7 + 4 files changed, 86 insertions(+), 90 deletions(-) diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs index 94c749f9..abe6204f 100644 --- a/moq-transport/src/message/subscribe_update.rs +++ b/moq-transport/src/message/subscribe_update.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; -use crate::message::subscribe::{SubscribeLocation, SubscribePair}; -use crate::message::FilterType; -use crate::message::GroupOrder; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::message::subscribe::SubscribePair; +use crate::session::{SubscribeFilter, Subscriber}; /// Sent by the subscriber to request all future objects for the given track. /// @@ -11,88 +10,55 @@ pub struct SubscribeUpdate { /// The subscription ID pub id: u64, - /// Track properties - pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: Tuple, - pub track_name: String, + /// Start and End depending on the Filter Type set + pub start: SubscribePair, + pub end_group: u64, // Subscriber Priority pub subscriber_priority: u8, - pub group_order: GroupOrder, - - /// Filter type - pub filter_type: FilterType, - - /// The start/end group/object. (TODO: Make optional) - pub start: Option, // TODO: Make optional - pub end: Option, // TODO: Make optional /// Optional parameters pub params: Params, } +impl SubscribeUpdate { + pub fn new(mut subscriber: Subscriber, id: u64, filter: SubscribeFilter, priority: u8) -> Self { + let (start, end_group) = match filter { + SubscribeFilter::AbsoluteStart(start) => (start, 0), + SubscribeFilter::AbsoluteRange(start, end_group) => (start, end_group), + SubscribeFilter::LatestObject => (SubscribePair { group: 0, object: 0 }, 0), + }; + + let update = SubscribeUpdate { + id, + start, + end_group, + subscriber_priority: priority, + params: Params::new(), + }; + + subscriber.send_message(update.clone()); + + update + } +} + impl Decode for SubscribeUpdate { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; - let track_alias = u64::decode(r)?; - let track_namespace = Tuple::decode(r)?; - let track_name = String::decode(r)?; + + let start = SubscribePair::decode(r)?; + let end_group = u64::decode(r)?; let subscriber_priority = u8::decode(r)?; - let group_order = GroupOrder::decode(r)?; - - let filter_type = FilterType::decode(r)?; - - let start: Option; - let end: Option; - match filter_type { - FilterType::AbsoluteStart => { - if r.remaining() < 2 { - return Err(DecodeError::MissingField); - } - start = Some(SubscribePair::decode(r)?); - end = None; - } - FilterType::AbsoluteRange => { - if r.remaining() < 4 { - return Err(DecodeError::MissingField); - } - start = Some(SubscribePair::decode(r)?); - end = Some(SubscribePair::decode(r)?); - } - _ => { - start = None; - end = None; - } - } - - if let Some(s) = &start { - // You can't have a start object without a start group. - if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - if let Some(e) = &end { - // You can't have an end object without an end group. - if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - - // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. let params = Params::decode(r)?; Ok(Self { id, - track_alias, - track_namespace, - track_name, - subscriber_priority, - group_order, - filter_type, start, - end, + end_group, + subscriber_priority, params, }) } @@ -101,28 +67,11 @@ impl Decode for SubscribeUpdate { impl Encode for SubscribeUpdate { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w)?; - self.track_alias.encode(w)?; - self.track_namespace.encode(w)?; - self.track_name.encode(w)?; + + self.start.encode(w)?; + self.end_group.encode(w)?; self.subscriber_priority.encode(w)?; - self.group_order.encode(w)?; - - self.filter_type.encode(w)?; - - if self.filter_type == FilterType::AbsoluteStart - || self.filter_type == FilterType::AbsoluteRange - { - if self.start.is_none() || self.end.is_none() { - return Err(EncodeError::MissingField); - } - if let Some(start) = &self.start { - start.encode(w)?; - } - if let Some(end) = &self.end { - end.encode(w)?; - } - } self.params.encode(w)?; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index aad7203f..e1b97354 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -256,10 +256,12 @@ impl Publisher { fn recv_subscribe_update( &mut self, - _msg: message::SubscribeUpdate, + msg: message::SubscribeUpdate, ) -> Result<(), SessionError> { - // TODO: Implement updating subscriptions. - Err(SessionError::Internal) + if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { + subscribed.recv_subscribe_update(msg)?; + Ok(()) + } else { } fn recv_track_status_request( diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 0ac5e0e8..e15c9035 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -4,6 +4,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use crate::coding::Encode; +use crate::message::{SubscribePair, SubscribeUpdate}; use crate::serve::{ServeError, TrackReaderMode}; use crate::watch::State; use crate::{data, message, serve}; @@ -64,6 +65,7 @@ impl Subscribed { ..Default::default() } ).split(); + let info = SubscribeInfo { id: msg.id, namespace: msg.track_namespace.clone(), @@ -383,6 +385,42 @@ pub(super) struct SubscribedRecv { } impl SubscribedRecv { + pub fn recv_subscribe_update(&mut self, msg: SubscribeUpdate) -> Result<(), ServeError> { + let state = self.state.lock(); + + if let Some(mut state) = state.into_mut() { + log::trace!("received subscribe update"); + + state.priority = msg.subscriber_priority; + + let filter = SubscribeFilter::from(&msg); + // Check for assumptions: + // - Subscriptions can only become more narrow, not wider + // - A publisher SHOULD close the Session as a 'Protocol Violation' + // if the SUBSCRIBE_UPDATE violates either rule [...] + use SubscribeFilter::*; + match (&state.filter, &filter) { + (AbsoluteStart(start_old), AbsoluteStart(start_new)) + if start_old <= start_new => state.filter = filter, + (AbsoluteStart(start_old), AbsoluteRange(start_new, end_group_new)) + if start_old <= start_new && start_new.group <= *end_group_new => state.filter = filter, + (AbsoluteStart(_), LatestObject) => state.filter = filter, + (AbsoluteRange(start_old, end_group_old), AbsoluteRange(start_new, end_group_new)) + if start_old <= start_new && start_new.group <= *end_group_new && end_group_new <= end_group_old => state.filter = filter, + (LatestObject, AbsoluteStart(start_new)) + if state.max_group_id.is_none() || start_new.group >= state.max_group_id.unwrap() => state.filter = filter, + (LatestObject, AbsoluteRange(start_new, end_group_new)) + if (state.max_group_id.is_none() || start_new.group >= state.max_group_id.unwrap()) && start_new.group <= *end_group_new => state.filter = filter, + (LatestObject, LatestObject) => state.filter = filter, + _ => { + return Err(ServeError::Internal("narrowing subscribe update".to_string())); + }, + }; + + } + + Ok(()) + } pub fn recv_unsubscribe(&mut self) -> Result<(), ServeError> { let state = self.state.lock(); state.closed.clone()?; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 1b6d09a5..b10f7eb2 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -62,6 +62,13 @@ impl Subscriber { send.closed().await } + pub fn subscribe_update(&mut self, id: u64, filter: SubscribeFilter, priority: u8) -> Result<(), ServeError> { + let _update = SubscribeUpdate::new(self.clone(), id, filter, priority); + log::trace!("sent subscribe update"); + + Ok(()) + } + pub fn send_message>(&mut self, msg: M) { let msg = msg.into(); From cbac9c0912e5d1e9c2357c56958359ae7e489961 Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 13:39:42 +0200 Subject: [PATCH 4/6] Update commments in `Subscribe` --- moq-transport/src/message/subscribe.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index edc39eb9..902ea8d4 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -15,14 +15,16 @@ pub struct Subscribe { pub track_namespace: Tuple, pub track_name: String, - // Subscriber Priority + /// Subscriber Priority pub subscriber_priority: u8, + + /// Group Order pub group_order: GroupOrder, /// Filter type pub filter_type: FilterType, - /// The start/end group/object + /// Start and End depending on the Filter Type set pub start: Option, pub end_group: Option, From a8fad1dbfbdf394859680baf0c5cd40b7ed07006 Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 18:33:14 +0200 Subject: [PATCH 5/6] Add meaningful log traces for unimplemented features When a `Subscribe` or a `SubscribeUpdate` is received with a non-empty list of Subscribe Parameters, a warning is logged to note the use of the unimplemented feature. --- moq-transport/src/coding/params.rs | 4 ++++ moq-transport/src/session/publisher.rs | 5 +++++ moq-transport/src/session/subscribed.rs | 9 +++++++++ 3 files changed, 18 insertions(+) diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs index dad2ae10..5c7a81d7 100644 --- a/moq-transport/src/coding/params.rs +++ b/moq-transport/src/coding/params.rs @@ -73,4 +73,8 @@ impl Params { Ok(None) } } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index e1b97354..6fc5677a 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -262,6 +262,11 @@ impl Publisher { subscribed.recv_subscribe_update(msg)?; Ok(()) } else { + // Non-existent Subscribe ID, we should close the Session as a + // 'Protocol Violation'; however, it is not implemented yet, + // thus we are sending an internal session error instead. + Err(SessionError::Internal) + } } fn recv_track_status_request( diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index e15c9035..5c20f7d1 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -73,6 +73,11 @@ impl Subscribed { alias: msg.track_alias, }; + if !msg.params.is_empty() { + // TODO: handle subscribe parameters + log::warn!("subscription parameters are not supported"); + } + let send = Self { publisher, state: send, @@ -417,6 +422,10 @@ impl SubscribedRecv { }, }; + if !msg.params.is_empty() { + // TODO: handle subscribe parameters + log::warn!("subscription parameters are not supported"); + } } Ok(()) From 1f090d04d14fb4afa099987040da3afd81ffdae7 Mon Sep 17 00:00:00 2001 From: Zotyamester Date: Wed, 2 Apr 2025 19:00:22 +0200 Subject: [PATCH 6/6] cargo fmt --- moq-sub/src/media.rs | 19 ++++-- moq-transport/src/message/subscribe_update.rs | 8 ++- moq-transport/src/serve/subgroup.rs | 6 +- moq-transport/src/session/publisher.rs | 5 +- moq-transport/src/session/subscribe.rs | 23 +++++-- moq-transport/src/session/subscribed.rs | 68 +++++++++++++------ moq-transport/src/session/subscriber.rs | 22 ++++-- 7 files changed, 107 insertions(+), 44 deletions(-) diff --git a/moq-sub/src/media.rs b/moq-sub/src/media.rs index 9044f6f0..96d3da69 100644 --- a/moq-sub/src/media.rs +++ b/moq-sub/src/media.rs @@ -43,9 +43,12 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track, SubscribeFilter::LatestObject).await.unwrap_or_else(|err| { - warn!("failed to subscribe to init track: {err:?}"); - }); + subscriber + .subscribe(track, SubscribeFilter::LatestObject) + .await + .unwrap_or_else(|err| { + warn!("failed to subscribe to init track: {err:?}"); + }); }); let track = self @@ -101,10 +104,12 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track, SubscribeFilter::LatestObject) - .await.unwrap_or_else(|err| { - warn!("failed to subscribe to track: {err:?}"); - }); + subscriber + .subscribe(track, SubscribeFilter::LatestObject) + .await + .unwrap_or_else(|err| { + warn!("failed to subscribe to track: {err:?}"); + }); }); tracks.push(self.broadcast.subscribe(&name).context("no track")?); diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs index abe6204f..dba9deb7 100644 --- a/moq-transport/src/message/subscribe_update.rs +++ b/moq-transport/src/message/subscribe_update.rs @@ -26,7 +26,13 @@ impl SubscribeUpdate { let (start, end_group) = match filter { SubscribeFilter::AbsoluteStart(start) => (start, 0), SubscribeFilter::AbsoluteRange(start, end_group) => (start, end_group), - SubscribeFilter::LatestObject => (SubscribePair { group: 0, object: 0 }, 0), + SubscribeFilter::LatestObject => ( + SubscribePair { + group: 0, + object: 0, + }, + 0, + ), }; let update = SubscribeUpdate { diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index f6382e6c..25299761 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -310,7 +310,11 @@ impl SubgroupWriter { /// is invoked at the original publisher and the GroupIDs are sequential). /// /// BAD STUFF will happen if the size is wrong; this is an advanced feature. - pub fn create(&mut self, size: usize, id: Option) -> Result { + pub fn create( + &mut self, + size: usize, + id: Option, + ) -> Result { let (writer, reader) = SubgroupObject { group: self.info.clone(), object_id: id.unwrap_or(self.next), diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 6fc5677a..7f95dac3 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -254,10 +254,7 @@ impl Publisher { Ok(()) } - fn recv_subscribe_update( - &mut self, - msg: message::SubscribeUpdate, - ) -> Result<(), SessionError> { + fn recv_subscribe_update(&mut self, msg: message::SubscribeUpdate) -> Result<(), SessionError> { if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { subscribed.recv_subscribe_update(msg)?; Ok(()) diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index a1262e24..7bc25f14 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -46,14 +46,16 @@ pub struct Subscribe { pub enum SubscribeFilter { AbsoluteStart(SubscribePair), AbsoluteRange(SubscribePair, u64), - LatestObject + LatestObject, } impl SubscribeFilter { - fn unwrap(self) -> (FilterType, Option, Option) { + fn unwrap(self) -> (FilterType, Option, Option) { match self { SubscribeFilter::AbsoluteStart(start) => (FilterType::AbsoluteStart, Some(start), None), - SubscribeFilter::AbsoluteRange(start, end_group) => (FilterType::AbsoluteRange, Some(start), Some(end_group)), + SubscribeFilter::AbsoluteRange(start, end_group) => { + (FilterType::AbsoluteRange, Some(start), Some(end_group)) + } SubscribeFilter::LatestObject => (FilterType::LatestObject, None, None), } } @@ -63,11 +65,20 @@ impl From<&message::Subscribe> for SubscribeFilter { fn from(subscribe: &message::Subscribe) -> Self { match subscribe.filter_type { FilterType::AbsoluteStart => Self::AbsoluteStart( - subscribe.start.clone().expect("AbsoluteStart, but no StartGroup nor StartObject") + subscribe + .start + .clone() + .expect("AbsoluteStart, but no StartGroup nor StartObject"), ), FilterType::AbsoluteRange => Self::AbsoluteRange( - subscribe.start.clone().expect("AbsoluteRange, but no StartGroup nor StartObject"), - subscribe.end_group.clone().expect("AbsoluteRange, but no EndGroup"), + subscribe + .start + .clone() + .expect("AbsoluteRange, but no StartGroup nor StartObject"), + subscribe + .end_group + .clone() + .expect("AbsoluteRange, but no EndGroup"), ), FilterType::LatestObject => Self::LatestObject, } diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 5c20f7d1..ae2ef5cf 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -59,12 +59,11 @@ pub struct Subscribed { impl Subscribed { pub(super) fn new(publisher: Publisher, msg: message::Subscribe) -> (Self, SubscribedRecv) { - let (send, recv) = State::new( - SubscribedState { - filter: SubscribeFilter::from(&msg), - ..Default::default() - } - ).split(); + let (send, recv) = State::new(SubscribedState { + filter: SubscribeFilter::from(&msg), + ..Default::default() + }) + .split(); let info = SubscribeInfo { id: msg.id, @@ -106,9 +105,9 @@ impl Subscribed { .lock_mut() .ok_or(ServeError::Cancel)? .max_group_id = match latest { - None => None, - Some(latest) => Some(latest.0), - }; + None => None, + Some(latest) => Some(latest.0), + }; self.publisher.send_message(message::SubscribeOk { id: self.info.id, @@ -194,7 +193,10 @@ impl Drop for Subscribed { impl Subscribed { async fn serve_track(&mut self, mut track: serve::StreamReader) -> Result<(), SessionError> { let mut stream = self.publisher.open_uni().await?; - self.state.lock_mut().ok_or(ServeError::Done)?.increment_stream_count()?; + self.state + .lock_mut() + .ok_or(ServeError::Done)? + .increment_stream_count()?; // TODO figure out u32 vs u64 priority stream.set_priority(track.priority as i32); @@ -313,8 +315,10 @@ impl Subscribed { } let mut stream = publisher.open_uni().await?; - state.lock_mut().ok_or(ServeError::Done)?.increment_stream_count()?; - + state + .lock_mut() + .ok_or(ServeError::Done)? + .increment_stream_count()?; // TODO figure out u32 vs u64 priority stream.set_priority(subgroup.priority as i32); @@ -405,21 +409,43 @@ impl SubscribedRecv { // if the SUBSCRIBE_UPDATE violates either rule [...] use SubscribeFilter::*; match (&state.filter, &filter) { - (AbsoluteStart(start_old), AbsoluteStart(start_new)) - if start_old <= start_new => state.filter = filter, + (AbsoluteStart(start_old), AbsoluteStart(start_new)) if start_old <= start_new => { + state.filter = filter + } (AbsoluteStart(start_old), AbsoluteRange(start_new, end_group_new)) - if start_old <= start_new && start_new.group <= *end_group_new => state.filter = filter, + if start_old <= start_new && start_new.group <= *end_group_new => + { + state.filter = filter + } (AbsoluteStart(_), LatestObject) => state.filter = filter, - (AbsoluteRange(start_old, end_group_old), AbsoluteRange(start_new, end_group_new)) - if start_old <= start_new && start_new.group <= *end_group_new && end_group_new <= end_group_old => state.filter = filter, + ( + AbsoluteRange(start_old, end_group_old), + AbsoluteRange(start_new, end_group_new), + ) if start_old <= start_new + && start_new.group <= *end_group_new + && end_group_new <= end_group_old => + { + state.filter = filter + } (LatestObject, AbsoluteStart(start_new)) - if state.max_group_id.is_none() || start_new.group >= state.max_group_id.unwrap() => state.filter = filter, + if state.max_group_id.is_none() + || start_new.group >= state.max_group_id.unwrap() => + { + state.filter = filter + } (LatestObject, AbsoluteRange(start_new, end_group_new)) - if (state.max_group_id.is_none() || start_new.group >= state.max_group_id.unwrap()) && start_new.group <= *end_group_new => state.filter = filter, + if (state.max_group_id.is_none() + || start_new.group >= state.max_group_id.unwrap()) + && start_new.group <= *end_group_new => + { + state.filter = filter + } (LatestObject, LatestObject) => state.filter = filter, _ => { - return Err(ServeError::Internal("narrowing subscribe update".to_string())); - }, + return Err(ServeError::Internal( + "narrowing subscribe update".to_string(), + )); + } }; if !msg.params.is_empty() { diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index b10f7eb2..fdbac035 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -1,5 +1,7 @@ use std::{ - collections::{hash_map, HashMap}, io, sync::{atomic, Arc, Mutex} + collections::{hash_map, HashMap}, + io, + sync::{atomic, Arc, Mutex}, }; use crate::{ @@ -12,7 +14,10 @@ use crate::{ use crate::watch::Queue; -use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeFilter, SubscribeRecv}; +use super::{ + Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeFilter, + SubscribeRecv, +}; // TODO remove Clone. #[derive(Clone)] @@ -53,7 +58,11 @@ impl Subscriber { self.announced_queue.pop().await } - pub async fn subscribe(&mut self, track: serve::TrackWriter, filter: SubscribeFilter) -> Result<(), ServeError> { + pub async fn subscribe( + &mut self, + track: serve::TrackWriter, + filter: SubscribeFilter, + ) -> Result<(), ServeError> { let id = self.subscribe_next.fetch_add(1, atomic::Ordering::Relaxed); let (send, recv) = Subscribe::new(self.clone(), id, track, filter); @@ -62,7 +71,12 @@ impl Subscriber { send.closed().await } - pub fn subscribe_update(&mut self, id: u64, filter: SubscribeFilter, priority: u8) -> Result<(), ServeError> { + pub fn subscribe_update( + &mut self, + id: u64, + filter: SubscribeFilter, + priority: u8, + ) -> Result<(), ServeError> { let _update = SubscribeUpdate::new(self.clone(), id, filter, priority); log::trace!("sent subscribe update");