Skip to content

Commit 14d8420

Browse files
authored
Telemetry cleanup and adding test for fixed crash (#376)
* Cleaning up code and removing noisy caller tag * Adding test that reproduced crash before fix * Refactoring out unhelpful size checks * Adding atomic to shared variables * Fixing NSObject plugin class names
1 parent cc47b9a commit 14d8420

File tree

4 files changed

+69
-49
lines changed

4 files changed

+69
-49
lines changed

Diff for: Sources/Segment/Errors.swift

+8-10
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,11 @@ extension Analytics {
7373
if fatal {
7474
exceptionFailure("A critical error occurred: \(translatedError)")
7575
}
76-
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
77-
(_ it: inout [String: String]) in
78-
it["error"] = "\(translatedError)"
79-
it["writekey"] = configuration.values.writeKey
80-
it["caller"] = Thread.callStackSymbols[3]
81-
}
76+
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
77+
(_ it: inout [String: String]) in
78+
it["error"] = "\(translatedError)"
79+
it["writekey"] = configuration.values.writeKey
80+
}
8281
}
8382

8483
static public func reportInternalError(_ error: Error, fatal: Bool = false) {
@@ -90,9 +89,8 @@ extension Analytics {
9089
exceptionFailure("A critical error occurred: \(translatedError)")
9190
}
9291
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
93-
(_ it: inout [String: String]) in
94-
it["error"] = "\(translatedError)"
95-
it["caller"] = Thread.callStackSymbols[3]
96-
}
92+
(_ it: inout [String: String]) in
93+
it["error"] = "\(translatedError)"
94+
}
9795
}
9896
}

Diff for: Sources/Segment/Timeline.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ internal class Mediator {
7171
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
7272
it["plugin"] = "\(plugin.type)-\(plugin.key)"
7373
} else {
74-
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
74+
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
7575
}
7676
}
7777
}
@@ -84,7 +84,7 @@ internal class Mediator {
8484
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
8585
it["plugin"] = "\(plugin.type)-\(plugin.key)"
8686
} else {
87-
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
87+
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
8888
} }
8989
return plugin === storedPlugin
9090
}
@@ -109,7 +109,7 @@ internal class Mediator {
109109
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
110110
it["plugin"] = "\(plugin.type)-\(plugin.key)"
111111
} else {
112-
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
112+
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
113113
} }
114114
}
115115
}

Diff for: Sources/Segment/Utilities/Telemetry.swift

+20-31
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public class Telemetry: Subscriber {
7171

7272
internal var session: any HTTPSession
7373
internal var host: String = HTTPClient.getDefaultAPIHost()
74-
var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
74+
@Atomic internal var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
75+
internal var sampleRateTest: Atomic<Double> { _sampleRate }
7576
private var flushTimer: Int = 30
7677
internal var maxQueueSize: Int = 20
7778
var errorLogSizeMax: Int = 4000
@@ -85,29 +86,28 @@ public class Telemetry: Subscriber {
8586

8687
internal var queue = [RemoteMetric]()
8788
private var queueBytes = 0
88-
private var queueSizeExceeded = false
89-
private var seenErrors = [String: Int]()
90-
internal var started = false
91-
private var rateLimitEndTime: TimeInterval = 0
92-
internal var flushFirstError = true
89+
@Atomic internal var started = false
90+
@Atomic private var rateLimitEndTime: TimeInterval = 0
91+
@Atomic internal var flushFirstError = true
92+
internal var flushFirstErrorTest: Atomic<Bool> { _flushFirstError }
9393
private var telemetryQueue = DispatchQueue(label: "telemetryQueue")
9494
private var updateQueue = DispatchQueue(label: "updateQueue")
9595
private var telemetryTimer: QueueTimer?
9696

9797
/// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment.
9898
func start() {
9999
guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return }
100-
started = true
100+
_started.set(true)
101101

102102
// Queue contents were sampled at the default 100%
103103
// the values on flush will be adjusted in the send function
104104
if Double.random(in: 0...1) > sampleRate {
105105
resetQueue()
106106
}
107107

108-
self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in
108+
self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in
109109
if (!(self?.enable ?? false)) {
110-
self?.started = false
110+
self?._started.set(false)
111111
self?.telemetryTimer?.suspend()
112112
}
113113
self?.flush()
@@ -118,17 +118,16 @@ public class Telemetry: Subscriber {
118118
func reset() {
119119
telemetryTimer?.suspend()
120120
resetQueue()
121-
seenErrors.removeAll()
122-
started = false
123-
rateLimitEndTime = 0
121+
_started.set(false)
122+
_rateLimitEndTime.set(0)
124123
}
125124

126125
/// Increments a metric with the provided tags.
127126
/// - Parameters:
128127
/// - metric: The metric name.
129128
/// - buildTags: A closure to build the tags dictionary.
130129
func increment(metric: String, buildTags: (inout [String: String]) -> Void) {
131-
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
130+
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
132131
if Double.random(in: 0...1) > sampleRate { return }
133132

134133
var tags = [String: String]()
@@ -144,7 +143,7 @@ public class Telemetry: Subscriber {
144143
/// - log: The log data.
145144
/// - buildTags: A closure to build the tags dictionary.
146145
func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) {
147-
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
146+
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
148147
if Double.random(in: 0...1) > sampleRate { return }
149148

150149
var tags = [String: String]()
@@ -164,7 +163,7 @@ public class Telemetry: Subscriber {
164163
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)
165164

166165
if (flushFirstError) {
167-
flushFirstError = false
166+
_flushFirstError.set(false)
168167
flush()
169168
}
170169
}
@@ -178,14 +177,14 @@ public class Telemetry: Subscriber {
178177
if rateLimitEndTime > Date().timeIntervalSince1970 {
179178
return
180179
}
181-
rateLimitEndTime = 0
180+
_rateLimitEndTime.set(0)
182181

183182
do {
184183
try send()
185184
queueBytes = 0
186185
} catch {
187186
errorHandler?(error)
188-
sampleRate = 0.0
187+
_sampleRate.set(0.0)
189188
}
190189
}
191190
}
@@ -200,7 +199,6 @@ public class Telemetry: Subscriber {
200199
sendQueue.append(metric)
201200
}
202201
queueBytes = 0
203-
queueSizeExceeded = false
204202

205203
let payload = try JSONEncoder().encode(["series": sendQueue])
206204
var request = upload(apiHost: host)
@@ -214,7 +212,7 @@ public class Telemetry: Subscriber {
214212

215213
if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 {
216214
if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) {
217-
self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970
215+
self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970)
218216
}
219217
}
220218
}
@@ -255,6 +253,8 @@ public class Telemetry: Subscriber {
255253
return
256254
}
257255

256+
guard queue.count < maxQueueSize else { return }
257+
258258
let newMetric = RemoteMetric(
259259
type: METRIC_TYPE,
260260
metric: metric,
@@ -266,8 +266,6 @@ public class Telemetry: Subscriber {
266266
if queueBytes + newMetricSize <= maxQueueBytes {
267267
queue.append(newMetric)
268268
queueBytes += newMetricSize
269-
} else {
270-
queueSizeExceeded = true
271269
}
272270
}
273271
}
@@ -284,7 +282,7 @@ public class Telemetry: Subscriber {
284282

285283
private func systemUpdate(system: System) {
286284
if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue {
287-
self.sampleRate = sampleRate
285+
self._sampleRate.set(sampleRate)
288286
start()
289287
}
290288
}
@@ -297,19 +295,10 @@ public class Telemetry: Subscriber {
297295
return request
298296
}
299297

300-
private func queueHasSpace() -> Bool {
301-
var under = false
302-
telemetryQueue.sync {
303-
under = queue.count < maxQueueSize
304-
}
305-
return under
306-
}
307-
308298
private func resetQueue() {
309299
telemetryQueue.sync {
310300
queue.removeAll()
311301
queueBytes = 0
312-
queueSizeExceeded = false
313302
}
314303
}
315304
}

Diff for: Tests/Segment-Tests/Telemetry_Tests.swift

+38-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class TelemetryTests: XCTestCase {
1212
self.errors.append("\(error)")
1313
}
1414
errors.removeAll()
15-
Telemetry.shared.sampleRate = 1.0
15+
Telemetry.shared.sampleRateTest.set(1.0)
1616
mockTelemetryHTTPClient()
1717
}
1818

@@ -29,12 +29,12 @@ class TelemetryTests: XCTestCase {
2929
}
3030

3131
func testTelemetryStart() {
32-
Telemetry.shared.sampleRate = 0.0
32+
Telemetry.shared.sampleRateTest.set(0.0)
3333
Telemetry.shared.enable = true
3434
Telemetry.shared.start()
3535
XCTAssertFalse(Telemetry.shared.started)
3636

37-
Telemetry.shared.sampleRate = 1.0
37+
Telemetry.shared.sampleRateTest.set(1.0)
3838
Telemetry.shared.start()
3939
XCTAssertTrue(Telemetry.shared.started)
4040
XCTAssertTrue(errors.isEmpty)
@@ -116,7 +116,7 @@ class TelemetryTests: XCTestCase {
116116

117117
func testHTTPException() {
118118
mockTelemetryHTTPClient(shouldThrow: true)
119-
Telemetry.shared.flushFirstError = true
119+
Telemetry.shared.flushFirstErrorTest.set(true)
120120
Telemetry.shared.enable = true
121121
Telemetry.shared.start()
122122
Telemetry.shared.error(metric: Telemetry.INVOKE_METRIC, log: "log") { $0["error"] = "test" }
@@ -143,6 +143,38 @@ class TelemetryTests: XCTestCase {
143143
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: longString) { $0["writekey"] = longString }
144144
XCTAssertTrue(Telemetry.shared.queue.count < 1000)
145145
}
146+
147+
func testConcurrentErrorReporting() {
148+
Telemetry.shared.enable = true
149+
let operationCount = 200
150+
151+
let concurrentExpectation = XCTestExpectation(description: "High pressure operations")
152+
concurrentExpectation.expectedFulfillmentCount = operationCount
153+
154+
// Use multiple dispatch queues to increase concurrency
155+
let queues = [
156+
DispatchQueue.global(qos: .userInitiated),
157+
DispatchQueue.global(qos: .default),
158+
DispatchQueue.global(qos: .utility)
159+
]
160+
for i in 0..<operationCount {
161+
// Round-robin between different queues
162+
let queue = queues[i % queues.count]
163+
queue.async {
164+
Telemetry.shared.error(
165+
metric: Telemetry.INVOKE_ERROR_METRIC,
166+
log: "High pressure test \(i)"
167+
) { tags in
168+
tags["error"] = "pressure_test_key"
169+
tags["queue"] = "\(i % queues.count)"
170+
tags["iteration"] = "\(i)"
171+
}
172+
concurrentExpectation.fulfill()
173+
}
174+
}
175+
wait(for: [concurrentExpectation], timeout: 15.0)
176+
XCTAssertTrue(Telemetry.shared.queue.count == Telemetry.shared.maxQueueSize)
177+
}
146178
}
147179

148180
// Mock URLSession
@@ -154,12 +186,13 @@ class URLSessionMock: RestrictedHTTPSession {
154186
var shouldThrow = false
155187

156188
override func dataTask(with request: URLRequest, completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void) -> URLSessionDataTask {
189+
let task = URLSession.shared.dataTask(with: request) { _, _, _ in }
157190
if shouldThrow {
158191
completionHandler(nil, nil, NSError(domain: "Test", code: 1, userInfo: nil))
159192
} else {
160193
completionHandler(nil, HTTPURLResponse(url: request.url!, statusCode: 200, httpVersion: nil, headerFields: nil), nil)
161194
}
162-
return URLSessionDataTaskMock()
195+
return task
163196
}
164197
}
165198

0 commit comments

Comments
 (0)