Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions appsync/aws-appsync-events/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ dependencies {
testImplementation(libs.test.mockk)
testImplementation(libs.test.kotlin.coroutines)
testImplementation(libs.test.kotest.assertions)
testImplementation(libs.test.mockwebserver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers
import com.amplifyframework.aws.appsync.events.data.EventsException
import com.amplifyframework.aws.appsync.events.data.PublishResult
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import okhttp3.OkHttpClient
import org.jetbrains.annotations.VisibleForTesting

/**
* The main class for interacting with AWS AppSync Events
Expand All @@ -27,12 +30,33 @@ import kotlinx.serialization.json.JsonElement
* @param connectAuthorizer for AWS AppSync Websocket Pub/Sub connection.
* @param defaultChannelAuthorizers passed to created channels if not overridden.
*/
class Events(
class Events @VisibleForTesting internal constructor(
val endpoint: String,
val connectAuthorizer: AppSyncAuthorizer,
val defaultChannelAuthorizers: ChannelAuthorizers
val defaultChannelAuthorizers: ChannelAuthorizers,
okHttpClient: OkHttpClient
) {

/**
* The main class for interacting with AWS AppSync Events
*
* @property endpoint AWS AppSync Events endpoint.
* @param connectAuthorizer for AWS AppSync Websocket Pub/Sub connection.
* @param defaultChannelAuthorizers passed to created channels if not overridden.
*/
constructor(
endpoint: String,
connectAuthorizer: AppSyncAuthorizer,
defaultChannelAuthorizers: ChannelAuthorizers
) : this(endpoint, connectAuthorizer, defaultChannelAuthorizers, OkHttpClient.Builder().build())

private val json = Json {
encodeDefaults = true
ignoreUnknownKeys = true
}
private val endpoints = EventsEndpoints(endpoint)
private val httpClient = RestClient(endpoints.restEndpoint, okHttpClient, json)

/**
* Publish a single event to a channel.
*
Expand All @@ -47,7 +71,7 @@ class Events(
event: JsonElement,
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
): PublishResult {
TODO("Need to implement")
return httpClient.post(channelName, authorizer, event)
}

/**
Expand All @@ -64,7 +88,7 @@ class Events(
events: List<JsonElement>,
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
): PublishResult {
TODO("Need to implement")
return httpClient.post(channelName, authorizer, events)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.aws.appsync.events

import com.amplifyframework.aws.appsync.events.data.EventsException
import okhttp3.HttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrl

/**
* Class representing the Events API endpoint. There are multiple URLs associated with each AppSync endpoint: the
* appsync server URL, for sending HTTP requests, and the realtime URL, for establishing websocket connections. This
* class derives the realtime URL from the server URL.
*/
internal class EventsEndpoints(private val endpoint: String) {

companion object {
private val standardEndpointRegex =
"^https://\\w{26}\\.appsync-api\\.\\w{2}(?:-\\w{2,})+-\\d\\.amazonaws.com(?:\\.cn)?/event$".toRegex()
}

/**
* The URL to use for HTTP requests.
*/
val restEndpoint = try {
endpoint.toHttpUrl()
} catch (e: Exception) {
throw EventsException("Invalid endpoint provided", e)
}

val host = restEndpoint.host

/**
* The URL to use for IAM Signing of WebSocket requests.
* While it may be confusing to return https instead of wss, Okhttp expects this and converts.
*/
val websocketBaseEndpoint: HttpUrl by lazy {
restEndpoint.newBuilder().apply {
if (standardEndpointRegex.matches(endpoint)) {
host(restEndpoint.host.replace("appsync-api", "appsync-realtime-api"))
}
}.build()
}

/**
* The URL to use for WebSocket requests.
* While it may be confusing to return https instead of wss, Okhttp expects this and converts.
*/
val websocketRealtimeEndpoint: HttpUrl by lazy {
websocketBaseEndpoint.newBuilder().apply {
addPathSegment("realtime")
}.build()
}

init {
if (!this.restEndpoint.isHttps) {
throw EventsException("AppSync URL must start with https")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amplifyframework.aws.appsync.events

import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
import com.amplifyframework.aws.appsync.core.AppSyncRequest
import com.amplifyframework.aws.appsync.events.data.PublishResult
import com.amplifyframework.aws.appsync.events.utils.HeaderKeys
import com.amplifyframework.aws.appsync.events.utils.HeaderValues
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonArray
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
import okhttp3.HttpUrl
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody

internal class RestClient(
private val url: HttpUrl,
private val okHttpClient: OkHttpClient,
private val json: Json
) {

suspend fun post(channelName: String, authorizer: AppSyncAuthorizer, event: JsonElement): PublishResult {
return post(channelName, authorizer, events = listOf(event))
}

suspend fun post(channelName: String, authorizer: AppSyncAuthorizer, events: List<JsonElement>): PublishResult {
val postBody = JsonObject(
content = mapOf(
"channel" to JsonPrimitive(channelName),
"events" to JsonArray(events.map { JsonPrimitive(it.toString()) })
)
).toString()

val preAuthRequest = Request.Builder().apply {
url(url)
addHeader(HeaderKeys.ACCEPT, HeaderValues.ACCEPT_APPLICATION_JSON)
addHeader(HeaderKeys.CONTENT_TYPE, HeaderValues.CONTENT_TYPE_APPLICATION_JSON)
addHeader(HeaderKeys.HOST, url.host)
post(postBody.toRequestBody(HeaderValues.CONTENT_TYPE_APPLICATION_JSON.toMediaType()))
}.build()

val authHeaders = authorizer.getAuthorizationHeaders(object : AppSyncRequest {
override val method: AppSyncRequest.HttpMethod
get() = AppSyncRequest.HttpMethod.POST
override val url: String
get() = preAuthRequest.url.toString()
override val headers: Map<String, String>
get() = preAuthRequest.headers.toMap()
override val body: String
get() = postBody
})

val authRequest = preAuthRequest.newBuilder().apply {
authHeaders.forEach {
header(it.key, it.value)
}
}.build()

try {
val result = okHttpClient.newCall(authRequest).execute()
return if (result.isSuccessful) {
json.decodeFromString<PublishResult>(result.body.string())
} else {
TODO("Convert to proper exception type")
}
} catch (e: Exception) {
TODO("Convert to proper exception type")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ import kotlinx.serialization.json.JsonElement
*
* @property data of the received event, formatted in json.
*/
data class EventsMessage(val data: JsonElement)
data class EventsMessage(val data: JsonElement)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
package com.amplifyframework.aws.appsync.events.data

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable

/**
* Contains the result of an event(s) publish call.
*
Expand All @@ -24,28 +27,29 @@ package com.amplifyframework.aws.appsync.events.data
* Failed = All events failed to publish
* PartialSuccess = Mix of successful and failed events. Check event indexes to determine individual states.
*/
@Serializable
data class PublishResult internal constructor(
val successfulEvents: List<SuccessfulEvent>,
val failedEvents: List<FailedEvent>
@SerialName("successful") val successfulEvents: List<SuccessfulEvent>,
@SerialName("failed") val failedEvents: List<FailedEvent>
) {

/**
* Contains identifying information of an event AWS AppSync failed to process.
*/
sealed class Status {
data object Successful: Status()
data object Failed: Status()
data object PartialSuccess: Status()
data object Successful : Status()
data object Failed : Status()
data object PartialSuccess : Status()
}

val status: Status
get() {
return when {
successfulEvents.isNotEmpty() && failedEvents.isNotEmpty() -> Status.PartialSuccess
failedEvents.isNotEmpty() -> Status.Failed
else -> Status.Successful
return when {
successfulEvents.isNotEmpty() && failedEvents.isNotEmpty() -> Status.PartialSuccess
failedEvents.isNotEmpty() -> Status.Failed
else -> Status.Successful
}
}
}
}

/**
Expand All @@ -54,6 +58,7 @@ data class PublishResult internal constructor(
* @property identifier identifier of event used for logging purposes.
* @property index of the event as it was sent in the publish.
*/
@Serializable
data class SuccessfulEvent internal constructor(
val identifier: String,
val index: Int
Expand All @@ -67,9 +72,10 @@ data class SuccessfulEvent internal constructor(
* @property errorCode for the failed event.
* @property errorMessage for the failed event.
*/
@Serializable
data class FailedEvent internal constructor(
val identifier: String,
val index: Int,
val errorCode: Int?,
val errorMessage: String?
)
@SerialName("code") val errorCode: Int? = null,
@SerialName("message") val errorMessage: String? = null
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amplifyframework.aws.appsync.events.utils

internal object HeaderKeys {
const val HOST = "host"
const val ACCEPT = "accept"
const val CONTENT_TYPE = "content-type"
}

internal object HeaderValues {
const val ACCEPT_APPLICATION_JSON = "application/json, text/javascript"
const val CONTENT_TYPE_APPLICATION_JSON = "application/json; charset=UTF-8"
}
Loading