Skip to content

Commit 34cd7d9

Browse files
authored
Merge pull request #32 from englishm/me/draft-07
moq-transport draft-07 initial wire format updates
2 parents 06a8e2e + b35c68c commit 34cd7d9

File tree

11 files changed

+237
-4
lines changed

11 files changed

+237
-4
lines changed

moq-transport/src/message/fetch.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple};
2+
use crate::message::GroupOrder;
3+
4+
/// Sent by the subscriber to request to request a range
5+
/// of already published objects within a track.
6+
#[derive(Clone, Debug)]
7+
pub struct Fetch {
8+
/// The subscription ID
9+
pub id: u64,
10+
11+
/// Track properties
12+
pub track_namespace: Tuple,
13+
pub track_name: String,
14+
15+
/// Subscriber Priority
16+
pub subscriber_priority: u8,
17+
18+
pub group_order: GroupOrder,
19+
20+
/// The start/end group/object.
21+
pub start_group: u64,
22+
pub start_object: u64,
23+
pub end_group: u64,
24+
pub end_object: u64,
25+
26+
/// Optional parameters
27+
pub params: Params,
28+
}
29+
30+
impl Decode for Fetch {
31+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
32+
let id = u64::decode(r)?;
33+
34+
let track_namespace = Tuple::decode(r)?;
35+
let track_name = String::decode(r)?;
36+
37+
let subscriber_priority = u8::decode(r)?;
38+
39+
let group_order = GroupOrder::decode(r)?;
40+
41+
let start_group = u64::decode(r)?;
42+
let start_object = u64::decode(r)?;
43+
let end_group = u64::decode(r)?;
44+
let end_object = u64::decode(r)?;
45+
46+
let params = Params::decode(r)?;
47+
48+
Ok(Self {
49+
id,
50+
track_namespace,
51+
track_name,
52+
subscriber_priority,
53+
group_order,
54+
start_group,
55+
start_object,
56+
end_group,
57+
end_object,
58+
params,
59+
})
60+
}
61+
}
62+
63+
impl Encode for Fetch {
64+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
65+
self.id.encode(w)?;
66+
67+
self.track_namespace.encode(w)?;
68+
self.track_name.encode(w)?;
69+
70+
self.subscriber_priority.encode(w)?;
71+
72+
self.group_order.encode(w)?;
73+
74+
self.start_group.encode(w)?;
75+
self.start_object.encode(w)?;
76+
self.end_group.encode(w)?;
77+
self.end_object.encode(w)?;
78+
79+
self.params.encode(w)?;
80+
81+
Ok(())
82+
}
83+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2+
3+
/// A subscriber issues a FETCH_CANCEL message to a publisher indicating it is
4+
/// no longer interested in receiving Objects for the fetch specified by 'Subscribe ID'.
5+
#[derive(Clone, Debug)]
6+
pub struct FetchCancel {
7+
/// The subscription ID
8+
pub id: u64,
9+
}
10+
11+
impl Decode for FetchCancel {
12+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
13+
let id = u64::decode(r)?;
14+
Ok(Self { id })
15+
}
16+
}
17+
18+
impl Encode for FetchCancel {
19+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
20+
self.id.encode(w)?;
21+
22+
Ok(())
23+
}
24+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2+
3+
/// Sent by the server to indicate that the client should connect to a different server.
4+
#[derive(Clone, Debug)]
5+
pub struct FetchError {
6+
/// The ID for this subscription.
7+
pub id: u64,
8+
9+
/// An error code.
10+
pub code: u64,
11+
12+
/// An optional, human-readable reason.
13+
pub reason: String,
14+
}
15+
16+
impl Decode for FetchError {
17+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
18+
let id = u64::decode(r)?;
19+
20+
let code = u64::decode(r)?;
21+
let reason = String::decode(r)?;
22+
23+
Ok(Self { id, code, reason })
24+
}
25+
}
26+
27+
impl Encode for FetchError {
28+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
29+
self.id.encode(w)?;
30+
31+
self.code.encode(w)?;
32+
self.reason.encode(w)?;
33+
34+
Ok(())
35+
}
36+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params};
2+
3+
/// A publisher sends a FETCH_OK control message in response to successful fetches.
4+
#[derive(Clone, Debug)]
5+
pub struct FetchOk {
6+
/// The subscription ID
7+
pub id: u64,
8+
9+
/// Order groups will be delivered in
10+
pub group_order: u8,
11+
12+
/// True if all objects have been published on this track
13+
pub end_of_track: u8,
14+
15+
/// The largest Group ID available for this track (last if end_of_track)
16+
pub largest_group_id: u64,
17+
/// The largest Object ID available within the largest Group ID for this track (last if end_of_track)
18+
pub largest_object_id: u64,
19+
20+
/// Optional parameters
21+
pub params: Params,
22+
}
23+
24+
impl Decode for FetchOk {
25+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
26+
let id = u64::decode(r)?;
27+
28+
let group_order = u8::decode(r)?;
29+
30+
let end_of_track = u8::decode(r)?;
31+
32+
let largest_group_id = u64::decode(r)?;
33+
let largest_object_id = u64::decode(r)?;
34+
35+
let params = Params::decode(r)?;
36+
37+
Ok(Self {
38+
id,
39+
group_order,
40+
end_of_track,
41+
largest_group_id,
42+
largest_object_id,
43+
params,
44+
})
45+
}
46+
}
47+
48+
impl Encode for FetchOk {
49+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
50+
self.id.encode(w)?;
51+
52+
self.group_order.encode(w)?;
53+
54+
self.end_of_track.encode(w)?;
55+
56+
self.largest_group_id.encode(w)?;
57+
self.largest_object_id.encode(w)?;
58+
59+
self.params.encode(w)?;
60+
61+
Ok(())
62+
}
63+
}

moq-transport/src/message/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ mod announce;
3636
mod announce_cancel;
3737
mod announce_error;
3838
mod announce_ok;
39+
mod fetch;
40+
mod fetch_cancel;
41+
mod fetch_error;
42+
mod fetch_ok;
3943
mod filter_type;
4044
mod go_away;
4145
mod group_order;
@@ -60,6 +64,10 @@ pub use announce::*;
6064
pub use announce_cancel::*;
6165
pub use announce_error::*;
6266
pub use announce_ok::*;
67+
pub use fetch::*;
68+
pub use fetch_cancel::*;
69+
pub use fetch_error::*;
70+
pub use fetch_ok::*;
6371
pub use filter_type::*;
6472
pub use go_away::*;
6573
pub use group_order::*;
@@ -209,6 +217,12 @@ message_types! {
209217
SubscribeNamespaceOk = 0x12,
210218
SubscribeNamespaceError = 0x13,
211219
UnsubscribeNamespace = 0x14,
220+
221+
// FETCH family, sent by subscriber
222+
Fetch = 0x16,
223+
FetchCancel = 0x17,
224+
FetchOk = 0x18,
225+
FetchError = 0x19,
212226
}
213227

214228
/// Track Status Codes

moq-transport/src/message/publisher.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ publisher_msgs! {
5252
SubscribeDone,
5353
MaxSubscribeId,
5454
TrackStatus,
55+
FetchOk,
56+
FetchError,
5557
}

moq-transport/src/message/subscriber.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,6 @@ subscriber_msgs! {
5656
SubscribeNamespaceOk,
5757
SubscribeNamespaceError,
5858
UnsubscribeNamespace,
59+
Fetch,
60+
FetchCancel,
5961
}

moq-transport/src/session/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Session {
8181
let mut sender = Writer::new(control.0);
8282
let mut recver = Reader::new(control.1);
8383

84-
let versions: setup::Versions = [setup::Version::DRAFT_06].into();
84+
let versions: setup::Versions = [setup::Version::DRAFT_07].into();
8585

8686
let client = setup::Client {
8787
role,
@@ -134,10 +134,10 @@ impl Session {
134134
let client: setup::Client = recver.decode().await?;
135135
log::debug!("received client SETUP: {:?}", client);
136136

137-
if !client.versions.contains(&setup::Version::DRAFT_06) {
137+
if !client.versions.contains(&setup::Version::DRAFT_07) {
138138
return Err(SessionError::Version(
139139
client.versions,
140-
[setup::Version::DRAFT_06].into(),
140+
[setup::Version::DRAFT_07].into(),
141141
));
142142
}
143143

@@ -162,7 +162,7 @@ impl Session {
162162

163163
let server = setup::Server {
164164
role,
165-
version: setup::Version::DRAFT_06,
165+
version: setup::Version::DRAFT_07,
166166
params: Default::default(),
167167
};
168168

moq-transport/src/session/publisher.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ impl Publisher {
183183
message::Subscriber::SubscribeNamespaceOk(_msg) => unimplemented!(),
184184
message::Subscriber::SubscribeNamespaceError(_msg) => unimplemented!(),
185185
message::Subscriber::UnsubscribeNamespace(_msg) => unimplemented!(),
186+
// TODO: Implement fetch messages
187+
message::Subscriber::Fetch(_msg) => todo!(),
188+
message::Subscriber::FetchCancel(_msg) => todo!(),
186189
};
187190

188191
if let Err(err) = res {

moq-transport/src/session/subscriber.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ impl Subscriber {
8787
message::Publisher::SubscribeDone(msg) => self.recv_subscribe_done(msg),
8888
message::Publisher::MaxSubscribeId(msg) => self.recv_max_subscribe_id(msg),
8989
message::Publisher::TrackStatus(msg) => self.recv_track_status(msg),
90+
// TODO: Implement fetch messages
91+
message::Publisher::FetchOk(_msg) => todo!(),
92+
message::Publisher::FetchError(_msg) => todo!(),
9093
};
9194

9295
if let Err(SessionError::Serve(err)) = res {

0 commit comments

Comments
 (0)