diff --git a/moq-clock-ietf/src/clock.rs b/moq-clock-ietf/src/clock.rs index 0dc6ce06..67f9cae5 100644 --- a/moq-clock-ietf/src/clock.rs +++ b/moq-clock-ietf/src/clock.rs @@ -19,8 +19,8 @@ impl Publisher { let start = Utc::now(); let mut now = start; - // Just for fun, don't start at zero. - let mut sequence = start.minute(); + // For better reproducibility and easier testing, start at 0. + let mut sequence = 0; loop { let segment = self diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs index 6e38a94e..3549f64d 100644 --- a/moq-clock-ietf/src/main.rs +++ b/moq-clock-ietf/src/main.rs @@ -10,7 +10,7 @@ mod clock; use moq_transport::{ coding::Tuple, serve, - session::{Publisher, Subscriber}, + session::{Publisher, SubscribeFilter, Subscriber}, }; #[derive(Parser, Clone)] @@ -93,7 +93,7 @@ async fn main() -> anyhow::Result<()> { tokio::select! { res = session.run() => res.context("session error")?, res = clock.run() => res.context("clock error")?, - res = subscriber.subscribe(prod) => res.context("failed to subscribe to track")?, + res = subscriber.subscribe(prod, SubscribeFilter::LatestObject) => res.context("failed to subscribe to track")?, } } diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 52c57e76..0ed69d7f 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -2,7 +2,7 @@ use anyhow::Context; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_transport::{ serve::Tracks, - session::{Announced, SessionError, Subscriber}, + session::{Announced, SessionError, SubscribeFilter, Subscriber}, }; use crate::{Api, Locals, Producer}; @@ -96,7 +96,7 @@ impl Consumer { let info = track.clone(); log::info!("forwarding subscribe: {:?}", info); - if let Err(err) = remote.subscribe(track).await { + if let Err(err) = remote.subscribe(track, SubscribeFilter::LatestObject).await { log::warn!("failed forwarding subscribe: {:?}, error: {}", info, err) } diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index b64706a0..7769dc77 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -12,6 +12,7 @@ use futures::StreamExt; use moq_native_ietf::quic; use moq_transport::coding::Tuple; use moq_transport::serve::{Track, TrackReader, TrackWriter}; +use moq_transport::session::SubscribeFilter; use moq_transport::watch::State; use url::Url; @@ -233,7 +234,7 @@ impl RemoteProducer { let mut subscriber = subscriber.clone(); tasks.push(async move { - if let Err(err) = subscriber.subscribe(track).await { + if let Err(err) = subscriber.subscribe(track, SubscribeFilter::LatestObject).await { log::warn!("failed serving track: {:?}, error: {}", info, err); } }); diff --git a/moq-sub/src/media.rs b/moq-sub/src/media.rs index c33399c3..96d3da69 100644 --- a/moq-sub/src/media.rs +++ b/moq-sub/src/media.rs @@ -6,7 +6,7 @@ use moq_transport::serve::{ SubgroupObjectReader, SubgroupReader, TrackReader, TrackReaderMode, Tracks, TracksReader, TracksWriter, }; -use moq_transport::session::Subscriber; +use moq_transport::session::{SubscribeFilter, Subscriber}; use mp4::ReadBox; use tokio::{ io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}, @@ -43,9 +43,12 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track).await.unwrap_or_else(|err| { - warn!("failed to subscribe to init track: {err:?}"); - }); + subscriber + .subscribe(track, SubscribeFilter::LatestObject) + .await + .unwrap_or_else(|err| { + warn!("failed to subscribe to init track: {err:?}"); + }); }); let track = self @@ -101,9 +104,12 @@ impl Media { let mut subscriber = self.subscriber.clone(); tokio::task::spawn(async move { - subscriber.subscribe(track).await.unwrap_or_else(|err| { - warn!("failed to subscribe to track: {err:?}"); - }); + subscriber + .subscribe(track, SubscribeFilter::LatestObject) + .await + .unwrap_or_else(|err| { + warn!("failed to subscribe to track: {err:?}"); + }); }); tracks.push(self.broadcast.subscribe(&name).context("no track")?); diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs index dad2ae10..5c7a81d7 100644 --- a/moq-transport/src/coding/params.rs +++ b/moq-transport/src/coding/params.rs @@ -73,4 +73,8 @@ impl Params { Ok(None) } } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } diff --git a/moq-transport/src/message/filter_type.rs b/moq-transport/src/message/filter_type.rs index e826443c..5aa79f8e 100644 --- a/moq-transport/src/message/filter_type.rs +++ b/moq-transport/src/message/filter_type.rs @@ -4,7 +4,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types #[derive(Clone, Debug, PartialEq)] pub enum FilterType { - LatestGroup = 0x1, LatestObject = 0x2, AbsoluteStart = 0x3, AbsoluteRange = 0x4, @@ -13,7 +12,6 @@ pub enum FilterType { impl Encode for FilterType { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { match self { - Self::LatestGroup => (0x1_u64).encode(w), Self::LatestObject => (0x2_u64).encode(w), Self::AbsoluteStart => (0x3_u64).encode(w), Self::AbsoluteRange => (0x4_u64).encode(w), @@ -24,7 +22,6 @@ impl Encode for FilterType { impl Decode for FilterType { fn decode(r: &mut R) -> Result { match u64::decode(r)? { - 0x01 => Ok(Self::LatestGroup), 0x02 => Ok(Self::LatestObject), 0x03 => Ok(Self::AbsoluteStart), 0x04 => Ok(Self::AbsoluteRange), diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 6dd32723..902ea8d4 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -15,16 +15,18 @@ pub struct Subscribe { pub track_namespace: Tuple, pub track_name: String, - // Subscriber Priority + /// Subscriber Priority pub subscriber_priority: u8, + + /// Group Order pub group_order: GroupOrder, /// Filter type pub filter_type: FilterType, - /// The start/end group/object. (TODO: Make optional) - pub start: Option, // TODO: Make optional - pub end: Option, // TODO: Make optional + /// Start and End depending on the Filter Type set + pub start: Option, + pub end_group: Option, /// Optional parameters pub params: Params, @@ -43,43 +45,28 @@ impl Decode for Subscribe { let filter_type = FilterType::decode(r)?; let start: Option; - let end: Option; + let end_group: Option; match filter_type { FilterType::AbsoluteStart => { if r.remaining() < 2 { return Err(DecodeError::MissingField); } start = Some(SubscribePair::decode(r)?); - end = None; + end_group = None; } FilterType::AbsoluteRange => { if r.remaining() < 4 { return Err(DecodeError::MissingField); } start = Some(SubscribePair::decode(r)?); - end = Some(SubscribePair::decode(r)?); + end_group = Some(u64::decode(r)?); } _ => { start = None; - end = None; - } - } - - if let Some(s) = &start { - // You can't have a start object without a start group. - if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - if let Some(e) = &end { - // You can't have an end object without an end group. - if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); + end_group = None; } } - // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. - let params = Params::decode(r)?; Ok(Self { @@ -91,7 +78,7 @@ impl Decode for Subscribe { group_order, filter_type, start, - end, + end_group, params, }) } @@ -113,14 +100,18 @@ impl Encode for Subscribe { if self.filter_type == FilterType::AbsoluteStart || self.filter_type == FilterType::AbsoluteRange { - if self.start.is_none() || self.end.is_none() { - return Err(EncodeError::MissingField); - } if let Some(start) = &self.start { start.encode(w)?; + } else { + return Err(EncodeError::MissingField); } - if let Some(end) = &self.end { - end.encode(w)?; + } + + if self.filter_type == FilterType::AbsoluteRange { + if let Some(end_group) = &self.end_group { + end_group.encode(w)?; + } else { + return Err(EncodeError::MissingField); } } @@ -130,17 +121,20 @@ impl Encode for Subscribe { } } -#[derive(Clone, Debug, PartialEq)] +// Note: When derived on structs, it will produce a lexicographic ordering +// based on the top-to-bottom declaration order of the struct’s members. +// Therefore, it will work just as expected. +#[derive(Clone, Debug, PartialEq, PartialOrd)] pub struct SubscribePair { - pub group: SubscribeLocation, - pub object: SubscribeLocation, + pub group: u64, + pub object: u64, } impl Decode for SubscribePair { fn decode(r: &mut R) -> Result { Ok(Self { - group: SubscribeLocation::decode(r)?, - object: SubscribeLocation::decode(r)?, + group: u64::decode(r)?, + object: u64::decode(r)?, }) } } @@ -152,50 +146,3 @@ impl Encode for SubscribePair { Ok(()) } } - -/// Signal where the subscription should begin, relative to the current cache. -#[derive(Clone, Debug, PartialEq)] -pub enum SubscribeLocation { - None, - Absolute(u64), - Latest(u64), - Future(u64), -} - -impl Decode for SubscribeLocation { - fn decode(r: &mut R) -> Result { - let kind = u64::decode(r)?; - - match kind { - 0 => Ok(Self::None), - 1 => Ok(Self::Absolute(u64::decode(r)?)), - 2 => Ok(Self::Latest(u64::decode(r)?)), - 3 => Ok(Self::Future(u64::decode(r)?)), - _ => Err(DecodeError::InvalidSubscribeLocation), - } - } -} - -impl Encode for SubscribeLocation { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.id().encode(w)?; - - match self { - Self::None => Ok(()), - Self::Absolute(val) => val.encode(w), - Self::Latest(val) => val.encode(w), - Self::Future(val) => val.encode(w), - } - } -} - -impl SubscribeLocation { - fn id(&self) -> u64 { - match self { - Self::None => 0, - Self::Absolute(_) => 1, - Self::Latest(_) => 2, - Self::Future(_) => 3, - } - } -} diff --git a/moq-transport/src/message/subscribe_done.rs b/moq-transport/src/message/subscribe_done.rs index a7f0c2e6..36656dc0 100644 --- a/moq-transport/src/message/subscribe_done.rs +++ b/moq-transport/src/message/subscribe_done.rs @@ -3,37 +3,32 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; /// Sent by the publisher to cleanly terminate a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeDone { - /// The ID for this subscription. + /// The ID for this subscription pub id: u64, /// The error code pub code: u64, + /// Number of DATA streams opened for this subscription + /// (1 << 62) - 1 if it cannot be determined + pub count: u64, + /// An optional error reason pub reason: String, - - /// The final group/object sent on this subscription. - pub last: Option<(u64, u64)>, } impl Decode for SubscribeDone { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; let code = u64::decode(r)?; + let count = u64::decode(r)?; let reason = String::decode(r)?; - Self::decode_remaining(r, 1)?; - let last = match r.get_u8() { - 0 => None, - 1 => Some((u64::decode(r)?, u64::decode(r)?)), - _ => return Err(DecodeError::InvalidValue), - }; - Ok(Self { id, code, + count, reason, - last, }) } } @@ -42,18 +37,9 @@ impl Encode for SubscribeDone { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w)?; self.code.encode(w)?; + self.count.encode(w)?; self.reason.encode(w)?; - Self::encode_remaining(w, 1)?; - - if let Some((group, object)) = self.last { - w.put_u8(1); - group.encode(w)?; - object.encode(w)?; - } else { - w.put_u8(0); - } - Ok(()) } } diff --git a/moq-transport/src/message/subscribe_update.rs b/moq-transport/src/message/subscribe_update.rs index 94c749f9..dba9deb7 100644 --- a/moq-transport/src/message/subscribe_update.rs +++ b/moq-transport/src/message/subscribe_update.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple}; -use crate::message::subscribe::{SubscribeLocation, SubscribePair}; -use crate::message::FilterType; -use crate::message::GroupOrder; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::message::subscribe::SubscribePair; +use crate::session::{SubscribeFilter, Subscriber}; /// Sent by the subscriber to request all future objects for the given track. /// @@ -11,88 +10,61 @@ pub struct SubscribeUpdate { /// The subscription ID pub id: u64, - /// Track properties - pub track_alias: u64, // This alias is useless but part of the spec - pub track_namespace: Tuple, - pub track_name: String, + /// Start and End depending on the Filter Type set + pub start: SubscribePair, + pub end_group: u64, // Subscriber Priority pub subscriber_priority: u8, - pub group_order: GroupOrder, - - /// Filter type - pub filter_type: FilterType, - - /// The start/end group/object. (TODO: Make optional) - pub start: Option, // TODO: Make optional - pub end: Option, // TODO: Make optional /// Optional parameters pub params: Params, } +impl SubscribeUpdate { + pub fn new(mut subscriber: Subscriber, id: u64, filter: SubscribeFilter, priority: u8) -> Self { + let (start, end_group) = match filter { + SubscribeFilter::AbsoluteStart(start) => (start, 0), + SubscribeFilter::AbsoluteRange(start, end_group) => (start, end_group), + SubscribeFilter::LatestObject => ( + SubscribePair { + group: 0, + object: 0, + }, + 0, + ), + }; + + let update = SubscribeUpdate { + id, + start, + end_group, + subscriber_priority: priority, + params: Params::new(), + }; + + subscriber.send_message(update.clone()); + + update + } +} + impl Decode for SubscribeUpdate { fn decode(r: &mut R) -> Result { let id = u64::decode(r)?; - let track_alias = u64::decode(r)?; - let track_namespace = Tuple::decode(r)?; - let track_name = String::decode(r)?; + + let start = SubscribePair::decode(r)?; + let end_group = u64::decode(r)?; let subscriber_priority = u8::decode(r)?; - let group_order = GroupOrder::decode(r)?; - - let filter_type = FilterType::decode(r)?; - - let start: Option; - let end: Option; - match filter_type { - FilterType::AbsoluteStart => { - if r.remaining() < 2 { - return Err(DecodeError::MissingField); - } - start = Some(SubscribePair::decode(r)?); - end = None; - } - FilterType::AbsoluteRange => { - if r.remaining() < 4 { - return Err(DecodeError::MissingField); - } - start = Some(SubscribePair::decode(r)?); - end = Some(SubscribePair::decode(r)?); - } - _ => { - start = None; - end = None; - } - } - - if let Some(s) = &start { - // You can't have a start object without a start group. - if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - if let Some(e) = &end { - // You can't have an end object without an end group. - if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None { - return Err(DecodeError::InvalidSubscribeLocation); - } - } - - // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. let params = Params::decode(r)?; Ok(Self { id, - track_alias, - track_namespace, - track_name, - subscriber_priority, - group_order, - filter_type, start, - end, + end_group, + subscriber_priority, params, }) } @@ -101,28 +73,11 @@ impl Decode for SubscribeUpdate { impl Encode for SubscribeUpdate { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w)?; - self.track_alias.encode(w)?; - self.track_namespace.encode(w)?; - self.track_name.encode(w)?; + + self.start.encode(w)?; + self.end_group.encode(w)?; self.subscriber_priority.encode(w)?; - self.group_order.encode(w)?; - - self.filter_type.encode(w)?; - - if self.filter_type == FilterType::AbsoluteStart - || self.filter_type == FilterType::AbsoluteRange - { - if self.start.is_none() || self.end.is_none() { - return Err(EncodeError::MissingField); - } - if let Some(start) = &self.start { - start.encode(w)?; - } - if let Some(end) = &self.end { - end.encode(w)?; - } - } self.params.encode(w)?; diff --git a/moq-transport/src/serve/subgroup.rs b/moq-transport/src/serve/subgroup.rs index d3fa889e..25299761 100644 --- a/moq-transport/src/serve/subgroup.rs +++ b/moq-transport/src/serve/subgroup.rs @@ -299,24 +299,33 @@ impl SubgroupWriter { /// Create the next object ID with the given payload. pub fn write(&mut self, payload: bytes::Bytes) -> Result<(), ServeError> { - let mut object = self.create(payload.len())?; + let mut object = self.create(payload.len(), None)?; object.write(payload)?; Ok(()) } /// Write an object over multiple writes. /// + /// The argument id is optional for convenience (e.g., when the function + /// is invoked at the original publisher and the GroupIDs are sequential). + /// /// BAD STUFF will happen if the size is wrong; this is an advanced feature. - pub fn create(&mut self, size: usize) -> Result { + pub fn create( + &mut self, + size: usize, + id: Option, + ) -> Result { let (writer, reader) = SubgroupObject { group: self.info.clone(), - object_id: self.next, + object_id: id.unwrap_or(self.next), status: ObjectStatus::Object, size, } .produce(); - self.next += 1; + if id.is_none() { + self.next += 1; + } let mut state = self.state.lock_mut().ok_or(ServeError::Cancel)?; state.objects.push(reader); @@ -392,13 +401,23 @@ impl SubgroupReader { } pub async fn next(&mut self) -> Result, ServeError> { + self._next(true).await + } + + pub async fn peek(&mut self) -> Result, ServeError> { + self._next(false).await + } + + async fn _next(&mut self, inc: bool) -> Result, ServeError> { loop { { let state = self.state.lock(); if self.index < state.objects.len() { let object = state.objects[self.index].clone(); - self.index += 1; + if inc { + self.index += 1; + } return Ok(Some(object)); } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index aad7203f..7f95dac3 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -254,12 +254,16 @@ impl Publisher { Ok(()) } - fn recv_subscribe_update( - &mut self, - _msg: message::SubscribeUpdate, - ) -> Result<(), SessionError> { - // TODO: Implement updating subscriptions. - Err(SessionError::Internal) + fn recv_subscribe_update(&mut self, msg: message::SubscribeUpdate) -> Result<(), SessionError> { + if let Some(subscribed) = self.subscribed.lock().unwrap().get_mut(&msg.id) { + subscribed.recv_subscribe_update(msg)?; + Ok(()) + } else { + // Non-existent Subscribe ID, we should close the Session as a + // 'Protocol Violation'; however, it is not implemented yet, + // thus we are sending an internal session error instead. + Err(SessionError::Internal) + } } fn recv_track_status_request( diff --git a/moq-transport/src/session/subscribe.rs b/moq-transport/src/session/subscribe.rs index 620ab09a..7bc25f14 100644 --- a/moq-transport/src/session/subscribe.rs +++ b/moq-transport/src/session/subscribe.rs @@ -3,7 +3,7 @@ use std::ops; use crate::{ coding::Tuple, data, - message::{self, FilterType, GroupOrder, SubscribeLocation, SubscribePair}, + message::{self, FilterType, GroupOrder, SubscribePair}, serve::{self, ServeError, TrackWriter, TrackWriterMode}, }; @@ -13,8 +13,10 @@ use super::Subscriber; #[derive(Debug, Clone)] pub struct SubscribeInfo { + pub id: u64, pub namespace: Tuple, pub name: String, + pub alias: u64, } struct SubscribeState { @@ -36,17 +38,73 @@ impl Default for SubscribeState { pub struct Subscribe { state: State, subscriber: Subscriber, - id: u64, pub info: SubscribeInfo, } +#[derive(Debug, Clone)] +pub enum SubscribeFilter { + AbsoluteStart(SubscribePair), + AbsoluteRange(SubscribePair, u64), + LatestObject, +} + +impl SubscribeFilter { + fn unwrap(self) -> (FilterType, Option, Option) { + match self { + SubscribeFilter::AbsoluteStart(start) => (FilterType::AbsoluteStart, Some(start), None), + SubscribeFilter::AbsoluteRange(start, end_group) => { + (FilterType::AbsoluteRange, Some(start), Some(end_group)) + } + SubscribeFilter::LatestObject => (FilterType::LatestObject, None, None), + } + } +} + +impl From<&message::Subscribe> for SubscribeFilter { + fn from(subscribe: &message::Subscribe) -> Self { + match subscribe.filter_type { + FilterType::AbsoluteStart => Self::AbsoluteStart( + subscribe + .start + .clone() + .expect("AbsoluteStart, but no StartGroup nor StartObject"), + ), + FilterType::AbsoluteRange => Self::AbsoluteRange( + subscribe + .start + .clone() + .expect("AbsoluteRange, but no StartGroup nor StartObject"), + subscribe + .end_group + .clone() + .expect("AbsoluteRange, but no EndGroup"), + ), + FilterType::LatestObject => Self::LatestObject, + } + } +} + +impl From<&message::SubscribeUpdate> for SubscribeFilter { + fn from(update: &message::SubscribeUpdate) -> Self { + if update.end_group != 0 { + Self::AbsoluteRange(update.start.clone(), update.end_group - 1) + } else if !(update.start.group == 0 && update.start.object == 0) { + Self::AbsoluteStart(update.start.clone()) // A value of 0 means the subscription is open-ended. + } else { + Self::LatestObject // If starts from the beginning and is open-ended, it must be LatestObject. + } + } +} + impl Subscribe { pub(super) fn new( mut subscriber: Subscriber, id: u64, track: TrackWriter, + filter: SubscribeFilter, ) -> (Subscribe, SubscribeRecv) { + let (filter_type, start, end_group) = filter.clone().unwrap(); subscriber.send_message(message::Subscribe { id, track_alias: id, @@ -55,22 +113,17 @@ impl Subscribe { // TODO add prioritization logic on the publisher side subscriber_priority: 127, // default to mid value, see: https://github.com/moq-wg/moq-transport/issues/504 group_order: GroupOrder::Publisher, // defer to publisher send order - filter_type: FilterType::LatestGroup, - // TODO add these to the publisher. - start: Some(SubscribePair { - group: SubscribeLocation::Latest(0), - object: SubscribeLocation::Absolute(0), - }), - end: Some(SubscribePair { - group: SubscribeLocation::None, - object: SubscribeLocation::None, - }), + filter_type, + start, + end_group, params: Default::default(), }); let info = SubscribeInfo { + id, namespace: track.namespace.clone(), name: track.name.clone(), + alias: id, }; let (send, recv) = State::default().split(); @@ -78,7 +131,6 @@ impl Subscribe { let send = Subscribe { state: send, subscriber, - id, info, }; @@ -109,7 +161,7 @@ impl Subscribe { impl Drop for Subscribe { fn drop(&mut self) { self.subscriber - .send_message(message::Unsubscribe { id: self.id }); + .send_message(message::Unsubscribe { id: self.info.id }); } } diff --git a/moq-transport/src/session/subscribed.rs b/moq-transport/src/session/subscribed.rs index 04dd95ac..ae2ef5cf 100644 --- a/moq-transport/src/session/subscribed.rs +++ b/moq-transport/src/session/subscribed.rs @@ -4,26 +4,35 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use crate::coding::Encode; +use crate::message::{SubscribePair, SubscribeUpdate}; use crate::serve::{ServeError, TrackReaderMode}; use crate::watch::State; use crate::{data, message, serve}; -use super::{Publisher, SessionError, SubscribeInfo, Writer}; +use super::{Publisher, SessionError, SubscribeFilter, SubscribeInfo, Writer}; #[derive(Debug)] struct SubscribedState { - max_group_id: Option<(u64, u64)>, + max_group_id: Option, + stream_count: u64, + filter: SubscribeFilter, + priority: u8, closed: Result<(), ServeError>, } impl SubscribedState { - fn update_max_group_id(&mut self, group_id: u64, object_id: u64) -> Result<(), ServeError> { - if let Some((max_group, max_object)) = self.max_group_id { - if group_id >= max_group && object_id >= max_object { - self.max_group_id = Some((group_id, object_id)); + fn update_max_group_id(&mut self, group_id: u64) -> Result<(), ServeError> { + if let Some(max_group_id) = self.max_group_id { + if group_id > max_group_id { + self.max_group_id = Some(group_id); } } + Ok(()) + } + fn increment_stream_count(&mut self) -> Result<(), ServeError> { + self.stream_count += 1; + Ok(()) } } @@ -32,6 +41,9 @@ impl Default for SubscribedState { fn default() -> Self { Self { max_group_id: None, + stream_count: 0, + filter: SubscribeFilter::LatestObject, + priority: 127, closed: Ok(()), } } @@ -40,7 +52,6 @@ impl Default for SubscribedState { pub struct Subscribed { publisher: Publisher, state: State, - msg: message::Subscribe, ok: bool, pub info: SubscribeInfo, @@ -48,16 +59,27 @@ pub struct Subscribed { impl Subscribed { pub(super) fn new(publisher: Publisher, msg: message::Subscribe) -> (Self, SubscribedRecv) { - let (send, recv) = State::default().split(); + let (send, recv) = State::new(SubscribedState { + filter: SubscribeFilter::from(&msg), + ..Default::default() + }) + .split(); + let info = SubscribeInfo { + id: msg.id, namespace: msg.track_namespace.clone(), name: msg.track_name.clone(), + alias: msg.track_alias, }; + if !msg.params.is_empty() { + // TODO: handle subscribe parameters + log::warn!("subscription parameters are not supported"); + } + let send = Self { publisher, state: send, - msg, info, ok: false, }; @@ -82,10 +104,13 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Cancel)? - .max_group_id = latest; + .max_group_id = match latest { + None => None, + Some(latest) => Some(latest.0), + }; self.publisher.send_message(message::SubscribeOk { - id: self.msg.id, + id: self.info.id, expires: None, group_order: message::GroupOrder::Descending, // TODO: resolve correct value from publisher / subscriber prefs latest, @@ -144,19 +169,19 @@ impl Drop for Subscribed { .err() .cloned() .unwrap_or(ServeError::Done); - let max_group_id = state.max_group_id; + let stream_count = state.stream_count; drop(state); // Important to avoid a deadlock if self.ok { self.publisher.send_message(message::SubscribeDone { - id: self.msg.id, - last: max_group_id, + id: self.info.id, code: err.code(), + count: stream_count, reason: err.to_string(), }); } else { self.publisher.send_message(message::SubscribeError { - id: self.msg.id, + id: self.info.id, alias: 0, code: err.code(), reason: err.to_string(), @@ -168,6 +193,10 @@ impl Drop for Subscribed { impl Subscribed { async fn serve_track(&mut self, mut track: serve::StreamReader) -> Result<(), SessionError> { let mut stream = self.publisher.open_uni().await?; + self.state + .lock_mut() + .ok_or(ServeError::Done)? + .increment_stream_count()?; // TODO figure out u32 vs u64 priority stream.set_priority(track.priority as i32); @@ -175,8 +204,8 @@ impl Subscribed { let mut writer = Writer::new(stream); let header: data::Header = data::TrackHeader { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, publisher_priority: track.priority, } .into(); @@ -193,11 +222,10 @@ impl Subscribed { size: object.size, status: object.status, }; - self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(object.group_id, object.object_id)?; + .update_max_group_id(object.group_id)?; writer.encode(&header).await?; @@ -227,8 +255,8 @@ impl Subscribed { res = subgroups.next(), if done.is_none() => match res { Ok(Some(subgroup)) => { let header = data::SubgroupHeader { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, group_id: subgroup.group_id, subgroup_id: subgroup.subgroup_id, publisher_priority: subgroup.priority, @@ -260,7 +288,37 @@ impl Subscribed { mut publisher: Publisher, state: State, ) -> Result<(), SessionError> { + log::trace!("serving group {}", subgroup.group_id); + + let filter = state.lock().filter.clone(); + if let SubscribeFilter::AbsoluteStart(SubscribePair { + group: group_id, + object: object_id, + }) = filter + { + if subgroup.group_id < group_id { + log::trace!("skipping group {}", subgroup.group_id); + return Ok(()); + } else if subgroup.group_id == group_id { + while let Some(object) = subgroup.peek().await? { + if object.object_id >= object_id { + break; + } + log::trace!( + "skipping object {} of group {}", + object.object_id, + subgroup.group_id + ); + subgroup.next().await?; + } + } + } + let mut stream = publisher.open_uni().await?; + state + .lock_mut() + .ok_or(ServeError::Done)? + .increment_stream_count()?; // TODO figure out u32 vs u64 priority stream.set_priority(subgroup.priority as i32); @@ -284,7 +342,7 @@ impl Subscribed { state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(subgroup.group_id, object.object_id)?; + .update_max_group_id(subgroup.group_id)?; log::trace!("sent group object: {:?}", header); @@ -305,8 +363,8 @@ impl Subscribed { ) -> Result<(), SessionError> { while let Some(datagram) = datagrams.read().await? { let datagram = data::Datagram { - subscribe_id: self.msg.id, - track_alias: self.msg.track_alias, + subscribe_id: self.info.id, + track_alias: self.info.alias, group_id: datagram.group_id, object_id: datagram.object_id, publisher_priority: datagram.priority, @@ -324,7 +382,7 @@ impl Subscribed { self.state .lock_mut() .ok_or(ServeError::Done)? - .update_max_group_id(datagram.group_id, datagram.object_id)?; + .update_max_group_id(datagram.group_id)?; } Ok(()) @@ -336,6 +394,68 @@ pub(super) struct SubscribedRecv { } impl SubscribedRecv { + pub fn recv_subscribe_update(&mut self, msg: SubscribeUpdate) -> Result<(), ServeError> { + let state = self.state.lock(); + + if let Some(mut state) = state.into_mut() { + log::trace!("received subscribe update"); + + state.priority = msg.subscriber_priority; + + let filter = SubscribeFilter::from(&msg); + // Check for assumptions: + // - Subscriptions can only become more narrow, not wider + // - A publisher SHOULD close the Session as a 'Protocol Violation' + // if the SUBSCRIBE_UPDATE violates either rule [...] + use SubscribeFilter::*; + match (&state.filter, &filter) { + (AbsoluteStart(start_old), AbsoluteStart(start_new)) if start_old <= start_new => { + state.filter = filter + } + (AbsoluteStart(start_old), AbsoluteRange(start_new, end_group_new)) + if start_old <= start_new && start_new.group <= *end_group_new => + { + state.filter = filter + } + (AbsoluteStart(_), LatestObject) => state.filter = filter, + ( + AbsoluteRange(start_old, end_group_old), + AbsoluteRange(start_new, end_group_new), + ) if start_old <= start_new + && start_new.group <= *end_group_new + && end_group_new <= end_group_old => + { + state.filter = filter + } + (LatestObject, AbsoluteStart(start_new)) + if state.max_group_id.is_none() + || start_new.group >= state.max_group_id.unwrap() => + { + state.filter = filter + } + (LatestObject, AbsoluteRange(start_new, end_group_new)) + if (state.max_group_id.is_none() + || start_new.group >= state.max_group_id.unwrap()) + && start_new.group <= *end_group_new => + { + state.filter = filter + } + (LatestObject, LatestObject) => state.filter = filter, + _ => { + return Err(ServeError::Internal( + "narrowing subscribe update".to_string(), + )); + } + }; + + if !msg.params.is_empty() { + // TODO: handle subscribe parameters + log::warn!("subscription parameters are not supported"); + } + } + + Ok(()) + } pub fn recv_unsubscribe(&mut self) -> Result<(), ServeError> { let state = self.state.lock(); state.closed.clone()?; diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 8778b1b0..fdbac035 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -7,14 +7,17 @@ use std::{ use crate::{ coding::{Decode, Tuple}, data, - message::{self, Message}, + message::{self, Message, SubscribeUpdate}, serve::{self, ServeError}, setup, }; use crate::watch::Queue; -use super::{Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeRecv}; +use super::{ + Announced, AnnouncedRecv, Reader, Session, SessionError, Subscribe, SubscribeFilter, + SubscribeRecv, +}; // TODO remove Clone. #[derive(Clone)] @@ -55,16 +58,32 @@ impl Subscriber { self.announced_queue.pop().await } - pub async fn subscribe(&mut self, track: serve::TrackWriter) -> Result<(), ServeError> { + pub async fn subscribe( + &mut self, + track: serve::TrackWriter, + filter: SubscribeFilter, + ) -> Result<(), ServeError> { let id = self.subscribe_next.fetch_add(1, atomic::Ordering::Relaxed); - let (send, recv) = Subscribe::new(self.clone(), id, track); + let (send, recv) = Subscribe::new(self.clone(), id, track, filter); self.subscribes.lock().unwrap().insert(id, recv); send.closed().await } - pub(super) fn send_message>(&mut self, msg: M) { + pub fn subscribe_update( + &mut self, + id: u64, + filter: SubscribeFilter, + priority: u8, + ) -> Result<(), ServeError> { + let _update = SubscribeUpdate::new(self.clone(), id, filter, priority); + log::trace!("sent subscribe update"); + + Ok(()) + } + + pub fn send_message>(&mut self, msg: M) { let msg = msg.into(); // Remove our entry on terminal state. @@ -272,7 +291,7 @@ impl Subscriber { log::trace!("received group object: {:?}", object); let mut remain = object.size; - let mut object = group.create(object.size)?; + let mut object = group.create(object.size, Some(object.object_id))?; while remain > 0 { let data = reader