diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index 90633906..831a1b0c 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -32,7 +32,7 @@ public final class RealtimeClientV2: Sendable { var messageTask: Task<Void, Never>? var connectionTask: Task<Void, Never>? - var channels: [RealtimeChannelV2] = [] + var channels: [String: RealtimeChannelV2] = [:] var sendBuffer: [@Sendable () -> Void] = [] var conn: (any WebSocket)? @@ -51,13 +51,11 @@ public final class RealtimeClientV2: Sendable { /// All managed channels indexed by their topics. public var channels: [String: RealtimeChannelV2] { - mutableState.channels.reduce( - into: [:], - { $0[$1.topic] = $1 } - ) + mutableState.channels } private let statusSubject = AsyncValueSubject<RealtimeClientStatus>(.disconnected) + private let heartbeatSubject = AsyncValueSubject<HeartbeatStatus?>(nil) /// Listen for connection status changes. /// @@ -72,6 +70,16 @@ public final class RealtimeClientV2: Sendable { set { statusSubject.yield(newValue) } } + /// Listen for heartbeat status. + /// + /// You can also use ``onHeartbeat(_:)`` for a closure based method. + public var heartbeat: AsyncStream<HeartbeatStatus> { + AsyncStream( + heartbeatSubject.values.compactMap { $0 } + as AsyncCompactMapSequence<AsyncStream<HeartbeatStatus?>, HeartbeatStatus> + ) + } + /// Listen for connection status changes. /// - Parameter listener: Closure that will be called when connection status changes. /// - Returns: An observation handle that can be used to stop listening. @@ -84,6 +92,21 @@ public final class RealtimeClientV2: Sendable { return RealtimeSubscription { task.cancel() } } + /// Listen for heatbeat checks. + /// - Parameter listener: Closure that will be called when heartbeat status changes. + /// - Returns: An observation handle that can be used to stop listening. + /// + /// - Note: Use ``heartbeat`` if you prefer to use Async/Await. + public func onHeartbeat( + _ listener: @escaping @Sendable (HeartbeatStatus) -> Void + ) -> RealtimeSubscription { + let task = heartbeatSubject.onChange { message in + guard let message else { return } + listener(message) + } + return RealtimeSubscription { task.cancel() } + } + public convenience init(url: URL, options: RealtimeClientOptions) { var interceptors: [any HTTPClientInterceptor] = [] @@ -139,7 +162,7 @@ public final class RealtimeClientV2: Sendable { mutableState.withValue { $0.heartbeatTask?.cancel() $0.messageTask?.cancel() - $0.channels = [] + $0.channels = [:] } } @@ -223,14 +246,15 @@ public final class RealtimeClientV2: Sendable { private func onClose(code: Int?, reason: String?) { options.logger?.debug( - "WebSocket closed. Code: \(code?.description ?? "<none>"), Reason: \(reason ?? "<none>")") + "WebSocket closed. Code: \(code?.description ?? "<none>"), Reason: \(reason ?? "<none>")" + ) reconnect() } - private func reconnect() { + private func reconnect(disconnectReason: String? = nil) { Task { - disconnect() + disconnect(reason: disconnectReason) await connect(reconnect: true) } } @@ -246,35 +270,42 @@ public final class RealtimeClientV2: Sendable { _ topic: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } ) -> RealtimeChannelV2 { - var config = RealtimeChannelConfig( - broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), - presence: PresenceJoinConfig(key: ""), - isPrivate: false - ) - options(&config) + mutableState.withValue { + let realtimeTopic = "realtime:\(topic)" - let channel = RealtimeChannelV2( - topic: "realtime:\(topic)", - config: config, - socket: self, - logger: self.options.logger - ) + if let channel = $0.channels[realtimeTopic] { + return channel + } - mutableState.withValue { - $0.channels.append(channel) - } + var config = RealtimeChannelConfig( + broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), + presence: PresenceJoinConfig(key: ""), + isPrivate: false + ) + options(&config) - return channel + let channel = RealtimeChannelV2( + topic: realtimeTopic, + config: config, + socket: self, + logger: self.options.logger + ) + + $0.channels[realtimeTopic] = channel + + return channel + } } @available( - *, deprecated, + *, + deprecated, message: "Client handles channels automatically, this method will be removed on the next major release." ) public func addChannel(_ channel: RealtimeChannelV2) { mutableState.withValue { - $0.channels.append(channel) + $0.channels[channel.topic] = channel } } @@ -294,9 +325,7 @@ public final class RealtimeClientV2: Sendable { func _remove(_ channel: RealtimeChannelV2) { mutableState.withValue { - $0.channels.removeAll { - $0.joinRef == channel.joinRef - } + $0.channels[channel.topic] = nil } } @@ -372,6 +401,11 @@ public final class RealtimeClientV2: Sendable { } private func sendHeartbeat() async { + if status != .connected { + heartbeatSubject.yield(.disconnected) + return + } + let pendingHeartbeatRef: String? = mutableState.withValue { if $0.pendingHeartbeatRef != nil { $0.pendingHeartbeatRef = nil @@ -393,10 +427,12 @@ public final class RealtimeClientV2: Sendable { payload: [:] ) ) + heartbeatSubject.yield(.sent) await setAuth() } else { options.logger?.debug("Heartbeat timeout") - reconnect() + heartbeatSubject.yield(.timeout) + reconnect(disconnectReason: "heartbeat timeout") } } @@ -453,7 +489,11 @@ public final class RealtimeClientV2: Sendable { } private func onMessage(_ message: RealtimeMessageV2) async { - let channels = mutableState.withValue { + if message.topic == "phoenix", message.event == "phx_reply" { + heartbeatSubject.yield(message.status == .ok ? .ok : .error) + } + + let channel = mutableState.withValue { if let ref = message.ref, ref == $0.pendingHeartbeatRef { $0.pendingHeartbeatRef = nil options.logger?.debug("heartbeat received") @@ -462,10 +502,10 @@ public final class RealtimeClientV2: Sendable { .debug("Received event \(message.event) for channel \(message.topic)") } - return $0.channels.filter { $0.topic == message.topic } + return $0.channels[message.topic] } - for channel in channels { + if let channel { await channel.onMessage(message) } } @@ -488,7 +528,8 @@ public final class RealtimeClientV2: Sendable { Error: \(error) - """) + """ + ) } } diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index dd2b19b7..5f45a248 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -86,6 +86,19 @@ public enum RealtimeClientStatus: Sendable, CustomStringConvertible { } } +public enum HeartbeatStatus: Sendable { + /// Heartbeat was sent. + case sent + /// Heartbeat was received. + case ok + /// Server responded with an error. + case error + /// Heartbeat wasn't received in time. + case timeout + /// Socket is disconnected. + case disconnected +} + extension HTTPField.Name { static let apiKey = Self("apiKey")! } diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index e89541d2..f1517558 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -18,11 +18,11 @@ final class RealtimeTests: XCTestCase { let apiKey = "anon.api.key" #if !os(Windows) && !os(Linux) && !os(Android) - override func invokeTest() { - withMainSerialExecutor { - super.invokeTest() + override func invokeTest() { + withMainSerialExecutor { + super.invokeTest() + } } - } #endif var server: FakeWebSocket! @@ -303,11 +303,21 @@ final class RealtimeTests: XCTestCase { } } + let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([]) + let subscription = sut.onHeartbeat { status in + heartbeatStatuses.withValue { + $0.append(status) + } + } + defer { subscription.cancel() } + await sut.connect() await testClock.advance(by: .seconds(heartbeatInterval * 2)) await fulfillment(of: [expectation], timeout: 3) + + expectNoDifference(heartbeatStatuses.value, [.sent, .ok, .sent, .ok]) } func testHeartbeat_whenNoResponse_shouldReconnect() async throws { @@ -354,6 +364,34 @@ final class RealtimeTests: XCTestCase { ) } + func testHeartbeat_timeout() async throws { + let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([]) + let s1 = sut.onHeartbeat { status in + heartbeatStatuses.withValue { + $0.append(status) + } + } + defer { s1.cancel() } + + // Don't respond to any heartbeats + server.onEvent = { _ in } + + await sut.connect() + await testClock.advance(by: .seconds(heartbeatInterval)) + + // First heartbeat sent + XCTAssertEqual(heartbeatStatuses.value, [.sent]) + + // Wait for timeout + await testClock.advance(by: .seconds(timeoutInterval)) + + // Wait for next heartbeat. + await testClock.advance(by: .seconds(heartbeatInterval)) + + // Should have timeout status + XCTAssertEqual(heartbeatStatuses.value, [.sent, .timeout]) + } + func testBroadcastWithHTTP() async throws { await http.when { $0.url.path.hasSuffix("broadcast")