Skip to content

Commit 00eb015

Browse files
committed
Event REST Publish for 200 codes
1 parent cbc383c commit 00eb015

File tree

12 files changed

+661
-18
lines changed

12 files changed

+661
-18
lines changed

appsync/aws-appsync-events/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ dependencies {
5353
testImplementation(libs.test.mockk)
5454
testImplementation(libs.test.kotlin.coroutines)
5555
testImplementation(libs.test.kotest.assertions)
56+
testImplementation(libs.test.mockwebserver)
5657
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/Events.kt

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
1818
import com.amplifyframework.aws.appsync.events.data.ChannelAuthorizers
1919
import com.amplifyframework.aws.appsync.events.data.EventsException
2020
import com.amplifyframework.aws.appsync.events.data.PublishResult
21+
import kotlinx.serialization.json.Json
2122
import kotlinx.serialization.json.JsonElement
23+
import okhttp3.OkHttpClient
24+
import org.jetbrains.annotations.VisibleForTesting
2225

2326
/**
2427
* The main class for interacting with AWS AppSync Events
@@ -27,12 +30,33 @@ import kotlinx.serialization.json.JsonElement
2730
* @param connectAuthorizer for AWS AppSync Websocket Pub/Sub connection.
2831
* @param defaultChannelAuthorizers passed to created channels if not overridden.
2932
*/
30-
class Events(
33+
class Events @VisibleForTesting internal constructor(
3134
val endpoint: String,
3235
val connectAuthorizer: AppSyncAuthorizer,
33-
val defaultChannelAuthorizers: ChannelAuthorizers
36+
val defaultChannelAuthorizers: ChannelAuthorizers,
37+
okHttpClient: OkHttpClient
3438
) {
3539

40+
/**
41+
* The main class for interacting with AWS AppSync Events
42+
*
43+
* @property endpoint AWS AppSync Events endpoint.
44+
* @param connectAuthorizer for AWS AppSync Websocket Pub/Sub connection.
45+
* @param defaultChannelAuthorizers passed to created channels if not overridden.
46+
*/
47+
constructor(
48+
endpoint: String,
49+
connectAuthorizer: AppSyncAuthorizer,
50+
defaultChannelAuthorizers: ChannelAuthorizers
51+
) : this(endpoint, connectAuthorizer, defaultChannelAuthorizers, OkHttpClient.Builder().build())
52+
53+
private val json = Json {
54+
encodeDefaults = true
55+
ignoreUnknownKeys = true
56+
}
57+
private val endpoints = EventsEndpoints(endpoint)
58+
private val httpClient = RestClient(endpoints.restEndpoint, okHttpClient, json)
59+
3660
/**
3761
* Publish a single event to a channel.
3862
*
@@ -47,7 +71,7 @@ class Events(
4771
event: JsonElement,
4872
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
4973
): PublishResult {
50-
TODO("Need to implement")
74+
return httpClient.post(channelName, authorizer, event)
5175
}
5276

5377
/**
@@ -64,7 +88,7 @@ class Events(
6488
events: List<JsonElement>,
6589
authorizer: AppSyncAuthorizer = this.defaultChannelAuthorizers.publishAuthorizer
6690
): PublishResult {
67-
TODO("Need to implement")
91+
return httpClient.post(channelName, authorizer, events)
6892
}
6993

7094
/**
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package com.amplifyframework.aws.appsync.events
16+
17+
import com.amplifyframework.aws.appsync.events.data.EventsException
18+
import okhttp3.HttpUrl
19+
import okhttp3.HttpUrl.Companion.toHttpUrl
20+
21+
/**
22+
* Class representing the Events API endpoint. There are multiple URLs associated with each AppSync endpoint: the
23+
* appsync server URL, for sending HTTP requests, and the realtime URL, for establishing websocket connections. This
24+
* class derives the realtime URL from the server URL.
25+
*/
26+
internal class EventsEndpoints(private val endpoint: String) {
27+
28+
companion object {
29+
private val standardEndpointRegex =
30+
"^https://\\w{26}\\.appsync-api\\.\\w{2}(?:-\\w{2,})+-\\d\\.amazonaws.com(?:\\.cn)?/event$".toRegex()
31+
}
32+
33+
/**
34+
* The URL to use for HTTP requests.
35+
*/
36+
val restEndpoint = try {
37+
endpoint.toHttpUrl()
38+
} catch (e: Exception) {
39+
throw EventsException("Invalid endpoint provided", e)
40+
}
41+
42+
val host = restEndpoint.host
43+
44+
/**
45+
* The URL to use for IAM Signing of WebSocket requests.
46+
* While it may be confusing to return https instead of wss, Okhttp expects this and converts.
47+
*/
48+
val websocketBaseEndpoint: HttpUrl by lazy {
49+
restEndpoint.newBuilder().apply {
50+
if (standardEndpointRegex.matches(endpoint)) {
51+
host(restEndpoint.host.replace("appsync-api", "appsync-realtime-api"))
52+
}
53+
}.build()
54+
}
55+
56+
/**
57+
* The URL to use for WebSocket requests.
58+
* While it may be confusing to return https instead of wss, Okhttp expects this and converts.
59+
*/
60+
val websocketRealtimeEndpoint: HttpUrl by lazy {
61+
websocketBaseEndpoint.newBuilder().apply {
62+
addPathSegment("realtime")
63+
}.build()
64+
}
65+
66+
init {
67+
if (!this.restEndpoint.isHttps) {
68+
throw EventsException("AppSync URL must start with https")
69+
}
70+
}
71+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.aws.appsync.events
17+
18+
import com.amplifyframework.aws.appsync.core.AppSyncAuthorizer
19+
import com.amplifyframework.aws.appsync.core.AppSyncRequest
20+
import com.amplifyframework.aws.appsync.events.data.PublishResult
21+
import com.amplifyframework.aws.appsync.events.utils.HeaderKeys
22+
import com.amplifyframework.aws.appsync.events.utils.HeaderValues
23+
import kotlinx.serialization.json.Json
24+
import kotlinx.serialization.json.JsonArray
25+
import kotlinx.serialization.json.JsonElement
26+
import kotlinx.serialization.json.JsonObject
27+
import kotlinx.serialization.json.JsonPrimitive
28+
import okhttp3.HttpUrl
29+
import okhttp3.MediaType.Companion.toMediaType
30+
import okhttp3.OkHttpClient
31+
import okhttp3.Request
32+
import okhttp3.RequestBody.Companion.toRequestBody
33+
34+
internal class RestClient(
35+
private val url: HttpUrl,
36+
private val okHttpClient: OkHttpClient,
37+
private val json: Json
38+
) {
39+
40+
suspend fun post(channelName: String, authorizer: AppSyncAuthorizer, event: JsonElement): PublishResult {
41+
return post(channelName, authorizer, events = listOf(event))
42+
}
43+
44+
suspend fun post(channelName: String, authorizer: AppSyncAuthorizer, events: List<JsonElement>): PublishResult {
45+
val postBody = JsonObject(
46+
content = mapOf(
47+
"channel" to JsonPrimitive(channelName),
48+
"events" to JsonArray(events.map { JsonPrimitive(it.toString()) })
49+
)
50+
).toString()
51+
52+
val preAuthRequest = Request.Builder().apply {
53+
url(url)
54+
addHeader(HeaderKeys.ACCEPT, HeaderValues.ACCEPT_APPLICATION_JSON)
55+
addHeader(HeaderKeys.CONTENT_TYPE, HeaderValues.CONTENT_TYPE_APPLICATION_JSON)
56+
addHeader(HeaderKeys.HOST, url.host)
57+
post(postBody.toRequestBody(HeaderValues.CONTENT_TYPE_APPLICATION_JSON.toMediaType()))
58+
}.build()
59+
60+
val authHeaders = authorizer.getAuthorizationHeaders(object : AppSyncRequest {
61+
override val method: AppSyncRequest.HttpMethod
62+
get() = AppSyncRequest.HttpMethod.POST
63+
override val url: String
64+
get() = preAuthRequest.url.toString()
65+
override val headers: Map<String, String>
66+
get() = preAuthRequest.headers.toMap()
67+
override val body: String
68+
get() = postBody
69+
})
70+
71+
val authRequest = preAuthRequest.newBuilder().apply {
72+
authHeaders.forEach {
73+
header(it.key, it.value)
74+
}
75+
}.build()
76+
77+
try {
78+
val result = okHttpClient.newCall(authRequest).execute()
79+
return if (result.isSuccessful) {
80+
json.decodeFromString<PublishResult>(result.body.string())
81+
} else {
82+
TODO("Convert to proper exception type")
83+
}
84+
} catch (e: Exception) {
85+
TODO("Convert to proper exception type")
86+
}
87+
}
88+
}

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/EventsMessage.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ import kotlinx.serialization.json.JsonElement
2121
*
2222
* @property data of the received event, formatted in json.
2323
*/
24-
data class EventsMessage(val data: JsonElement)
24+
data class EventsMessage(val data: JsonElement)

appsync/aws-appsync-events/src/main/java/com/amplifyframework/aws/appsync/events/data/PublishResult.kt

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
*/
1515
package com.amplifyframework.aws.appsync.events.data
1616

17+
import kotlinx.serialization.SerialName
18+
import kotlinx.serialization.Serializable
19+
1720
/**
1821
* Contains the result of an event(s) publish call.
1922
*
@@ -24,28 +27,29 @@ package com.amplifyframework.aws.appsync.events.data
2427
* Failed = All events failed to publish
2528
* PartialSuccess = Mix of successful and failed events. Check event indexes to determine individual states.
2629
*/
30+
@Serializable
2731
data class PublishResult internal constructor(
28-
val successfulEvents: List<SuccessfulEvent>,
29-
val failedEvents: List<FailedEvent>
32+
@SerialName("successful") val successfulEvents: List<SuccessfulEvent>,
33+
@SerialName("failed") val failedEvents: List<FailedEvent>
3034
) {
3135

3236
/**
3337
* Contains identifying information of an event AWS AppSync failed to process.
3438
*/
3539
sealed class Status {
36-
data object Successful: Status()
37-
data object Failed: Status()
38-
data object PartialSuccess: Status()
40+
data object Successful : Status()
41+
data object Failed : Status()
42+
data object PartialSuccess : Status()
3943
}
4044

4145
val status: Status
4246
get() {
43-
return when {
44-
successfulEvents.isNotEmpty() && failedEvents.isNotEmpty() -> Status.PartialSuccess
45-
failedEvents.isNotEmpty() -> Status.Failed
46-
else -> Status.Successful
47+
return when {
48+
successfulEvents.isNotEmpty() && failedEvents.isNotEmpty() -> Status.PartialSuccess
49+
failedEvents.isNotEmpty() -> Status.Failed
50+
else -> Status.Successful
51+
}
4752
}
48-
}
4953
}
5054

5155
/**
@@ -54,6 +58,7 @@ data class PublishResult internal constructor(
5458
* @property identifier identifier of event used for logging purposes.
5559
* @property index of the event as it was sent in the publish.
5660
*/
61+
@Serializable
5762
data class SuccessfulEvent internal constructor(
5863
val identifier: String,
5964
val index: Int
@@ -67,9 +72,10 @@ data class SuccessfulEvent internal constructor(
6772
* @property errorCode for the failed event.
6873
* @property errorMessage for the failed event.
6974
*/
75+
@Serializable
7076
data class FailedEvent internal constructor(
7177
val identifier: String,
7278
val index: Int,
73-
val errorCode: Int?,
74-
val errorMessage: String?
75-
)
79+
@SerialName("code") val errorCode: Int? = null,
80+
@SerialName("message") val errorMessage: String? = null
81+
)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amplifyframework.aws.appsync.events.utils
17+
18+
internal object HeaderKeys {
19+
const val HOST = "host"
20+
const val ACCEPT = "accept"
21+
const val CONTENT_TYPE = "content-type"
22+
}
23+
24+
internal object HeaderValues {
25+
const val ACCEPT_APPLICATION_JSON = "application/json, text/javascript"
26+
const val CONTENT_TYPE_APPLICATION_JSON = "application/json; charset=UTF-8"
27+
}

0 commit comments

Comments
 (0)