Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ class Events @VisibleForTesting internal constructor(
val endpoint: String,
val connectAuthorizer: AppSyncAuthorizer,
val defaultChannelAuthorizers: ChannelAuthorizers,
options: Options,
okHttpClient: OkHttpClient
okHttpClient: OkHttpClient,
loggerProvider: LoggerProvider?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove Options here instead of making it private val? Seems it would be simpler to keep it as the number of options potentially grows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had originally created this internal constructor to make the okhttp client injectable to inject a mocking interceptor. Technically this isn't needed now so I can revert back to just using the public constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed change. @mattcreaser can you reapprove

) {

data class Options(
val loggerProvider: LoggerProvider? = null
val loggerProvider: LoggerProvider? = null,
val okHttpConfigurationProvider: OkHttpConfigurationProvider? = null
)

/**
Expand All @@ -58,11 +59,13 @@ class Events @VisibleForTesting internal constructor(
defaultChannelAuthorizers: ChannelAuthorizers,
options: Options = Options()
) : this(
endpoint,
connectAuthorizer,
defaultChannelAuthorizers,
options,
OkHttpClient.Builder().build()
endpoint = endpoint,
connectAuthorizer = connectAuthorizer,
defaultChannelAuthorizers = defaultChannelAuthorizers,
okHttpClient = OkHttpClient.Builder().apply {
options.okHttpConfigurationProvider?.applyConfiguration(this)
}.build(),
loggerProvider = options.loggerProvider
)

private val json = Json {
Expand All @@ -76,7 +79,7 @@ class Events @VisibleForTesting internal constructor(
connectAuthorizer,
okHttpClient,
json,
options.loggerProvider
loggerProvider
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,11 @@ class EventsChannel internal constructor(
* @param authorizer for the publish call. If not provided, the EventChannel publish authorizer will be used.
* @return result of publish.
*/
@Throws(EventsException::class)
suspend fun publish(
event: JsonElement,
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
): PublishResult {
return try {
publishToWebSocket(listOf(event), authorizer)
} catch (exception: Exception) {
throw exception.toEventsException()
}
return publish(listOf(event), authorizer)
}

/**
Expand All @@ -103,22 +98,27 @@ class EventsChannel internal constructor(
* @param authorizer for the publish call. If not provided, the EventChannel publish authorizer will be used.
* @return result of publish.
*/
@Throws(Exception::class)
suspend fun publish(
events: List<JsonElement>,
authorizer: AppSyncAuthorizer = this.authorizers.publishAuthorizer
): PublishResult {
return try {
publishToWebSocket(events, authorizer)
publishToWebSocket(events, authorizer).let {
PublishResult.Response(
successfulEvents = it.successfulEvents,
failedEvents = it.failedEvents
)
}
} catch (exception: Exception) {
throw exception.toEventsException()
PublishResult.Failure(exception.toEventsException())
}
}

@Throws(Exception::class)
private suspend fun publishToWebSocket(
events: List<JsonElement>,
authorizer: AppSyncAuthorizer
): PublishResult = coroutineScope {
): WebSocketMessage.Received.PublishSuccess = coroutineScope {
val publishId = UUID.randomUUID().toString()
val publishMessage = WebSocketMessage.Send.Publish(
id = publishId,
Expand All @@ -136,19 +136,16 @@ class EventsChannel internal constructor(

return@coroutineScope when (val response = deferredResponse.await()) {
is WebSocketMessage.Received.PublishSuccess -> {
PublishResult(response.successfulEvents, response.failedEvents)
response
}

is WebSocketMessage.ErrorContainer -> {
val fallbackMessage = "Failed to publish event(s)"
throw response.errors.firstOrNull()?.toEventsException(fallbackMessage)
?: EventsException(fallbackMessage)
}

is WebSocketMessage.Closed -> {
throw response.reason.toCloseException()
}

else -> throw EventsException("Received unexpected publish response of type: ${response::class}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ internal class EventsWebSocket(
url(eventsEndpoints.websocketRealtimeEndpoint)
addHeader(HeaderKeys.SEC_WEBSOCKET_PROTOCOL, HeaderValues.SEC_WEBSOCKET_PROTOCOL_APPSYNC_EVENTS)
addHeader(HeaderKeys.HOST, eventsEndpoints.restEndpoint.host)
addHeader(HeaderKeys.USER_AGENT, HeaderValues.USER_AGENT)
addHeader(HeaderKeys.X_AMZ_USER_AGENT, HeaderValues.USER_AGENT)
}.build()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.aws.appsync.events

import okhttp3.OkHttpClient

/**
* An OkHttpConfigurationProvider is a hook provided to a customer, enabling them to customize
* the OkHttp client used by the Events Library.
*
* This hook is for advanced use cases, such as where a user may want to append some of
* their own request headers, configure timeouts, or otherwise manipulate an outgoing request.
*/
fun interface OkHttpConfigurationProvider {
/**
* The OkHttp.Builder() used for the Events library is provided. This mutable builder allows for setting custom
* configurations on the OkHttp.Builder() instance. The library will run this configuration when the Events
* class is constructed and then build() the OkHttp client which will be used for all library network calls.
* @param okHttpClientBuilder An [OkHttpClient.Builder] instance
*/
fun applyConfiguration(okHttpClientBuilder: OkHttpClient.Builder)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ internal class RestClient(
}

suspend fun post(channelName: String, authorizer: AppSyncAuthorizer, events: List<JsonElement>): PublishResult {
return try {
executePost(channelName, authorizer, events)
} catch (exception: Exception) {
PublishResult.Failure(exception.toEventsException())
}
}

@Throws(Exception::class)
internal suspend fun executePost(
channelName: String,
authorizer: AppSyncAuthorizer,
events: List<JsonElement>
): PublishResult.Response {
val postBody = JsonObject(
content = mapOf(
"channel" to JsonPrimitive(channelName),
Expand All @@ -57,6 +70,8 @@ internal class RestClient(
addHeader(HeaderKeys.ACCEPT, HeaderValues.ACCEPT_APPLICATION_JSON)
addHeader(HeaderKeys.CONTENT_TYPE, HeaderValues.CONTENT_TYPE_APPLICATION_JSON)
addHeader(HeaderKeys.HOST, url.host)
addHeader(HeaderKeys.USER_AGENT, HeaderValues.USER_AGENT)
addHeader(HeaderKeys.X_AMZ_USER_AGENT, HeaderValues.USER_AGENT)
post(postBody.toRequestBody(HeaderValues.CONTENT_TYPE_APPLICATION_JSON.toMediaType()))
}.build()

Expand All @@ -80,7 +95,7 @@ internal class RestClient(
val result = okHttpClient.newCall(authRequest).execute()
val body = result.body.string()
return if (result.isSuccessful) {
json.decodeFromString<PublishResult>(body)
json.decodeFromString<PublishResult.Response>(body)
} else {
throw try {
val errors = json.decodeFromString<EventsErrors>(body)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package com.amplifyframework.aws.appsync.events.data

import java.net.SocketTimeoutException
import java.net.UnknownHostException

/**
Expand Down Expand Up @@ -43,10 +44,10 @@ open class EventsException internal constructor(
}
}

fun Exception.toEventsException(): EventsException {
internal fun Exception.toEventsException(): EventsException {
return when (this) {
is EventsException -> this
is UnknownHostException -> NetworkException(throwable = this)
is UnknownHostException, is SocketTimeoutException -> NetworkException(throwable = this)
else -> EventsException.unknown(cause = this)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,51 @@ import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable

/**
* Contains the result of an event(s) publish call.
* Sealed Result type of an event(s) publish call.
* PublishResult.Success = The publish call was successfully processed by the service
* PublishResult.Failure = The publish call did not succeed
*
* @property successfulEvents list of events successfully processed by AWS AppSync.
* @property failedEvents list of events that AWS AppSync failed to process.
* @property status of the publish call.
* Successful = All events published successfully
* Failed = All events failed to publish
* PartialSuccess = Mix of successful and failed events. Check event indexes to determine individual states.
*/
@Serializable
data class PublishResult internal constructor(
@SerialName("successful") val successfulEvents: List<SuccessfulEvent>,
@SerialName("failed") val failedEvents: List<FailedEvent>
) {
sealed class PublishResult {

/**
* Contains identifying information of an event AWS AppSync failed to process.
*/
sealed class Status {
data object Successful : Status()
data object Failed : Status()
data object PartialSuccess : Status()
}
* Represents a successful response, which may contain both
* successful and failed events. A Success case indicates the publish
* itself succeeded, not that all events were processed successfully.
*
* @property successfulEvents list of events successfully processed by AWS AppSync.
* @property failedEvents list of events that AWS AppSync failed to process.
* @property status of the publish call.
* Successful = All events published successfully
* Failed = All events failed to publish
* PartialSuccess = Mix of successful and failed events. Check event indexes to determine individual states. */
@Serializable
data class Response internal constructor(
@SerialName("successful") val successfulEvents: List<SuccessfulEvent>,
@SerialName("failed") val failedEvents: List<FailedEvent>
) : PublishResult() {

/**
* Contains identifying information of an event AWS AppSync failed to process.
*/
sealed class Status {
data object Successful : Status()
data object Failed : Status()
data object PartialSuccess : Status()
}

val status: Status
get() {
return when {
val status: Status
get() = when {
successfulEvents.isNotEmpty() && failedEvents.isNotEmpty() -> Status.PartialSuccess
failedEvents.isNotEmpty() -> Status.Failed
else -> Status.Successful
}
}
}

/**
* Represents a failed response where the publish was not successful
*/
data class Failure internal constructor(val error: EventsException) : PublishResult()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@

package com.amplifyframework.aws.appsync.events.utils

import com.amplifyframework.aws.appsync.events.BuildConfig

internal object HeaderKeys {
const val HOST = "host"
const val ACCEPT = "accept"
const val CONTENT_TYPE = "content-type"
const val SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol"
const val USER_AGENT = "User-Agent"
const val X_AMZ_USER_AGENT = "x-amz-user-agent"
}

internal object HeaderValues {
const val ACCEPT_APPLICATION_JSON = "application/json, text/javascript"
const val CONTENT_TYPE_APPLICATION_JSON = "application/json; charset=UTF-8"
const val SEC_WEBSOCKET_PROTOCOL_APPSYNC_EVENTS = "aws-appsync-event-ws"
const val USER_AGENT = "aws-appsync-events-android#${BuildConfig.VERSION_NAME}"
}
Loading