Skip to content

Commit d403f0f

Browse files
committed
Add live metrics
1 parent 75c0acc commit d403f0f

File tree

23 files changed

+881
-1427
lines changed

23 files changed

+881
-1427
lines changed

apps/push-serverless/src/controllers/internal/on_publish.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export const apiInternalOnPublish: Middleware = async ctx => {
3737
return;
3838
}
3939

40-
await Action.startStream(liveId, watchToken, body.client_id);
40+
await Action.startStream(liveId, watchToken, body.client_id, body.stream_id);
4141

4242
void client.v1.internals.push.action.$post({
4343
body: {

apps/push-serverless/src/services/action.ts

+31-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import { lives, rejectSession, sessions } from '../utils/sessions';
22
import { Encoder } from './encoder';
33
import { generateToken } from '../utils/token';
4+
import { client, serverToken } from '../utils/api';
5+
import { getStream } from './srs-api';
46

57
export class Action {
68
static async startStream(
79
liveId: number,
810
watchToken: string,
9-
clientId: string
11+
clientId: string,
12+
streamId: string
1013
) {
1114
const currentSession = sessions.get(liveId);
1215
if (currentSession) {
@@ -17,17 +20,43 @@ export class Action {
1720
const internalToken = generateToken();
1821
const encoder = new Encoder(liveId, watchToken, internalToken);
1922

23+
const sendHeartbeat = async () => {
24+
const streamInfo = await getStream(streamId);
25+
const stats = streamInfo?.stream;
26+
if (!stats) {
27+
console.warn('stream not found', streamId, liveId);
28+
return;
29+
}
30+
31+
void client.v1.internals.push.action.$post({
32+
body: {
33+
liveId,
34+
action: 'stream:heartbeat',
35+
serverToken,
36+
stats: stats
37+
}
38+
});
39+
};
40+
41+
const heartbeatInterval = setInterval(
42+
() => void sendHeartbeat(),
43+
1000 * 15
44+
);
45+
2046
sessions.set(liveId, {
2147
clientId,
2248
encoder,
23-
internalToken
49+
internalToken,
50+
heartbeatInterval
2451
});
2552

2653
setTimeout(() => {
2754
void encoder.encodeToHighQualityHls();
2855
void encoder.encodeToLowQualityHls();
2956
void encoder.encodeToAudio();
3057

58+
void sendHeartbeat();
59+
3160
// this.startRecording(liveId);
3261
}, 500);
3362
}

apps/push-serverless/src/services/srs-api.ts

+98
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,101 @@ export const kickoffClient = async (id: string) => {
88
}
99
});
1010
};
11+
12+
/** "streams": [
13+
{
14+
"id": "vid-ff500jr",
15+
"name": "10_628f27feff6be7187b295317f8d5fe81e20e524f7f57e581c9430d3b3eea06ada579f23ba03c78ea43bf6c64520db4d6",
16+
"vhost": "vid-8n5l812",
17+
"app": "live",
18+
"tcUrl": "rtmp://127.0.0.1/live",
19+
"url": "/live/10_628f27feff6be7187b295317f8d5fe81e20e524f7f57e581c9430d3b3eea06ada579f23ba03c78ea43bf6c64520db4d6",
20+
"live_ms": 1717517428337,
21+
"clients": 6,
22+
"frames": 7795,
23+
"send_bytes": 186135731,
24+
"recv_bytes": 44801527,
25+
"kbps": {
26+
"recv_30s": 2668,
27+
"send_30s": 10686
28+
},
29+
"publish": {
30+
"active": true,
31+
"cid": "69kaucce"
32+
},
33+
"video": {
34+
"codec": "H264",
35+
"profile": "High",
36+
"level": "Other",
37+
"width": 1920,
38+
"height": 1080
39+
},
40+
"audio": {
41+
"codec": "AAC",
42+
"sample_rate": 44100,
43+
"channel": 2,
44+
"profile": "LC"
45+
}
46+
}
47+
]
48+
*/
49+
interface Stream {
50+
id: string;
51+
name: string;
52+
vhost: string;
53+
app: string;
54+
tcUrl: string;
55+
url: string;
56+
live_ms: number;
57+
clients: number;
58+
frames: number;
59+
send_bytes: number;
60+
recv_bytes: number;
61+
kbps: {
62+
recv_30s: number;
63+
send_30s: number;
64+
};
65+
publish: {
66+
active: boolean;
67+
cid: string;
68+
};
69+
video: {
70+
codec: string;
71+
profile: string;
72+
level: string;
73+
width: number;
74+
height: number;
75+
} | null;
76+
audio: {
77+
codec: string;
78+
sample_rate: number;
79+
channel: number;
80+
profile: string;
81+
} | null;
82+
}
83+
84+
interface StreamsResponse {
85+
code: number;
86+
server: string;
87+
service: string;
88+
pid: string;
89+
streams: Stream[];
90+
}
91+
92+
interface StreamResponse {
93+
code: number;
94+
server: string;
95+
service: string;
96+
pid: string;
97+
stream?: Stream;
98+
}
99+
100+
export const getStreams = async () => {
101+
const res = await fetch(`${API_ENDPOINT}/api/v1/streams`);
102+
return res.json() as Promise<StreamsResponse>;
103+
};
104+
105+
export const getStream = async (id: string) => {
106+
const res = await fetch(`${API_ENDPOINT}/api/v1/streams/${id}`);
107+
return res.json() as Promise<StreamResponse>;
108+
};

apps/push-serverless/src/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export type SRSPublishCallback = {
88
stream: string;
99
tcUrl: string;
1010
param: string;
11+
stream_id: string;
1112
};
1213

1314
export type SRSUnPublishCallback = {

apps/push-serverless/src/utils/sessions.ts

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export const sessions = new Map<
77
clientId: string;
88
encoder: Encoder;
99
internalToken: string;
10+
heartbeatInterval?: NodeJS.Timeout;
1011
}
1112
>();
1213

@@ -37,6 +38,10 @@ export const rejectSession = async (liveId: number) => {
3738
console.warn('cleanup error', liveId, e);
3839
}
3940

41+
if (session.heartbeatInterval) {
42+
clearInterval(session.heartbeatInterval);
43+
}
44+
4045
try {
4146
await kickoffClient(session.clientId);
4247
} catch (e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "Live" ADD COLUMN "stats" JSONB NOT NULL DEFAULT '{}';

apps/server/prisma/schema.prisma

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ model Live {
8181
watchToken String? @db.VarChar(100)
8282
thumbnailId Int?
8383
config Json @default("{}")
84+
stats Json @default("{}")
8485
isDeleted Boolean @default(false)
8586
isRecording Boolean @default(false)
8687
isPushing Boolean @default(false)

apps/server/src/controllers/v1/internals/push/action.ts

+72-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ const reqBodySchema: JSONSchemaType<Request> = {
2929
'stream:stop',
3030
'record:processing',
3131
'record:done',
32-
'record:failed'
32+
'record:failed',
33+
'stream:heartbeat'
3334
]
3435
},
3536
serverToken: {
@@ -42,6 +43,66 @@ const reqBodySchema: JSONSchemaType<Request> = {
4243
fileSize: {
4344
type: 'string',
4445
nullable: true
46+
},
47+
stats: {
48+
type: 'object',
49+
properties: {
50+
kbps: {
51+
type: 'object',
52+
properties: {
53+
recv_30s: {
54+
type: 'number'
55+
},
56+
send_30s: {
57+
type: 'number'
58+
}
59+
},
60+
required: ['recv_30s', 'send_30s']
61+
},
62+
video: {
63+
type: 'object',
64+
properties: {
65+
codec: {
66+
type: 'string'
67+
},
68+
profile: {
69+
type: 'string'
70+
},
71+
level: {
72+
type: 'string'
73+
},
74+
width: {
75+
type: 'number'
76+
},
77+
height: {
78+
type: 'number'
79+
}
80+
},
81+
required: ['codec', 'profile', 'level', 'width', 'height'],
82+
nullable: true
83+
},
84+
audio: {
85+
type: 'object',
86+
properties: {
87+
codec: {
88+
type: 'string'
89+
},
90+
sample_rate: {
91+
type: 'number'
92+
},
93+
channel: {
94+
type: 'number'
95+
},
96+
profile: {
97+
type: 'string'
98+
}
99+
},
100+
required: ['codec', 'sample_rate', 'channel', 'profile'],
101+
nullable: true
102+
}
103+
},
104+
required: ['kbps'],
105+
nullable: true
45106
}
46107
},
47108
required: ['liveId', 'action', 'serverToken'],
@@ -127,6 +188,16 @@ export const postV1InternalsPushAction: APIRoute<
127188
live.id,
128189
LiveRecordingStatus.Failed
129190
);
191+
} else if (body.action === 'stream:heartbeat') {
192+
if (!body.stats) {
193+
ctx.status = 400;
194+
ctx.body = {
195+
errorCode: 'invalid_request'
196+
};
197+
return;
198+
}
199+
200+
await lives.updateStats(live, body.stats);
130201
}
131202

132203
if (newLive) {

apps/server/src/controllers/v1/streams/get.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ export const getV1Streams: APIRoute<
1010
never,
1111
Response,
1212
UserState & LiveState
13-
> = ctx => {
13+
> = async ctx => {
1414
ctx.body = {
15-
live: lives.getPrivate(ctx.state.live)
15+
live: lives.getPrivate(ctx.state.live),
16+
stats: await lives.getStats(ctx.state.live)
1617
};
1718
};

0 commit comments

Comments
 (0)