Skip to content

Commit 1dd1873

Browse files
committed
log streams to database now
1 parent bb86d6d commit 1dd1873

10 files changed

+240
-3
lines changed

api_server/src/Database.ts

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import {Pool, ResultSetHeader, RowDataPacket} from "mysql2";
2+
import {Util} from "./Util";
3+
import {CONFIG} from "./config";
4+
5+
const mysql = require('mysql2');
6+
7+
export class Database {
8+
9+
protected static instance: Database;
10+
protected pool: Pool;
11+
12+
private constructor() {
13+
14+
const dbConfig = CONFIG.database;
15+
16+
this.pool = mysql.createPool({
17+
host: dbConfig.host,
18+
port: dbConfig.port,
19+
user: dbConfig.user,
20+
password: dbConfig.password,
21+
database: 'streaming',
22+
waitForConnections: true,
23+
connectionLimit: 10,
24+
maxIdle: 10, // max idle connections, the default value is the same as `connectionLimit`
25+
idleTimeout: 60000, // idle connections timeout, in milliseconds, the default value 60000
26+
queueLimit: 0,
27+
enableKeepAlive: true,
28+
keepAliveInitialDelay: 0
29+
});
30+
}
31+
32+
public static getInstance(): Database {
33+
34+
if (!Database.instance) {
35+
Database.instance = new Database();
36+
}
37+
38+
return Database.instance;
39+
}
40+
41+
public async select<T>(sql: string, parameters?: unknown[]): Promise<T[]> {
42+
const [rows] = await this.pool.promise().query(sql, parameters);
43+
return rows as T[];
44+
}
45+
46+
// contains affectedRows and lastInsertId properties.
47+
public async update() {
48+
// TODO
49+
}
50+
51+
async ping(): Promise<boolean> {
52+
await this.pool.promise().query("SELECT 1");
53+
return true;
54+
}
55+
56+
async checkIfChannelNameAvailable(name: string): Promise<boolean> {
57+
58+
const [rows, fields] = await this.pool.promise()
59+
.execute<RowDataPacket[]>("SELECT COUNT(1) AS cnt FROM channels WHERE name = ?", [name]);
60+
61+
return rows.length && rows[0]["cnt"] == 0;
62+
}
63+
64+
async createChannel(name: string): Promise<boolean> {
65+
66+
if (!await this.checkIfChannelNameAvailable(name)) {
67+
throw "Channel with name '" + name + "' is not available";
68+
}
69+
70+
const sk = Util.generateStreamKey();
71+
72+
const result = await this.pool.promise()
73+
.execute<ResultSetHeader>("INSERT INTO channels (created_at, name, stream_key) VALUES (UTC_TIMESTAMP(), ?, ?)", [
74+
name, sk
75+
]);
76+
77+
return true;
78+
}
79+
80+
async createNewStream(name: string, ipAddress: string, streamInfo: string): Promise<void> {
81+
await this.pool.promise().execute("INSERT INTO streams (name, user_ip, started_at, ffprobe_json) VALUES (?, ?, UTC_TIMESTAMP(), ?)", [
82+
name, ipAddress, streamInfo
83+
]);
84+
}
85+
86+
async updateStreamInfo(name: string, bytesIn: number): Promise<void> {
87+
88+
await this.pool.promise().execute("UPDATE streams SET ended_at = UTC_TIMESTAMP(), bytes_in = ? WHERE name = ?", [
89+
bytesIn,
90+
name
91+
]);
92+
}
93+
}

api_server/src/classes/StreamInfo.ts

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import {ffprobe} from 'fluent-ffmpeg';
2+
import * as Ffmpeg from "fluent-ffmpeg";
3+
4+
export class StreamInfo {
5+
6+
static probeAsync(inputStream: string, timeout: number = 5000): Promise<Ffmpeg.FfprobeData> {
7+
8+
return new Promise((resolve, reject) => {
9+
10+
setTimeout(() => {
11+
reject("FFProbe timed out")
12+
}, timeout);
13+
14+
ffprobe(inputStream, function (error, data) {
15+
16+
if (error) {
17+
reject(error);
18+
}
19+
20+
resolve(data);
21+
22+
});
23+
24+
});
25+
}
26+
}

api_server/src/config.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const urlFromEnv: URL | null = (() => {
1212
})();
1313

1414
const connectionOptions: ConnectionOptions = {
15-
host: urlFromEnv?.hostname || 'localhost',
15+
host: urlFromEnv?.hostname || 'db',
1616
port: +(urlFromEnv?.port || 3306),
1717
user: urlFromEnv?.username || 'root',
1818
password: urlFromEnv?.password || 'password',

api_server/src/controllers/IndexController.ts

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {BaseController} from "./BaseController";
22
import {Request, Response, Router} from "express";
3+
import {Database} from "../Database";
34

45
export class IndexController extends BaseController {
56

@@ -14,5 +15,24 @@ export class IndexController extends BaseController {
1415
message: 'Hello world from nginx-rtmp-server - API Server'
1516
});
1617
});
18+
19+
router.get('/health', async (req, res) => {
20+
21+
try {
22+
23+
await Database.getInstance().ping();
24+
25+
res.json({
26+
status: 'OK'
27+
});
28+
29+
} catch (e) {
30+
31+
res.json({
32+
error: e
33+
});
34+
}
35+
36+
})
1737
}
1838
}

api_server/src/controllers/PublishController.ts

+18-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import {Request, Response, Router} from "express";
33
import {OnPublishPayload} from "../classes/OnPublishPayload";
44
import {Hooks} from "../hooks";
55
import {Logger} from "../classes/Logger";
6+
import {Util} from "../Util";
7+
import {StreamInfo} from "../classes/StreamInfo";
8+
import {Database} from "../Database";
69

710
export class PublishController extends BaseController {
811

@@ -22,10 +25,24 @@ export class PublishController extends BaseController {
2225
let result = await Hooks.onPublish(params);
2326

2427
if (result) {
25-
return res.status(201).send('OK');
28+
res.status(201).send('OK');
29+
30+
res.on('finish', () => {
31+
32+
StreamInfo.probeAsync(Util.rtmpStreamUrl(params.name))
33+
.then((streamInfo) => {
34+
35+
Database.getInstance().createNewStream(params.name, params.addr, JSON.stringify(streamInfo));
36+
37+
});
38+
39+
});
40+
41+
return;
2642
}
2743

2844
} catch (ex) {
45+
console.error(ex);
2946
// report!
3047
}
3148

api_server/src/controllers/PublishDoneController.ts

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {Request, Response, Router} from "express";
33
import {OnPublishDonePayload} from "../classes/OnPublishDonePayload";
44
import {Hooks} from "../hooks";
55
import {Logger} from "../classes/Logger";
6+
import {Database} from "../Database";
67

78
export class PublishDoneController extends BaseController {
89

@@ -21,6 +22,10 @@ export class PublishDoneController extends BaseController {
2122

2223
}
2324

25+
res.on('finish', () => {
26+
Database.getInstance().updateStreamInfo(payload.name, payload.bytes_in);
27+
})
28+
2429
res.status(200).send('OK');
2530
});
2631
}

api_server/src/hooks.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@ export class Hooks {
1919
map.set(name, 1);
2020

2121
const transcoder = new Transcoder(name);
22+
const worker = new ScreenshotWorker(name);
2223

2324
transcoder.start()
2425
.then(function () {
2526
Logger.log('Transcoder finished. Exited gracefully.')
2627
})
2728
.catch(function (err) {
2829
Logger.error('Transcoder stopped with error: ' + err);
30+
})
31+
.finally(() => {
32+
worker.stop();
2933
});
3034

31-
let worker = new ScreenshotWorker(name);
3235
worker.start();
3336

3437
screenWorkers.set(name, worker);

api_server/src/types.ts

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import {RowDataPacket} from "mysql2";
2+
3+
export interface IChannel extends RowDataPacket {
4+
id: number;
5+
created_at: Date;
6+
name: string;
7+
stream_key: string;
8+
}

docker-compose.yml

+23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
version: "3.9"
22
services:
33

4+
database:
5+
container_name: db
6+
image: mysql
7+
command: --default-authentication-plugin=mysql_native_password
8+
restart: always
9+
environment:
10+
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD:-password}
11+
volumes:
12+
- ./docker/mysql/data:/var/lib/mysql
13+
- ./docker/mysql/init:/docker-entrypoint-initdb.d
14+
ports:
15+
- "${MYSQL_PORT_EXPOSE:-127.0.0.1:3306}:3306"
16+
417
rtmp:
518
container_name: 'rtmp'
619
build: './rtmp'
@@ -20,7 +33,17 @@ services:
2033
- ./public/storage:/var/storage
2134
environment:
2235
- NODE_ENV=production
36+
# - MYSQL_CONNECTION=mysql://root:password@db:3306
2337
- RTMP_SERVER=rtmp
2438

39+
adminer:
40+
image: adminer
41+
restart: always
42+
environment:
43+
- ADMINER_DEFAULT_SERVER=db
44+
- ADMINER_DESIGN='nette'
45+
ports:
46+
- "127.0.0.1:9000:8080"
47+
2548
volumes:
2649
storage: { }

docker/mysql/init/01-schema.sql

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
-- Adminer 4.8.1 MySQL 8.1.0 dump
2+
3+
SET NAMES utf8;
4+
SET time_zone = '+00:00';
5+
SET foreign_key_checks = 0;
6+
SET sql_mode = 'NO_AUTO_VALUE_ON_ZERO';
7+
8+
SET NAMES utf8mb4;
9+
10+
CREATE DATABASE `streaming` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
11+
USE `streaming`;
12+
13+
DROP TABLE IF EXISTS `channels`;
14+
CREATE TABLE `channels` (
15+
`id` int unsigned NOT NULL AUTO_INCREMENT,
16+
`created_at` timestamp NULL DEFAULT NULL,
17+
`name` varchar(255) NOT NULL,
18+
`stream_key` char(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
19+
`is_live` tinyint NOT NULL DEFAULT '0',
20+
`title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
21+
`description` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
22+
PRIMARY KEY (`id`),
23+
KEY `name` (`name`),
24+
KEY `stream_key` (`stream_key`)
25+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
26+
27+
28+
DROP TABLE IF EXISTS `streams`;
29+
CREATE TABLE `streams` (
30+
`id` int NOT NULL AUTO_INCREMENT,
31+
`name` varchar(255) NOT NULL,
32+
`user_ip` varchar(22) NOT NULL,
33+
`started_at` timestamp NOT NULL,
34+
`ended_at` timestamp NULL DEFAULT NULL,
35+
`bytes_in` int NOT NULL DEFAULT '0',
36+
`ffprobe_json` text,
37+
PRIMARY KEY (`id`),
38+
KEY `name` (`name`)
39+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
40+
41+
42+
-- 2023-09-24 23:48:02

0 commit comments

Comments
 (0)