Skip to content

Commit

Permalink
fix(realtime): crash when connecting socket (#470)
Browse files Browse the repository at this point in the history
* fix(realtime): crash when connecting socket

* fix test

* Fix slack clone example

* import FoundationNetworking
  • Loading branch information
grdsdev authored Jul 19, 2024
1 parent b8f8164 commit 5cf4f56
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 158 deletions.
4 changes: 2 additions & 2 deletions Examples/Examples.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@
"LD_RUNPATH_SEARCH_PATHS[sdk=macosx*]" = "@executable_path/../Frameworks";
LOCALIZATION_PREFERS_STRING_CATALOGS = YES;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = com.supabase.SlackClone;
PRODUCT_BUNDLE_IDENTIFIER = "com.supabase.slack-clone";
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = auto;
SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx xros xrsimulator";
Expand Down Expand Up @@ -809,7 +809,7 @@
"LD_RUNPATH_SEARCH_PATHS[sdk=macosx*]" = "@executable_path/../Frameworks";
LOCALIZATION_PREFERS_STRING_CATALOGS = YES;
MARKETING_VERSION = 1.0;
PRODUCT_BUNDLE_IDENTIFIER = com.supabase.SlackClone;
PRODUCT_BUNDLE_IDENTIFIER = "com.supabase.slack-clone";
PRODUCT_NAME = "$(TARGET_NAME)";
SDKROOT = auto;
SUPPORTED_PLATFORMS = "iphoneos iphonesimulator macosx xros xrsimulator";
Expand Down
36 changes: 14 additions & 22 deletions Examples/SlackClone/AuthView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,20 @@ final class AuthViewModel {
var email = ""
var toast: ToastState?

func signInButtonTapped() {
Task {
do {
try await supabase.auth.signInWithOTP(
email: email,
redirectTo: URL(string: "slackclone://sign-in")
)
toast = ToastState(status: .success, title: "Check your inbox.")
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
}
}
func signInButtonTapped() async {
do {
try await supabase.auth.signInWithOTP(email: email)
toast = ToastState(status: .success, title: "Check your inbox.")

func handle(_ url: URL) {
Task {
do {
try await supabase.auth.session(from: url)
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
try? await Task.sleep(for: .seconds(1))

#if os(macOS)
NSWorkspace.shared.open(URL(string: "http://127.0.0.1:54324")!)
#else
await UIApplication.shared.open(URL(string: "http://127.0.0.1:54324")!)
#endif
} catch {
toast = ToastState(status: .error, title: "Error", description: error.localizedDescription)
}
}
}
Expand All @@ -54,12 +47,11 @@ struct AuthView: View {
.autocorrectionDisabled()
}
Button("Sign in with Magic Link") {
model.signInButtonTapped()
Task { await model.signInButtonTapped() }
}
}
.padding()
.toast(state: $model.toast)
.onOpenURL { model.handle($0) }
}
}

Expand Down
5 changes: 2 additions & 3 deletions Examples/SlackClone/Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
<key>CFBundleURLIconFile</key>
<string></string>
<key>CFBundleURLName</key>
<string>com.supabase.SlackClone</string>
<string>com.supabase.slack-clone</string>
<key>CFBundleURLSchemes</key>
<array>
<string>slackclone</string>
<string>com.supabase.slack-clone</string>
</array>
</dict>
<dict/>
</array>
</dict>
</plist>
3 changes: 3 additions & 0 deletions Examples/SlackClone/SlackCloneApp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ struct SlackCloneApp: App {
var body: some Scene {
WindowGroup {
AppView(model: model)
.onOpenURL { url in
supabase.handle(url)
}
}
}
}
1 change: 1 addition & 0 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ let supabase = SupabaseClient(
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://")),
global: SupabaseClientOptions.GlobalOptions(logger: LogStore.shared)
)
)
2 changes: 1 addition & 1 deletion Examples/SlackClone/supabase/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ enabled = true
# in emails.
site_url = "http://127.0.0.1:3000"
# A list of *exact* URLs that auth providers are permitted to redirect to post authentication.
additional_redirect_urls = ["https://127.0.0.1:3000", "slackclone://*"]
additional_redirect_urls = ["https://127.0.0.1:3000", "com.supabase.slack-clone://"]
# How long tokens are valid for, in seconds. Defaults to 3600 (1 hour), maximum 604,800 (1 week).
jwt_expiry = 3600
# If disabled, the refresh token will never expire.
Expand Down
140 changes: 14 additions & 126 deletions Sources/Realtime/V2/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ protocol WebSocketClient: Sendable {
func send(_ message: RealtimeMessageV2) async throws
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error>
func connect() -> AsyncStream<ConnectionStatus>
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode)
}

extension WebSocketClient {
func disconnect() {
disconnect(closeCode: .normalClosure)
}
func disconnect()
}

final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable {
Expand All @@ -39,7 +33,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @

struct MutableState {
var continuation: AsyncStream<ConnectionStatus>.Continuation?
var stream: SocketStream?
var connection: WebSocketConnection<RealtimeMessageV2, RealtimeMessageV2>?
}

private let mutableState = LockIsolated(MutableState())
Expand All @@ -57,7 +51,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
mutableState.withValue { state in
let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
let task = session.webSocketTask(with: realtimeURL)
state.stream = SocketStream(task: task)
state.connection = WebSocketConnection(task: task)
task.resume()

let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
Expand All @@ -66,51 +60,27 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
}
}

func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
func disconnect() {
mutableState.withValue { state in
state.stream?.cancel(with: closeCode)
state.connection?.close()
}
}

func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
mutableState.withValue { mutableState in
guard let stream = mutableState.stream else {
return .finished(
throwing: RealtimeError(
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
)
guard let connection = mutableState.connection else {
return .finished(
throwing: RealtimeError(
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
)
}

return stream.map { message in
switch message {
case let .string(stringMessage):
self.logger?.verbose("Received message: \(stringMessage)")

guard let data = stringMessage.data(using: .utf8) else {
throw RealtimeError("Expected a UTF8 encoded message.")
}

let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
return message

case .data:
fallthrough

default:
throw RealtimeError("Unsupported message type.")
}
}
.eraseToThrowingStream()
)
}

return connection.receive()
}

func send(_ message: RealtimeMessageV2) async throws {
let data = try JSONEncoder().encode(message)
let string = String(decoding: data, as: UTF8.self)

logger?.verbose("Sending message: \(string)")
try await mutableState.stream?.send(.string(string))
logger?.verbose("Sending message: \(message)")
try await mutableState.connection?.send(message)
}

// MARK: - URLSessionWebSocketDelegate
Expand Down Expand Up @@ -145,85 +115,3 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
mutableState.continuation?.yield(.error(error))
}
}

typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>

final class SocketStream: AsyncSequence, Sendable {
typealias AsyncIterator = WebSocketStream.Iterator
typealias Element = URLSessionWebSocketTask.Message

struct MutableState {
var continuation: WebSocketStream.Continuation?
var stream: WebSocketStream?
}

private let task: URLSessionWebSocketTask
private let mutableState = LockIsolated(MutableState())

private func makeStreamIfNeeded() -> WebSocketStream {
mutableState.withValue { state in
if let stream = state.stream {
return stream
}

let stream = WebSocketStream { continuation in
state.continuation = continuation
waitForNextValue()
}

state.stream = stream
return stream
}
}

private func waitForNextValue() {
guard task.closeCode == .invalid else {
mutableState.continuation?.finish()
return
}

task.receive { [weak self] result in
guard let continuation = self?.mutableState.continuation else { return }

do {
let message = try result.get()
continuation.yield(message)
self?.waitForNextValue()
} catch {
continuation.finish(throwing: error)
}
}
}

init(task: URLSessionWebSocketTask) {
self.task = task
}

deinit {
mutableState.continuation?.finish()
}

func makeAsyncIterator() -> WebSocketStream.Iterator {
makeStreamIfNeeded().makeAsyncIterator()
}

func cancel(with closeCode: URLSessionWebSocketTask.CloseCode = .goingAway) {
task.cancel(with: closeCode, reason: nil)
mutableState.continuation?.finish()
}

func send(_ message: URLSessionWebSocketTask.Message) async throws {
try await task.send(message)
}
}

#if os(Linux) || os(Windows)
extension URLSessionWebSocketTask {
func receive(completionHandler: @Sendable @escaping (Result<URLSessionWebSocketTask.Message, any Error>) -> Void) {
Task {
let result = await Result(catching: { try await self.receive() })
completionHandler(result)
}
}
}
#endif
77 changes: 77 additions & 0 deletions Sources/Realtime/V2/WebSocketConnection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// WebSocketConnection.swift
//
//
// Created by Guilherme Souza on 29/03/24.
//

import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

enum WebSocketConnectionError: Error {
case unsupportedData
}

final class WebSocketConnection<Incoming: Codable, Outgoing: Codable>: Sendable {
private let task: URLSessionWebSocketTask
private let encoder: JSONEncoder
private let decoder: JSONDecoder

init(
task: URLSessionWebSocketTask,
encoder: JSONEncoder = JSONEncoder(),
decoder: JSONDecoder = JSONDecoder()
) {
self.task = task
self.encoder = encoder
self.decoder = decoder

task.resume()
}

deinit {
task.cancel(with: .goingAway, reason: nil)
}

func receiveOnce() async throws -> Incoming {
switch try await task.receive() {
case let .data(data):
let message = try decoder.decode(Incoming.self, from: data)
return message

case let .string(string):
guard let data = string.data(using: .utf8) else {
throw WebSocketConnectionError.unsupportedData
}

let message = try decoder.decode(Incoming.self, from: data)
return message

@unknown default:
assertionFailure("Unsupported message type.")
task.cancel(with: .unsupportedData, reason: nil)
throw WebSocketConnectionError.unsupportedData
}
}

func send(_ message: Outgoing) async throws {
let data = try encoder.encode(message)
try await task.send(.data(data))
}

func receive() -> AsyncThrowingStream<Incoming, any Error> {
AsyncThrowingStream { [weak self] in
guard let self else { return nil }

let message = try await receiveOnce()
return Task.isCancelled ? nil : message
}
}

func close() {
task.cancel(with: .normalClosure, reason: nil)
}
}
4 changes: 2 additions & 2 deletions Sources/TestHelpers/HTTPClientMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package actor HTTPClientMock: HTTPClientType {
package struct MockNotFound: Error {}

private var mocks = [@Sendable (HTTPRequest) async throws -> HTTPResponse?]()

/// Requests received by this client in order.
package var receivedRequests: [HTTPRequest] = []

Expand Down Expand Up @@ -47,7 +47,7 @@ package actor HTTPClientMock: HTTPClientType {
package func send(_ request: HTTPRequest) async throws -> HTTPResponse {
receivedRequests.append(request)

for mock in mocks{
for mock in mocks {
do {
if let response = try await mock(request) {
returnedResponses.append(.success(response))
Expand Down
Loading

0 comments on commit 5cf4f56

Please sign in to comment.