diff --git a/chat-android/src/main/java/com/ably/chat/Room.kt b/chat-android/src/main/java/com/ably/chat/Room.kt index 22d91f3..0ff81d9 100644 --- a/chat-android/src/main/java/com/ably/chat/Room.kt +++ b/chat-android/src/main/java/com/ably/chat/Room.kt @@ -105,7 +105,8 @@ internal class DefaultRoom( override val reactions: RoomReactions = DefaultRoomReactions( roomId = roomId, - realtimeClient = realtimeClient, + clientId = realtimeClient.auth.clientId, + realtimeChannels = realtimeClient.channels, ) override val typing: Typing = DefaultTyping( diff --git a/chat-android/src/main/java/com/ably/chat/RoomReactions.kt b/chat-android/src/main/java/com/ably/chat/RoomReactions.kt index 06b214c..f58b712 100644 --- a/chat-android/src/main/java/com/ably/chat/RoomReactions.kt +++ b/chat-android/src/main/java/com/ably/chat/RoomReactions.kt @@ -2,7 +2,12 @@ package com.ably.chat +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel +import io.ably.lib.types.AblyException +import io.ably.lib.types.ErrorInfo +import io.ably.lib.types.MessageExtras /** * This interface is used to interact with room-level reactions in a chat room: subscribing to reactions and sending them. @@ -100,19 +105,53 @@ data class SendReactionParams( internal class DefaultRoomReactions( roomId: String, - private val realtimeClient: RealtimeClient, + private val clientId: String, + realtimeChannels: AblyRealtime.Channels, ) : RoomReactions { + // (CHA-ER1) private val roomReactionsChannelName = "$roomId::\$chat::\$reactions" - override val channel: Channel - get() = realtimeClient.channels.get(roomReactionsChannelName, ChatChannelOptions()) + override val channel: Channel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions()) + // (CHA-ER3) Ephemeral room reactions are sent to Ably via the Realtime connection via a send method. + // (CHA-ER3a) Reactions are sent on the channel using a message in a particular format - see spec for format. override suspend fun send(params: SendReactionParams) { - TODO("Not yet implemented") + val pubSubMessage = PubSubMessage().apply { + data = JsonObject().apply { + addProperty("type", params.type) + params.metadata?.let { add("metadata", it.toJson()) } + } + params.headers?.let { + extras = MessageExtras( + JsonObject().apply { + add("headers", it.toJson()) + }, + ) + } + } + channel.publishCoroutine(pubSubMessage) } override fun subscribe(listener: RoomReactions.Listener): Subscription { - TODO("Not yet implemented") + val messageListener = PubSubMessageListener { + val pubSubMessage = it ?: throw AblyException.fromErrorInfo( + ErrorInfo("Got empty pubsub channel message", HttpStatusCodes.BadRequest, ErrorCodes.BadRequest), + ) + val data = pubSubMessage.data as? JsonObject ?: throw AblyException.fromErrorInfo( + ErrorInfo("Unrecognized Pub/Sub channel's message for `roomReaction` event", HttpStatusCodes.InternalServerError), + ) + val reaction = Reaction( + type = data.requireString("type"), + createdAt = pubSubMessage.timestamp, + clientId = pubSubMessage.clientId, + metadata = data.get("metadata")?.toMap() ?: mapOf(), + headers = pubSubMessage.extras.asJsonObject().get("headers")?.toMap() ?: mapOf(), + isSelf = pubSubMessage.clientId == clientId, + ) + listener.onReaction(reaction) + } + channel.subscribe(RoomReactionEventType.Reaction.eventName, messageListener) + return Subscription { channel.unsubscribe(RoomReactionEventType.Reaction.eventName, messageListener) } } override fun onDiscontinuity(listener: EmitsDiscontinuities.Listener): Subscription { diff --git a/chat-android/src/main/java/com/ably/chat/Utils.kt b/chat-android/src/main/java/com/ably/chat/Utils.kt index a915530..8f57352 100644 --- a/chat-android/src/main/java/com/ably/chat/Utils.kt +++ b/chat-android/src/main/java/com/ably/chat/Utils.kt @@ -35,6 +35,21 @@ suspend fun Channel.detachCoroutine() = suspendCoroutine { continuation -> }) } +suspend fun Channel.publishCoroutine(message: PubSubMessage) = suspendCoroutine { continuation -> + publish( + message, + object : CompletionListener { + override fun onSuccess() { + continuation.resume(Unit) + } + + override fun onError(reason: ErrorInfo?) { + continuation.resumeWithException(AblyException.fromErrorInfo(reason)) + } + }, + ) +} + @Suppress("FunctionName") fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOptions { val options = ChannelOptions() diff --git a/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt b/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt new file mode 100644 index 0000000..182c4e9 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/RoomReactionsTest.kt @@ -0,0 +1,109 @@ +package com.ably.chat + +import com.google.gson.JsonObject +import io.ably.lib.realtime.AblyRealtime.Channels +import io.ably.lib.realtime.Channel +import io.ably.lib.realtime.buildRealtimeChannel +import io.ably.lib.types.MessageExtras +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertEquals +import org.junit.Before +import org.junit.Test + +class RoomReactionsTest { + private val realtimeChannels = mockk(relaxed = true) + private val realtimeChannel = spyk(buildRealtimeChannel("room1::\$chat::\$reactions")) + private lateinit var roomReactions: DefaultRoomReactions + + @Before + fun setUp() { + every { realtimeChannels.get(any(), any()) } answers { + val channelName = firstArg() + if (channelName == "room1::\$chat::\$reactions") { + realtimeChannel + } else { + buildRealtimeChannel(channelName) + } + } + + roomReactions = DefaultRoomReactions( + roomId = "room1", + clientId = "client1", + realtimeChannels = realtimeChannels, + ) + } + + /** + * @spec CHA-ER1 + */ + @Test + fun `channel name is set according to the spec`() = runTest { + val roomReactions = DefaultRoomReactions( + roomId = "foo", + clientId = "client1", + realtimeChannels = realtimeChannels, + ) + + assertEquals( + "foo::\$chat::\$reactions", + roomReactions.channel.name, + ) + } + + /** + * @spec CHA-ER3a + */ + @Test + fun `should be able to subscribe to incoming reactions`() = runTest { + val pubSubMessageListenerSlot = slot() + + every { realtimeChannel.subscribe("roomReaction", capture(pubSubMessageListenerSlot)) } returns Unit + + val deferredValue = DeferredValue() + + roomReactions.subscribe { + deferredValue.completeWith(it) + } + + verify { realtimeChannel.subscribe("roomReaction", any()) } + + pubSubMessageListenerSlot.captured.onMessage( + PubSubMessage().apply { + data = JsonObject().apply { + addProperty("type", "like") + } + clientId = "clientId" + timestamp = 1000L + extras = MessageExtras( + JsonObject().apply { + add( + "headers", + JsonObject().apply { + addProperty("foo", "bar") + }, + ) + }, + ) + }, + ) + + val reaction = deferredValue.await() + + assertEquals( + Reaction( + type = "like", + createdAt = 1000L, + clientId = "clientId", + metadata = mapOf(), + headers = mapOf("foo" to "bar"), + isSelf = false, + ), + reaction, + ) + } +}