Skip to content

Commit 1eb6d9f

Browse files
Merge pull request #67 from ably-labs/53-room-lifecycle-monitoring
[ECO-4988] Implement (some of) Room Lifecycle Monitoring spec
2 parents 96cdfa8 + 05836a6 commit 1eb6d9f

File tree

5 files changed

+604
-44
lines changed

5 files changed

+604
-44
lines changed

Package.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ let package = Package(
4040
name: "Ably",
4141
package: "ably-cocoa"
4242
),
43+
.product(
44+
name: "AsyncAlgorithms",
45+
package: "swift-async-algorithms"
46+
),
4347
]
4448
),
4549
.testTarget(

Sources/AblyChat/RoomLifecycleManager.swift

Lines changed: 246 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1-
import Ably
1+
@preconcurrency import Ably
2+
import AsyncAlgorithms
23

34
/// The interface that the lifecycle manager expects its contributing realtime channels to conform to.
45
///
5-
/// We use this instead of the ``RealtimeChannel`` interface as its ``attach`` and ``detach`` methods are `async` instead of using callbacks. This makes it easier to write mocks for (since ``RealtimeChannel`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
6+
/// We use this instead of the ``RealtimeChannelProtocol`` interface as:
7+
///
8+
/// - its ``attach`` and ``detach`` methods are `async` instead of using callbacks
9+
/// - it uses `AsyncSequence` to emit state changes instead of using callbacks
10+
///
11+
/// This makes it easier to write mocks for (since ``RealtimeChannelProtocol`` doesn’t express to the type system that the callbacks it receives need to be `Sendable`, it’s hard to, for example, create a mock that creates a `Task` and then calls the callback from inside this task).
612
///
713
/// We choose to also mark the channel’s mutable state as `async`. This is a way of highlighting at the call site of accessing this state that, since `ARTRealtimeChannel` mutates this state on a separate thread, it’s possible for this state to have changed since the last time you checked it, or since the last time you performed an operation that might have mutated it, or since the last time you recieved an event informing you that it changed. To be clear, marking these as `async` doesn’t _solve_ these issues; it just makes them a bit more visible. We’ll decide how to address them in https://github.com/ably-labs/ably-chat-swift/issues/49.
814
internal protocol RoomLifecycleContributorChannel: Sendable {
@@ -11,31 +17,90 @@ internal protocol RoomLifecycleContributorChannel: Sendable {
1117

1218
var state: ARTRealtimeChannelState { get async }
1319
var errorReason: ARTErrorInfo? { get async }
20+
21+
/// Equivalent to subscribing to a `RealtimeChannelProtocol` object’s state changes via its `on(_:)` method. The subscription should use the ``BufferingPolicy.unbounded`` buffering policy.
22+
///
23+
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
24+
func subscribeToState() async -> Subscription<ARTChannelStateChange>
1425
}
1526

16-
internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
17-
/// A realtime channel that contributes to the room lifecycle.
18-
internal struct Contributor {
19-
/// The room feature that this contributor corresponds to. Used only for choosing which error to throw when a contributor operation fails.
20-
internal var feature: RoomFeature
27+
/// A realtime channel that contributes to the room lifecycle.
28+
///
29+
/// The identity implied by the `Identifiable` conformance must distinguish each of the contributors passed to a given ``RoomLifecycleManager`` instance.
30+
internal protocol RoomLifecycleContributor: Identifiable, Sendable {
31+
associatedtype Channel: RoomLifecycleContributorChannel
32+
33+
/// The room feature that this contributor corresponds to. Used only for choosing which error to throw when a contributor operation fails.
34+
var feature: RoomFeature { get }
35+
var channel: Channel { get }
36+
37+
/// Informs the contributor that there has been a break in channel continuity, which it should inform library users about.
38+
///
39+
/// It is marked as `async` purely to make it easier to write mocks for this method (i.e. to use an actor as a mock).
40+
func emitDiscontinuity(_ error: ARTErrorInfo) async
41+
}
2142

22-
internal var channel: Channel
43+
internal actor RoomLifecycleManager<Contributor: RoomLifecycleContributor> {
44+
/// Stores manager state relating to a given contributor.
45+
private struct ContributorAnnotation {
46+
// TODO: Not clear whether there can be multiple or just one (asked in https://github.com/ably/specification/pull/200/files#r1781927850)
47+
var pendingDiscontinuityEvents: [ARTErrorInfo] = []
2348
}
2449

2550
internal private(set) var current: RoomLifecycle
2651
internal private(set) var error: ARTErrorInfo?
52+
// TODO: This currently allows the the tests to inject a value in order to test the spec points that are predicated on whether “a channel lifecycle operation is in progress”. In https://github.com/ably-labs/ably-chat-swift/issues/52 we’ll set this property based on whether there actually is a lifecycle operation in progress.
53+
private let hasOperationInProgress: Bool
54+
/// Manager state that relates to individual contributors, keyed by contributors’ ``Contributor.id``. Stored separately from ``contributors`` so that the latter can be a `let`, to make it clear that the contributors remain fixed for the lifetime of the manager.
55+
private var contributorAnnotations: ContributorAnnotations
56+
57+
/// Provides a `Dictionary`-like interface for storing manager state about individual contributors.
58+
private struct ContributorAnnotations {
59+
private var storage: [Contributor.ID: ContributorAnnotation]
60+
61+
init(contributors: [Contributor], pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]) {
62+
storage = contributors.reduce(into: [:]) { result, contributor in
63+
result[contributor.id] = .init(pendingDiscontinuityEvents: pendingDiscontinuityEvents[contributor.id] ?? [])
64+
}
65+
}
66+
67+
/// It is a programmer error to call this subscript getter with a contributor that was not one of those passed to ``init(contributors:pendingDiscontinuityEvents)``.
68+
subscript(_ contributor: Contributor) -> ContributorAnnotation {
69+
get {
70+
guard let annotation = storage[contributor.id] else {
71+
preconditionFailure("Expected annotation for \(contributor)")
72+
}
73+
return annotation
74+
}
75+
76+
set {
77+
storage[contributor.id] = newValue
78+
}
79+
}
80+
81+
mutating func clearPendingDiscontinuityEvents() {
82+
storage = storage.mapValues { annotation in
83+
var newAnnotation = annotation
84+
newAnnotation.pendingDiscontinuityEvents = []
85+
return newAnnotation
86+
}
87+
}
88+
}
2789

2890
private let logger: InternalLogger
2991
private let clock: SimpleClock
3092
private let contributors: [Contributor]
93+
private var listenForStateChangesTask: Task<Void, Never>!
3194

3295
internal init(
3396
contributors: [Contributor],
3497
logger: InternalLogger,
3598
clock: SimpleClock
36-
) {
37-
self.init(
99+
) async {
100+
await self.init(
38101
current: nil,
102+
hasOperationInProgress: nil,
103+
pendingDiscontinuityEvents: [:],
39104
contributors: contributors,
40105
logger: logger,
41106
clock: clock
@@ -45,12 +110,16 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
45110
#if DEBUG
46111
internal init(
47112
testsOnly_current current: RoomLifecycle? = nil,
113+
testsOnly_hasOperationInProgress hasOperationInProgress: Bool? = nil,
114+
testsOnly_pendingDiscontinuityEvents pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]? = nil,
48115
contributors: [Contributor],
49116
logger: InternalLogger,
50117
clock: SimpleClock
51-
) {
52-
self.init(
118+
) async {
119+
await self.init(
53120
current: current,
121+
hasOperationInProgress: hasOperationInProgress,
122+
pendingDiscontinuityEvents: pendingDiscontinuityEvents,
54123
contributors: contributors,
55124
logger: logger,
56125
clock: clock
@@ -60,16 +129,56 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
60129

61130
private init(
62131
current: RoomLifecycle?,
132+
hasOperationInProgress: Bool?,
133+
pendingDiscontinuityEvents: [Contributor.ID: [ARTErrorInfo]]?,
63134
contributors: [Contributor],
64135
logger: InternalLogger,
65136
clock: SimpleClock
66-
) {
137+
) async {
67138
self.current = current ?? .initialized
139+
self.hasOperationInProgress = hasOperationInProgress ?? false
68140
self.contributors = contributors
141+
contributorAnnotations = .init(contributors: contributors, pendingDiscontinuityEvents: pendingDiscontinuityEvents ?? [:])
69142
self.logger = logger
70143
self.clock = clock
144+
145+
// The idea here is to make sure that, before the initializer completes, we are already listening for state changes, so that e.g. tests don’t miss a state change.
146+
let subscriptions = await withTaskGroup(of: (contributor: Contributor, subscription: Subscription<ARTChannelStateChange>).self) { group in
147+
for contributor in contributors {
148+
group.addTask {
149+
await (contributor: contributor, subscription: contributor.channel.subscribeToState())
150+
}
151+
}
152+
153+
return await Array(group)
154+
}
155+
156+
// CHA-RL4: listen for state changes from our contributors
157+
// TODO: Understand what happens when this task gets cancelled by `deinit`; I’m not convinced that the for-await loops will exit (https://github.com/ably-labs/ably-chat-swift/issues/29)
158+
listenForStateChangesTask = Task {
159+
await withTaskGroup(of: Void.self) { group in
160+
for (contributor, subscription) in subscriptions {
161+
// This `@Sendable` is to make the compiler error "'self'-isolated value of type '() async -> Void' passed as a strongly transferred parameter; later accesses could race" go away. I don’t hugely understand what it means, but given the "'self'-isolated value" I guessed it was something vaguely to do with the fact that `async` actor initializers are actor-isolated and thought that marking it as `@Sendable` would sever this isolation and make the error go away, which it did 🤷. But there are almost certainly consequences that I am incapable of reasoning about with my current level of Swift concurrency knowledge.
162+
group.addTask { @Sendable [weak self] in
163+
for await stateChange in subscription {
164+
await self?.didReceiveStateChange(stateChange, forContributor: contributor)
165+
}
166+
}
167+
}
168+
}
169+
}
71170
}
72171

172+
deinit {
173+
listenForStateChangesTask.cancel()
174+
}
175+
176+
#if DEBUG
177+
internal func testsOnly_pendingDiscontinuityEvents(for contributor: Contributor) -> [ARTErrorInfo] {
178+
contributorAnnotations[contributor].pendingDiscontinuityEvents
179+
}
180+
#endif
181+
73182
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
74183
private var subscriptions: [Subscription<RoomStatusChange>] = []
75184

@@ -79,6 +188,113 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
79188
return subscription
80189
}
81190

191+
#if DEBUG
192+
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
193+
/// Supports the ``testsOnly_subscribeToHandledContributorStateChanges()`` method.
194+
private var stateChangeHandledSubscriptions: [Subscription<ARTChannelStateChange>] = []
195+
196+
/// Returns a subscription which emits the contributor state changes that have been handled by the manager.
197+
///
198+
/// A contributor state change is considered handled once the manager has performed all of the side effects that it will perform as a result of receiving this state change. Specifically, once:
199+
///
200+
/// - the manager has recorded all pending discontinuity events provoked by the state change (you can retrieve these using ``testsOnly_pendingDiscontinuityEventsForContributor(at:)``)
201+
/// - the manager has performed all status changes provoked by the state change
202+
/// - the manager has performed all contributor actions provoked by the state change, namely calls to ``RoomLifecycleContributorChannel.detach()`` or ``RoomLifecycleContributor.emitDiscontinuity(_:)``
203+
internal func testsOnly_subscribeToHandledContributorStateChanges() -> Subscription<ARTChannelStateChange> {
204+
let subscription = Subscription<ARTChannelStateChange>(bufferingPolicy: .unbounded)
205+
stateChangeHandledSubscriptions.append(subscription)
206+
return subscription
207+
}
208+
#endif
209+
210+
/// Implements CHA-RL4b’s contributor state change handling.
211+
private func didReceiveStateChange(_ stateChange: ARTChannelStateChange, forContributor contributor: Contributor) async {
212+
logger.log(message: "Got state change \(stateChange) for contributor \(contributor)", level: .info)
213+
214+
// TODO: The spec, which is written for a single-threaded environment, is presumably operating on the assumption that the channel is currently in the state given by `stateChange.current` (https://github.com/ably-labs/ably-chat-swift/issues/49)
215+
switch stateChange.event {
216+
case .update:
217+
// CHA-RL4a1 — if RESUMED then no-op
218+
guard !stateChange.resumed else {
219+
break
220+
}
221+
222+
guard let reason = stateChange.reason else {
223+
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
224+
preconditionFailure("State change event with resumed == false should have a reason")
225+
}
226+
227+
if hasOperationInProgress {
228+
// CHA-RL4a3
229+
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)
230+
231+
contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
232+
} else {
233+
// CHA-RL4a4
234+
logger.log(message: "Emitting discontinuity event for contributor \(contributor)", level: .info)
235+
236+
await contributor.emitDiscontinuity(reason)
237+
}
238+
case .attached:
239+
if hasOperationInProgress {
240+
if !stateChange.resumed {
241+
// CHA-RL4b1
242+
logger.log(message: "Recording pending discontinuity event for contributor \(contributor)", level: .info)
243+
244+
guard let reason = stateChange.reason else {
245+
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
246+
preconditionFailure("State change event with resumed == false should have a reason")
247+
}
248+
249+
contributorAnnotations[contributor].pendingDiscontinuityEvents.append(reason)
250+
}
251+
} else if current != .attached {
252+
if await (contributors.async.map { await $0.channel.state }.allSatisfy { $0 == .attached }) {
253+
// CHA-RL4b8
254+
logger.log(message: "Now that all contributors are ATTACHED, transitioning room to ATTACHED", level: .info)
255+
changeStatus(to: .attached)
256+
}
257+
}
258+
case .failed:
259+
if !hasOperationInProgress {
260+
// CHA-RL4b5
261+
guard let reason = stateChange.reason else {
262+
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
263+
preconditionFailure("FAILED state change event should have a reason")
264+
}
265+
266+
changeStatus(to: .failed, error: reason)
267+
268+
// TODO: CHA-RL4b5 is a bit unclear about how to handle failure, and whether they can be detached concurrently (asked in https://github.com/ably/specification/pull/200/files#r1777471810)
269+
for contributor in contributors {
270+
do {
271+
try await contributor.channel.detach()
272+
} catch {
273+
logger.log(message: "Failed to detach contributor \(contributor), error \(error)", level: .info)
274+
}
275+
}
276+
}
277+
case .suspended:
278+
if !hasOperationInProgress {
279+
// CHA-RL4b9
280+
guard let reason = stateChange.reason else {
281+
// TODO: Decide the right thing to do here (https://github.com/ably-labs/ably-chat-swift/issues/74)
282+
preconditionFailure("SUSPENDED state change event should have a reason")
283+
}
284+
285+
changeStatus(to: .suspended, error: reason)
286+
}
287+
default:
288+
break
289+
}
290+
291+
#if DEBUG
292+
for subscription in stateChangeHandledSubscriptions {
293+
subscription.emit(stateChange)
294+
}
295+
#endif
296+
}
297+
82298
/// Updates ``current`` and ``error`` and emits a status change event.
83299
private func changeStatus(to new: RoomLifecycle, error: ARTErrorInfo? = nil) {
84300
logger.log(message: "Transitioning from \(current) to \(new), error \(String(describing: error))", level: .info)
@@ -150,6 +366,23 @@ internal actor RoomLifecycleManager<Channel: RoomLifecycleContributorChannel> {
150366
151367
// CHA-RL1g1
152368
changeStatus(to: .attached)
369+
370+
// CHA-RL1g2
371+
await emitPendingDiscontinuityEvents()
372+
}
373+
374+
/// Implements CHA-RL1g2’s emitting of pending discontinuity events.
375+
private func emitPendingDiscontinuityEvents() async {
376+
// Emit all pending discontinuity events
377+
logger.log(message: "Emitting pending discontinuity events", level: .info)
378+
for contributor in contributors {
379+
for pendingDiscontinuityEvent in contributorAnnotations[contributor].pendingDiscontinuityEvents {
380+
logger.log(message: "Emitting pending discontinuity event \(pendingDiscontinuityEvent) to contributor \(contributor)", level: .info)
381+
await contributor.emitDiscontinuity(pendingDiscontinuityEvent)
382+
}
383+
}
384+
385+
contributorAnnotations.clearPendingDiscontinuityEvents()
153386
}
154387

155388
/// Implements CHA-RL1h5’s "detach all channels that are not in the FAILED state".
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import Ably
2+
@testable import AblyChat
3+
4+
actor MockRoomLifecycleContributor: RoomLifecycleContributor {
5+
nonisolated let feature: RoomFeature
6+
nonisolated let channel: MockRoomLifecycleContributorChannel
7+
8+
private(set) var emitDiscontinuityArguments: [ARTErrorInfo] = []
9+
10+
init(feature: RoomFeature, channel: MockRoomLifecycleContributorChannel) {
11+
self.feature = feature
12+
self.channel = channel
13+
}
14+
15+
func emitDiscontinuity(_ error: ARTErrorInfo) async {
16+
emitDiscontinuityArguments.append(error)
17+
}
18+
}

0 commit comments

Comments
 (0)