diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt index 74c92e0a7e..23e2735253 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt @@ -1,21 +1,31 @@ package org.jitsi.videobridge.export +import org.eclipse.jetty.websocket.api.WebSocketAdapter +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest +import org.eclipse.jetty.websocket.client.WebSocketClient import org.jitsi.nlj.PacketInfo import org.jitsi.nlj.rtp.AudioRtpPacket import org.jitsi.utils.logging2.createLogger import org.jitsi.videobridge.PotentialPacketHandler import org.jitsi.videobridge.colibri2.FeatureNotImplementedException import org.jitsi.videobridge.exporter.MediaJsonEncoder -import org.jitsi.videobridge.recorder.MediaJsonRecorder import org.jitsi.videobridge.util.ByteBufferPool +import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig import org.jitsi.xmpp.extensions.colibri2.Export class Exporter : PotentialPacketHandler { val logger = createLogger() var started = false - private val encoder = MediaJsonEncoder { recorder.handleEvent(it) } - private val recorder = MediaJsonRecorder() + private val encoder = MediaJsonEncoder { + if (recorderWebSocket.isConnected) { + recorderWebSocket.remote?.sendString(it.toXml()) ?: + logger.info("Websocket is connected, but remote is null?") + } else { + logger.info("Can not send packet, websocket is not connected.") + } + } + private var recorderWebSocket = WebSocketAdapter() fun setExports(exports: List) { when { @@ -40,10 +50,19 @@ class Exporter : PotentialPacketHandler { fun stop() { started = false logger.info("Stopping.") + recorderWebSocket.session?.close(org.eclipse.jetty.websocket.core.CloseStatus.SHUTDOWN, "closing") } fun start(export: Export) { logger.info("Starting with url=${export.url}") + webSocketClient.connect(recorderWebSocket, export.url, ClientUpgradeRequest()) started = true } + + companion object { + val webSocketClient = WebSocketClient().apply { + idleTimeout = WebsocketServiceConfig.config.idleTimeout + start() + } + } } \ No newline at end of file diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/exporter/MediaJsonEncoder.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/exporter/MediaJsonEncoder.kt index 24ac542226..3c88ce8301 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/exporter/MediaJsonEncoder.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/exporter/MediaJsonEncoder.kt @@ -1,6 +1,5 @@ package org.jitsi.videobridge.exporter -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.jitsi.mediajson.Event import org.jitsi.mediajson.Media import org.jitsi.mediajson.MediaEvent @@ -30,7 +29,6 @@ class MediaJsonEncoder( private val ssrcsStarted = mutableSetOf() var seq = 0 - val om = jacksonObjectMapper() fun encode(p: AudioRtpPacket, epId: String) = synchronized(ssrcsStarted) { if (ssrcsStarted.none { it.ssrc == p.ssrc } ) { @@ -38,7 +36,7 @@ class MediaJsonEncoder( val state = SsrcState(p.ssrc, p.timestamp, offset) ssrcsStarted.add(state) val e = StartEvent( - (++seq).toString(), + ++seq, Start( "$epId-${p.ssrc}", MediaFormat( @@ -61,11 +59,11 @@ class MediaJsonEncoder( val elapsedRtpTime = this.timestamp - ssrcState.initialRtpTs val ts = elapsedRtpTime + ssrcState.offset val p = MediaEvent( - seq.toString(), + seq, media = Media( "$epId-${this.ssrc}", - this.sequenceNumber.toString(), - ts.toString(), + this.sequenceNumber, + ts, Base64.encode(this.buffer, this.payloadOffset, this.payloadOffset + this.payloadLength) ) ) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MediaJsonRecorder.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MediaJsonRecorder.kt deleted file mode 100644 index 89067af96d..0000000000 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MediaJsonRecorder.kt +++ /dev/null @@ -1,33 +0,0 @@ -package org.jitsi.videobridge.recorder - -import org.bouncycastle.util.encoders.Base64 -import org.jitsi.mediajson.Event -import org.jitsi.mediajson.MediaEvent -import org.jitsi.mediajson.StartEvent -import org.jitsi.utils.logging2.createLogger - -class MediaJsonRecorder { - val mkaRecorder = MkaRecorder() - val logger = createLogger() - - fun handleEvent(event: Event) { - when(event) { - is StartEvent -> { - logger.info("Start new stream: $event") - mkaRecorder.startTrack(event.start.tag) - } - is MediaEvent -> { - mkaRecorder.addFrame( - event.media.tag, - event.media.timestamp.toLong(), - Base64.decode(event.media.payload) - ) - } - } - } - - fun stop() { - logger.info("Stopping.") - mkaRecorder.close() - } -} \ No newline at end of file diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MkaRecorder.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MkaRecorder.kt deleted file mode 100644 index c20af4cc8f..0000000000 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/recorder/MkaRecorder.kt +++ /dev/null @@ -1,112 +0,0 @@ -package org.jitsi.videobridge.recorder - -//import org.ebml.EBMLReader -//import org.ebml.Element -//import org.ebml.MasterElement -//import org.ebml.io.DataSource -//import org.ebml.io.FileDataSource -import org.ebml.io.FileDataWriter -import org.ebml.matroska.MatroskaFileFrame -import org.ebml.matroska.MatroskaFileTrack -import org.ebml.matroska.MatroskaFileTrack.TrackType -import org.ebml.matroska.MatroskaFileWriter -//import org.jitsi.rtp.extensions.get3Bytes -import org.jitsi.utils.logging2.createLogger -import java.io.File -import java.nio.ByteBuffer - -class MkaRecorder { - private val logger = createLogger() - private val destination: File = File.createTempFile("test", ".mkv").apply { - logger.warn("Writing to $this") - } - - private val ioDW = FileDataWriter(destination.path) - private val writer: MatroskaFileWriter = MatroskaFileWriter(ioDW) - private val tracks = mutableMapOf() - - private var f = 0 - fun startTrack(name: String) { - val track = MatroskaFileTrack().apply { - trackNo = tracks.size + 1 - trackType = TrackType.AUDIO - codecID = "A_OPUS" - defaultDuration = 20000000 - audio = MatroskaFileTrack.MatroskaAudioTrack().apply { - channels = 2 - samplingFrequency = 48000F - } - } - tracks[name] = track - writer.addTrack(track) - } - - fun addFrame(trackName: String, timecode: Long, payload: ByteArray) { - val track = tracks[trackName] ?: throw Exception("Track not started") - val frame = MatroskaFileFrame() - frame.data = ByteBuffer.wrap(payload) - frame.trackNo = track.trackNo - //frame.timecode = timecode / 48 - logger.warn("Add to $trackName timecode=${timecode/48}") - writer.addFrame(frame) - } - - fun close() { - writer.close() - ioDW.close() - //testDocTraversal() - //destination.delete() - } -// -// fun testDocTraversal() { -// val ioDS = FileDataSource(destination.path) -// val reader = EBMLReader(ioDS) -// var level0 = reader.readNextElement() -// while (level0 != null) { -// traverseElement(level0, ioDS, reader, 0) -// level0.skipData(ioDS) -// level0 = reader.readNextElement() -// } -// } -// -// var x = 0 -// private fun traverseElement(levelN: Element?, ioDS: DataSource, reader: EBMLReader, level: Int) { -// if (levelN == null) { -// return -// } -// -// logger.info("Found element: ${".".repeat(level*2)} ${levelN.elementType.name}") -//// if (levelN.elementType.name == "TimecodeScale") { -//// levelN.readData(ioDS) -//// println("oops: "+ levelN.data.get3Bytes()) -//// return -//// } -// if (levelN.elementType.name == "Timecode") { -// if (x == 0) { x++} else { -// levelN.readData(ioDS) -// if (levelN.data.capacity() == 1) -// println("oops: " + levelN.data.get().toInt()) -// else if (levelN.data.capacity() == 2) -// println("oops: " + levelN.data.getShort().toInt()) -// else if (levelN.data.capacity() == 3) -// println("oops: " + levelN.data.get3Bytes().toInt()) -// else if (levelN.data.capacity() == 4) -// println("oops: " + levelN.data.getInt()) -// return -// } -// } -// -// val elemLevel = levelN.elementType.level -// if (elemLevel != -1) { -// check(level.toLong() == elemLevel.toLong()) -// } -// if (levelN is MasterElement) { -// var levelNPlusOne = levelN.readNextChild(reader) -// while (levelNPlusOne != null) { -// traverseElement(levelNPlusOne, ioDS, reader, level + 1) -// levelNPlusOne.skipData(ioDS) -// levelNPlusOne = levelN.readNextChild(reader) -// } -// } -// } -} \ No newline at end of file diff --git a/jvb/src/test/kotlin/org/jitsi/videobridge/recorder/RecorderTest.kt b/jvb/src/test/kotlin/org/jitsi/videobridge/recorder/RecorderTest.kt deleted file mode 100644 index ea9e7035b2..0000000000 --- a/jvb/src/test/kotlin/org/jitsi/videobridge/recorder/RecorderTest.kt +++ /dev/null @@ -1,38 +0,0 @@ -package org.jitsi.videobridge.recorder - -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import io.kotest.assertions.fail -import io.kotest.core.spec.style.ShouldSpec -import org.bouncycastle.util.encoders.Base64 -import org.jitsi.mediajson.Event -import org.jitsi.mediajson.MediaEvent -import org.jitsi.mediajson.StartEvent -import org.jitsi.utils.logging2.createLogger - -class RecorderTest : ShouldSpec(){ - val logger = createLogger() - init { - val input = javaClass.getResource("/opus-sample3.json")?.readText()?.lines()?.dropLast(1) ?: fail("Can not read opus-sample.json") - val objectMapper = jacksonObjectMapper() - val inputJson: List = input.map { objectMapper.readValue(it, Event::class.java) } - logger.info("Parsed ${inputJson.size} events") - - context("Recording") { - logger.warn("Running") - val recorder = MkaRecorder() - - - inputJson.forEach { - if (it is StartEvent) { - logger.info("Start new stream: $it") - recorder.startTrack(it.start.tag) - } else if (it is MediaEvent) { - //logger.info("Adding frame to ${it.media.tag}") - recorder.addFrame(it.media.tag, it.media.timestamp.toLong(), Base64.decode(it.media.payload)) - } - } - - recorder.close() - } - } -} \ No newline at end of file