Skip to content

Commit 28ef75a

Browse files
authored
Merge pull request #26 from mcintyrehh/draft-06
MOQT-v6 Support
2 parents 6d00c66 + f907ba5 commit 28ef75a

File tree

15 files changed

+442
-356
lines changed

15 files changed

+442
-356
lines changed

lib/contribute/broadcast.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import * as Catalog from "../media/catalog"
77
import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
88

99
export interface BroadcastConfig {
10-
namespace: string
10+
namespace: string[]
1111
connection: Connection
1212
media: MediaStream
1313

@@ -26,7 +26,7 @@ export class Broadcast {
2626
readonly config: BroadcastConfig
2727
readonly catalog: Catalog.Root
2828
readonly connection: Connection
29-
readonly namespace: string
29+
readonly namespace: string[]
3030

3131
#running: Promise<void>
3232

@@ -148,7 +148,7 @@ export class Broadcast {
148148
// Send a SUBSCRIBE_OK
149149
await subscriber.ack()
150150

151-
const stream = await subscriber.group({ group: 0 })
151+
const stream = await subscriber.subgroup({ group: 0, subgroup: 0 })
152152
await stream.write({ object: 0, payload: bytes })
153153
await stream.close()
154154
}
@@ -162,7 +162,7 @@ export class Broadcast {
162162

163163
const init = await track.init()
164164

165-
const stream = await subscriber.group({ group: 0 })
165+
const stream = await subscriber.subgroup({ group: 0, subgroup: 0 })
166166
await stream.write({ object: 0, payload: init })
167167
await stream.close()
168168
}
@@ -190,8 +190,9 @@ export class Broadcast {
190190

191191
async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
192192
// Create a new stream for each segment.
193-
const stream = await subscriber.group({
193+
const stream = await subscriber.subgroup({
194194
group: segment.id,
195+
subgroup: 0, // @todo: figure out the right way to do this
195196
priority: 127, // TODO,default to mid value, see: https://github.com/moq-wg/moq-transport/issues/504
196197
})
197198

lib/media/catalog/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Connection } from "../../transport"
22
import { asError } from "../../common/error"
33

44
export interface CommonTrackFields {
5-
namespace?: string
5+
namespace?: string[]
66
packaging?: string
77
renderGroup?: number
88
altGroup?: number
@@ -28,6 +28,9 @@ export function decode(raw: Uint8Array): Root {
2828
const str = decoder.decode(raw)
2929

3030
const catalog = JSON.parse(str)
31+
// namespace comes serialized as a "/" joined string, cast it back to a tuple (array)
32+
catalog.commonTrackFields.namespace = catalog.commonTrackFields.namespace?.split("/").filter(Boolean) // remove empty strings
33+
3134
if (!isRoot(catalog)) {
3235
throw new Error("invalid catalog")
3336
}
@@ -43,7 +46,7 @@ export function decode(raw: Uint8Array): Root {
4346
return catalog
4447
}
4548

46-
export async function fetch(connection: Connection, namespace: string): Promise<Root> {
49+
export async function fetch(connection: Connection, namespace: string[]): Promise<Root> {
4750
const subscribe = await connection.subscribe(namespace, ".catalog")
4851
try {
4952
const segment = await subscribe.data()
@@ -61,6 +64,7 @@ export async function fetch(connection: Connection, namespace: string): Promise<
6164
throw new Error("invalid catalog chunk")
6265
}
6366
} catch (e) {
67+
console.error("Catalog fetch error: ", e)
6468
const err = asError(e)
6569

6670
// Close the subscription after we're done.
@@ -78,7 +82,7 @@ export function isRoot(catalog: any): catalog is Root {
7882
}
7983

8084
export interface Track {
81-
namespace?: string
85+
namespace?: string[]
8286
name: string
8387
depends?: any[]
8488
packaging?: string
@@ -174,7 +178,7 @@ function isCatalogFieldValid(catalog: any, field: string): boolean {
174178
}
175179

176180
function isValidNamespace(namespace: any): boolean {
177-
return typeof namespace === "string"
181+
return Array.isArray(namespace) && namespace.every((ns) => typeof ns === "string")
178182
}
179183

180184
let isValidField: (value: any) => boolean

lib/playback/backend.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import MediaWorker from "web-worker:./worker/index.ts"
66

77
import { RingShared } from "../common/ring"
88
import { Root, isAudioTrack } from "../media/catalog"
9-
import { GroupHeader } from "../transport/objects"
9+
import { SubgroupHeader } from "../transport/objects"
1010

1111
export interface PlayerConfig {
1212
canvas: OffscreenCanvas
@@ -123,7 +123,7 @@ export interface Init {
123123
export interface Segment {
124124
init: string // name of the init track
125125
kind: "audio" | "video"
126-
header: GroupHeader
126+
header: SubgroupHeader
127127
buffer: Uint8Array
128128
stream: ReadableStream<Uint8Array>
129129
}

lib/playback/index.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { asError } from "../common/error"
77
import Backend from "./backend"
88

99
import { Client } from "../transport/client"
10-
import { GroupReader } from "../transport/objects"
10+
import { SubgroupReader } from "../transport/objects"
1111

1212
export type Range = Message.Range
1313
export type Timeline = Message.Timeline
@@ -71,7 +71,7 @@ export default class Player {
7171
const client = new Client({ url: config.url, fingerprint: config.fingerprint, role: "subscriber" })
7272
const connection = await client.connect()
7373

74-
const catalog = await Catalog.fetch(connection, config.namespace)
74+
const catalog = await Catalog.fetch(connection, [config.namespace])
7575
console.log("catalog", catalog)
7676

7777
const canvas = config.canvas.transferControlToOffscreen()
@@ -81,13 +81,15 @@ export default class Player {
8181
}
8282

8383
async #run() {
84+
// Key is "/" serialized namespace for lookup ease
85+
// Value is Track.initTrack. @todo: type this properly
8486
const inits = new Set<[string, string]>()
8587
const tracks = new Array<Catalog.Track>()
8688

8789
this.#catalog.tracks.forEach((track, index) => {
8890
if (index == this.#tracknum || Catalog.isAudioTrack(track)) {
8991
if (!track.namespace) throw new Error("track has no namespace")
90-
if (track.initTrack) inits.add([track.namespace, track.initTrack])
92+
if (track.initTrack) inits.add([track.namespace.join("/"), track.initTrack])
9193
tracks.push(track)
9294
}
9395
})
@@ -103,7 +105,7 @@ export default class Player {
103105
}
104106

105107
async #runInit(namespace: string, name: string) {
106-
const sub = await this.#connection.subscribe(namespace, name)
108+
const sub = await this.#connection.subscribe([namespace], name)
107109
try {
108110
const init = await Promise.race([sub.data(), this.#running])
109111
if (!init) throw new Error("no init data")
@@ -143,7 +145,7 @@ export default class Player {
143145
const segment = await Promise.race([sub.data(), this.#running])
144146
if (!segment) continue
145147

146-
if (!(segment instanceof GroupReader)) {
148+
if (!(segment instanceof SubgroupReader)) {
147149
throw new Error(`expected group reader for segment: ${track.name}`)
148150
}
149151

@@ -281,6 +283,7 @@ export default class Player {
281283
try {
282284
await this.#running
283285
} catch (e) {
286+
console.error("Error in Player.closed():", e)
284287
return asError(e)
285288
}
286289
}

lib/playback/worker/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import * as MP4 from "../../media/mp4"
77
import * as Message from "./message"
88
import { asError } from "../../common/error"
99
import { Deferred } from "../../common/async"
10-
import { GroupReader, Reader } from "../../transport/objects"
10+
import { SubgroupReader, Reader } from "../../transport/objects"
1111

1212
class Worker {
1313
// Timeline receives samples, buffering them and choosing the timestamp to render.
@@ -71,7 +71,7 @@ class Worker {
7171
const container = new MP4.Parser(await init.promise)
7272

7373
const timeline = msg.kind === "audio" ? this.#timeline.audio : this.#timeline.video
74-
const reader = new GroupReader(msg.header, new Reader(msg.buffer, msg.stream))
74+
const reader = new SubgroupReader(msg.header, new Reader(msg.buffer, msg.stream))
7575

7676
// Create a queue that will contain each MP4 frame.
7777
const queue = new TransformStream<MP4.Frame>({})

lib/playback/worker/message.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { GroupHeader } from "../../transport/objects"
1+
import { SubgroupHeader } from "../../transport/objects"
22
import { RingShared } from "../../common/ring"
33

44
export interface Config {
@@ -25,7 +25,7 @@ export interface Init {
2525
export interface Segment {
2626
init: string // name of the init object
2727
kind: "audio" | "video"
28-
header: GroupHeader
28+
header: SubgroupHeader
2929
buffer: Uint8Array
3030
stream: ReadableStream<Uint8Array>
3131
}

lib/transport/client.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,16 @@ export class Client {
4747
const setup = new Setup.Stream(reader, writer)
4848

4949
// Send the setup message.
50-
await setup.send.client({ versions: [Setup.Version.DRAFT_05], role: this.config.role })
50+
await setup.send.client({
51+
versions: [Setup.Version.DRAFT_06],
52+
role: this.config.role,
53+
})
5154

5255
// Receive the setup message.
5356
// TODO verify the SETUP response.
5457
const server = await setup.recv.server()
5558

56-
if (server.version != Setup.Version.DRAFT_05) {
59+
if (server.version != Setup.Version.DRAFT_06) {
5760
throw new Error(`unsupported server version: ${server.version}`)
5861
}
5962

lib/transport/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ export class Connection {
4343
await Promise.all([this.#runControl(), this.#runObjects()])
4444
}
4545

46-
announce(namespace: string) {
46+
announce(namespace: string[]) {
4747
return this.#publisher.announce(namespace)
4848
}
4949

5050
announced() {
5151
return this.#subscriber.announced()
5252
}
5353

54-
subscribe(namespace: string, track: string) {
54+
subscribe(namespace: string[], track: string) {
5555
return this.#subscriber.subscribe(namespace, track)
5656
}
5757

0 commit comments

Comments
 (0)