From 41cd74f76ca8e857553a0f9b59571ed7b8c2c903 Mon Sep 17 00:00:00 2001 From: Yuzuki Aida Date: Sat, 1 Jun 2024 03:02:31 +0900 Subject: [PATCH] Add flv (websocket) protocol --- apps/push-serverless/src/streaming/index.ts | 92 ++++++++++++++++--- .../src/controllers/v1/lives/get-url.ts | 5 +- apps/server/src/utils/constants.ts | 5 + apps/web/locales/en.json | 6 +- apps/web/locales/ja.json | 8 +- apps/web/organisms/live/video/controller.tsx | 18 ++-- apps/web/utils/hooks/use-video-stream.ts | 39 +++++--- .../api/v1/lives/_liveId@number/url/index.ts | 3 +- 8 files changed, 139 insertions(+), 37 deletions(-) diff --git a/apps/push-serverless/src/streaming/index.ts b/apps/push-serverless/src/streaming/index.ts index 062394c3..1d3daa9d 100644 --- a/apps/push-serverless/src/streaming/index.ts +++ b/apps/push-serverless/src/streaming/index.ts @@ -1,6 +1,6 @@ import { Server } from 'http'; import { WebSocketServer } from 'ws'; -import type WebSocket from 'ws'; +import WebSocket from 'ws'; import { pathToRegexp } from 'path-to-regexp'; import { Encoder } from '../services/encoder'; import { Readable } from 'stream'; @@ -9,6 +9,9 @@ import { checkToken } from '../utils/api'; const streamPushRegexp = pathToRegexp( '/api/externals/websocket/v1/stream-push/:liveId' ); +const streamWatchRegexp = pathToRegexp( + '/api/externals/websocket/v1/stream-watch/:liveId' +); export class Streaming { constructor(server: Server) { @@ -17,31 +20,53 @@ export class Streaming { }); ws.on('connection', (socket, req) => { - try { - console.log('websocket connected', req.url); - - if (req.url) { - void this.handleConnection(socket, req.url); + void (async () => { + try { + console.log('websocket connected', req.url); + + if (req.url) { + await this.handleConnection(socket, req.url); + } + } catch (e) { + console.warn(e); + socket.close(); } - } catch (e) { - console.warn(e); - socket.close(); - } + })(); }); } - private handleConnection(socket: WebSocket, Url: string) { + private async handleConnection(socket: WebSocket, Url: string) { const url = new URL(Url, process.env.SERVER_ENDPOINT); const watchToken = url.searchParams.get('watchToken') || undefined; const token = url.searchParams.get('token') || undefined; const streamPush = streamPushRegexp.exec(url.pathname); + const streamWatch = streamWatchRegexp.exec(url.pathname); + if (streamPush) { const liveId = Number(streamPush[1]); if (!token || !watchToken) { return this.closeConnection(socket, 'Token is required'); } - return this.handleV1Push(socket, liveId, watchToken, token); + try { + await this.handleV1Push(socket, liveId, watchToken, token); + return; + } catch (e) { + console.error(e); + return this.closeConnection(socket, 'An error occurred'); + } + } else if (streamWatch) { + const liveId = Number(streamWatch[1]); + if (!token || !watchToken) { + return this.closeConnection(socket, 'Token is required'); + } + try { + await this.handleV1Watch(socket, liveId, watchToken, token); + return; + } catch (e) { + console.error(e); + return this.closeConnection(socket, 'An error occurred'); + } } return this.closeConnection(socket, 'invalid_path'); @@ -94,6 +119,49 @@ export class Streaming { socket.send(JSON.stringify({ type: 'ready' })); } + private async handleV1Watch( + socket: WebSocket, + liveId: number, + watchToken: string, + token: string + ) { + const url = `http://localhost:8080/streaming/live/${liveId}_${watchToken}.flv?token=${token}`; + console.log('websocket connected', liveId); + const response = await fetch(url); + + if (!response.ok) { + return this.closeConnection( + socket, + `Failed to connect: ${response.status}` + ); + } + + const reader = response.body?.getReader(); + if (!reader) { + return this.closeConnection(socket, 'Failed to get reader'); + } + + let success = false; + while (!success) { + const { done, value } = await reader.read(); + if (done || socket.readyState !== WebSocket.OPEN) { + success = true; + socket.close(); + break; + } + + socket.send(value, err => { + if (err) { + console.error('[handleV1Watch] failed to send data', err); + this.closeConnection(socket, 'Failed to send data'); + success = true; + } + }); + } + + console.log('websocket closed', liveId); + } + private closeConnection(socket: WebSocket, error?: string) { if (error) { socket.send(JSON.stringify({ error })); diff --git a/apps/server/src/controllers/v1/lives/get-url.ts b/apps/server/src/controllers/v1/lives/get-url.ts index d1368068..d1d4674c 100644 --- a/apps/server/src/controllers/v1/lives/get-url.ts +++ b/apps/server/src/controllers/v1/lives/get-url.ts @@ -1,7 +1,7 @@ import { Methods } from 'api-types/api/v1/lives/_liveId@number/url'; import { lives } from '../../../models'; import { jwtEdge } from '../../../services/jwt'; -import { basePushPlay } from '../../../utils/constants'; +import { basePushPlay, basePushPlayWs } from '../../../utils/constants'; import { APIRoute, LiveState } from '../../../utils/types'; type Response = Methods['get']['resBody']; @@ -45,7 +45,8 @@ export const getV1LivesUrl: APIRoute< } ctx.body = { - flv: `${basePushPlay}/streaming/live/${live.id}_${live.watchToken}.flv?token=${token}`, + flvWs: `${basePushPlayWs}/api/externals/websocket/v1/stream-watch/${live.id}?watchToken=${live.watchToken}&token=${token}`, + flvHttp: `${basePushPlay}/streaming/live/${live.id}_${live.watchToken}.flv?token=${token}`, hlsHq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/high/stream.m3u8?token=${token}`, hlsLq: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/low/stream.m3u8?token=${token}`, audio: `${basePushPlay}/static/live/${live.id}_${live.watchToken}/audio/stream.m3u8?token=${token}` diff --git a/apps/server/src/utils/constants.ts b/apps/server/src/utils/constants.ts index 53db0670..e3c2659f 100644 --- a/apps/server/src/utils/constants.ts +++ b/apps/server/src/utils/constants.ts @@ -7,6 +7,8 @@ export const REDIS_CONNECTION = { }; export const PROTOCOL = `http${process.env.USE_HTTP ? '' : 's'}`; +export const PROTOCOL_WS = `ws${process.env.USE_HTTP ? '' : 's'}`; + export const basePushStream = `${PROTOCOL}://${process.env.PUSH_DOMAIN || ''}`; export const enableVideo = !!process.env.VIDEO_DOMAIN; export const baseVideoStream = `${PROTOCOL}://${ @@ -15,6 +17,9 @@ export const baseVideoStream = `${PROTOCOL}://${ export const basePushPlay = `${PROTOCOL}://${ process.env.PUSH_CDN_DOMAIN || process.env.PUSH_DOMAIN || '' }`; +export const basePushPlayWs = `${PROTOCOL_WS}://${ + process.env.PUSH_CDN_DOMAIN || process.env.PUSH_DOMAIN || '' +}`; export const baseVideoPlay = `${PROTOCOL}://${ process.env.VIDEO_CDN_DOMAIN || process.env.VIDEO_DOMAIN || '' }`; diff --git a/apps/web/locales/en.json b/apps/web/locales/en.json index 2cc641d2..175d618d 100644 --- a/apps/web/locales/en.json +++ b/apps/web/locales/en.json @@ -97,8 +97,10 @@ "video.cache-deleted": "The video cannot be played because the cache has been deleted. The streamer can restore the recording from the live history.", "video.recording-deleted": "This live was not recorded.", "live.player.settings.type": "Change streaming method", - "live.player.settings.type.flv.title": "Source quality", - "live.player.settings.type.flv.note": "Ultra-Low latency (iOS is not supported)", + "live.player.settings.type.flvWs.title": "Source quality", + "live.player.settings.type.flvWs.note": "Ultra-Low latency (iOS is not supported) (Beta)", + "live.player.settings.type.flvHttp.title": "Source quality", + "live.player.settings.type.flvHttp.note": "Low latency (iOS is not supported)", "live.player.settings.type.hlsHq.title": "High quality", "live.player.settings.type.hlsHq.note": "Low latency (For iOS)", "live.player.settings.type.hlsLq.title": "Low quality", diff --git a/apps/web/locales/ja.json b/apps/web/locales/ja.json index a000f52d..1ddd57f3 100644 --- a/apps/web/locales/ja.json +++ b/apps/web/locales/ja.json @@ -97,12 +97,14 @@ "video.cache-deleted": "キャッシュが削除されたため動画は再生できません。配信履歴より復活させる事ができます。", "video.recording-deleted": "この配信の録画はありません。", "live.player.settings.type": "再生方式を変更", - "live.player.settings.type.flv.title": "ソース画質", - "live.player.settings.type.flv.note": "超低遅延 (iOS 非対応)", + "live.player.settings.type.flvWs.title": "ソース画質", + "live.player.settings.type.flvWs.note": "超低遅延 (iOS 非対応) (Beta)", + "live.player.settings.type.flvHttp.title": "ソース画質", + "live.player.settings.type.flvHttp.note": "低遅延 (iOS 非対応)", "live.player.settings.type.hlsHq.title": "高画質", "live.player.settings.type.hlsHq.note": "低遅延 (iOS 向け)", "live.player.settings.type.hlsLq.title": "低画質", - "live.player.settings.type.hlsLq.note": "携帯回線向け", + "live.player.settings.type.hlsLq.note": "安定性重視モード (携帯回線向け)", "live.player.settings.type.audio.title": "音声のみ", "live.player.settings.type.audio.note": "バックグラウンド再生向け", "video.player.settings.type.hlsHq.title": "高画質", diff --git a/apps/web/organisms/live/video/controller.tsx b/apps/web/organisms/live/video/controller.tsx index 370439ee..d4fb0283 100644 --- a/apps/web/organisms/live/video/controller.tsx +++ b/apps/web/organisms/live/video/controller.tsx @@ -230,16 +230,18 @@ export const Controller: FC = props => { > {(isLive ? livePlayTypes : videoPlayTypes).map(playType => ( - {playType.badge && ( - {playType.badge} - )} - + {playType.badge && ( + + {playType.badge} + + )} + ( }, [videoTagRef]); const handleFlv = useCallback( - async (url: string) => { + async (playType: LivePlayType, url: string) => { if (!videoTagRef.current) { return; } @@ -61,7 +67,7 @@ export const useVideoStream = ( const player = Mpegts.createPlayer( { - type: 'flv', + type: playType === LivePlayType.FlvWs ? 'mse' : 'flv', isLive: true, url }, @@ -146,6 +152,10 @@ export const useVideoStream = ( player.on(Hls.Events.ERROR, (event, data) => { console.warn(event, data); + if (!data.fatal) { + return; + } + switch (data.type) { case Hls.ErrorTypes.NETWORK_ERROR: { if (data.details === Hls.ErrorDetails.MANIFEST_LOAD_ERROR) { @@ -164,12 +174,9 @@ export const useVideoStream = ( setTimeout(() => { player.recoverMediaError(); }, 1000); - break; default: - if (data.fatal) { - player.destroy(); - } + player.destroy(); break; } }); @@ -196,10 +203,20 @@ export const useVideoStream = ( if (entityType === 'live') { const live = url as LiveUrls; - if (playType === LivePlayType.Flv) { + if (playType === LivePlayType.FlvHttp) { + void (async () => { + try { + await handleFlv(playType, live.flvHttp); + } catch (e) { + if (e instanceof FlvNotSupportedError) { + setPlayType(LivePlayType.HlsHq as PlayType); + } + } + })(); + } else if (playType === LivePlayType.FlvWs) { void (async () => { try { - await handleFlv(live.flv); + await handleFlv(playType, live.flvWs); } catch (e) { if (e instanceof FlvNotSupportedError) { setPlayType(LivePlayType.HlsHq as PlayType); @@ -213,7 +230,7 @@ export const useVideoStream = ( } else if (playType === LivePlayType.Audio) { void handleHls(live.audio, playType, true); } else { - setPlayType(LivePlayType.Flv as PlayType); + setPlayType(LivePlayType.FlvWs as PlayType); } } else { const video = url as VideoUrls; diff --git a/packages/api-types/api/v1/lives/_liveId@number/url/index.ts b/packages/api-types/api/v1/lives/_liveId@number/url/index.ts index 9f0c222a..32baaee3 100644 --- a/packages/api-types/api/v1/lives/_liveId@number/url/index.ts +++ b/packages/api-types/api/v1/lives/_liveId@number/url/index.ts @@ -1,7 +1,8 @@ import { AuthorizationHeader } from '../../../../../common/types'; export type LiveUrls = { - flv: string; + flvWs: string; + flvHttp: string; hlsHq: string; hlsLq: string; audio: string;