diff --git a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt index 64e151c6b0..638d793980 100644 --- a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt +++ b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/BridgeSelector.kt @@ -27,6 +27,7 @@ import org.jitsi.xmpp.extensions.colibri.ColibriStatsExtension import org.json.simple.JSONObject import org.jxmpp.jid.Jid import java.time.Clock +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors /** @@ -50,17 +51,16 @@ class BridgeSelector @JvmOverloads constructor( fun addHandler(eventHandler: EventHandler) = eventEmitter.addHandler(eventHandler) fun removeHandler(eventHandler: EventHandler) = eventEmitter.removeHandler(eventHandler) - /** - * The bridge selection strategy. - */ + /** The bridge selection strategy. */ private val bridgeSelectionStrategy = BridgeConfig.config.selectionStrategy.also { logger.info("Using ${it.javaClass.name}") } - /** - * The map of bridge JID to Bridge. - */ - private val bridges: MutableMap = mutableMapOf() + /** The map of bridge JID to Bridge. */ + private val bridges: MutableMap = ConcurrentHashMap() + + /** Get the [Bridge] with a specific JID or null */ + fun get(jid: Jid) = bridges[jid] init { JicofoMetricsContainer.instance.metricsUpdater.addUpdateTask { updateMetrics() } diff --git a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriSessionManager.kt b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriSessionManager.kt index 802d2ec3b0..b30588c2cb 100644 --- a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriSessionManager.kt +++ b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriSessionManager.kt @@ -18,6 +18,7 @@ package org.jitsi.jicofo.bridge.colibri import org.jitsi.jicofo.bridge.Bridge +import org.jitsi.jicofo.bridge.ConferenceBridgeProperties import org.jitsi.jicofo.conference.source.EndpointSourceSet import org.jitsi.utils.MediaType import org.jitsi.utils.OrderedJsonObject @@ -41,8 +42,12 @@ interface ColibriSessionManager { val bridgeCount: Int val bridgeRegions: Set + /** Get the list of participant IDs that are currently allocated on a specific [Bridge]. */ + fun getParticipants(bridge: Bridge): List + @Throws(ColibriAllocationFailedException::class, BridgeSelectionFailedException::class) fun allocate(participant: ParticipantAllocationParameters): ColibriAllocation + fun getBridges(): Map fun updateParticipant( participantId: String, diff --git a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriV2SessionManager.kt b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriV2SessionManager.kt index e8fb0ce0c1..c4df29f694 100644 --- a/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriV2SessionManager.kt +++ b/jicofo-selector/src/main/kotlin/org/jitsi/jicofo/bridge/colibri/ColibriV2SessionManager.kt @@ -82,7 +82,7 @@ class ColibriV2SessionManager( } /** - * The colibri2 sessions that are currently active, mapped by the [Bridge] that they use. + * The colibri2 sessions that are currently active, mapped by the relayId of the [Bridge] that they use. */ override val sessions = mutableMapOf() @@ -92,15 +92,18 @@ class ColibriV2SessionManager( */ private val participants = mutableMapOf() + override fun getParticipants(bridge: Bridge): List = synchronized(syncRoot) { + val session = sessions[bridge.relayId] ?: return emptyList() + return getSessionParticipants(session).map { it.id } + } + /** * Maintains the same set as [participants], but organized by their session. Needs to be kept in sync with * [participants] (see [add], [remove], [clear]). */ private val participantsBySession = mutableMapOf>() - /** - * Protects access to [sessions], [participants] and [participantsBySession]. - */ + /** Protects access to [sessions], [participants] and [participantsBySession]. */ private val syncRoot = Any() /** @@ -243,7 +246,7 @@ class ColibriV2SessionManager( } /** Get the bridge-to-bridge-properties map needed for bridge selection. */ - private fun getBridges(): Map = synchronized(syncRoot) { + override fun getBridges(): Map = synchronized(syncRoot) { return participantsBySession.entries .filter { it.key.bridge.isOperational } .associate { diff --git a/jicofo-selector/src/main/resources/reference.conf b/jicofo-selector/src/main/resources/reference.conf index 230c81e600..b470415bc8 100644 --- a/jicofo-selector/src/main/resources/reference.conf +++ b/jicofo-selector/src/main/resources/reference.conf @@ -338,6 +338,10 @@ jicofo { // Enable the conference-request endpoint enabled = true } + move-endpoints { + // Enable the move-endpoint API. + enabled = true + } } sctp { diff --git a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConference.java b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConference.java index 5a3b8844eb..9007f5bb03 100644 --- a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConference.java +++ b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConference.java @@ -18,6 +18,7 @@ package org.jitsi.jicofo.conference; import org.jetbrains.annotations.*; +import org.jitsi.jicofo.bridge.*; import org.jitsi.jicofo.jibri.*; import org.jitsi.jicofo.xmpp.*; import org.jitsi.jicofo.xmpp.muc.*; @@ -150,4 +151,20 @@ MuteResult handleMuteRequest( /** Get the stats for this conference that should be exported to rtcstats. */ @NotNull OrderedJsonObject getRtcstatsState(); + + /** Move (reinvite) an endpoint in this conference. Return true if the endpoint was moved. */ + boolean moveEndpoint(@NotNull String endpointId, Bridge bridge); + + /** + * Move (reinvite) a specific number of endpoints from the conference from a specific bridge. The implementation + * decides which endpoints to move. + * + * @param bridge the bridge from which to move endpoints. + * @param numEps the number of endpoints to move. + * @return the number of endpoints moved. + */ + int moveEndpoints(@NotNull Bridge bridge, int numEps); + + /** Get information about the bridges currently used by this conference. */ + Map getBridges(); } diff --git a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java index 5f369d5290..3b5139da8d 100644 --- a/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java +++ b/jicofo/src/main/java/org/jitsi/jicofo/conference/JitsiMeetConferenceImpl.java @@ -1686,6 +1686,56 @@ public long getVisitorCount() } } + public Map getBridges() + { + ColibriSessionManager colibriSessionManager = this.colibriSessionManager; + if (colibriSessionManager == null) + { + return Collections.emptyMap(); + } + return colibriSessionManager.getBridges(); + } + + @Override + public boolean moveEndpoint(@NotNull String endpointId, Bridge bridge) + { + if (bridge != null) + { + List bridgeParticipants = colibriSessionManager.getParticipants(bridge); + if (!bridgeParticipants.contains(endpointId)) + { + logger.warn("Endpoint " + endpointId + " is not connected to bridge " + bridge.getJid()); + return false; + } + } + ColibriSessionManager colibriSessionManager = this.colibriSessionManager; + if (colibriSessionManager == null) + { + return false; + } + + colibriSessionManager.removeParticipant(endpointId); + return reInviteParticipantsById(Collections.singletonList(endpointId)) == 1; + } + + @Override + public int moveEndpoints(@NotNull Bridge bridge, int numEps) + { + logger.info("Moving " + numEps + " endpoints from " + bridge.getJid()); + ColibriSessionManager colibriSessionManager = this.colibriSessionManager; + if (colibriSessionManager == null) + { + return 0; + } + List participantIds + = colibriSessionManager.getParticipants(bridge).stream().limit(numEps).collect(Collectors.toList()); + for (String participantId : participantIds) + { + colibriSessionManager.removeParticipant(participantId); + } + return reInviteParticipantsById(participantIds); + } + /** * Checks whether a request for a new endpoint to join this conference should be redirected to a visitor node. * @return the name of the visitor node if it should be redirected, and null otherwise. @@ -1887,33 +1937,41 @@ private void onBridgeUp(Jid bridgeJid) } } - private void reInviteParticipantsById(@NotNull List participantIdsToReinvite) + private int reInviteParticipantsById(@NotNull List participantIdsToReinvite) { - reInviteParticipantsById(participantIdsToReinvite, true); + return reInviteParticipantsById(participantIdsToReinvite, true); } - private void reInviteParticipantsById(@NotNull List participantIdsToReinvite, boolean updateParticipant) + private int reInviteParticipantsById(@NotNull List participantIdsToReinvite, boolean updateParticipant) { - if (!participantIdsToReinvite.isEmpty()) + int n = participantIdsToReinvite.size(); + if (n == 0) { - ConferenceMetrics.participantsMoved.addAndGet(participantIdsToReinvite.size()); - synchronized (participantLock) + return 0; + } + + List participantsToReinvite = new ArrayList<>(); + synchronized (participantLock) + { + for (Participant participant : participants.values()) { - List participantsToReinvite = new ArrayList<>(); - for (Participant participant : participants.values()) + if (participantsToReinvite.size() == n) { - if (participantIdsToReinvite.contains(participant.getEndpointId())) - { - participantsToReinvite.add(participant); - } + break; } - if (participantsToReinvite.size() != participantIdsToReinvite.size()) + if (participantIdsToReinvite.contains(participant.getEndpointId())) { - logger.error("Can not re-invite all participants, no Participant object for some of them."); + participantsToReinvite.add(participant); } - reInviteParticipants(participantsToReinvite, updateParticipant); } + if (participantsToReinvite.size() != participantIdsToReinvite.size()) + { + logger.error("Can not re-invite all participants, no Participant object for some of them."); + } + reInviteParticipants(participantsToReinvite, updateParticipant); } + ConferenceMetrics.participantsMoved.addAndGet(participantsToReinvite.size()); + return participantsToReinvite.size(); } /** diff --git a/jicofo/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt b/jicofo/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt index 1510b2530e..1721010fbc 100644 --- a/jicofo/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt +++ b/jicofo/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt @@ -39,6 +39,8 @@ import org.jitsi.jicofo.metrics.JicofoMetricsContainer import org.jitsi.jicofo.rest.Application import org.jitsi.jicofo.rest.ConferenceRequest import org.jitsi.jicofo.rest.RestConfig +import org.jitsi.jicofo.rest.move.MoveEndpoints +import org.jitsi.jicofo.rest.move.MoveEndpointsConfig import org.jitsi.jicofo.util.SynchronizedDelegate import org.jitsi.jicofo.version.CurrentVersionImpl import org.jitsi.jicofo.xmpp.XmppServices @@ -147,8 +149,7 @@ class JicofoServices { jettyServer = if (RestConfig.config.enabled) { logger.info("Starting HTTP server with config: ${RestConfig.config.httpServerConfig}.") val restApp = Application( - buildList - { + buildList { healthChecker?.let { add(org.jitsi.rest.Health(it)) } @@ -159,6 +160,9 @@ class JicofoServices { if (RestConfig.config.enablePrometheus) { add(Prometheus(JicofoMetricsContainer.instance)) } + if (MoveEndpointsConfig.enabled) { + add(MoveEndpoints(focusManager, bridgeSelector)) + } } ) createServer(RestConfig.config.httpServerConfig).also { diff --git a/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/ConferenceRequest.kt b/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/ConferenceRequest.kt index f3bf497560..6f0f497052 100644 --- a/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/ConferenceRequest.kt +++ b/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/ConferenceRequest.kt @@ -82,6 +82,6 @@ class ConferenceRequest( * in the constructor below) is necessary. This class exists in order to expose * that behavior in a more concise way */ -private class BadRequestExceptionWithMessage(message: String?) : BadRequestException( +class BadRequestExceptionWithMessage(message: String?) : BadRequestException( Response.status(HttpServletResponse.SC_BAD_REQUEST, message).build() ) diff --git a/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/move/MoveEndpoints.kt b/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/move/MoveEndpoints.kt new file mode 100644 index 0000000000..8842a995ed --- /dev/null +++ b/jicofo/src/main/kotlin/org/jitsi/jicofo/rest/move/MoveEndpoints.kt @@ -0,0 +1,248 @@ +/* + * Jicofo, the Jitsi Conference Focus. + * + * Copyright @ 2024 - present 8x8, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.jicofo.rest.move + +import jakarta.servlet.http.HttpServletResponse +import jakarta.ws.rs.DefaultValue +import jakarta.ws.rs.GET +import jakarta.ws.rs.NotFoundException +import jakarta.ws.rs.Path +import jakarta.ws.rs.Produces +import jakarta.ws.rs.QueryParam +import jakarta.ws.rs.core.MediaType +import jakarta.ws.rs.core.Response +import org.jitsi.config.JitsiConfig +import org.jitsi.jicofo.ConferenceStore +import org.jitsi.jicofo.bridge.Bridge +import org.jitsi.jicofo.bridge.BridgeConfig +import org.jitsi.jicofo.bridge.BridgeSelector +import org.jitsi.jicofo.conference.JitsiMeetConference +import org.jitsi.jicofo.rest.BadRequestExceptionWithMessage +import org.jitsi.metaconfig.config +import org.jitsi.utils.logging2.createLogger +import org.jxmpp.jid.impl.JidCreate +import kotlin.math.min +import kotlin.math.roundToInt + +/** + * An API for moving (i.e. re-inviting) endpoints. The main goal is to facilitate reducing the load on a bridge. Note + * that when re-inviting the normal bridge selection logic is used again, so it's possible that the same bridge is + * selected (unless it's unhealthy/draining or overloaded and there are less loaded bridges). + */ +@Path("/move-endpoints") +class MoveEndpoints( + val conferenceStore: ConferenceStore, + val bridgeSelector: BridgeSelector +) { + val logger = createLogger() + + /** + * Move a specific endpoint in a specific conference. + */ + @Path("move-endpoint") + @GET + @Produces(MediaType.APPLICATION_JSON) + fun moveEndpoint( + /** + * Conference JID, e.g room@conference.example.com. This is a required parameter, but without a @DefaultValue + * jetty returns a 500 and prints a stack trace. + */ + @QueryParam("conference") @DefaultValue("") conferenceId: String, + /** + * Endpoint ID, e.g. abcdefgh. This is a required parameter, but without a @DefaultValue jetty returns a 500 + * and prints a stack trace. + */ + @QueryParam("endpoint") @DefaultValue("") endpointId: String, + /** + * Optional bridge JID. If specified, the endpoint will only be moved it if is indeed connected to this bridge. + */ + @QueryParam("bridge") @DefaultValue("") bridgeId: String + ): Result { + if (conferenceId.isEmpty()) throw BadRequestExceptionWithMessage("Conference ID is missing") + if (endpointId.isEmpty()) throw BadRequestExceptionWithMessage("Endpoint ID is missing") + val bridge = if (bridgeId.isEmpty()) null else getBridge(bridgeId) + val conference = getConference(conferenceId) + + logger.info("Moving conference=$conferenceId endpoint=$endpointId bridge=$bridgeId") + return if (conference.moveEndpoint(endpointId, bridge)) { + logger.info("Moved successfully") + Result(1, 1) + } else { + logger.info("Failed to move") + Result(0, 0) + } + } + + /** + * Moves a specific number E of endpoints from a specific bridge B. If a conference is specified, only endpoints in + * that conference are moved. Otherwise, all conferences are ordered by the number of endpoints on B, and endpoints + * from large conferences are removed until E is reached. + * + * If a conference is specified, the endpoints are selected randomly from it. Otherwise, the endpoints are selected + * by ordering the list of conferences that use the bridge by the number of endpoints on this bridge. Then we select + * greedily from the list until we've selected the desired count. Note that this may need to be adjusted if it leads + * to thundering horde issues (though the recentlyAddedEndpointCount correction should prevent them). + */ + @Path("move-endpoints") + @GET + @Produces(MediaType.APPLICATION_JSON) + fun moveEndpoints( + /** + * Bridge JID, e.g. jvbbrewery@muc.jvb.example.com/jvb1. This is a required parameter, but without a + * @DefaultValue jetty returns a 500 and prints a stack trace. + */ + @QueryParam("bridge") @DefaultValue("") bridgeId: String, + /** + * Optional conference JID, e.g room@conference.example.com. If specified only endpoints from this conference + * will be moved. + */ + @QueryParam("conference") @DefaultValue("") conferenceId: String, + /** Number of endpoints to move. */ + @QueryParam("endpoints") @DefaultValue("1") numEndpoints: Int + ): Result { + if (bridgeId.isEmpty()) throw BadRequestExceptionWithMessage("Bridge JID is missing") + val bridge = getBridge(bridgeId) + val conference = if (conferenceId.isEmpty()) null else getConference(conferenceId) + val bridgeConferences = if (conference == null) { + bridge.getConferences() + } else { + bridge.getConferences().filter { it.first == conference } + } + logger.info("Moving $numEndpoints from bridge=${bridge.jid} (conference=$conference)") + val endpointsToMove = bridgeConferences.select(numEndpoints) + return doMove(bridge, endpointsToMove) + } + + /** + * Move a specific fraction of the endpoints from a specific bridge. + * + * The endpoints to move are selected by ordering the list of conferences that use the bridge by the number of + * endpoints on this bridge. Then we select greedily from the list until we've selected the desired count. Note + * that this may need to be adjusted if it leads to thundering horde issues (though the recentlyAddedEndpointCount + * correction should prevent them). + */ + @Path("move-fraction") + @GET + @Produces(MediaType.APPLICATION_JSON) + fun moveFraction( + /** + * Bridge JID, e.g. jvbbrewery@muc.jvb.example.com/jvb1. This is a required parameter, but without a + * @DefaultValue jetty returns a 500 and prints a stack trace. + */ + @QueryParam("bridge") @DefaultValue("") bridgeId: String, + /** The fraction of endpoints to move. Defaults to 10% */ + @QueryParam("fraction") @DefaultValue("0.1") fraction: Double + ): Result { + if (bridgeId.isEmpty()) throw BadRequestExceptionWithMessage("Bridge JID is missing") + val bridge = getBridge(bridgeId) + val bridgeConferences = bridge.getConferences() + val totalEndpoints = bridgeConferences.sumOf { it.second } + val numEndpoints = (fraction * totalEndpoints).roundToInt() + logger.info("Moving $fraction of endpoints from bridge=$bridge ($numEndpoints out of $totalEndpoints)") + val endpointsToMove = bridgeConferences.select(numEndpoints) + return doMove(bridge, endpointsToMove) + } + + private fun doMove(bridge: Bridge, endpointsToMove: Map): Result { + logger.info("Moving endpoints from bridge ${bridge.jid}: $endpointsToMove") + var movedEndpoints = 0 + var conferences = 0 + endpointsToMove.forEach { (conference, numEps) -> + val moved = conference.moveEndpoints(bridge, numEps) + movedEndpoints += moved + if (moved > 0) conferences++ + } + logger.info("Moved $movedEndpoints endpoints from $conferences conferences.") + return Result(movedEndpoints, conferences) + } + + private fun getBridge(bridge: String): Bridge { + val bridgeJid = try { + JidCreate.from(bridge) + } catch (e: Exception) { + throw BadRequestExceptionWithMessage("Invalid bridge ID") + } + + bridgeSelector.get(bridgeJid)?.let { return it } + + val bridgeFullJid = try { + JidCreate.from("${BridgeConfig.config.breweryJid}/$bridge") + } catch (e: Exception) { + throw BadRequestExceptionWithMessage("Invalid bridge ID") + } + return bridgeSelector.get(bridgeFullJid) ?: throw NotFoundExceptionWithMessage("Bridge not found") + } + + private fun getConference(conferenceId: String): JitsiMeetConference { + val conferenceJid = try { + JidCreate.entityBareFrom(conferenceId) + } catch (e: Exception) { + throw BadRequestExceptionWithMessage("Invalid conference ID") + } + return conferenceStore.getConference(conferenceJid) + ?: throw NotFoundExceptionWithMessage("Conference not found") + } + + private fun Bridge.getConferences() = conferenceStore.getAllConferences().mapNotNull { conference -> + conference.bridges[this]?.participantCount?.let { Pair(conference, it) } + }.sortedByDescending { it.second } +} + +data class Result( + val movedEndpoints: Int, + val conferences: Int +) + +class MoveEndpointsConfig { + companion object { + val enabled: Boolean by config { + "jicofo.rest.move-endpoints.enabled".from(JitsiConfig.newConfig) + } + } +} + +/** + * Select endpoints to move, e.g. with a map m={a: 1, b: 3, c: 3}: + * select(m, 1) should return {a: 1} + * select(m, 2) should return {a: 1, b: 1} + * select(m, 3) should return {a: 1, b: 2} + * select(m, 6) should return {a: 1, b: 3, c: 2} + * select(m, 100) should return {a: 1, b: 3, c: 3} + * + * That is, it selects greedily in the order of the list. + */ +private fun List>.select(n: Int): Map { + var moved = 0 + return buildMap { + this@select.forEach { + if (moved >= n) { + return@forEach + } + val m = min(it.second, n - moved) + moved += m + put(it.first, m) + } + } +} + +/** + * The [NotFoundException(String message)] constructor doesn't actually include the message in the response. + */ +class NotFoundExceptionWithMessage(message: String?) : NotFoundException( + Response.status(HttpServletResponse.SC_NOT_FOUND, message).build() +)