Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions moq-clock-ietf/src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ impl Publisher {
let start = Utc::now();
let mut now = start;

// Just for fun, don't start at zero.
let mut sequence = start.minute();
// For better reproducibility and easier testing, start at 0.
let mut sequence = 0;

loop {
let segment = self
Expand Down
4 changes: 2 additions & 2 deletions moq-clock-ietf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod clock;
use moq_transport::{
coding::Tuple,
serve,
session::{Publisher, Subscriber},
session::{Publisher, SubscribeFilter, Subscriber},
};

#[derive(Parser, Clone)]
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn main() -> anyhow::Result<()> {
tokio::select! {
res = session.run() => res.context("session error")?,
res = clock.run() => res.context("clock error")?,
res = subscriber.subscribe(prod) => res.context("failed to subscribe to track")?,
res = subscriber.subscribe(prod, SubscribeFilter::LatestObject) => res.context("failed to subscribe to track")?,
}
}

Expand Down
4 changes: 2 additions & 2 deletions moq-relay-ietf/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Context;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::{
serve::Tracks,
session::{Announced, SessionError, Subscriber},
session::{Announced, SessionError, SubscribeFilter, Subscriber},
};

use crate::{Api, Locals, Producer};
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Consumer {
let info = track.clone();
log::info!("forwarding subscribe: {:?}", info);

if let Err(err) = remote.subscribe(track).await {
if let Err(err) = remote.subscribe(track, SubscribeFilter::LatestObject).await {
log::warn!("failed forwarding subscribe: {:?}, error: {}", info, err)
}

Expand Down
3 changes: 2 additions & 1 deletion moq-relay-ietf/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::StreamExt;
use moq_native_ietf::quic;
use moq_transport::coding::Tuple;
use moq_transport::serve::{Track, TrackReader, TrackWriter};
use moq_transport::session::SubscribeFilter;
use moq_transport::watch::State;
use url::Url;

Expand Down Expand Up @@ -233,7 +234,7 @@ impl RemoteProducer {
let mut subscriber = subscriber.clone();

tasks.push(async move {
if let Err(err) = subscriber.subscribe(track).await {
if let Err(err) = subscriber.subscribe(track, SubscribeFilter::LatestObject).await {
log::warn!("failed serving track: {:?}, error: {}", info, err);
}
});
Expand Down
20 changes: 13 additions & 7 deletions moq-sub/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use moq_transport::serve::{
SubgroupObjectReader, SubgroupReader, TrackReader, TrackReaderMode, Tracks, TracksReader,
TracksWriter,
};
use moq_transport::session::Subscriber;
use moq_transport::session::{SubscribeFilter, Subscriber};
use mp4::ReadBox;
use tokio::{
io::{AsyncReadExt, AsyncWrite, AsyncWriteExt},
Expand Down Expand Up @@ -43,9 +43,12 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {

let mut subscriber = self.subscriber.clone();
tokio::task::spawn(async move {
subscriber.subscribe(track).await.unwrap_or_else(|err| {
warn!("failed to subscribe to init track: {err:?}");
});
subscriber
.subscribe(track, SubscribeFilter::LatestObject)
.await
.unwrap_or_else(|err| {
warn!("failed to subscribe to init track: {err:?}");
});
});

let track = self
Expand Down Expand Up @@ -101,9 +104,12 @@ impl<O: AsyncWrite + Send + Unpin + 'static> Media<O> {

let mut subscriber = self.subscriber.clone();
tokio::task::spawn(async move {
subscriber.subscribe(track).await.unwrap_or_else(|err| {
warn!("failed to subscribe to track: {err:?}");
});
subscriber
.subscribe(track, SubscribeFilter::LatestObject)
.await
.unwrap_or_else(|err| {
warn!("failed to subscribe to track: {err:?}");
});
});

tracks.push(self.broadcast.subscribe(&name).context("no track")?);
Expand Down
4 changes: 4 additions & 0 deletions moq-transport/src/coding/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ impl Params {
Ok(None)
}
}

pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
3 changes: 0 additions & 3 deletions moq-transport/src/message/filter_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError};
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-04.html#name-filter-types
#[derive(Clone, Debug, PartialEq)]
pub enum FilterType {
LatestGroup = 0x1,
LatestObject = 0x2,
AbsoluteStart = 0x3,
AbsoluteRange = 0x4,
Expand All @@ -13,7 +12,6 @@ pub enum FilterType {
impl Encode for FilterType {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
match self {
Self::LatestGroup => (0x1_u64).encode(w),
Self::LatestObject => (0x2_u64).encode(w),
Self::AbsoluteStart => (0x3_u64).encode(w),
Self::AbsoluteRange => (0x4_u64).encode(w),
Expand All @@ -24,7 +22,6 @@ impl Encode for FilterType {
impl Decode for FilterType {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
match u64::decode(r)? {
0x01 => Ok(Self::LatestGroup),
0x02 => Ok(Self::LatestObject),
0x03 => Ok(Self::AbsoluteStart),
0x04 => Ok(Self::AbsoluteRange),
Expand Down
109 changes: 28 additions & 81 deletions moq-transport/src/message/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ pub struct Subscribe {
pub track_namespace: Tuple,
pub track_name: String,

// Subscriber Priority
/// Subscriber Priority
pub subscriber_priority: u8,

/// Group Order
pub group_order: GroupOrder,

/// Filter type
pub filter_type: FilterType,

/// The start/end group/object. (TODO: Make optional)
pub start: Option<SubscribePair>, // TODO: Make optional
pub end: Option<SubscribePair>, // TODO: Make optional
/// Start and End depending on the Filter Type set
pub start: Option<SubscribePair>,
pub end_group: Option<u64>,

/// Optional parameters
pub params: Params,
Expand All @@ -43,43 +45,28 @@ impl Decode for Subscribe {
let filter_type = FilterType::decode(r)?;

let start: Option<SubscribePair>;
let end: Option<SubscribePair>;
let end_group: Option<u64>;
match filter_type {
FilterType::AbsoluteStart => {
if r.remaining() < 2 {
return Err(DecodeError::MissingField);
}
start = Some(SubscribePair::decode(r)?);
end = None;
end_group = None;
}
FilterType::AbsoluteRange => {
if r.remaining() < 4 {
return Err(DecodeError::MissingField);
}
start = Some(SubscribePair::decode(r)?);
end = Some(SubscribePair::decode(r)?);
end_group = Some(u64::decode(r)?);
}
_ => {
start = None;
end = None;
}
}

if let Some(s) = &start {
// You can't have a start object without a start group.
if s.group == SubscribeLocation::None && s.object != SubscribeLocation::None {
return Err(DecodeError::InvalidSubscribeLocation);
}
}
if let Some(e) = &end {
// You can't have an end object without an end group.
if e.group == SubscribeLocation::None && e.object != SubscribeLocation::None {
return Err(DecodeError::InvalidSubscribeLocation);
end_group = None;
}
}

// NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level.

let params = Params::decode(r)?;

Ok(Self {
Expand All @@ -91,7 +78,7 @@ impl Decode for Subscribe {
group_order,
filter_type,
start,
end,
end_group,
params,
})
}
Expand All @@ -113,14 +100,18 @@ impl Encode for Subscribe {
if self.filter_type == FilterType::AbsoluteStart
|| self.filter_type == FilterType::AbsoluteRange
{
if self.start.is_none() || self.end.is_none() {
return Err(EncodeError::MissingField);
}
if let Some(start) = &self.start {
start.encode(w)?;
} else {
return Err(EncodeError::MissingField);
}
if let Some(end) = &self.end {
end.encode(w)?;
}

if self.filter_type == FilterType::AbsoluteRange {
if let Some(end_group) = &self.end_group {
end_group.encode(w)?;
} else {
return Err(EncodeError::MissingField);
}
}

Expand All @@ -130,17 +121,20 @@ impl Encode for Subscribe {
}
}

#[derive(Clone, Debug, PartialEq)]
// Note: When derived on structs, it will produce a lexicographic ordering
// based on the top-to-bottom declaration order of the struct’s members.
// Therefore, it will work just as expected.
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct SubscribePair {
pub group: SubscribeLocation,
pub object: SubscribeLocation,
pub group: u64,
pub object: u64,
}

impl Decode for SubscribePair {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
Ok(Self {
group: SubscribeLocation::decode(r)?,
object: SubscribeLocation::decode(r)?,
group: u64::decode(r)?,
object: u64::decode(r)?,
})
}
}
Expand All @@ -152,50 +146,3 @@ impl Encode for SubscribePair {
Ok(())
}
}

/// Signal where the subscription should begin, relative to the current cache.
#[derive(Clone, Debug, PartialEq)]
pub enum SubscribeLocation {
None,
Absolute(u64),
Latest(u64),
Future(u64),
}

impl Decode for SubscribeLocation {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let kind = u64::decode(r)?;

match kind {
0 => Ok(Self::None),
1 => Ok(Self::Absolute(u64::decode(r)?)),
2 => Ok(Self::Latest(u64::decode(r)?)),
3 => Ok(Self::Future(u64::decode(r)?)),
_ => Err(DecodeError::InvalidSubscribeLocation),
}
}
}

impl Encode for SubscribeLocation {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.id().encode(w)?;

match self {
Self::None => Ok(()),
Self::Absolute(val) => val.encode(w),
Self::Latest(val) => val.encode(w),
Self::Future(val) => val.encode(w),
}
}
}

impl SubscribeLocation {
fn id(&self) -> u64 {
match self {
Self::None => 0,
Self::Absolute(_) => 1,
Self::Latest(_) => 2,
Self::Future(_) => 3,
}
}
}
30 changes: 8 additions & 22 deletions moq-transport/src/message/subscribe_done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,32 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError};
/// Sent by the publisher to cleanly terminate a Subscribe.
#[derive(Clone, Debug)]
pub struct SubscribeDone {
/// The ID for this subscription.
/// The ID for this subscription
pub id: u64,

/// The error code
pub code: u64,

/// Number of DATA streams opened for this subscription
/// (1 << 62) - 1 if it cannot be determined
pub count: u64,

/// An optional error reason
pub reason: String,

/// The final group/object sent on this subscription.
pub last: Option<(u64, u64)>,
}

impl Decode for SubscribeDone {
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
let id = u64::decode(r)?;
let code = u64::decode(r)?;
let count = u64::decode(r)?;
let reason = String::decode(r)?;

Self::decode_remaining(r, 1)?;
let last = match r.get_u8() {
0 => None,
1 => Some((u64::decode(r)?, u64::decode(r)?)),
_ => return Err(DecodeError::InvalidValue),
};

Ok(Self {
id,
code,
count,
reason,
last,
})
}
}
Expand All @@ -42,18 +37,9 @@ impl Encode for SubscribeDone {
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.id.encode(w)?;
self.code.encode(w)?;
self.count.encode(w)?;
self.reason.encode(w)?;

Self::encode_remaining(w, 1)?;

if let Some((group, object)) = self.last {
w.put_u8(1);
group.encode(w)?;
object.encode(w)?;
} else {
w.put_u8(0);
}

Ok(())
}
}
Loading