From 49cc46e69df6497aed27bdf5ec4e4a85916daa69 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Tue, 27 Feb 2024 18:05:14 -0500 Subject: [PATCH] Monitor visitors' codec lists, and aggregate them into a conference property. (#1137) --- .../jitsi/jicofo/xmpp/muc/ChatRoomMember.kt | 3 + .../jicofo/xmpp/muc/ChatRoomMemberImpl.kt | 46 +++++- .../conference/JitsiMeetConferenceImpl.java | 32 +++- .../jitsi/jicofo/util/PreferenceAggregator.kt | 150 ++++++++++++++++++ .../kotlin/org/jitsi/jicofo/xmpp/Smack.kt | 6 + .../jitsi/jicofo/jibri/JibriDetectorTest.kt | 1 + .../jicofo/util/PreferenceAggregatorTest.kt | 109 +++++++++++++ pom.xml | 2 +- 8 files changed, 340 insertions(+), 9 deletions(-) create mode 100644 jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt create mode 100644 jicofo/src/test/kotlin/org/jitsi/jicofo/util/PreferenceAggregatorTest.kt diff --git a/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMember.kt b/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMember.kt index 36b5ec3f38..0dd156b2a5 100644 --- a/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMember.kt +++ b/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMember.kt @@ -66,6 +66,9 @@ interface ChatRoomMember { /** The statistics id if any. */ val statsId: String? + /** The supported video codecs if any */ + val videoCodecs: List? + /** * The list of features advertised as XMPP capabilities. Note that although the features are cached (XEP-0115), * the first time [features] is accessed it may block waiting for a disco#info response! diff --git a/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMemberImpl.kt b/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMemberImpl.kt index f998e8d5f8..b4c54c8427 100644 --- a/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMemberImpl.kt +++ b/jicofo-common/src/main/kotlin/org/jitsi/jicofo/xmpp/muc/ChatRoomMemberImpl.kt @@ -27,6 +27,7 @@ import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.createChildLogger import org.jitsi.xmpp.extensions.jitsimeet.AudioMutedExtension import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension +import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension import org.jitsi.xmpp.extensions.jitsimeet.StartMutedPacketExtension import org.jitsi.xmpp.extensions.jitsimeet.StatsId @@ -36,6 +37,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.VideoMutedExtension import org.jivesoftware.smack.packet.Presence import org.jivesoftware.smack.packet.StandardExtensionElement import org.jivesoftware.smackx.caps.packet.CapsExtension +import org.json.simple.JSONArray import org.jxmpp.jid.EntityFullJid import org.jxmpp.jid.Jid @@ -68,6 +70,8 @@ class ChatRoomMemberImpl( private set override var statsId: String? = null private set + override var videoCodecs: List? = null + private set override var isAudioMuted = true private set override var isVideoMuted = true @@ -150,6 +154,7 @@ class ChatRoomMemberImpl( fun processPresence(presence: Presence) { require(presence.from == occupantJid) { "Ignoring presence for a different member: ${presence.from}" } + val firstPresence = (this.presence == null) this.presence = presence presence.getExtension(UserInfoPacketExt::class.java)?.let { val newStatus = it.isRobot @@ -185,12 +190,14 @@ class ChatRoomMemberImpl( isJibri = false } - val oldRole = role - chatRoom.getOccupant(this)?.let { role = fromSmack(it.role, it.affiliation) } - if ((role == MemberRole.VISITOR) != (oldRole == MemberRole.VISITOR)) { + var newRole: MemberRole = MemberRole.VISITOR + chatRoom.getOccupant(this)?.let { newRole = fromSmack(it.role, it.affiliation) } + if (!firstPresence && (role == MemberRole.VISITOR) != (newRole == MemberRole.VISITOR)) { // This will mess up various member counts // TODO: Should we try to update them, instead? - logger.warn("Member role changed from $oldRole to $role - not supported!") + logger.warn("Member role changed from $role to $newRole - not supported!") + } else { + role = newRole } isTranscriber = isJigasi && presence.getExtension(TranscriptionStatusExtension::class.java) != null @@ -209,6 +216,36 @@ class ChatRoomMemberImpl( presence.getExtension(StatsId::class.java)?.let { statsId = it.statsId } + + presence.getExtension(JitsiParticipantCodecList::class.java)?.let { + if (!firstPresence && it.codecs != videoCodecs) { + logger.warn("Video codec list changed from $videoCodecs to ${it.codecs} - not supported!") + } else { + if (!it.codecs.contains("vp8")) { + if (firstPresence) { + logger.warn("Video codec list {${it.codecs}} does not contain vp8! Adding manually.") + } + videoCodecs = it.codecs + "vp8" + } else { + videoCodecs = it.codecs + } + } + } ?: // Older clients sent a single codec in codecType rather than all supported ones in codecList + presence.getExtensionElement("jitsi_participant_codecType", "jabber:client")?.let { + if (it is StandardExtensionElement) { + val codec = it.text.lowercase() + val codecList = if (codec == "vp8") { + listOf(codec) + } else { + listOf(codec, "vp8") + } + if (!firstPresence && codecList != videoCodecs) { + logger.warn("Video codec list changed from $videoCodecs to $codecList - not supported!") + } else { + videoCodecs = codecList + } + } + } } /** @@ -240,6 +277,7 @@ class ChatRoomMemberImpl( this["is_jibri"] = isJibri this["is_jigasi"] = isJigasi this["role"] = role.toString() + this["video_codecs"] = JSONArray().apply { videoCodecs?.let { addAll(it) } } this["stats_id"] = statsId.toString() this["is_audio_muted"] = isAudioMuted this["is_video_muted"] = isVideoMuted diff --git a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java index 3e6c200057..5f369d5290 100644 --- a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java +++ b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java @@ -133,6 +133,11 @@ public class JitsiMeetConferenceImpl return null; }); + /** + * The aggregated count of visitors' supported codecs + */ + private final PreferenceAggregator visitorCodecs; + /** * The {@link JibriRecorder} instance used to provide live streaming through * Jibri. @@ -290,6 +295,16 @@ public JitsiMeetConferenceImpl( TimeUnit.MILLISECONDS); + visitorCodecs = new PreferenceAggregator( + logger, + (codecs) -> { + setConferenceProperty( + ConferenceProperties.KEY_VISITOR_CODECS, + String.join(",", codecs) + ); + return null; + }); + logger.info("Created new conference."); } @@ -816,7 +831,7 @@ private void inviteChatMember(ChatRoomMember chatRoomMember, boolean justJoined) } else if (participant.getChatMember().getRole() == MemberRole.VISITOR) { - visitorAdded(); + visitorAdded(participant.getChatMember().getVideoCodecs()); } } @@ -1042,7 +1057,7 @@ private void terminateParticipant( } else if (removed.getChatMember().getRole() == MemberRole.VISITOR) { - visitorRemoved(); + visitorRemoved(removed.getChatMember().getVideoCodecs()); } } } @@ -1579,6 +1594,7 @@ else if (member.isJigasi()) } } o.put("visitor_count", visitorCount); + o.put("visitor_codecs", visitorCodecs.debugState()); o.put("participant_count", participantCount); o.put("jibri_count", jibriCount); o.put("jigasi_count", jigasiCount); @@ -2013,15 +2029,23 @@ private void rescheduleSingleParticipantTimeout() } /** Called when a new visitor has been added to the conference. */ - private void visitorAdded() + private void visitorAdded(List codecs) { visitorCount.adjustValue(+1); + if (codecs != null) + { + visitorCodecs.addPreference(codecs); + } } /** Called when a new visitor has been added to the conference. */ - private void visitorRemoved() + private void visitorRemoved(List codecs) { visitorCount.adjustValue(-1); + if (codecs != null) + { + visitorCodecs.removePreference(codecs); + } } /** diff --git a/jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt b/jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt new file mode 100644 index 0000000000..235c0f2b5d --- /dev/null +++ b/jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt @@ -0,0 +1,150 @@ +/* + * Jicofo, the Jitsi Conference Focus. + * + * Copyright @ 2015-Present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.jicofo.util + +import org.jitsi.utils.OrderedJsonObject +import org.jitsi.utils.logging2.Logger +import org.jitsi.utils.logging2.createChildLogger +import org.json.simple.JSONArray + +/** Aggregate lists of preferences coming from a large group of people, such that the resulting aggregated + * list consists of preference items supported by everyone, and in a rough consensus of preference order. + * + * The intended use case is maintaining the list of supported codecs for conference visitors. + * + * Preference orders are aggregated using the Borda count; this isn't theoretically optimal, but it should be + * good enough, and it's computationally cheap. + */ +class PreferenceAggregator( + parentLogger: Logger, + private val onChanged: (List) -> Unit +) { + private val logger = createChildLogger(parentLogger) + private val lock = Any() + + var aggregate: List = emptyList() + private set + + var count = 0 + private set + + private val values = mutableMapOf() + + /** + * Add a preference to the aggregator. + */ + fun addPreference(prefs: List) { + val distinctPrefs = prefs.distinct() + if (distinctPrefs != prefs) { + logger.warn("Preferences $prefs contains repeated values") + } + synchronized(lock) { + count++ + distinctPrefs.forEachIndexed { index, element -> + val info = values.computeIfAbsent(element) { ValueInfo() } + info.count++ + info.rankAggregate += index + } + updateAggregate() + } + } + + /** + * Remove a preference from the aggregator. + */ + fun removePreference(prefs: List) { + val distinctPrefs = prefs.distinct() + if (distinctPrefs != prefs) { + logger.warn("Preferences $prefs contains repeated values") + } + synchronized(lock) { + count-- + check(count >= 0) { + "Preference count $count should not be negative" + } + distinctPrefs.forEachIndexed { index, element -> + val info = values[element] + check(info != null) { + "Preference info for $element should exist when preferences are being removed" + } + info.count-- + check(info.count >= 0) { + "Preference count for $element ${info.count} should not be negative" + } + info.rankAggregate -= index + check(info.rankAggregate >= 0) { + "Preference rank aggregate for $element ${info.rankAggregate} should not be negative" + } + if (info.count == 0) { + check(info.rankAggregate == 0) { + "Preference rank aggregate for $element ${info.rankAggregate} should be zero " + + "when preference count is 0" + } + values.remove(element) + } + } + updateAggregate() + } + } + + fun reset() { + synchronized(lock) { + aggregate = emptyList() + count = 0 + values.clear() + } + } + + fun debugState() = OrderedJsonObject().apply { + synchronized(lock) { + put("count", count) + put( + "ranks", + OrderedJsonObject().apply { + this@PreferenceAggregator.values.asSequence() + .sortedBy { it.value.rankAggregate } + .forEach { put(it.key, it.value.debugState()) } + } + ) + put("aggregate", JSONArray().apply { addAll(aggregate) }) + } + } + + private fun updateAggregate() { + val newAggregate = values.asSequence() + .filter { it.value.count == count } + .sortedBy { it.value.rankAggregate } + .map { it.key } + .toList() + if (aggregate != newAggregate) { + aggregate = newAggregate + /* ?? Do we need to drop the lock before calling this? */ + onChanged(aggregate) + } + } + + private class ValueInfo { + var count = 0 + var rankAggregate = 0 + + fun debugState() = OrderedJsonObject().apply { + put("count", count) + put("rank_aggregate", rankAggregate) + } + } +} diff --git a/jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt b/jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt index b04a88fde9..fe7b36e8d9 100644 --- a/jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt +++ b/jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt @@ -36,6 +36,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.ConferenceIqProvider import org.jitsi.xmpp.extensions.jitsimeet.FeatureExtension import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension import org.jitsi.xmpp.extensions.jitsimeet.IceStatePacketExtension +import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension import org.jitsi.xmpp.extensions.jitsimeet.JsonMessageExtension import org.jitsi.xmpp.extensions.jitsimeet.LoginUrlIqProvider @@ -119,6 +120,11 @@ fun registerXmppExtensions() { StatsId.NAMESPACE, StatsId.Provider() ) + ProviderManager.addExtensionProvider( + JitsiParticipantCodecList.ELEMENT, + JitsiParticipantCodecList.NAMESPACE, + DefaultPacketExtensionProvider(JitsiParticipantCodecList::class.java) + ) // Add the extensions used for handling the inviting of transcriber ProviderManager.addExtensionProvider( diff --git a/jicofo/src/test/kotlin/org/jitsi/jicofo/jibri/JibriDetectorTest.kt b/jicofo/src/test/kotlin/org/jitsi/jicofo/jibri/JibriDetectorTest.kt index 8758a5fc97..820e4f0550 100644 --- a/jicofo/src/test/kotlin/org/jitsi/jicofo/jibri/JibriDetectorTest.kt +++ b/jicofo/src/test/kotlin/org/jitsi/jicofo/jibri/JibriDetectorTest.kt @@ -144,6 +144,7 @@ class JibriChatRoomMember( override val isVideoMuted: Boolean get() = TODO("Not yet implemented") override val region: String? get() = TODO("Not yet implemented") override val statsId: String? get() = TODO("Not yet implemented") + override val videoCodecs: List? get() = TODO("Not yet implemented") override val features: Set get() = TODO("Not yet implemented") override val debugState: OrderedJsonObject get() = TODO("Not yet implemented") diff --git a/jicofo/src/test/kotlin/org/jitsi/jicofo/util/PreferenceAggregatorTest.kt b/jicofo/src/test/kotlin/org/jitsi/jicofo/util/PreferenceAggregatorTest.kt new file mode 100644 index 0000000000..acce3bae7d --- /dev/null +++ b/jicofo/src/test/kotlin/org/jitsi/jicofo/util/PreferenceAggregatorTest.kt @@ -0,0 +1,109 @@ +package org.jitsi.jicofo.util + +import io.kotest.core.spec.IsolationMode +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldContainExactly +import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder +import io.kotest.matchers.shouldBe +import org.jitsi.utils.logging2.createLogger + +class PreferenceAggregatorTest : ShouldSpec() { + override fun isolationMode() = IsolationMode.InstancePerLeaf + + private val logger = createLogger() + + private val calledWith = mutableListOf>() + private val aggregator = PreferenceAggregator(logger) { + calledWith.add(it) + } + + init { + context("An aggregator with no values added") { + should("Not call its callback") { + calledWith shouldBe emptyList() + } + } + context("An aggregator called once") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + should("Call the callback exactly once with that set of values") { + calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264")) + } + } + context("An aggregator called twice with the same values") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + should("Call the callback exactly once") { + calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264")) + } + } + context("An aggregator with all preferences removed") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.removePreference(listOf("vp9", "vp8", "h264")) + should("Have its final output be the empty set") { + calledWith.last() shouldBe emptyList() + } + } + context("Aggregating preferences with disparate values (subset)") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp8", "h264")) + should("Output the minimal agreed set") { + calledWith.last().shouldContainExactly("vp8", "h264") + } + } + context("Aggregating preferences with disparate values (non-subset)") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp8", "h264")) + aggregator.addPreference(listOf("vp9", "vp8")) + should("Output the minimal agreed set") { + calledWith.last().shouldContainExactly("vp8") + } + } + context("Aggregating a new superset") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("av1", "vp9", "vp8", "h264")) + should("Not call the callback a second time") { + calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264")) + } + } + context("Removing the only preference that does not support a value") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp8", "h264")) + aggregator.addPreference(listOf("vp9", "vp8")) + + aggregator.removePreference(listOf("vp8", "h264")) + should("Return that value to the set of preferences") { + calledWith.last().shouldContainExactly(listOf("vp9", "vp8")) + } + } + context("Preferences that express different orders") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp9", "h264", "vp8")) + + should("Reflect the majority preference") { + calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264")) + } + } + context("Ties in preference order") { + aggregator.addPreference(listOf("vp9", "vp8", "h264")) + aggregator.addPreference(listOf("vp9", "h264", "vp8")) + + should("Result in the correct set, in some order, with consensus where it exists") { + calledWith.last().shouldContainExactlyInAnyOrder("h264", "vp9", "vp8") + calledWith.last().first() shouldBe "vp9" + } + } + context("Repeated values in preferences") { + aggregator.addPreference(listOf("vp9", "vp8")) + aggregator.addPreference(listOf("vp9", "vp8", "vp9")) + should("not confuse things") { + calledWith shouldContainExactly listOf(listOf("vp9", "vp8")) + } + aggregator.removePreference(listOf("vp9", "vp8", "vp9")) + aggregator.removePreference(listOf("vp9", "vp8")) + should("not confuse things on removal") { + calledWith.last().shouldContainExactly(emptyList()) + } + } + } +} diff --git a/pom.xml b/pom.xml index a48921b030..51141dfc81 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ ${project.groupId} jitsi-xmpp-extensions - 1.0-78-g62d03d4 + 1.0-79-gdc9285e org.slf4j