From 1217dbb7d70594bcc19f1e53a61b62b82e565a8f Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 06:33:11 -0300 Subject: [PATCH 1/8] feat(realtime): add heartbeat callback --- Sources/Realtime/RealtimeClientV2.swift | 86 ++++++++++++++++--------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index 90633906..e9a9092e 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -32,7 +32,7 @@ public final class RealtimeClientV2: Sendable { var messageTask: Task? var connectionTask: Task? - var channels: [RealtimeChannelV2] = [] + var channels: [String: RealtimeChannelV2] = [:] var sendBuffer: [@Sendable () -> Void] = [] var conn: (any WebSocket)? @@ -51,13 +51,11 @@ public final class RealtimeClientV2: Sendable { /// All managed channels indexed by their topics. public var channels: [String: RealtimeChannelV2] { - mutableState.channels.reduce( - into: [:], - { $0[$1.topic] = $1 } - ) + mutableState.channels } private let statusSubject = AsyncValueSubject(.disconnected) + private let heartbeatSubject = AsyncValueSubject(nil) /// Listen for connection status changes. /// @@ -72,6 +70,13 @@ public final class RealtimeClientV2: Sendable { set { statusSubject.yield(newValue) } } + /// Listen for heartbeat checks. + /// + /// You can also use ``onHeartbeat(_:)`` for a closure based method. + public var heartbeat: AsyncStream { + heartbeatSubject.values.compactMap { $0 }.eraseToStream() + } + /// Listen for connection status changes. /// - Parameter listener: Closure that will be called when connection status changes. /// - Returns: An observation handle that can be used to stop listening. @@ -84,6 +89,21 @@ public final class RealtimeClientV2: Sendable { return RealtimeSubscription { task.cancel() } } + /// Listen for heatbeat checks. + /// - Parameter listener: Closure that will be called when a heartbeat is received. + /// - Returns: An observation handle that can be used to stop listening. + /// + /// - Nite: Use ``heartbeat`` if you prefer to use Async/Await. + public func onHeartbeat( + _ listener: @escaping @Sendable (RealtimeMessageV2) -> Void + ) -> RealtimeSubscription { + let task = heartbeatSubject.onChange { message in + guard let message else { return } + listener(message) + } + return RealtimeSubscription { task.cancel() } + } + public convenience init(url: URL, options: RealtimeClientOptions) { var interceptors: [any HTTPClientInterceptor] = [] @@ -139,7 +159,7 @@ public final class RealtimeClientV2: Sendable { mutableState.withValue { $0.heartbeatTask?.cancel() $0.messageTask?.cancel() - $0.channels = [] + $0.channels = [:] } } @@ -246,25 +266,31 @@ public final class RealtimeClientV2: Sendable { _ topic: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } ) -> RealtimeChannelV2 { - var config = RealtimeChannelConfig( - broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), - presence: PresenceJoinConfig(key: ""), - isPrivate: false - ) - options(&config) + mutableState.withValue { + let realtimeTopic = "realtime:\(topic)" - let channel = RealtimeChannelV2( - topic: "realtime:\(topic)", - config: config, - socket: self, - logger: self.options.logger - ) + if let channel = $0.channels[realtimeTopic] { + return channel + } - mutableState.withValue { - $0.channels.append(channel) - } + var config = RealtimeChannelConfig( + broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false), + presence: PresenceJoinConfig(key: ""), + isPrivate: false + ) + options(&config) + + let channel = RealtimeChannelV2( + topic: realtimeTopic, + config: config, + socket: self, + logger: self.options.logger + ) + + $0.channels[realtimeTopic] = channel - return channel + return channel + } } @available( @@ -274,7 +300,7 @@ public final class RealtimeClientV2: Sendable { ) public func addChannel(_ channel: RealtimeChannelV2) { mutableState.withValue { - $0.channels.append(channel) + $0.channels[channel.topic] = channel } } @@ -294,9 +320,7 @@ public final class RealtimeClientV2: Sendable { func _remove(_ channel: RealtimeChannelV2) { mutableState.withValue { - $0.channels.removeAll { - $0.joinRef == channel.joinRef - } + $0.channels[channel.topic] = nil } } @@ -453,7 +477,11 @@ public final class RealtimeClientV2: Sendable { } private func onMessage(_ message: RealtimeMessageV2) async { - let channels = mutableState.withValue { + if message.topic == "phoenix", message.event == "phx_reply" { + heartbeatSubject.yield(message) + } + + let channel = mutableState.withValue { if let ref = message.ref, ref == $0.pendingHeartbeatRef { $0.pendingHeartbeatRef = nil options.logger?.debug("heartbeat received") @@ -462,10 +490,10 @@ public final class RealtimeClientV2: Sendable { .debug("Received event \(message.event) for channel \(message.topic)") } - return $0.channels.filter { $0.topic == message.topic } + return $0.channels[message.topic] } - for channel in channels { + if let channel { await channel.onMessage(message) } } From 91b4cae7d14adf9957278c17ba41e50864691f94 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 06:40:09 -0300 Subject: [PATCH 2/8] Add heartbeat status --- Sources/Realtime/RealtimeClientV2.swift | 30 ++++++++++++++++--------- Sources/Realtime/Types.swift | 13 +++++++++++ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index e9a9092e..5e22c832 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -55,7 +55,7 @@ public final class RealtimeClientV2: Sendable { } private let statusSubject = AsyncValueSubject(.disconnected) - private let heartbeatSubject = AsyncValueSubject(nil) + private let heartbeatSubject = AsyncValueSubject(nil) /// Listen for connection status changes. /// @@ -73,7 +73,7 @@ public final class RealtimeClientV2: Sendable { /// Listen for heartbeat checks. /// /// You can also use ``onHeartbeat(_:)`` for a closure based method. - public var heartbeat: AsyncStream { + public var heartbeat: AsyncStream { heartbeatSubject.values.compactMap { $0 }.eraseToStream() } @@ -95,7 +95,7 @@ public final class RealtimeClientV2: Sendable { /// /// - Nite: Use ``heartbeat`` if you prefer to use Async/Await. public func onHeartbeat( - _ listener: @escaping @Sendable (RealtimeMessageV2) -> Void + _ listener: @escaping @Sendable (HeartbeatStatus) -> Void ) -> RealtimeSubscription { let task = heartbeatSubject.onChange { message in guard let message else { return } @@ -243,14 +243,15 @@ public final class RealtimeClientV2: Sendable { private func onClose(code: Int?, reason: String?) { options.logger?.debug( - "WebSocket closed. Code: \(code?.description ?? ""), Reason: \(reason ?? "")") + "WebSocket closed. Code: \(code?.description ?? ""), Reason: \(reason ?? "")" + ) reconnect() } - private func reconnect() { + private func reconnect(disconnectReason: String? = nil) { Task { - disconnect() + disconnect(reason: disconnectReason) await connect(reconnect: true) } } @@ -294,7 +295,8 @@ public final class RealtimeClientV2: Sendable { } @available( - *, deprecated, + *, + deprecated, message: "Client handles channels automatically, this method will be removed on the next major release." ) @@ -396,6 +398,11 @@ public final class RealtimeClientV2: Sendable { } private func sendHeartbeat() async { + if status != .connected { + heartbeatSubject.yield(.disconnected) + return + } + let pendingHeartbeatRef: String? = mutableState.withValue { if $0.pendingHeartbeatRef != nil { $0.pendingHeartbeatRef = nil @@ -417,10 +424,12 @@ public final class RealtimeClientV2: Sendable { payload: [:] ) ) + heartbeatSubject.yield(.sent) await setAuth() } else { options.logger?.debug("Heartbeat timeout") - reconnect() + heartbeatSubject.yield(.timeout) + reconnect(disconnectReason: "heartbeat timeout") } } @@ -478,7 +487,7 @@ public final class RealtimeClientV2: Sendable { private func onMessage(_ message: RealtimeMessageV2) async { if message.topic == "phoenix", message.event == "phx_reply" { - heartbeatSubject.yield(message) + heartbeatSubject.yield(message.status == .ok ? .ok : .error) } let channel = mutableState.withValue { @@ -516,7 +525,8 @@ public final class RealtimeClientV2: Sendable { Error: \(error) - """) + """ + ) } } diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index dd2b19b7..225f612b 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -86,6 +86,19 @@ public enum RealtimeClientStatus: Sendable, CustomStringConvertible { } } +public enum HeartbeatStatus: Sendable { + /// Hearbeat was sent. + case sent + /// Hearbeat was received. + case ok + /// Server responded with an error. + case error + /// Hearbeat wasn't received in time. + case timeout + /// Socket is disconnected. + case disconnected +} + extension HTTPField.Name { static let apiKey = Self("apiKey")! } From c7735cacc10f151a2e51b4bdb9fda6d99f6d3f96 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 06:41:57 -0300 Subject: [PATCH 3/8] fix typo --- Sources/Realtime/Types.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/Realtime/Types.swift b/Sources/Realtime/Types.swift index 225f612b..5f45a248 100644 --- a/Sources/Realtime/Types.swift +++ b/Sources/Realtime/Types.swift @@ -87,13 +87,13 @@ public enum RealtimeClientStatus: Sendable, CustomStringConvertible { } public enum HeartbeatStatus: Sendable { - /// Hearbeat was sent. + /// Heartbeat was sent. case sent - /// Hearbeat was received. + /// Heartbeat was received. case ok /// Server responded with an error. case error - /// Hearbeat wasn't received in time. + /// Heartbeat wasn't received in time. case timeout /// Socket is disconnected. case disconnected From 6a8241dc5d81d2182761150e3e5337a2d10e7b64 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 06:43:19 -0300 Subject: [PATCH 4/8] fix docs --- Sources/Realtime/RealtimeClientV2.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index 5e22c832..f48dc8c3 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -70,7 +70,7 @@ public final class RealtimeClientV2: Sendable { set { statusSubject.yield(newValue) } } - /// Listen for heartbeat checks. + /// Listen for heartbeat status. /// /// You can also use ``onHeartbeat(_:)`` for a closure based method. public var heartbeat: AsyncStream { @@ -90,7 +90,7 @@ public final class RealtimeClientV2: Sendable { } /// Listen for heatbeat checks. - /// - Parameter listener: Closure that will be called when a heartbeat is received. + /// - Parameter listener: Closure that will be called when heartbeat status changes. /// - Returns: An observation handle that can be used to stop listening. /// /// - Nite: Use ``heartbeat`` if you prefer to use Async/Await. From 2ee8c1601b1633a66f6cb32cfe140a552d22de09 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 07:14:35 -0300 Subject: [PATCH 5/8] add tests --- Tests/RealtimeTests/RealtimeTests.swift | 46 ++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index e89541d2..f1517558 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -18,11 +18,11 @@ final class RealtimeTests: XCTestCase { let apiKey = "anon.api.key" #if !os(Windows) && !os(Linux) && !os(Android) - override func invokeTest() { - withMainSerialExecutor { - super.invokeTest() + override func invokeTest() { + withMainSerialExecutor { + super.invokeTest() + } } - } #endif var server: FakeWebSocket! @@ -303,11 +303,21 @@ final class RealtimeTests: XCTestCase { } } + let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([]) + let subscription = sut.onHeartbeat { status in + heartbeatStatuses.withValue { + $0.append(status) + } + } + defer { subscription.cancel() } + await sut.connect() await testClock.advance(by: .seconds(heartbeatInterval * 2)) await fulfillment(of: [expectation], timeout: 3) + + expectNoDifference(heartbeatStatuses.value, [.sent, .ok, .sent, .ok]) } func testHeartbeat_whenNoResponse_shouldReconnect() async throws { @@ -354,6 +364,34 @@ final class RealtimeTests: XCTestCase { ) } + func testHeartbeat_timeout() async throws { + let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([]) + let s1 = sut.onHeartbeat { status in + heartbeatStatuses.withValue { + $0.append(status) + } + } + defer { s1.cancel() } + + // Don't respond to any heartbeats + server.onEvent = { _ in } + + await sut.connect() + await testClock.advance(by: .seconds(heartbeatInterval)) + + // First heartbeat sent + XCTAssertEqual(heartbeatStatuses.value, [.sent]) + + // Wait for timeout + await testClock.advance(by: .seconds(timeoutInterval)) + + // Wait for next heartbeat. + await testClock.advance(by: .seconds(heartbeatInterval)) + + // Should have timeout status + XCTAssertEqual(heartbeatStatuses.value, [.sent, .timeout]) + } + func testBroadcastWithHTTP() async throws { await http.when { $0.url.path.hasSuffix("broadcast") From a5c87edd9ec9da5b4385373f23ef5917d9d96167 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Tue, 22 Apr 2025 08:06:07 -0300 Subject: [PATCH 6/8] desambiguate compactMap by providing return type --- Sources/Realtime/RealtimeClientV2.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index f48dc8c3..28e962be 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -74,7 +74,7 @@ public final class RealtimeClientV2: Sendable { /// /// You can also use ``onHeartbeat(_:)`` for a closure based method. public var heartbeat: AsyncStream { - heartbeatSubject.values.compactMap { $0 }.eraseToStream() + heartbeatSubject.values.compactMap { value -> HeartbeatStatus? in value }.eraseToStream() } /// Listen for connection status changes. From af7009d379734b666361428876173bbcc6a8a6fa Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 28 Apr 2025 15:13:18 -0300 Subject: [PATCH 7/8] try to fix ambiguity: --- Sources/Realtime/RealtimeClientV2.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index 28e962be..c647673d 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -74,7 +74,10 @@ public final class RealtimeClientV2: Sendable { /// /// You can also use ``onHeartbeat(_:)`` for a closure based method. public var heartbeat: AsyncStream { - heartbeatSubject.values.compactMap { value -> HeartbeatStatus? in value }.eraseToStream() + AsyncStream( + heartbeatSubject.values.compactMap { $0 } + as AsyncCompactMapSequence, HeartbeatStatus> + ) } /// Listen for connection status changes. From cb60535fbf7def32ede6655d62776adb16a125fb Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 28 Apr 2025 15:23:44 -0300 Subject: [PATCH 8/8] Update Sources/Realtime/RealtimeClientV2.swift Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Sources/Realtime/RealtimeClientV2.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Realtime/RealtimeClientV2.swift b/Sources/Realtime/RealtimeClientV2.swift index c647673d..831a1b0c 100644 --- a/Sources/Realtime/RealtimeClientV2.swift +++ b/Sources/Realtime/RealtimeClientV2.swift @@ -96,7 +96,7 @@ public final class RealtimeClientV2: Sendable { /// - Parameter listener: Closure that will be called when heartbeat status changes. /// - Returns: An observation handle that can be used to stop listening. /// - /// - Nite: Use ``heartbeat`` if you prefer to use Async/Await. + /// - Note: Use ``heartbeat`` if you prefer to use Async/Await. public func onHeartbeat( _ listener: @escaping @Sendable (HeartbeatStatus) -> Void ) -> RealtimeSubscription {