Skip to content

Commit

Permalink
Merge branch 'feature/roomlifecycle-attach-with-retry' into tests/roo…
Browse files Browse the repository at this point in the history
…mlifecycle-attach

# Conflicts:
#	chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt
#	chat-android/src/main/java/com/ably/chat/RoomReactions.kt
#	chat-android/src/main/java/com/ably/chat/Utils.kt
  • Loading branch information
sacOO7 committed Nov 8, 2024
2 parents e73b9ba + 290fb84 commit a9fffc2
Show file tree
Hide file tree
Showing 20 changed files with 335 additions and 163 deletions.
16 changes: 8 additions & 8 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Di
private var isRunning = false // Only accessed from sequentialScope
private var queueCounter = 0 // Only accessed from synchronized method

val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning

val pendingJobCount: Int
get() = jobs.count()

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
* Defines priority for the operation execution and
* executes given coroutineBlock mutually exclusive under given scope.
*/
@Synchronized
fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
Expand Down Expand Up @@ -77,12 +83,6 @@ class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Di
}
}

val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning

val pendingJobCount: Int
get() = jobs.count()

/**
* Cancels ongoing and pending operations with given error.
*/
Expand Down
4 changes: 2 additions & 2 deletions chat-android/src/main/java/com/ably/chat/Discontinuities.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ interface EmitsDiscontinuities {
}
}

open class DiscontinuityEmitter : EventEmitter<String, EmitsDiscontinuities.Listener>() {
class DiscontinuityEmitter : EventEmitter<String, EmitsDiscontinuities.Listener>() {
override fun apply(listener: EmitsDiscontinuities.Listener?, event: String?, vararg args: Any?) {
try {
listener?.discontinuityEmitted(args[0] as ErrorInfo?)
listener?.discontinuityEmitted(args[0] as? ErrorInfo?)
} catch (t: Throwable) {
Log.e("DiscontinuityEmitter", "Unexpected exception calling Discontinuity Listener", t)
}
Expand Down
15 changes: 1 addition & 14 deletions chat-android/src/main/java/com/ably/chat/Messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ internal class DefaultMessages(
private val roomId: String,
realtimeChannels: AblyRealtime.Channels,
private val chatApi: ChatApi,
) : Messages, ContributesToRoomLifecycle, ResolvedContributor {
) : Messages, ContributesToRoomLifecycleImpl(), ResolvedContributor {

private var listeners: Map<Messages.Listener, DeferredValue<String>> = emptyMap()

Expand Down Expand Up @@ -303,19 +303,6 @@ internal class DefaultMessages(

override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params)

private val discontinuityEmitter = DiscontinuityEmitter()

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
discontinuityEmitter.on(listener)
return Subscription {
discontinuityEmitter.off(listener)
}
}

override fun discontinuityDetected(reason: ErrorInfo?) {
discontinuityEmitter.emit("discontinuity", reason)
}

fun release() {
channel.off(channelStateListener)
}
Expand Down
16 changes: 1 addition & 15 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

package com.ably.chat

import io.ably.lib.types.ErrorInfo
import io.ably.lib.realtime.Channel as AblyRealtimeChannel

/**
Expand Down Expand Up @@ -62,7 +61,7 @@ data class OccupancyEvent(

internal class DefaultOccupancy(
private val messages: Messages,
) : Occupancy, ContributesToRoomLifecycle, ResolvedContributor {
) : Occupancy, ContributesToRoomLifecycleImpl(), ResolvedContributor {

override val featureName: String = "occupancy"

Expand All @@ -81,17 +80,4 @@ internal class DefaultOccupancy(
override suspend fun get(): OccupancyEvent {
TODO("Not yet implemented")
}

private val discontinuityEmitter = DiscontinuityEmitter()

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
discontinuityEmitter.on(listener)
return Subscription {
discontinuityEmitter.off(listener)
}
}

override fun discontinuityDetected(reason: ErrorInfo?) {
discontinuityEmitter.emit("discontinuity", reason)
}
}
15 changes: 1 addition & 14 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ data class PresenceEvent(

internal class DefaultPresence(
private val messages: Messages,
) : Presence, ContributesToRoomLifecycle, ResolvedContributor {
) : Presence, ContributesToRoomLifecycleImpl(), ResolvedContributor {

override val featureName = "presence"

Expand Down Expand Up @@ -167,17 +167,4 @@ internal class DefaultPresence(
override fun subscribe(listener: Presence.Listener): Subscription {
TODO("Not yet implemented")
}

private val discontinuityEmitter = DiscontinuityEmitter()

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
discontinuityEmitter.on(listener)
return Subscription {
discontinuityEmitter.off(listener)
}
}

override fun discontinuityDetected(reason: ErrorInfo?) {
discontinuityEmitter.emit("discontinuity", reason)
}
}
4 changes: 3 additions & 1 deletion chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
@file:Suppress("StringLiteralDuplication", "NotImplementedDeclaration")

package com.ably.chat

import io.ably.lib.util.Log.LogHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -124,7 +125,8 @@ internal class DefaultRoom(

override val reactions = DefaultRoomReactions(
roomId = roomId,
realtimeClient = realtimeClient,
clientId = realtimeClient.auth.clientId,
realtimeChannels = realtimeClient.channels,
)

override val occupancy = DefaultOccupancy(
Expand Down
111 changes: 71 additions & 40 deletions chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ interface ContributesToRoomLifecycle : EmitsDiscontinuities, HandlesDiscontinuit
val detachmentErrorCode: ErrorCodes
}

abstract class ContributesToRoomLifecycleImpl : ContributesToRoomLifecycle {

private val discontinuityEmitter = DiscontinuityEmitter()

override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription {
discontinuityEmitter.on(listener)
return Subscription {
discontinuityEmitter.off(listener)
}
}

override fun discontinuityDetected(reason: ErrorInfo?) {
discontinuityEmitter.emit("discontinuity", reason)
}
}

/**
* This interface represents a feature that contributes to the room lifecycle and
* exposes its channel directly. Objects of this type are created by awaiting the
Expand Down Expand Up @@ -78,27 +94,29 @@ interface RoomAttachmentResult : NewRoomStatus {
}

class DefaultRoomAttachmentResult : RoomAttachmentResult {
internal var _failedFeature: ResolvedContributor? = null
internal var _status: RoomLifecycle = RoomLifecycle.Attached
internal var _error: ErrorInfo? = null
internal var statusField: RoomLifecycle = RoomLifecycle.Attached
override val status: RoomLifecycle
get() = statusField

internal var failedFeatureField: ResolvedContributor? = null
override val failedFeature: ResolvedContributor?
get() = _failedFeature
get() = failedFeatureField

override val exception: AblyException
get() = AblyException.fromErrorInfo(
_error
?: ErrorInfo(
"unknown error in attach for ${failedFeature?.contributor?.featureName} feature",
500, ErrorCodes.RoomLifecycleError.errorCode,
),
)
internal var errorField: ErrorInfo? = null
override val error: ErrorInfo?
get() = errorField

override val status: RoomLifecycle
get() = _status
internal var throwable: Throwable? = null

override val error: ErrorInfo?
get() = _error
override val exception: AblyException
get() {
val errorInfo = errorField
?: ErrorInfo("unknown error in attach", HttpStatusCodes.InternalServerError, ErrorCodes.RoomLifecycleError.errorCode)
throwable?.let {
return AblyException.fromErrorInfo(throwable, errorInfo)
}
return AblyException.fromErrorInfo(errorInfo)
}
}

/**
Expand Down Expand Up @@ -153,6 +171,11 @@ class RoomLifecycleManager
*/
private val _firstAttachesCompleted = mutableMapOf<ResolvedContributor, Boolean>()

/**
* Retry duration in milliseconds, used by internal doRetry and runDownChannelsOnFailedAttach methods
*/
private val _retryDurationInMs: Long = 250

init {
if (_status.current != RoomLifecycle.Attached) {
_operationInProgress = true
Expand Down Expand Up @@ -183,6 +206,7 @@ class RoomLifecycleManager
* @param contributor The contributor that has entered a suspended state.
* @returns Returns when the room is attached, or the room enters a failed state.
*/
@SuppressWarnings("CognitiveComplexMethod")
private suspend fun doRetry(contributor: ResolvedContributor) {
// Handle the channel wind-down for other channels
var result = kotlin.runCatching { doChannelWindDown(contributor) }
Expand All @@ -191,7 +215,7 @@ class RoomLifecycleManager
if (this._status.current === RoomLifecycle.Failed) {
error("room is in a failed state")
}
delay(250)
delay(_retryDurationInMs)
result = kotlin.runCatching { doChannelWindDown(contributor) }
}

Expand All @@ -214,7 +238,11 @@ class RoomLifecycleManager
val failedFeature = attachmentResult.failedFeature
if (failedFeature == null) {
AblyException.fromErrorInfo(
ErrorInfo("no failed feature in doRetry", 500, ErrorCodes.RoomLifecycleError.errorCode),
ErrorInfo(
"no failed feature in doRetry",
HttpStatusCodes.InternalServerError,
ErrorCodes.RoomLifecycleError.errorCode,
),
)
}
// No need to catch errors, rather they should propagate to caller method
Expand Down Expand Up @@ -248,7 +276,7 @@ class RoomLifecycleManager
contributor.channel.once(ChannelState.failed) {
val exception = AblyException.fromErrorInfo(
it.reason
?: ErrorInfo("unknown error in _doRetry", 500, ErrorCodes.RoomLifecycleError.errorCode),
?: ErrorInfo("unknown error in _doRetry", HttpStatusCodes.InternalServerError, ErrorCodes.RoomLifecycleError.errorCode),
)
continuation.resumeWithException(exception)
}
Expand All @@ -263,6 +291,7 @@ class RoomLifecycleManager
* If a channel enters the failed state, we reject and then begin to wind down the other channels.
* Spec: CHA-RL1
*/
@SuppressWarnings("ThrowsCount")
internal suspend fun attach() {
val deferredAttach = atomicCoroutineScope.async(LifecycleOperationPrecedence.AttachOrDetach.priority) { // CHA-RL1d
when (_status.current) {
Expand All @@ -271,15 +300,15 @@ class RoomLifecycleManager
throw AblyException.fromErrorInfo(
ErrorInfo(
"unable to attach room; room is releasing",
500,
HttpStatusCodes.InternalServerError,
ErrorCodes.RoomIsReleasing.errorCode,
),
)
RoomLifecycle.Released -> // CHA-RL1c
throw AblyException.fromErrorInfo(
ErrorInfo(
"unable to attach room; room is released",
500,
HttpStatusCodes.InternalServerError,
ErrorCodes.RoomIsReleased.errorCode,
),
)
Expand All @@ -306,7 +335,11 @@ class RoomLifecycleManager
if (attachResult.status === RoomLifecycle.Suspended) {
if (attachResult.failedFeature == null) {
AblyException.fromErrorInfo(
ErrorInfo("no failed feature in attach", 500, ErrorCodes.RoomLifecycleError.errorCode),
ErrorInfo(
"no failed feature in attach",
HttpStatusCodes.InternalServerError,
ErrorCodes.RoomLifecycleError.errorCode,
),
)
}
attachResult.failedFeature?.let {
Expand Down Expand Up @@ -337,29 +370,26 @@ class RoomLifecycleManager
feature.channel.attachCoroutine()
_firstAttachesCompleted[feature] = true
} catch (ex: Throwable) { // CHA-RL1h - handle channel attach failure
attachResult._failedFeature = feature
attachResult._error = ErrorInfo(
attachResult.throwable = ex
attachResult.failedFeatureField = feature
attachResult.errorField = ErrorInfo(
"failed to attach ${feature.contributor.featureName} feature${feature.channel.errorMessage}",
500,
HttpStatusCodes.InternalServerError,
feature.contributor.attachmentErrorCode.errorCode,
)

// The current feature should be in one of two states, it will be either suspended or failed
// If it's in suspended, we wind down the other channels and wait for the reattach
// If it's failed, we can fail the entire room
when (feature.channel.state) {
ChannelState.suspended -> {
attachResult._status = RoomLifecycle.Suspended
}
ChannelState.failed -> {
attachResult._status = RoomLifecycle.Failed
}
ChannelState.suspended -> attachResult.statusField = RoomLifecycle.Suspended
ChannelState.failed -> attachResult.statusField = RoomLifecycle.Failed
else -> {
attachResult._status = RoomLifecycle.Failed
attachResult._error = ErrorInfo(
attachResult.statusField = RoomLifecycle.Failed
attachResult.errorField = ErrorInfo(
"unexpected channel state in doAttach ${feature.channel.state}${feature.channel.errorMessage}",
500,
feature.contributor.attachmentErrorCode.errorCode,
HttpStatusCodes.InternalServerError,
ErrorCodes.RoomLifecycleError.errorCode,
)
}
}
Expand Down Expand Up @@ -395,7 +425,7 @@ class RoomLifecycleManager
while (channelWindDown.isFailure) { // CHA-RL1h6 - repeat until all channels are detached
// Something went wrong during the wind down. After a short delay, to give others a turn, we should run down
// again until we reach a suitable conclusion.
delay(250)
delay(_retryDurationInMs)
channelWindDown = kotlin.runCatching { doChannelWindDown() }
}
}
Expand All @@ -408,6 +438,7 @@ class RoomLifecycleManager
* @returns Success/Failure when all channels are detached or at least one of them fails.
*
*/
@SuppressWarnings("CognitiveComplexMethod", "ComplexCondition")
private suspend fun doChannelWindDown(except: ResolvedContributor? = null) = coroutineScope {
_contributors.map { contributor: ResolvedContributor ->
async {
Expand All @@ -417,12 +448,12 @@ class RoomLifecycleManager
return@async
}
// If the room's already in the failed state, or it's releasing, we should not detach a failed channel
if (
(
if ((
_status.current === RoomLifecycle.Failed ||
_status.current === RoomLifecycle.Releasing ||
_status.current === RoomLifecycle.Released
) && contributor.channel.state === ChannelState.failed
) &&
contributor.channel.state === ChannelState.failed
) {
return@async
}
Expand All @@ -439,7 +470,7 @@ class RoomLifecycleManager
) {
val contributorError = ErrorInfo(
"failed to detach feature",
500,
HttpStatusCodes.InternalServerError,
contributor.contributor.detachmentErrorCode.errorCode,
)
_status.setStatus(RoomLifecycle.Failed, contributorError)
Expand Down
Loading

0 comments on commit a9fffc2

Please sign in to comment.