Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chat-android/src/main/java/com/ably/chat/ChatClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ internal class DefaultChatClient(
chatApi = chatApi,
clientOptions = clientOptions,
clientId = clientId,
logger = logger,
)

override val connection: Connection
Expand Down
20 changes: 3 additions & 17 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ package com.ably.chat
import com.google.gson.JsonElement
import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.Presence.GET_CLIENTID
import io.ably.lib.realtime.Presence.GET_CONNECTIONID
import io.ably.lib.realtime.Presence.GET_WAITFORSYNC
import io.ably.lib.types.Param
import io.ably.lib.types.PresenceMessage
import io.ably.lib.realtime.Presence as PubSubPresence
import io.ably.lib.realtime.Presence.PresenceListener as PubSubPresenceListener
Expand Down Expand Up @@ -143,9 +139,8 @@ internal class DefaultPresence(
private val presence: PubSubPresence,
) : Presence {

suspend fun get(params: List<Param>): List<PresenceMember> {
val usersOnPresence = presence.getCoroutine(params)
return usersOnPresence.map { user ->
override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
return presence.getCoroutine(waitForSync, clientId, connectionId).map { user ->
PresenceMember(
clientId = user.clientId,
action = user.action,
Expand All @@ -155,16 +150,7 @@ internal class DefaultPresence(
}
}

override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
val params = buildList {
if (waitForSync) add(Param(GET_WAITFORSYNC, true))
clientId?.let { add(Param(GET_CLIENTID, it)) }
connectionId?.let { add(Param(GET_CONNECTIONID, it)) }
}
return get(params)
}

override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(Param(GET_CLIENTID, clientId)).isNotEmpty()
override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(clientId = clientId).isNotEmpty()

override suspend fun enter(data: PresenceData?) {
presence.enterClientCoroutine(clientId, wrapInUserCustomData(data))
Expand Down
23 changes: 16 additions & 7 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ interface Room {
internal class DefaultRoom(
override val roomId: String,
override val options: RoomOptions,
val realtimeClient: RealtimeClient,
private val realtimeClient: RealtimeClient,
chatApi: ChatApi,
clientId: String,
private val logger: Logger,
) : Room {

private val _messages = DefaultMessages(
Expand All @@ -98,7 +99,19 @@ internal class DefaultRoom(
chatApi = chatApi,
)

override val messages: Messages = _messages
private val _typing: DefaultTyping = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
options = options.typing,
clientId = clientId,
logger = logger.withContext(tag = "Typing"),
)

override val messages: Messages
get() = _messages

override val typing: Typing
get() = _typing

override val presence: Presence = DefaultPresence(
channel = messages.channel,
Expand All @@ -112,11 +125,6 @@ internal class DefaultRoom(
realtimeChannels = realtimeClient.channels,
)

override val typing: Typing = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
)

override val occupancy: Occupancy = DefaultOccupancy(
messages = messages,
)
Expand All @@ -140,5 +148,6 @@ internal class DefaultRoom(

fun release() {
_messages.release()
_typing.release()
}
}
2 changes: 2 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Rooms.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal class DefaultRooms(
private val chatApi: ChatApi,
override val clientOptions: ClientOptions,
private val clientId: String,
private val logger: Logger,
) : Rooms {
private val roomIdToRoom: MutableMap<String, DefaultRoom> = mutableMapOf()

Expand All @@ -59,6 +60,7 @@ internal class DefaultRooms(
realtimeClient = realtimeClient,
chatApi = chatApi,
clientId = clientId,
logger = logger,
)
}

Expand Down
132 changes: 123 additions & 9 deletions chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@
package com.ably.chat

import io.ably.lib.realtime.Channel
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.math.min
import kotlin.math.pow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

/**
* base retry interval, we double it each time
*/
const val PRESENCE_GET_RETRY_INTERVAL_MS = 1500
const val PRESENCE_GET_RETRY_INTERVAL_MS: Long = 1500

/**
* max retry interval
*/
const val PRESENCE_GET_RETRY_MAX_INTERVAL_MS = 30_000
const val PRESENCE_GET_RETRY_MAX_INTERVAL_MS: Long = 30_000

/**
* max num of retries
Expand Down Expand Up @@ -77,30 +91,130 @@ data class TypingEvent(val currentlyTyping: Set<String>)

internal class DefaultTyping(
roomId: String,
private val realtimeClient: RealtimeClient,
realtimeClient: RealtimeClient,
private val clientId: String,
private val options: TypingOptions?,
private val logger: Logger,
) : Typing {
private val typingIndicatorsChannelName = "$roomId::\$chat::\$typingIndicators"

override val channel: Channel
get() = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())
private val typingScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())

private val eventBus = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val channel: Channel = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())

private var typingJob: Job? = null

private val listeners: MutableList<Typing.Listener> = CopyOnWriteArrayList()

private var lastTyping: Set<String> = setOf()

init {
typingScope.launch {
eventBus.collect {
processEvent()
}
}

channel.presence.subscribe {
if (it.clientId == null) {
logger.error("unable to handle typing event; no clientId", staticContext = mapOf("member" to it.toString()))
} else {
eventBus.tryEmit(Unit)
}
}
}

override fun subscribe(listener: Typing.Listener): Subscription {
TODO("Not yet implemented")
logger.trace("DefaultTyping.subscribe()")
listeners.add(listener)
return Subscription {
logger.trace("DefaultTyping.unsubscribe()")
listeners.remove(listener)
}
}

override suspend fun get(): Set<String> {
TODO("Not yet implemented")
logger.trace("DefaultTyping.get()")
return channel.presence.getCoroutine().map { it.clientId }.toSet()
}

override suspend fun start() {
TODO("Not yet implemented")
logger.trace("DefaultTyping.start()")

typingScope.launch {
// If the user is already typing, reset the timer
if (typingJob != null) {
logger.debug("DefaultTyping.start(); already typing, resetting timer")
typingJob?.cancel()
startTypingTimer()
} else {
startTypingTimer()
channel.presence.enterClientCoroutine(clientId)
}
}.join()
}

override suspend fun stop() {
TODO("Not yet implemented")
logger.trace("DefaultTyping.stop()")
typingScope.launch {
typingJob?.cancel()
channel.presence.leaveClientCoroutine(clientId)
}.join()
}

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
TODO("Not yet implemented")
}

fun release() {
typingScope.cancel()
}

private fun startTypingTimer() {
val timeout = options?.timeoutMs ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Typing options hasn't been initialized",
ErrorCodes.BadRequest,
),
)
logger.trace("DefaultTyping.startTypingTimer()")
typingJob = typingScope.launch {
delay(timeout)
logger.debug("DefaultTyping.startTypingTimer(); timeout expired")
stop()
}
}

private suspend fun processEvent() {
var numRetries = 0
while (numRetries <= PRESENCE_GET_MAX_RETRIES) {
try {
val currentlyTyping = get()
emit(currentlyTyping)
return // Exit if successful
} catch (e: Exception) {
numRetries++
val delayDuration = min(
PRESENCE_GET_RETRY_MAX_INTERVAL_MS,
PRESENCE_GET_RETRY_INTERVAL_MS * 2.0.pow(numRetries).toLong(),
)
logger.debug("Retrying in $delayDuration ms... (Attempt $numRetries of $PRESENCE_GET_MAX_RETRIES)", e)
delay(delayDuration)
}
}
logger.error("Failed to get members after $PRESENCE_GET_MAX_RETRIES retries")
Comment on lines +194 to +210
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address off-by-one error in the retry loop of 'processEvent'

In the processEvent function, the condition while (numRetries <= PRESENCE_GET_MAX_RETRIES) may result in one extra retry beyond the intended maximum. Since numRetries starts at 0, the loop can execute up to PRESENCE_GET_MAX_RETRIES + 1 times.

Consider adjusting the condition to ensure the maximum number of retries is not exceeded:

  • Change the condition to while (numRetries < PRESENCE_GET_MAX_RETRIES).

Apply this diff to correct the loop condition:

 private suspend fun processEvent() {
     var numRetries = 0
-    while (numRetries <= PRESENCE_GET_MAX_RETRIES) {
+    while (numRetries < PRESENCE_GET_MAX_RETRIES) {
         try {
             val currentlyTyping = get()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var numRetries = 0
while (numRetries <= PRESENCE_GET_MAX_RETRIES) {
try {
val currentlyTyping = get()
emit(currentlyTyping)
return // Exit if successful
} catch (e: Exception) {
numRetries++
val delayDuration = min(
PRESENCE_GET_RETRY_MAX_INTERVAL_MS,
PRESENCE_GET_RETRY_INTERVAL_MS * 2.0.pow(numRetries).toLong(),
)
logger.debug("Retrying in $delayDuration ms... (Attempt $numRetries of $PRESENCE_GET_MAX_RETRIES)", e)
delay(delayDuration)
}
}
logger.error("Failed to get members after $PRESENCE_GET_MAX_RETRIES retries")
var numRetries = 0
while (numRetries < PRESENCE_GET_MAX_RETRIES) {
try {
val currentlyTyping = get()
emit(currentlyTyping)
return // Exit if successful
} catch (e: Exception) {
numRetries++
val delayDuration = min(
PRESENCE_GET_RETRY_MAX_INTERVAL_MS,
PRESENCE_GET_RETRY_INTERVAL_MS * 2.0.pow(numRetries).toLong(),
)
logger.debug("Retrying in $delayDuration ms... (Attempt $numRetries of $PRESENCE_GET_MAX_RETRIES)", e)
delay(delayDuration)
}
}
logger.error("Failed to get members after $PRESENCE_GET_MAX_RETRIES retries")

}

private fun emit(currentlyTyping: Set<String>) {
if (lastTyping == currentlyTyping) return
lastTyping = currentlyTyping
listeners.forEach {
it.onEvent(TypingEvent(currentlyTyping))
}
}
}
Loading