diff --git a/Sources/SWBUtil/Dispatch+Async.swift b/Sources/SWBUtil/Dispatch+Async.swift index bf70a86d..fa81ddef 100644 --- a/Sources/SWBUtil/Dispatch+Async.swift +++ b/Sources/SWBUtil/Dispatch+Async.swift @@ -57,114 +57,38 @@ extension DispatchFD { } } } -} -extension AsyncThrowingStream where Element == UInt8, Failure == any Error { /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller. @available(macOS, deprecated: 15.0, message: "Use the AsyncSequence-returning overload.") @available(iOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.") @available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.") @available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.") @available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.") - public static func _dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> AsyncThrowingStream { - AsyncThrowingStream { continuation in - let newFD: DispatchFD - do { - newFD = try fileDescriptor._duplicate() - } catch { - continuation.finish(throwing: error) - return - } - - let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in - do { - try newFD._close() - if error != 0 { - continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#1")) - } - } catch { - continuation.finish(throwing: error) - } - } - io.setLimit(lowWater: 0) - io.setLimit(highWater: 4096) - - continuation.onTermination = { termination in - if case .cancelled = termination { - io.close(flags: .stop) - } else { - io.close() - } - } - - io.read(offset: 0, length: .max, queue: queue) { done, data, error in - guard error == 0 else { - continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#2")) - return - } - - let data = data ?? .empty - for element in data { - continuation.yield(element) - } - - if done { - continuation.finish() + public func _dataStream() -> AsyncThrowingStream { + AsyncThrowingStream { + while !Task.isCancelled { + let chunk = try await readChunk(upToLength: 4096) + if chunk.isEmpty { + return nil } + return chunk } + throw CancellationError() } } -} -@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) -extension AsyncSequence where Element == UInt8, Failure == any Error { /// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller. - public static func dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> any AsyncSequence { - AsyncThrowingStream { continuation in - let newFD: DispatchFD - do { - newFD = try fileDescriptor._duplicate() - } catch { - continuation.finish(throwing: error) - return - } - - let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in - do { - try newFD._close() - if error != 0 { - let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#1" - continuation.finish(throwing: POSIXError(error, context: context)) - } - } catch { - continuation.finish(throwing: error) - } - } - io.setLimit(lowWater: 0) - io.setLimit(highWater: 4096) - - continuation.onTermination = { termination in - if case .cancelled = termination { - io.close(flags: .stop) - } else { - io.close() - } - } - - io.read(offset: 0, length: .max, queue: queue) { done, data, error in - guard error == 0 else { - let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#2" - continuation.finish(throwing: POSIXError(error, context: context)) - return - } - - let data = data ?? .empty - continuation.yield(data) - - if done { - continuation.finish() + @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) + public func dataStream() -> some AsyncSequence { + AsyncThrowingStream { + while !Task.isCancelled { + let chunk = try await readChunk(upToLength: 4096) + if chunk.isEmpty { + return nil } + return chunk } - }.flattened + throw CancellationError() + } } } diff --git a/Sources/SWBUtil/FileHandle+Async.swift b/Sources/SWBUtil/FileHandle+Async.swift index 01746218..2f0874e9 100644 --- a/Sources/SWBUtil/FileHandle+Async.swift +++ b/Sources/SWBUtil/FileHandle+Async.swift @@ -19,13 +19,13 @@ extension FileHandle { @available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.") @available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.") @available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.") - public func _bytes(on queue: SWBQueue) -> AsyncThrowingStream { - ._dataStream(reading: DispatchFD(fileHandle: self), on: queue) + public func _bytes() -> AsyncThrowingStream { + DispatchFD(fileHandle: self)._dataStream() } /// Replacement for `bytes` which uses DispatchIO to avoid blocking the caller. @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) - public func bytes(on queue: SWBQueue) -> any AsyncSequence { - AsyncThrowingStream.dataStream(reading: DispatchFD(fileHandle: self), on: queue) + public func bytes() -> some AsyncSequence { + DispatchFD(fileHandle: self).dataStream() } } diff --git a/Sources/SWBUtil/Misc+Async.swift b/Sources/SWBUtil/Misc+Async.swift index 22818c12..ff01e3cf 100644 --- a/Sources/SWBUtil/Misc+Async.swift +++ b/Sources/SWBUtil/Misc+Async.swift @@ -23,6 +23,18 @@ extension AsyncSequence { } } +extension AsyncSequence where Element: RandomAccessCollection { + @inlinable + public func collect() async rethrows -> [Element.Element] { + var items = [Element.Element]() + var it = makeAsyncIterator() + while let e = try await it.next() { + items.append(contentsOf: e) + } + return items + } +} + extension TaskGroup where Element == Void { /// Concurrency-friendly replacement for `DispatchQueue.concurrentPerform(iterations:execute:)`. public static func concurrentPerform(iterations: Int, maximumParallelism: Int, execute work: @Sendable @escaping (Int) async -> Element) async { diff --git a/Sources/SWBUtil/Process+Async.swift b/Sources/SWBUtil/Process+Async.swift index aa22ecc1..93cecc3e 100644 --- a/Sources/SWBUtil/Process+Async.swift +++ b/Sources/SWBUtil/Process+Async.swift @@ -35,20 +35,20 @@ extension Process { @available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.") @available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.") @available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.") - public func _makeStream(for keyPath: ReferenceWritableKeyPath, using pipe: Pipe) -> AsyncThrowingStream { + public func _makeStream(for keyPath: ReferenceWritableKeyPath, using pipe: Pipe) -> AsyncThrowingStream { precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways self[keyPath: keyPath] = pipe - return pipe.fileHandleForReading._bytes(on: .global()) + return pipe.fileHandleForReading._bytes() } /// Returns an ``AsyncStream`` configured to read the standard output or error stream of the process. /// /// - note: This method will mutate the `standardOutput` or `standardError` property of the Process object, replacing any existing `Pipe` or `FileHandle` which may be set. It must be called before the process is started. @available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *) - public func makeStream(for keyPath: ReferenceWritableKeyPath, using pipe: Pipe) -> any AsyncSequence { + public func makeStream(for keyPath: ReferenceWritableKeyPath, using pipe: Pipe) -> some AsyncSequence { precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways self[keyPath: keyPath] = pipe - return pipe.fileHandleForReading.bytes(on: .global()) + return pipe.fileHandleForReading.bytes() } } diff --git a/Sources/SWBUtil/Process.swift b/Sources/SWBUtil/Process.swift index 634c3f0d..5243e5d7 100644 --- a/Sources/SWBUtil/Process.swift +++ b/Sources/SWBUtil/Process.swift @@ -103,7 +103,7 @@ extension Process { let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in process.standardOutputPipe = pipe process.standardErrorPipe = pipe - return pipe.fileHandleForReading.bytes(on: .global()) + return pipe.fileHandleForReading.bytes() } collect: { stream in try await stream.collect() } @@ -115,7 +115,7 @@ extension Process { let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in process.standardOutputPipe = pipe process.standardErrorPipe = pipe - return pipe.fileHandleForReading._bytes(on: .global()) + return pipe.fileHandleForReading._bytes() } collect: { stream in try await stream.collect() } diff --git a/Sources/SWBUtil/SWBDispatch.swift b/Sources/SWBUtil/SWBDispatch.swift index 26aad751..0aa561b2 100644 --- a/Sources/SWBUtil/SWBDispatch.swift +++ b/Sources/SWBUtil/SWBDispatch.swift @@ -49,33 +49,6 @@ public struct DispatchFD { rawValue = fileHandle.fileDescriptor #endif } - - internal func _duplicate() throws -> DispatchFD { - #if os(Windows) - return self - #else - return try DispatchFD(fileDescriptor: FileDescriptor(rawValue: rawValue).duplicate()) - #endif - } - - internal func _close() throws { - #if !os(Windows) - try FileDescriptor(rawValue: rawValue).close() - #endif - } - - // Only exists to help debug a rare concurrency issue where the file descriptor goes invalid - internal func _filePath() throws -> String { - #if canImport(Darwin) - var buffer = [CChar](repeating: 0, count: Int(MAXPATHLEN)) - if fcntl(rawValue, F_GETPATH, &buffer) == -1 { - throw POSIXError(errno, "fcntl", String(rawValue), "F_GETPATH") - } - return String(cString: buffer) - #else - return String() - #endif - } } // @unchecked: rdar://130051790 (DispatchData should be Sendable) diff --git a/Tests/SWBUtilTests/FileHandleTests.swift b/Tests/SWBUtilTests/FileHandleTests.swift index e118f507..3661db8a 100644 --- a/Tests/SWBUtilTests/FileHandleTests.swift +++ b/Tests/SWBUtilTests/FileHandleTests.swift @@ -38,21 +38,21 @@ import SystemPackage let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false) try await fd.closeAfter { if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) { - var it = fh.bytes(on: .global()).makeAsyncIterator() + var it = fh.bytes().makeAsyncIterator() var bytesOfFile: [UInt8] = [] await #expect(throws: Never.self) { - while let byte = try await it.next() { - bytesOfFile.append(byte) + while let chunk = try await it.next() { + bytesOfFile.append(contentsOf: chunk) } } #expect(bytesOfFile.count == 1448) #expect(plist.bytes == bytesOfFile) } else { - var it = fh._bytes(on: .global()).makeAsyncIterator() + var it = fh._bytes().makeAsyncIterator() var bytesOfFile: [UInt8] = [] await #expect(throws: Never.self) { - while let byte = try await it.next() { - bytesOfFile.append(byte) + while let chunk = try await it.next() { + bytesOfFile.append(contentsOf: chunk) } } #expect(bytesOfFile.count == 1448) @@ -72,7 +72,7 @@ import SystemPackage let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false) if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) { - var it = fh.bytes(on: .global()).makeAsyncIterator() + var it = fh.bytes().makeAsyncIterator() try fd.close() await #expect(throws: (any Error).self) { @@ -80,7 +80,7 @@ import SystemPackage } } } else { - var it = fh._bytes(on: .global()).makeAsyncIterator() + var it = fh._bytes().makeAsyncIterator() try fd.close() await #expect(throws: (any Error).self) { @@ -99,21 +99,21 @@ import SystemPackage try await fd.closeAfter { let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false) if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) { - var it = fh.bytes(on: .global()).makeAsyncIterator() + var it = fh.bytes().makeAsyncIterator() var bytes: [UInt8] = [] - while let byte = try await it.next() { - bytes.append(byte) - if bytes.count == 100 { + while let chunk = try await it.next() { + bytes.append(contentsOf: chunk) + if bytes.count >= 100 { condition.signal() throw CancellationError() } } } else { - var it = fh._bytes(on: .global()).makeAsyncIterator() + var it = fh._bytes().makeAsyncIterator() var bytes: [UInt8] = [] - while let byte = try await it.next() { - bytes.append(byte) - if bytes.count == 100 { + while let chunk = try await it.next() { + bytes.append(contentsOf: chunk) + if bytes.count >= 100 { condition.signal() throw CancellationError() } diff --git a/Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift b/Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift index 3b987a81..7de7fb62 100644 --- a/Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift +++ b/Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift @@ -32,8 +32,8 @@ final class CLIConnection { private let monitorHandle: FileHandle private let temporaryDirectory: NamedTemporaryDirectory private let exitPromise: Promise - private let outputStream: AsyncThrowingStream - private var outputStreamIterator: AsyncCLIConnectionResponseSequence>.AsyncIterator + private let outputStream: AsyncThrowingStream + private var outputStreamIterator: AsyncCLIConnectionResponseSequence>>.AsyncIterator static var swiftbuildToolSearchPaths: [URL] { var searchPaths: [URL] = [] @@ -138,8 +138,8 @@ final class CLIConnection { // Close the session handle, so the FD will close once the service stops. try sessionHandle.close() - outputStream = monitorHandle._bytes(on: .global()) - outputStreamIterator = outputStream.cliResponses.makeAsyncIterator() + outputStream = monitorHandle._bytes() + outputStreamIterator = outputStream.flattened.cliResponses.makeAsyncIterator() #endif } @@ -253,6 +253,9 @@ public struct AsyncCLIConnectionResponseSequence: AsyncSequ // BSDs send EOF, Linux raises EIO... #if os(Linux) || os(Android) if error.code == EIO { + if reply.isEmpty { + return nil + } break } #endif @@ -282,6 +285,9 @@ public struct AsyncCLIConnectionResponseSequence: AsyncSequ // BSDs send EOF, Linux raises EIO... #if os(Linux) || os(Android) if error.code == EIO { + if reply.isEmpty { + return nil + } break } #endif diff --git a/Tests/SwiftBuildTests/ConsoleCommands/ServiceConsoleTests.swift b/Tests/SwiftBuildTests/ConsoleCommands/ServiceConsoleTests.swift index 017090a5..26985a79 100644 --- a/Tests/SwiftBuildTests/ConsoleCommands/ServiceConsoleTests.swift +++ b/Tests/SwiftBuildTests/ConsoleCommands/ServiceConsoleTests.swift @@ -33,7 +33,7 @@ fileprivate struct ServiceConsoleTests { let standardOutput = task._makeStream(for: \.standardOutputPipe, using: outputPipe) let promise: Promise = try task.launch() - let data = try await standardOutput.reduce(into: [], { $0.append($1) }) + let data = try await standardOutput.reduce(into: [], { $0.append(contentsOf: $1) }) let output = String(decoding: data, as: UTF8.self) // Verify there were no errors.