Skip to content

Commit 5f3d75c

Browse files
grdsdevCopilot
andauthored
feat(realtime): add heartbeat callback (#709)
* feat(realtime): add heartbeat callback * Add heartbeat status * fix typo * fix docs * add tests * desambiguate compactMap by providing return type * try to fix ambiguity: * Update Sources/Realtime/RealtimeClientV2.swift Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 0104331 commit 5f3d75c

File tree

3 files changed

+131
-39
lines changed

3 files changed

+131
-39
lines changed

Sources/Realtime/RealtimeClientV2.swift

+76-35
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class RealtimeClientV2: Sendable {
3232
var messageTask: Task<Void, Never>?
3333

3434
var connectionTask: Task<Void, Never>?
35-
var channels: [RealtimeChannelV2] = []
35+
var channels: [String: RealtimeChannelV2] = [:]
3636
var sendBuffer: [@Sendable () -> Void] = []
3737

3838
var conn: (any WebSocket)?
@@ -51,13 +51,11 @@ public final class RealtimeClientV2: Sendable {
5151

5252
/// All managed channels indexed by their topics.
5353
public var channels: [String: RealtimeChannelV2] {
54-
mutableState.channels.reduce(
55-
into: [:],
56-
{ $0[$1.topic] = $1 }
57-
)
54+
mutableState.channels
5855
}
5956

6057
private let statusSubject = AsyncValueSubject<RealtimeClientStatus>(.disconnected)
58+
private let heartbeatSubject = AsyncValueSubject<HeartbeatStatus?>(nil)
6159

6260
/// Listen for connection status changes.
6361
///
@@ -72,6 +70,16 @@ public final class RealtimeClientV2: Sendable {
7270
set { statusSubject.yield(newValue) }
7371
}
7472

73+
/// Listen for heartbeat status.
74+
///
75+
/// You can also use ``onHeartbeat(_:)`` for a closure based method.
76+
public var heartbeat: AsyncStream<HeartbeatStatus> {
77+
AsyncStream(
78+
heartbeatSubject.values.compactMap { $0 }
79+
as AsyncCompactMapSequence<AsyncStream<HeartbeatStatus?>, HeartbeatStatus>
80+
)
81+
}
82+
7583
/// Listen for connection status changes.
7684
/// - Parameter listener: Closure that will be called when connection status changes.
7785
/// - Returns: An observation handle that can be used to stop listening.
@@ -84,6 +92,21 @@ public final class RealtimeClientV2: Sendable {
8492
return RealtimeSubscription { task.cancel() }
8593
}
8694

95+
/// Listen for heatbeat checks.
96+
/// - Parameter listener: Closure that will be called when heartbeat status changes.
97+
/// - Returns: An observation handle that can be used to stop listening.
98+
///
99+
/// - Note: Use ``heartbeat`` if you prefer to use Async/Await.
100+
public func onHeartbeat(
101+
_ listener: @escaping @Sendable (HeartbeatStatus) -> Void
102+
) -> RealtimeSubscription {
103+
let task = heartbeatSubject.onChange { message in
104+
guard let message else { return }
105+
listener(message)
106+
}
107+
return RealtimeSubscription { task.cancel() }
108+
}
109+
87110
public convenience init(url: URL, options: RealtimeClientOptions) {
88111
var interceptors: [any HTTPClientInterceptor] = []
89112

@@ -139,7 +162,7 @@ public final class RealtimeClientV2: Sendable {
139162
mutableState.withValue {
140163
$0.heartbeatTask?.cancel()
141164
$0.messageTask?.cancel()
142-
$0.channels = []
165+
$0.channels = [:]
143166
}
144167
}
145168

@@ -223,14 +246,15 @@ public final class RealtimeClientV2: Sendable {
223246

224247
private func onClose(code: Int?, reason: String?) {
225248
options.logger?.debug(
226-
"WebSocket closed. Code: \(code?.description ?? "<none>"), Reason: \(reason ?? "<none>")")
249+
"WebSocket closed. Code: \(code?.description ?? "<none>"), Reason: \(reason ?? "<none>")"
250+
)
227251

228252
reconnect()
229253
}
230254

231-
private func reconnect() {
255+
private func reconnect(disconnectReason: String? = nil) {
232256
Task {
233-
disconnect()
257+
disconnect(reason: disconnectReason)
234258
await connect(reconnect: true)
235259
}
236260
}
@@ -246,35 +270,42 @@ public final class RealtimeClientV2: Sendable {
246270
_ topic: String,
247271
options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in }
248272
) -> RealtimeChannelV2 {
249-
var config = RealtimeChannelConfig(
250-
broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false),
251-
presence: PresenceJoinConfig(key: ""),
252-
isPrivate: false
253-
)
254-
options(&config)
273+
mutableState.withValue {
274+
let realtimeTopic = "realtime:\(topic)"
255275

256-
let channel = RealtimeChannelV2(
257-
topic: "realtime:\(topic)",
258-
config: config,
259-
socket: self,
260-
logger: self.options.logger
261-
)
276+
if let channel = $0.channels[realtimeTopic] {
277+
return channel
278+
}
262279

263-
mutableState.withValue {
264-
$0.channels.append(channel)
265-
}
280+
var config = RealtimeChannelConfig(
281+
broadcast: BroadcastJoinConfig(acknowledgeBroadcasts: false, receiveOwnBroadcasts: false),
282+
presence: PresenceJoinConfig(key: ""),
283+
isPrivate: false
284+
)
285+
options(&config)
266286

267-
return channel
287+
let channel = RealtimeChannelV2(
288+
topic: realtimeTopic,
289+
config: config,
290+
socket: self,
291+
logger: self.options.logger
292+
)
293+
294+
$0.channels[realtimeTopic] = channel
295+
296+
return channel
297+
}
268298
}
269299

270300
@available(
271-
*, deprecated,
301+
*,
302+
deprecated,
272303
message:
273304
"Client handles channels automatically, this method will be removed on the next major release."
274305
)
275306
public func addChannel(_ channel: RealtimeChannelV2) {
276307
mutableState.withValue {
277-
$0.channels.append(channel)
308+
$0.channels[channel.topic] = channel
278309
}
279310
}
280311

@@ -294,9 +325,7 @@ public final class RealtimeClientV2: Sendable {
294325

295326
func _remove(_ channel: RealtimeChannelV2) {
296327
mutableState.withValue {
297-
$0.channels.removeAll {
298-
$0.joinRef == channel.joinRef
299-
}
328+
$0.channels[channel.topic] = nil
300329
}
301330
}
302331

@@ -372,6 +401,11 @@ public final class RealtimeClientV2: Sendable {
372401
}
373402

374403
private func sendHeartbeat() async {
404+
if status != .connected {
405+
heartbeatSubject.yield(.disconnected)
406+
return
407+
}
408+
375409
let pendingHeartbeatRef: String? = mutableState.withValue {
376410
if $0.pendingHeartbeatRef != nil {
377411
$0.pendingHeartbeatRef = nil
@@ -393,10 +427,12 @@ public final class RealtimeClientV2: Sendable {
393427
payload: [:]
394428
)
395429
)
430+
heartbeatSubject.yield(.sent)
396431
await setAuth()
397432
} else {
398433
options.logger?.debug("Heartbeat timeout")
399-
reconnect()
434+
heartbeatSubject.yield(.timeout)
435+
reconnect(disconnectReason: "heartbeat timeout")
400436
}
401437
}
402438

@@ -453,7 +489,11 @@ public final class RealtimeClientV2: Sendable {
453489
}
454490

455491
private func onMessage(_ message: RealtimeMessageV2) async {
456-
let channels = mutableState.withValue {
492+
if message.topic == "phoenix", message.event == "phx_reply" {
493+
heartbeatSubject.yield(message.status == .ok ? .ok : .error)
494+
}
495+
496+
let channel = mutableState.withValue {
457497
if let ref = message.ref, ref == $0.pendingHeartbeatRef {
458498
$0.pendingHeartbeatRef = nil
459499
options.logger?.debug("heartbeat received")
@@ -462,10 +502,10 @@ public final class RealtimeClientV2: Sendable {
462502
.debug("Received event \(message.event) for channel \(message.topic)")
463503
}
464504

465-
return $0.channels.filter { $0.topic == message.topic }
505+
return $0.channels[message.topic]
466506
}
467507

468-
for channel in channels {
508+
if let channel {
469509
await channel.onMessage(message)
470510
}
471511
}
@@ -488,7 +528,8 @@ public final class RealtimeClientV2: Sendable {
488528
489529
Error:
490530
\(error)
491-
""")
531+
"""
532+
)
492533
}
493534
}
494535

Sources/Realtime/Types.swift

+13
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ public enum RealtimeClientStatus: Sendable, CustomStringConvertible {
8686
}
8787
}
8888

89+
public enum HeartbeatStatus: Sendable {
90+
/// Heartbeat was sent.
91+
case sent
92+
/// Heartbeat was received.
93+
case ok
94+
/// Server responded with an error.
95+
case error
96+
/// Heartbeat wasn't received in time.
97+
case timeout
98+
/// Socket is disconnected.
99+
case disconnected
100+
}
101+
89102
extension HTTPField.Name {
90103
static let apiKey = Self("apiKey")!
91104
}

Tests/RealtimeTests/RealtimeTests.swift

+42-4
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ final class RealtimeTests: XCTestCase {
1818
let apiKey = "anon.api.key"
1919

2020
#if !os(Windows) && !os(Linux) && !os(Android)
21-
override func invokeTest() {
22-
withMainSerialExecutor {
23-
super.invokeTest()
21+
override func invokeTest() {
22+
withMainSerialExecutor {
23+
super.invokeTest()
24+
}
2425
}
25-
}
2626
#endif
2727

2828
var server: FakeWebSocket!
@@ -303,11 +303,21 @@ final class RealtimeTests: XCTestCase {
303303
}
304304
}
305305

306+
let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([])
307+
let subscription = sut.onHeartbeat { status in
308+
heartbeatStatuses.withValue {
309+
$0.append(status)
310+
}
311+
}
312+
defer { subscription.cancel() }
313+
306314
await sut.connect()
307315

308316
await testClock.advance(by: .seconds(heartbeatInterval * 2))
309317

310318
await fulfillment(of: [expectation], timeout: 3)
319+
320+
expectNoDifference(heartbeatStatuses.value, [.sent, .ok, .sent, .ok])
311321
}
312322

313323
func testHeartbeat_whenNoResponse_shouldReconnect() async throws {
@@ -354,6 +364,34 @@ final class RealtimeTests: XCTestCase {
354364
)
355365
}
356366

367+
func testHeartbeat_timeout() async throws {
368+
let heartbeatStatuses = LockIsolated<[HeartbeatStatus]>([])
369+
let s1 = sut.onHeartbeat { status in
370+
heartbeatStatuses.withValue {
371+
$0.append(status)
372+
}
373+
}
374+
defer { s1.cancel() }
375+
376+
// Don't respond to any heartbeats
377+
server.onEvent = { _ in }
378+
379+
await sut.connect()
380+
await testClock.advance(by: .seconds(heartbeatInterval))
381+
382+
// First heartbeat sent
383+
XCTAssertEqual(heartbeatStatuses.value, [.sent])
384+
385+
// Wait for timeout
386+
await testClock.advance(by: .seconds(timeoutInterval))
387+
388+
// Wait for next heartbeat.
389+
await testClock.advance(by: .seconds(heartbeatInterval))
390+
391+
// Should have timeout status
392+
XCTAssertEqual(heartbeatStatuses.value, [.sent, .timeout])
393+
}
394+
357395
func testBroadcastWithHTTP() async throws {
358396
await http.when {
359397
$0.url.path.hasSuffix("broadcast")

0 commit comments

Comments
 (0)