Skip to content

Commit c5b8cd4

Browse files
authored
Prepare connection / region pinning (#450)
1 parent 5b53abf commit c5b8cd4

File tree

17 files changed

+917
-28
lines changed

17 files changed

+917
-28
lines changed

.changes/prepare-connection

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch type="added" "Prepare connection & region pinning"

.swiftlint.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
excluded:
2+
- .build
3+
- .build-test
4+
- .cache
5+
- .swiftpm
6+
- build
27
- Sources/LiveKit/Protos
38

49
disabled_rules:
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2026 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
// MARK: - RegionManager
20+
21+
actor RegionManager: Loggable {
22+
struct State: Sendable {
23+
var lastRequested: Date?
24+
var all: [RegionInfo] = []
25+
var remaining: [RegionInfo] = []
26+
}
27+
28+
static let cacheInterval: TimeInterval = 30
29+
30+
nonisolated let providedUrl: URL
31+
private var state = State()
32+
private var settingsFetchTask: Task<[RegionInfo], Error>?
33+
private var settingsFetchTaskId: UUID?
34+
35+
init(providedUrl: URL) {
36+
self.providedUrl = providedUrl
37+
}
38+
39+
func cancel() {
40+
settingsFetchTask?.cancel()
41+
settingsFetchTask = nil
42+
settingsFetchTaskId = nil
43+
}
44+
45+
func resetAttempts(onlyIfExhausted: Bool = false) {
46+
if onlyIfExhausted {
47+
guard state.remaining.isEmpty else { return }
48+
}
49+
guard !state.all.isEmpty else { return }
50+
state.remaining = state.all
51+
}
52+
53+
func resetAll() {
54+
state = State()
55+
}
56+
57+
func markFailed(region: RegionInfo) {
58+
state.remaining.removeAll { $0 == region }
59+
}
60+
61+
func shouldRequestSettings() -> Bool {
62+
guard providedUrl.isCloud else { return false }
63+
guard let lastRequested = state.lastRequested else { return true }
64+
return Date().timeIntervalSince(lastRequested) > Self.cacheInterval
65+
}
66+
67+
func prepareSettingsFetch(token: String) {
68+
guard shouldRequestSettings() else { return }
69+
_ = startSettingsFetchIfNeeded(token: token)
70+
}
71+
72+
func resolveBest(token: String) async throws -> RegionInfo {
73+
try await requestSettingsIfNeeded(token: token)
74+
guard let selected = state.remaining.first else {
75+
throw LiveKitError(.regionManager, message: "No more remaining regions.")
76+
}
77+
78+
log("[Region] Resolved region: \(String(describing: selected))", .debug)
79+
return selected
80+
}
81+
82+
func updateFromServerReportedRegions(_ regions: Livekit_RegionSettings) {
83+
guard providedUrl.isCloud else { return }
84+
85+
let allRegions = regions.regions.compactMap { $0.toLKType() }
86+
guard !allRegions.isEmpty else { return }
87+
88+
// Keep previously failed regions excluded when updating the list.
89+
let allIds = Set(state.all.map(\.regionId))
90+
let remainingIds = Set(state.remaining.map(\.regionId))
91+
let failedRegionIds = allIds.subtracting(remainingIds)
92+
93+
let remainingRegions = allRegions.filter { !failedRegionIds.contains($0.regionId) }
94+
log("[Region] Updating regions from server-reported settings (\(allRegions.count)), remaining: \(remainingRegions.count)", .info)
95+
96+
state.all = allRegions
97+
state.remaining = remainingRegions
98+
state.lastRequested = Date()
99+
}
100+
101+
// MARK: - Testing
102+
103+
func snapshot() -> State { state }
104+
105+
func setStateForTesting(_ state: State) {
106+
self.state = state
107+
}
108+
109+
// MARK: - Private
110+
111+
private func startSettingsFetchIfNeeded(token: String) -> Task<[RegionInfo], Error> {
112+
if let task = settingsFetchTask { return task }
113+
114+
let taskId = UUID()
115+
settingsFetchTaskId = taskId
116+
117+
let task = Task { [providedUrl, token, taskId] in
118+
defer { clearSettingsFetchTask(matching: taskId) }
119+
do {
120+
let data = try await Self.fetchRegionSettings(providedUrl: providedUrl, token: token)
121+
let allRegions = try Self.parseRegionSettings(data: data)
122+
try Task.checkCancellation()
123+
applyFetchedRegions(allRegions)
124+
return allRegions
125+
} catch {
126+
log("[Region] Failed to fetch region settings: \(error)", .error)
127+
throw error
128+
}
129+
}
130+
131+
settingsFetchTask = task
132+
return task
133+
}
134+
135+
private func requestSettingsIfNeeded(token: String) async throws {
136+
guard providedUrl.isCloud else {
137+
throw LiveKitError(.onlyForCloud)
138+
}
139+
140+
guard shouldRequestSettings() else { return }
141+
let task = startSettingsFetchIfNeeded(token: token)
142+
_ = try await task.value
143+
}
144+
145+
private func applyFetchedRegions(_ allRegions: [RegionInfo]) {
146+
log("[Region] all regions: \(String(describing: allRegions))", .debug)
147+
state.all = allRegions
148+
state.remaining = allRegions
149+
state.lastRequested = Date()
150+
}
151+
152+
private func clearSettingsFetchTask(matching taskId: UUID) {
153+
guard settingsFetchTaskId == taskId else { return }
154+
settingsFetchTaskId = nil
155+
settingsFetchTask = nil
156+
}
157+
158+
// MARK: - Static helpers (non-isolated)
159+
160+
private nonisolated static func fetchRegionSettings(providedUrl: URL, token: String) async throws -> Data {
161+
var request = URLRequest(url: providedUrl.regionSettingsUrl(),
162+
cachePolicy: .reloadIgnoringLocalAndRemoteCacheData)
163+
request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization")
164+
165+
let (data, response) = try await URLSession.shared.data(for: request)
166+
guard let httpResponse = response as? HTTPURLResponse else {
167+
throw LiveKitError(.regionManager, message: "Failed to fetch region settings")
168+
}
169+
170+
let statusCode = httpResponse.statusCode
171+
guard (200 ..< 300).contains(statusCode) else {
172+
let rawBody = String(data: data, encoding: .utf8)?
173+
.trimmingCharacters(in: .whitespacesAndNewlines)
174+
let body = if let rawBody, !rawBody.isEmpty {
175+
rawBody.count > 1024 ? String(rawBody.prefix(1024)) + "..." : rawBody
176+
} else {
177+
"(No server message)"
178+
}
179+
180+
if (400 ..< 500).contains(statusCode) {
181+
throw LiveKitError(.validation, message: "Region settings error: HTTP \(statusCode): \(body)")
182+
}
183+
184+
throw LiveKitError(.regionManager, message: "Failed to fetch region settings: HTTP \(statusCode): \(body)")
185+
}
186+
187+
return data
188+
}
189+
190+
private nonisolated static func parseRegionSettings(data: Data) throws -> [RegionInfo] {
191+
do {
192+
let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data)
193+
let allRegions = regionSettings.regions.compactMap { $0.toLKType() }
194+
guard !allRegions.isEmpty else {
195+
throw LiveKitError(.regionManager, message: "Fetched region data is empty.")
196+
}
197+
return allRegions
198+
} catch {
199+
throw LiveKitError(.regionManager, message: "Failed to parse region settings with error: \(error)")
200+
}
201+
}
202+
}

Sources/LiveKit/Core/Room+Engine.swift

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ extension Room {
280280
throw LiveKitError(.invalidState)
281281
}
282282

283-
guard let url = _state.url, let token = _state.token else {
283+
let url = _state.read { $0.connectedUrl ?? $0.providedUrl }
284+
guard let url, let token = _state.token else {
284285
log("[Connect] Url or token is nil", .error)
285286
throw LiveKitError(.invalidState)
286287
}
@@ -360,16 +361,30 @@ extension Room {
360361
$0.connectionState = .reconnecting
361362
}
362363

363-
await cleanUp(isFullReconnect: true)
364+
let (providedUrl, connectedUrl, token) = _state.read { ($0.providedUrl, $0.connectedUrl, $0.token) }
364365

365-
guard let url = _state.url,
366-
let token = _state.token
367-
else {
366+
guard let providedUrl, let connectedUrl, let token else {
368367
log("[Connect] Url or token is nil")
369368
throw LiveKitError(.invalidState)
370369
}
371370

372-
try await fullConnectSequence(url, token)
371+
let finalUrl: URL
372+
await cleanUp(isFullReconnect: true)
373+
if providedUrl.isCloud {
374+
guard let regionManager = await regionManager(for: providedUrl) else {
375+
throw LiveKitError(.onlyForCloud)
376+
}
377+
378+
finalUrl = try await connectWithCloudRegionFailover(regionManager: regionManager,
379+
initialUrl: connectedUrl,
380+
initialRegion: nil,
381+
token: token)
382+
} else {
383+
try await fullConnectSequence(connectedUrl, token)
384+
finalUrl = connectedUrl
385+
}
386+
387+
_state.mutate { $0.connectedUrl = finalUrl }
373388
}
374389

375390
do {
@@ -435,6 +450,11 @@ extension Room {
435450
$0.isReconnectingWithMode = nil
436451
$0.nextReconnectMode = nil
437452
}
453+
454+
if let providedUrl = _state.providedUrl, providedUrl.isCloud, let regionManager = await regionManager(for: providedUrl) {
455+
// Clear failed region attempts after a successful reconnect.
456+
await regionManager.resetAttempts()
457+
}
438458
} catch {
439459
log("[Connect] Sequence failed with error: \(error)")
440460

0 commit comments

Comments
 (0)