Skip to content

Commit d28f9e9

Browse files
authored
feat(data): Events API Connection ack timeout handling (#3020)
1 parent 9ee12ba commit d28f9e9

File tree

7 files changed

+130
-53
lines changed

7 files changed

+130
-53
lines changed
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package com.amplifyframework.aws.appsync.core.util
16+
package com.amplifyframework.aws.appsync.core
1717

1818
import java.util.function.Supplier
1919

20+
fun interface LoggerProvider {
21+
fun getLogger(namespace: String): Logger
22+
}
23+
2024
/**
2125
* A component which can emit logs.
2226
*/
@@ -28,12 +32,6 @@ interface Logger {
2832
*/
2933
val thresholdLevel: LogLevel
3034

31-
/**
32-
* Gets the namespace of the logger.
33-
* @return namespace for logger
34-
*/
35-
val namespace: String
36-
3735
/**
3836
* Logs a message at the [LogLevel.ERROR] level.
3937
* @param message An error message

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
package com.amplifyframework.aws.appsync.events
1616

1717
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
18-
import com.amplifyframework.aws.appsync.core.util.Logger
18+
import com.amplifyframework.aws.appsync.core.LoggerProvider
1919
import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers
2020
import com.amplifyframework.aws.appsync.events.data.EventsException
2121
import com.amplifyframework.aws.appsync.events.data.PublishResult
@@ -41,7 +41,7 @@ class Events @VisibleForTesting internal constructor(
4141
) {
4242

4343
data class Options(
44-
val logger: Logger? = null
44+
val loggerProvider: LoggerProvider? = null
4545
)
4646

4747
/**
@@ -75,7 +75,7 @@ class Events @VisibleForTesting internal constructor(
7575
connectAuthorizer,
7676
okHttpClient,
7777
json,
78-
options.logger
78+
options.loggerProvider
7979
)
8080

8181
/**
@@ -133,6 +133,6 @@ class Events @VisibleForTesting internal constructor(
133133
* @return a channel to manage subscriptions and publishes.
134134
*/
135135
suspend fun disconnect(flushEvents: Boolean = true): Unit = coroutineScope {
136-
eventsWebSocketProvider.getExistingWebSocket()?.disconnect(flushEvents)
136+
eventsWebSocketProvider.existingWebSocket?.disconnect(flushEvents)
137137
}
138138
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsChannel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,10 @@ class EventsChannel internal constructor(
116116
emit(EventsMessage(it.event))
117117
}
118118
it is WebSocketMessage.Closed -> {
119-
if (it.userInitiated) {
119+
if (it.reason is DisconnectReason.UserInitiated) {
120120
throw UserClosedConnectionException()
121121
} else {
122-
throw ConnectionClosedException(it.throwable)
122+
throw ConnectionClosedException(it.reason.throwable)
123123
}
124124
}
125125
else -> Unit

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocket.kt

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ package com.amplifyframework.aws.appsync.events
1717

1818
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
1919
import com.amplifyframework.aws.appsync.core.AppSyncRequest
20-
import com.amplifyframework.aws.appsync.core.util.Logger
20+
import com.amplifyframework.aws.appsync.core.LoggerProvider
2121
import com.amplifyframework.aws.appsync.events.data.ConnectException
2222
import com.amplifyframework.aws.appsync.events.data.EventsException
2323
import com.amplifyframework.aws.appsync.events.data.WebSocketMessage
24+
import com.amplifyframework.aws.appsync.events.utils.ConnectionTimeoutTimer
2425
import com.amplifyframework.aws.appsync.events.utils.HeaderKeys
2526
import com.amplifyframework.aws.appsync.events.utils.HeaderValues
2627
import kotlinx.coroutines.async
@@ -35,22 +36,23 @@ import okhttp3.Request
3536
import okhttp3.Response
3637
import okhttp3.WebSocket
3738
import okhttp3.WebSocketListener
38-
import java.util.concurrent.atomic.AtomicBoolean
3939

4040
internal class EventsWebSocket(
4141
private val eventsEndpoints: EventsEndpoints,
4242
private val authorizer: AppSyncAuthorizer,
4343
private val okHttpClient: OkHttpClient,
4444
private val json: Json,
45-
private val logger: Logger?
45+
loggerProvider: LoggerProvider?
4646
) : WebSocketListener() {
4747

4848
private val _events = MutableSharedFlow<WebSocketMessage>(extraBufferCapacity = Int.MAX_VALUE)
4949
val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
5050

5151
private lateinit var webSocket: WebSocket
52-
internal val isClosed = AtomicBoolean(false)
53-
private var userInitiatedDisconnect = false
52+
@Volatile internal var isClosed = false
53+
private var disconnectReason: DisconnectReason? = null
54+
private val connectionTimeoutTimer = ConnectionTimeoutTimer(onTimeout = ::onTimeout)
55+
private val logger = loggerProvider?.getLogger(TAG)
5456

5557
@Throws(ConnectException::class)
5658
suspend fun connect() = coroutineScope {
@@ -71,7 +73,7 @@ internal class EventsWebSocket(
7173
when (val connectionResponse = deferredConnectResponse.await()) {
7274
is WebSocketMessage.Closed -> {
7375
webSocket.cancel()
74-
throw ConnectException(connectionResponse.throwable)
76+
throw ConnectException(connectionResponse.reason.throwable)
7577
}
7678
is WebSocketMessage.Received.ConnectionError -> {
7779
webSocket.cancel()
@@ -80,13 +82,16 @@ internal class EventsWebSocket(
8082
?: EventsException.unknown()
8183
)
8284
}
83-
else -> Unit // It isn't obvious here, but only other connect response type is ConnectionAck
85+
is WebSocketMessage.Received.ConnectionAck -> {
86+
connectionTimeoutTimer.resetTimeoutTimer(connectionResponse.connectionTimeoutMs)
87+
}
88+
else -> Unit // Not obvious here but this block should never run
8489
}
8590
logger?.debug("Websocket Connection Open")
8691
}
8792

8893
suspend fun disconnect(flushEvents: Boolean) = coroutineScope {
89-
userInitiatedDisconnect = true
94+
disconnectReason = DisconnectReason.UserInitiated
9095
val deferredClosedResponse = async { getClosedResponse() }
9196
when (flushEvents) {
9297
true -> webSocket.close(NORMAL_CLOSE_CODE, "User initiated disconnect")
@@ -96,12 +101,12 @@ internal class EventsWebSocket(
96101
}
97102

98103
override fun onOpen(webSocket: WebSocket, response: Response) {
99-
val connectionInitMessage = json.encodeToString(WebSocketMessage.Send.ConnectionInit())
100-
logger?.debug { "$TAG onOpen: sending connection init" }
101-
webSocket.send(connectionInitMessage)
104+
logger?.debug ("onOpen: sending connection init")
105+
send(WebSocketMessage.Send.ConnectionInit())
102106
}
103107

104108
override fun onMessage(webSocket: WebSocket, text: String) {
109+
connectionTimeoutTimer.resetTimeoutTimer()
105110
logger?.debug { "Websocket onMessage: $text" }
106111
try {
107112
val eventMessage = json.decodeFromString<WebSocketMessage.Received>(text)
@@ -112,29 +117,37 @@ internal class EventsWebSocket(
112117
}
113118

114119
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
115-
logger?.error(t) { "$TAG onFailure" }
116-
notifyClosed() // onClosed doesn't get called in failure. Treat this block the same as onClosed
120+
logger?.error(t) { "onFailure" }
121+
handleClosed() // onClosed doesn't get called in failure. Treat this block the same as onClosed
117122
}
118123

119124
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
120-
logger?.debug("$TAG onClosing")
125+
logger?.debug("onClosing")
121126
}
122127

123128
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
124129
// Events api sends normal close code even in failure
125130
// so inspecting code/reason isn't helpful as it should be
126-
logger?.debug("$TAG onClosed: userInitiated = $userInitiatedDisconnect")
127-
notifyClosed()
131+
logger?.debug {"onClosed: reason = $disconnectReason" }
132+
handleClosed()
133+
}
134+
135+
private fun onTimeout() {
136+
disconnectReason = DisconnectReason.Timeout
137+
webSocket.cancel()
128138
}
129139

130-
private fun notifyClosed() {
131-
_events.tryEmit(WebSocketMessage.Closed(userInitiated = userInitiatedDisconnect))
132-
isClosed.set(true)
140+
private fun handleClosed() {
141+
connectionTimeoutTimer.stop()
142+
_events.tryEmit(
143+
WebSocketMessage.Closed(reason = disconnectReason ?: DisconnectReason.Service())
144+
)
145+
isClosed = true
133146
}
134147

135148
inline fun <reified T : WebSocketMessage> send(webSocketMessage: T) {
136149
val message = json.encodeToString(webSocketMessage)
137-
logger?.debug("$TAG send: $message")
150+
logger?.debug { "send: ${webSocketMessage::class.java}" }
138151
webSocket.send(message)
139152
}
140153

@@ -193,3 +206,9 @@ private class ConnectAppSyncRequest(
193206
override val body: String
194207
get() = "{}"
195208
}
209+
210+
internal sealed class DisconnectReason(val throwable: Throwable?) {
211+
data object UserInitiated : DisconnectReason(null)
212+
data object Timeout : DisconnectReason(EventsException("Connection timed out."))
213+
class Service(throwable: Throwable? = null) : DisconnectReason(throwable)
214+
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/EventsWebSocketProvider.kt

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package com.amplifyframework.aws.appsync.events
1717

1818
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
19-
import com.amplifyframework.aws.appsync.core.util.Logger
19+
import com.amplifyframework.aws.appsync.core.LoggerProvider
2020
import java.util.concurrent.atomic.AtomicReference
2121
import kotlinx.coroutines.Deferred
2222
import kotlinx.coroutines.async
@@ -31,56 +31,58 @@ internal class EventsWebSocketProvider(
3131
private val authorizer: AppSyncAuthorizer,
3232
private val okHttpClient: OkHttpClient,
3333
private val json: Json,
34-
private val logger: Logger?
34+
private val loggerProvider: LoggerProvider?
3535
) {
3636
private val mutex = Mutex()
37-
private val _connectResult = AtomicReference<Result<EventsWebSocket>?>(null)
38-
private val _connectionInProgress = AtomicReference<Deferred<Result<EventsWebSocket>>?>(null)
37+
private val connectionResultReference = AtomicReference<Result<EventsWebSocket>?>(null)
38+
private val connectionInProgressReference = AtomicReference<Deferred<Result<EventsWebSocket>>?>(null)
39+
40+
val existingWebSocket: EventsWebSocket?
41+
get() = connectionResultReference.get()?.getOrNull()
3942

40-
fun getExistingWebSocket(): EventsWebSocket? = _connectResult.get()?.getOrNull()
4143

4244
suspend fun getConnectedWebSocket(): EventsWebSocket = getConnectedWebSocketResult().getOrThrow()
4345

4446
private suspend fun getConnectedWebSocketResult(): Result<EventsWebSocket> = coroutineScope {
4547
// If connection is already established, return it
4648
mutex.withLock {
47-
val existingResult = _connectResult.get()
49+
val existingResult = connectionResultReference.get()
4850
val existingWebSocket = existingResult?.getOrNull()
4951
if (existingWebSocket != null) {
50-
if (existingWebSocket.isClosed.get()) {
51-
_connectResult.set(null)
52+
if (existingWebSocket.isClosed) {
53+
connectionResultReference.set(null)
5254
} else {
5355
return@coroutineScope existingResult
5456
}
5557
}
5658
}
5759

58-
val deferredInProgressConnection = _connectionInProgress.get()
60+
val deferredInProgressConnection = connectionInProgressReference.get()
5961
if (deferredInProgressConnection != null && !deferredInProgressConnection.isCompleted) {
6062
return@coroutineScope deferredInProgressConnection.await()
6163
}
6264

6365
mutex.withLock {
64-
val existingResultInLock = _connectResult.get()
66+
val existingResultInLock = connectionResultReference.get()
6567
val existingWebSocket = existingResultInLock?.getOrNull()
6668
if (existingWebSocket != null) {
67-
if (existingWebSocket.isClosed.get()) {
68-
_connectResult.set(null)
69+
if (existingWebSocket.isClosed) {
70+
connectionResultReference.set(null)
6971
} else {
7072
return@coroutineScope existingResultInLock
7173
}
7274
}
7375

74-
val deferredInProgressConnectionInLock = _connectionInProgress.get()
76+
val deferredInProgressConnectionInLock = connectionInProgressReference.get()
7577
if (deferredInProgressConnectionInLock != null && !deferredInProgressConnectionInLock.isCompleted) {
7678
return@coroutineScope deferredInProgressConnectionInLock.await()
7779
}
7880

7981
val newDeferredInProgressConnection = async { attemptConnection() }
80-
_connectionInProgress.set(newDeferredInProgressConnection)
82+
connectionInProgressReference.set(newDeferredInProgressConnection)
8183
val connectionResult = newDeferredInProgressConnection.await()
82-
_connectResult.set(connectionResult)
83-
_connectionInProgress.set(null)
84+
connectionResultReference.set(connectionResult)
85+
connectionInProgressReference.set(null)
8486
connectionResult
8587
}
8688
}
@@ -92,7 +94,7 @@ internal class EventsWebSocketProvider(
9294
authorizer,
9395
okHttpClient,
9496
json,
95-
logger
97+
loggerProvider
9698
)
9799
eventsWebSocket.connect()
98100
Result.success(eventsWebSocket)

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/WebSocketMessage.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
package com.amplifyframework.aws.appsync.events.data
1717

18+
import com.amplifyframework.aws.appsync.events.DisconnectReason
1819
import kotlinx.serialization.SerialName
1920
import kotlinx.serialization.Serializable
21+
import kotlinx.serialization.json.JsonArray
2022
import kotlinx.serialization.json.JsonElement
2123

2224
@Serializable
@@ -51,7 +53,8 @@ internal sealed class WebSocketMessage {
5153
internal data class Publish(
5254
val id: String,
5355
val channel: String,
54-
val events: List<Boolean>
56+
val events: JsonArray,
57+
val authorization: Map<String, String>
5558
) : Send() {
5659
override val type = "publish"
5760
}
@@ -97,22 +100,30 @@ internal sealed class WebSocketMessage {
97100
override val id: String,
98101
val errors: List<WebSocketError>
99102
) : Subscription()
103+
104+
@Serializable @SerialName("publish_success")
105+
internal data class PublishSuccess(
106+
override val id: String,
107+
@SerialName("successful") val successfulEvents: List<SuccessfulEvent>,
108+
@SerialName("failed") val failedEvents: List<FailedEvent>
109+
) : Subscription()
100110
}
101111

102112
@Serializable @SerialName("error")
103113
data class Error(val errors: List<WebSocketError>)
104114
}
105115

106-
internal data class Closed(val userInitiated: Boolean, val throwable: Throwable? = null) : WebSocketMessage()
116+
internal data class Closed(val reason: DisconnectReason) : WebSocketMessage()
107117
}
108118

109119
@Serializable
110120
data class WebSocketError(val errorType: String, val message: String? = null) {
111121

112122
// fallback message is only used if WebSocketError didn't provide a message
113123
fun toEventsException(fallbackMessage: String? = null): EventsException {
124+
val message = this.message ?: fallbackMessage
114125
return when (errorType) {
115-
"UnauthorizedException" -> UnauthorizedException(message ?: fallbackMessage)
126+
"UnauthorizedException" -> UnauthorizedException(message)
116127
else -> EventsException(message = "$errorType: $message")
117128
}
118129
}

0 commit comments

Comments
 (0)