@@ -3,7 +3,7 @@ import { Effect, Signal } from "@kixelated/signals";
33import type * as Catalog from "../../catalog" ;
44import * as Frame from "../../frame" ;
55import { PRIORITY } from "../../publish/priority" ;
6- import * as Time from "../../time" ;
6+ import type * as Time from "../../time" ;
77import * as Hex from "../../util/hex" ;
88
99export type SourceProps = {
@@ -33,10 +33,16 @@ type SyncStatus = {
3333 bufferDuration ?: number ;
3434} ;
3535
36+ // Only count it as buffering if we had to sleep for 200ms or more before rendering the next frame.
37+ // Unfortunately, this has to be quite high because of b-frames.
38+ // TODO Maybe we need to detect b-frames and make this dynamic?
39+ const MIN_SYNC_WAIT_MS = 200 as Time . Milli ;
40+
41+ // The maximum number of concurrent b-frames that we support.
42+ const MAX_BFRAMES = 10 ;
43+
3644// Responsible for switching between video tracks and buffering frames.
3745export class Source {
38- #MIN_SYNC_WAIT_MS = 50 as Time . Milli ;
39-
4046 broadcast : Signal < Moq . Broadcast | undefined > ;
4147 enabled : Signal < boolean > ; // Don't download any longer
4248
@@ -59,12 +65,10 @@ export class Source {
5965 // Used as a tiebreaker when there are multiple tracks (HD vs SD).
6066 target = new Signal < Target | undefined > ( undefined ) ;
6167
62- // Unfortunately, browsers don't let us hold on to multiple VideoFrames.
63- // ex. Firefox only allows 2 outstanding VideoFrames at a time.
64- // In order to semi-support b-frames, we buffer two frames and expose the earliest one.
68+ // Expose the current frame to render as a signal
6569 frame = new Signal < VideoFrame | undefined > ( undefined ) ;
66- #next?: VideoFrame ;
6770
71+ // The target latency in milliseconds.
6872 latency : Signal < Time . Milli > ;
6973
7074 // The display size of the video in pixels, ideally sourced from the catalog.
@@ -76,11 +80,7 @@ export class Source {
7680 // Used to convert PTS to wall time.
7781 #reference: DOMHighResTimeStamp | undefined ;
7882
79- // The latency after we've accounted for the extra frame buffering and jitter buffer.
80- #jitter: Signal < Time . Milli > ;
81-
8283 bufferStatus = new Signal < BufferStatus > ( { state : "empty" } ) ;
83-
8484 syncStatus = new Signal < SyncStatus > ( { state : "ready" } ) ;
8585
8686 #signals = new Effect ( ) ;
@@ -94,10 +94,6 @@ export class Source {
9494 this . latency = Signal . from ( props ?. latency ?? ( 100 as Time . Milli ) ) ;
9595 this . enabled = Signal . from ( props ?. enabled ?? false ) ;
9696
97- // We subtract a frame from the jitter buffer to account for the extra buffered frame.
98- // Assume 30fps by default.
99- this . #jitter = new Signal ( Math . max ( 0 , this . latency . peek ( ) - 33 ) as Time . Milli ) ;
100-
10197 this . #signals. effect ( ( effect ) => {
10298 const c = effect . get ( catalog ) ?. video ;
10399 effect . set ( this . catalog , c ) ;
@@ -108,7 +104,6 @@ export class Source {
108104 this . #signals. effect ( this . #runSelected. bind ( this ) ) ;
109105 this . #signals. effect ( this . #runPending. bind ( this ) ) ;
110106 this . #signals. effect ( this . #runDisplay. bind ( this ) ) ;
111- this . #signals. effect ( this . #runJitter. bind ( this ) ) ;
112107 this . #signals. effect ( this . #runBuffer. bind ( this ) ) ;
113108 }
114109
@@ -170,9 +165,6 @@ export class Source {
170165 return undefined ;
171166 } ) ;
172167
173- this . #next?. close ( ) ;
174- this . #next = undefined ;
175-
176168 return ;
177169 }
178170
@@ -192,62 +184,89 @@ export class Source {
192184
193185 // Create consumer that reorders groups/frames up to the provided latency.
194186 const consumer = new Frame . Consumer ( sub , {
195- latency : this . #jitter ,
187+ latency : this . latency ,
196188 } ) ;
197189 effect . cleanup ( ( ) => consumer . close ( ) ) ;
198190
191+ // We need a queue because VideoDecoder doesn't block on a Promise returned by output.
192+ // NOTE: We will drain this queue almost immediately, so the highWaterMark is just a safety net.
193+ const queue = new TransformStream < VideoFrame , VideoFrame > (
194+ undefined ,
195+ { highWaterMark : MAX_BFRAMES } ,
196+ { highWaterMark : MAX_BFRAMES } ,
197+ ) ;
198+
199+ const writer = queue . writable . getWriter ( ) ;
200+ effect . cleanup ( ( ) => writer . close ( ) ) ;
201+
202+ const reader = queue . readable . getReader ( ) ;
203+ effect . cleanup ( ( ) => reader . cancel ( ) ) ;
204+
199205 const decoder = new VideoDecoder ( {
200- output : ( frame ) => {
201- // Keep track of the two newest frames.
202- // this.frame is older than this.#next, if it exists.
203- const prev = this . frame . peek ( ) ;
204- if ( prev && prev . timestamp >= frame . timestamp ) {
205- // NOTE: This can happen if you have more than 1 b-frame in a row.
206- // Sorry, blame Firefox.
206+ output : async ( frame : VideoFrame ) => {
207+ // Insert into a queue so we can perform ordered sleeps.
208+ // If this were to block, I believe WritableStream is still ordered.
209+ try {
210+ await writer . write ( frame ) ;
211+ } catch {
207212 frame . close ( ) ;
208- return ;
209213 }
214+ } ,
215+ // TODO bubble up error
216+ error : ( error ) => {
217+ console . error ( error ) ;
218+ effect . close ( ) ;
219+ } ,
220+ } ) ;
221+ effect . cleanup ( ( ) => decoder . close ( ) ) ;
210222
211- if ( ! prev ) {
212- // As time-to-video optimization, use the first frame we see.
213- // We know this is an i-frame so there's no need to re-order it.
214- this . frame . set ( frame ) ;
215- return ;
216- }
223+ effect . spawn ( async ( ) => {
224+ for ( ; ; ) {
225+ const { value : frame } = await reader . read ( ) ;
226+ if ( ! frame ) break ;
227+
228+ // Sleep until it's time to decode the next frame.
229+ const ref = performance . now ( ) - frame . timestamp / 1000 ;
217230
218- // If jitter is 0, then we disable buffering frames.
219- const jitter = this . #jitter . peek ( ) ;
220- if ( jitter === 0 ) {
221- prev . close ( ) ;
222- this . frame . set ( frame ) ;
223- return ;
231+ let sleep = 0 ;
232+ if ( ! this . #reference || ref < this . #reference ) {
233+ this . #reference = ref ;
234+ // Don't sleep so we immediately render this frame.
235+ } else {
236+ sleep = this . #reference - ref + this . latency . peek ( ) ;
224237 }
225238
226- if ( ! this . #next) {
227- // We know we're newer than the current frame, so buffer it.
228- this . #next = frame ;
229- return ;
239+ if ( sleep > MIN_SYNC_WAIT_MS ) {
240+ this . syncStatus . set ( { state : "wait" , bufferDuration : sleep } ) ;
230241 }
231242
232- // Close the previous frame, and check if we need to replace #next or this.frame.
233- prev . close ( ) ;
243+ if ( sleep > 0 ) {
244+ // NOTE: WebCodecs doesn't block on output promises (I think?), so these sleeps will occur concurrently.
245+ // TODO: This cause the `syncStatus` to be racey especially
246+ await new Promise ( ( resolve ) => setTimeout ( resolve , sleep ) ) ;
247+ }
234248
235- if ( this . #next. timestamp < frame . timestamp ) {
236- // Replace #next with the new frame.
237- this . frame . set ( this . #next) ;
238- this . #next = frame ;
249+ if ( sleep > MIN_SYNC_WAIT_MS ) {
250+ // Include how long we slept if it was above the threshold.
251+ this . syncStatus . set ( { state : "ready" , bufferDuration : sleep } ) ;
239252 } else {
240- // #next is newer than this new frame, so keep it.
241- this . frame . set ( frame ) ;
253+ this . syncStatus . set ( { state : "ready" } ) ;
254+
255+ // If the track switch was pending, complete it now.
256+ if ( this . #pending === effect ) {
257+ this . #active?. close ( ) ;
258+ this . #active = effect ;
259+ this . #pending = undefined ;
260+ effect . set ( this . active , name ) ;
261+ }
242262 }
243- } ,
244- // TODO bubble up error
245- error : ( error ) => {
246- console . error ( error ) ;
247- effect . close ( ) ;
248- } ,
263+
264+ this . frame . update ( ( prev ) => {
265+ prev ?. close ( ) ;
266+ return frame ;
267+ } ) ;
268+ }
249269 } ) ;
250- effect . cleanup ( ( ) => decoder . close ( ) ) ;
251270
252271 decoder . configure ( {
253272 ...config ,
@@ -262,47 +281,6 @@ export class Source {
262281 const next = await Promise . race ( [ consumer . decode ( ) , effect . cancel ] ) ;
263282 if ( ! next ) break ;
264283
265- // See if we can upgrade ourselves to the active track once we catch up.
266- // TODO: This is a racey when latency === 0, but I think it's fine.
267- const prev = this . frame . peek ( ) ;
268- if ( this . #pending === effect && ( ! prev || next . timestamp > prev . timestamp ) ) {
269- this . #active?. close ( ) ;
270- this . #active = effect ;
271- this . #pending = undefined ;
272- effect . set ( this . active , name ) ;
273- }
274-
275- // Sleep until it's time to decode the next frame.
276- const ref = performance . now ( ) - Time . Milli . fromMicro ( next . timestamp ) ;
277-
278- if ( ! this . #reference || ref < this . #reference) {
279- this . #reference = ref ;
280- } else {
281- const sleep = this . #reference - ref + this . #jitter. peek ( ) ;
282- const isWaitRequired = sleep >= this . #MIN_SYNC_WAIT_MS;
283- if ( sleep > 0 ) {
284- // The planned jitter buffer size
285- const bufferDuration : Time . Milli = this . #jitter. peek ( ) ;
286-
287- if ( isWaitRequired ) {
288- this . syncStatus . set ( { state : "wait" , bufferDuration } ) ;
289- }
290-
291- await new Promise ( ( resolve ) => setTimeout ( resolve , sleep ) ) ;
292-
293- if ( isWaitRequired ) {
294- this . syncStatus . set ( { state : "ready" , bufferDuration } ) ;
295- }
296- } else {
297- this . syncStatus . set ( { state : "ready" } ) ;
298- }
299- }
300-
301- if ( decoder . state === "closed" ) {
302- // Closed during the sleep
303- break ;
304- }
305-
306284 const chunk = new EncodedVideoChunk ( {
307285 type : next . keyframe ? "key" : "delta" ,
308286 data : next . data ,
@@ -373,48 +351,23 @@ export class Source {
373351 }
374352
375353 #runBuffer( effect : Effect ) : void {
376- const currentFrame = effect . get ( this . frame ) ;
377- const nextFrame = this . #next;
354+ const frame = effect . get ( this . frame ) ;
378355 const enabled = effect . get ( this . enabled ) ;
379356
380- const isBufferEmpty = enabled && ! currentFrame && ! nextFrame ;
381-
357+ const isBufferEmpty = enabled && ! frame ;
382358 if ( isBufferEmpty ) {
383359 this . bufferStatus . set ( { state : "empty" } ) ;
384360 } else {
385361 this . bufferStatus . set ( { state : "filled" } ) ;
386362 }
387363 }
388364
389- #runJitter( effect : Effect ) : void {
390- const config = effect . get ( this . #selectedConfig) ;
391- if ( ! config ) return ;
392-
393- // Use the framerate to compute the jitter buffer size.
394- // We always buffer a single frame, so subtract that from the jitter buffer.
395- const fps = config . framerate && config . framerate > 0 ? config . framerate : 30 ;
396- const delay = 1000 / fps ;
397- const latency = effect . get ( this . latency ) ;
398-
399- const jitter = Math . max ( 0 , latency - delay ) as Time . Milli ;
400- this . #jitter. set ( jitter ) ;
401-
402- // If we're not buffering any frames, then close the next frame.
403- if ( jitter === 0 && this . #next) {
404- this . #next. close ( ) ;
405- this . #next = undefined ;
406- }
407- }
408-
409365 close ( ) {
410366 this . frame . update ( ( prev ) => {
411367 prev ?. close ( ) ;
412368 return undefined ;
413369 } ) ;
414370
415- this . #next?. close ( ) ;
416- this . #next = undefined ;
417-
418371 this . #signals. close ( ) ;
419372 }
420373}
0 commit comments