diff --git a/Sources/NIOCore/AsyncAwaitSupport.swift b/Sources/NIOCore/AsyncAwaitSupport.swift index 5cc6b6ace3..e8018d6e08 100644 --- a/Sources/NIOCore/AsyncAwaitSupport.swift +++ b/Sources/NIOCore/AsyncAwaitSupport.swift @@ -18,8 +18,9 @@ extension EventLoopFuture { /// This function can be used to bridge an `EventLoopFuture` into the `async` world. Ie. if you're in an `async` /// function and want to get the result of this future. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + @preconcurrency @inlinable - public func get() async throws -> Value { + public func get() async throws -> Value where Value: Sendable { try await withUnsafeThrowingContinuation { (cont: UnsafeContinuation, Error>) in self.whenComplete { result in switch result { @@ -62,8 +63,11 @@ extension EventLoopPromise { /// - returns: A `Task` which was created to `await` the `body`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @discardableResult + @preconcurrency @inlinable - public func completeWithTask(_ body: @escaping @Sendable () async throws -> Value) -> Task { + public func completeWithTask( + _ body: @escaping @Sendable () async throws -> Value + ) -> Task where Value: Sendable { Task { do { let value = try await body() @@ -396,8 +400,9 @@ struct AsyncSequenceFromIterator: AsyncSeq @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension EventLoop { + @preconcurrency @inlinable - public func makeFutureWithTask( + public func makeFutureWithTask( _ body: @Sendable @escaping () async throws -> Return ) -> EventLoopFuture { let promise = self.makePromise(of: Return.self) diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index b3c0ef580f..b2c90c142d 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -457,10 +457,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self.contextSync(handler: handler)) + promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) } else { self.eventLoop.execute { - promise.completeWith(self.contextSync(handler: handler)) + promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) } } @@ -486,10 +486,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self.contextSync(name: name)) + promise.assumeIsolated().completeWith(self.contextSync(name: name)) } else { self.eventLoop.execute { - promise.completeWith(self.contextSync(name: name)) + promise.assumeIsolated().completeWith(self.contextSync(name: name)) } } @@ -519,10 +519,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) } else { self.eventLoop.execute { - promise.completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) } } diff --git a/Sources/NIOCore/DispatchQueue+WithFuture.swift b/Sources/NIOCore/DispatchQueue+WithFuture.swift index 593380aa26..ec9bc03b49 100644 --- a/Sources/NIOCore/DispatchQueue+WithFuture.swift +++ b/Sources/NIOCore/DispatchQueue+WithFuture.swift @@ -29,9 +29,10 @@ extension DispatchQueue { /// - callbackMayBlock: The scheduled callback for the IO / task. /// - returns a new `EventLoopFuture` with value returned by the `block` parameter. @inlinable - public func asyncWithFuture( + @preconcurrency + public func asyncWithFuture( eventLoop: EventLoop, - _ callbackMayBlock: @escaping () throws -> NewValue + _ callbackMayBlock: @escaping @Sendable () throws -> NewValue ) -> EventLoopFuture { let promise = eventLoop.makePromise(of: NewValue.self) diff --git a/Sources/NIOCore/Docs.docc/index.md b/Sources/NIOCore/Docs.docc/index.md index 65e595732d..13d76894ec 100644 --- a/Sources/NIOCore/Docs.docc/index.md +++ b/Sources/NIOCore/Docs.docc/index.md @@ -15,6 +15,7 @@ More specialized modules provide concrete implementations of many of the abstrac - - +- ### Event Loops and Event Loop Groups diff --git a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md new file mode 100644 index 0000000000..b5cbffa4f1 --- /dev/null +++ b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md @@ -0,0 +1,176 @@ +# EventLoops, EventLoopFutures, and Swift Concurrency + +This article aims to communicate how NIO's ``EventLoop``s and ``EventLoopFuture``s interact with the Swift 6 +concurrency model, particularly regarding data-race safety. It aims to be a reference for writing correct +concurrent code in the NIO model. + +NIO predates the Swift concurrency model. As a result, several of NIO's concepts are not perfect matches to +the concepts that Swift uses, or have overlapping responsibilities. + +## Isolation domains and executors + +First, a quick recap. The core of Swift 6's data-race safety protection is the concept of an "isolation +domain". Some valuable reading regarding the concept can be found in +[SE-0414 (Region based isolation)](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0414-region-based-isolation.md) +but at a high level an isolation domain can be understood to be a collection of state and methods within which there cannot be +multiple executors executing code at the same time. + +In standard Swift Concurrency, the main boundaries of isolation domains are actors and tasks. Each actor, +including global actors, defines an isolation domain. Additionally, for functions and methods that are +not isolated to an actor, the `Task` within which that code executes defines an isolation domain. Passing +values between these isolation domains requires that these values are either `Sendable` (safe to hold in +multiple domains), or that the `sending` keyword is used to force the value to be passed from one domain +to another. + +A related concept to an "isolation domain" is an "executor". Again, useful reading can be found in +[SE-0392 (Custom actor executors)](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0392-custom-actor-executors.md). +At a high level, an executor is simply an object that is capable of executing Swift `Task`s. Executors can be +concurrent, or they can be serial. Serial executors are the most common, as they can be used to back an +actor. + +## Event Loops + +NIO's core execution primitive is the ``EventLoop``. An ``EventLoop`` is fundamentally nothing more than +a Swift Concurrency Serial Executor that can also perform I/O operations directly. Indeed, NIO's +``EventLoop``s can be exposed as serial executors, using ``EventLoop/executor``. This provides a mechanism +to protect actor-isolated state using a NIO event-loop. With [the introduction of task executors](https://github.com/swiftlang/swift-evolution/blob/main/proposals/0417-task-executor-preference.md), +future versions of SwiftNIO will also be able to offer their event loops for individual `Task`s to execute +on as well. + +In a Swift 6 world, it is possible that these would be the API that NIO offered to execute tasks on the +loop. However, as NIO predates Swift 6, it also offers its own set of APIs to enqueue work. This includes +(but is not limited to): + +- ``EventLoop/execute(_:)`` +- ``EventLoop/submit(_:)`` +- ``EventLoop/scheduleTask(in:_:)`` +- ``EventLoop/scheduleRepeatedTask(initialDelay:delay:notifying:_:)`` +- ``EventLoop/scheduleCallback(at:handler:)-2xm6l`` + +The existence of these APIs requires us to also ask the question of where the submitted code executes. The +answer is that the submitted code executes on the event loop (or, in Swift Concurrency terms, on the +executor provided by the event loop). + +As the event loop only ever executes a single item of work (either an `async` function or one of the +closures above) at a time, it is a _serial_ executor. It also provides an _isolation domain_: code +submitted to a given `EventLoop` never runs in parallel with other code submitted to the same loop. + +The result here is that a all closures passed into the event loop to do work must be transferred +in: they may not be kept hold of outside of the event loop. That means they must be sent using +the `sending` keyword. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these closures + are `@Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +## Event loop futures + +In Swift NIO the most common mechanism to arrange a series of asynchronous work items is +_not_ to queue up a series of ``EventLoop/execute(_:)`` calls. Instead, users typically +use ``EventLoopFuture``. + +``EventLoopFuture`` has some extensive semantics documented in its API documentation. The +most important principal for this discussion is that all callbacks added to an +``EventLoopFuture`` will execute on the ``EventLoop`` to which that ``EventLoopFuture`` is +bound. By extension, then, all callbacks added to an ``EventLoopFuture`` execute on the same +executor (the ``EventLoop``) in the same isolation domain. + +The analogy to an actor here is hopefully fairly clear. Conceptually, an ``EventLoopFuture`` +could be modelled as an actor. That means all the callbacks have the same logical semantics: +the ``EventLoopFuture`` uses the isolation domain of its associated ``EventLoop``, and all +the callbacks are `sent` into the isolation domain. To that end, all the callback-taking APIs +require that the callback is sent using `sending` into the ``EventLoopFuture``. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these callbacks + are `@Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +Unlike ``EventLoop``s, however, ``EventLoopFuture``s also have value-receiving and value-taking +APIs. This is because ``EventLoopFuture``s pass a value along to their various callbacks, and +so need to be both given an initial value (via an ``EventLoopPromise``) and in some cases to +extract that value from the ``EventLoopFuture`` wrapper. + +This implies that ``EventLoopPromise``'s various success functions +(_and_ ``EventLoop/makeSucceededFuture(_:)``) need to take their value as `sending`. The value +is potentially sent from its current isolation domain into the ``EventLoop``, which will require +that the value is safe to move. + +> Note: As of the current 2.75.0 release, NIO enforces the stricter requirement that these values + are `Sendable`. This is not a long-term position, but reflects the need to continue + to support Swift 5 code which requires this stricter standard. In a future release of + SwiftNIO we expect to relax this constraint: if you need this relaxed constraint + then please file an issue. + +There are also a few ways to extract a value, such as ``EventLoopFuture/wait(file:line:)`` +and ``EventLoopFuture/get()``. These APIs can only safely be called when the ``EventLoopFuture`` +is carrying a `Sendable` value. This is because ``EventLoopFuture``s hold on to their value and +can give it to other closures or other callers of `get` and `wait`. Thus, `sending` is not +sufficient. + +## Combining Futures + +NIO provides a number of APIs for combining futures, such as ``EventLoopFuture/and(_:)``. +This potentially represents an issue, as two futures may not share the same isolation domain. +As a result, we can only safely call these combining functions when the ``EventLoopFuture`` +values are `Sendable`. + +> Note: We can conceptually relax this constraint somewhat by offering equivalent + functions that can only safely be called when all the combined futures share the + same bound event loop: that is, when they are all within the same isolation domain. + + This can be enforced with runtime isolation checks. If you have a need for these + functions, please reach out to the NIO team. + +## Interacting with Futures on the Event Loop + +In a number of contexts (such as in ``ChannelHandler``s), the programmer has static knowledge +that they are within an isolation domain. That isolation domain may well be shared with the +isolation domain of many futures and promises with which they interact. For example, +futures that are provided from ``ChannelHandlerContext/write(_:promise:)`` will be bound to +the event loop on which the ``ChannelHandler`` resides. + +In this context, the `sending` constraint is unnecessarily strict. The future callbacks are +guaranteed to fire on the same isolation domain as the ``ChannelHandlerContext``: no risk +of data race is present. However, Swift Concurrency cannot guarantee this at compile time, +as the specific isolation domain is determined only at runtime. + +In these contexts, today users can make their callbacks safe using ``NIOLoopBound`` and +``NIOLoopBoundBox``. These values can only be constructed on the event loop, and only allow +access to their values on the same event loop. These constraints are enforced at runtime, +so at compile time these are unconditionally `Sendable`. + +> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks + with runtime ones. This makes it possible to introduce crashes in your code. Please + ensure that you are 100% confident that the isolation domains align. If you are not + sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your + ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain + before using these types. + +> Note: In a future NIO release we intend to improve the ergonomics of this common problem + by offering a related type that can only be created from an ``EventLoopFuture`` on a + given ``EventLoop``. This minimises the number of runtime checks, and will make it + easier and more pleasant to write this kind of code. + +## Interacting with Event Loops on the Event Loop + +As with Futures, there are occasionally times where it is necessary to schedule +``EventLoop`` operations on the ``EventLoop`` where your code is currently executing. + +Much like with ``EventLoopFuture``, you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` +to make these callbacks safe. + +> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks + with runtime ones. This makes it possible to introduce crashes in your code. Please + ensure that you are 100% confident that the isolation domains align. If you are not + sure that the ``EventLoopFuture`` you wish to attach a callback to is bound to your + ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain + before using these types. + +> Note: In a future NIO release we intend to improve the ergonomics of this common problem + by offering a related type that can only be created from an ``EventLoopFuture`` on a + given ``EventLoop``. This minimises the number of runtime checks, and will make it + easier and more pleasant to write this kind of code. diff --git a/Sources/NIOCore/EventLoop+Deprecated.swift b/Sources/NIOCore/EventLoop+Deprecated.swift index e2321ceb74..dc3d356b5e 100644 --- a/Sources/NIOCore/EventLoop+Deprecated.swift +++ b/Sources/NIOCore/EventLoop+Deprecated.swift @@ -23,9 +23,10 @@ extension EventLoop { self.makeFailedFuture(error) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func makeSucceededFuture( + public func makeSucceededFuture( _ value: Success, file: StaticString = #fileID, line: UInt = #line diff --git a/Sources/NIOCore/EventLoop.swift b/Sources/NIOCore/EventLoop.swift index 4e76abe821..5732a989be 100644 --- a/Sources/NIOCore/EventLoop.swift +++ b/Sources/NIOCore/EventLoop.swift @@ -60,7 +60,7 @@ public struct Scheduled { } } -extension Scheduled: Sendable where T: Sendable {} +extension Scheduled: Sendable {} /// Returned once a task was scheduled to be repeatedly executed on the `EventLoop`. /// @@ -368,20 +368,22 @@ public protocol EventLoop: EventLoopGroup { /// /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. + @preconcurrency @discardableResult func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback /// Schedule a callback after given time. /// /// - NOTE: Event loops that provide a custom scheduled callback implementation **must** also implement /// `cancelScheduledCallback`. Failure to do so will result in a runtime error. + @preconcurrency @discardableResult func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback /// Cancel a scheduled callback. @@ -749,13 +751,7 @@ extension EventLoop { /// - returns: An `EventLoopFuture` containing the result of `task`'s execution. @inlinable @preconcurrency - public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { - _submit(task) - } - @usableFromInline typealias SubmitCallback = @Sendable () throws -> T - - @inlinable - func _submit(_ task: @escaping SubmitCallback) -> EventLoopFuture { + public func submit(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture { let promise: EventLoopPromise = makePromise(file: #fileID, line: #line) self.execute { @@ -779,18 +775,15 @@ extension EventLoop { /// - returns: An `EventLoopFuture` identical to the `EventLoopFuture` returned from `task`. @inlinable @preconcurrency - public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { - self._flatSubmit(task) - } - @usableFromInline typealias FlatSubmitCallback = @Sendable () -> EventLoopFuture - - @inlinable - func _flatSubmit(_ task: @escaping FlatSubmitCallback) -> EventLoopFuture { + public func flatSubmit(_ task: @escaping @Sendable () -> EventLoopFuture) -> EventLoopFuture { self.submit(task).flatMap { $0 } } /// Schedule a `task` that is executed by this `EventLoop` at the given time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -800,23 +793,11 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( deadline: NIODeadline, file: StaticString = #fileID, line: UInt = #line, _ task: @escaping @Sendable () throws -> EventLoopFuture - ) -> Scheduled { - self._flatScheduleTask(deadline: deadline, file: file, line: line, task) - } - @usableFromInline typealias FlatScheduleTaskDeadlineCallback = () throws -> EventLoopFuture - - @discardableResult - @inlinable - func _flatScheduleTask( - deadline: NIODeadline, - file: StaticString, - line: UInt, - _ task: @escaping FlatScheduleTaskDelayCallback ) -> Scheduled { let promise: EventLoopPromise = self.makePromise(file: file, line: line) let scheduled = self.scheduleTask(deadline: deadline, task) @@ -827,6 +808,9 @@ extension EventLoop { /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. /// + /// - Note: The `T` must be `Sendable` since the isolation domains of the event loop future returned from `task` and + /// this event loop might differ. + /// /// - parameters: /// - task: The asynchronous task to run. As everything that runs on the `EventLoop`, it must not block. /// - returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait @@ -836,7 +820,7 @@ extension EventLoop { @discardableResult @inlinable @preconcurrency - public func flatScheduleTask( + public func flatScheduleTask( in delay: TimeAmount, file: StaticString = #fileID, line: UInt = #line, @@ -848,7 +832,7 @@ extension EventLoop { @usableFromInline typealias FlatScheduleTaskDelayCallback = @Sendable () throws -> EventLoopFuture @inlinable - func _flatScheduleTask( + func _flatScheduleTask( in delay: TimeAmount, file: StaticString, line: UInt, @@ -886,8 +870,9 @@ extension EventLoop { /// - parameters: /// - result: the value that is used by the `EventLoopFuture`. /// - returns: a succeeded `EventLoopFuture`. + @preconcurrency @inlinable - public func makeSucceededFuture(_ value: Success) -> EventLoopFuture { + public func makeSucceededFuture(_ value: Success) -> EventLoopFuture { if Success.self == Void.self { // The as! will always succeed because we previously checked that Success.self == Void.self. return self.makeSucceededVoidFuture() as! EventLoopFuture @@ -901,8 +886,9 @@ extension EventLoop { /// - Parameters: /// - result: The value that is used by the `EventLoopFuture` /// - Returns: A completed `EventLoopFuture`. + @preconcurrency @inlinable - public func makeCompletedFuture(_ result: Result) -> EventLoopFuture { + public func makeCompletedFuture(_ result: Result) -> EventLoopFuture { switch result { case .success(let value): return self.makeSucceededFuture(value) @@ -916,8 +902,11 @@ extension EventLoop { /// - Parameters: /// - body: The function that is used to complete the `EventLoopFuture` /// - Returns: A completed `EventLoopFuture`. + @preconcurrency @inlinable - public func makeCompletedFuture(withResultOf body: () throws -> Success) -> EventLoopFuture { + public func makeCompletedFuture( + withResultOf body: () throws -> Success + ) -> EventLoopFuture { let trans = Result(catching: body) return self.makeCompletedFuture(trans) } @@ -999,7 +988,7 @@ extension EventLoop { notifying promise: EventLoopPromise?, _ task: @escaping ScheduleRepeatedTaskCallback ) -> RepeatedTask { - let futureTask: (RepeatedTask) -> EventLoopFuture = { repeatedTask in + let futureTask: @Sendable (RepeatedTask) -> EventLoopFuture = { repeatedTask in do { try task(repeatedTask) return self.makeSucceededFuture(()) diff --git a/Sources/NIOCore/EventLoopFuture+Deprecated.swift b/Sources/NIOCore/EventLoopFuture+Deprecated.swift index 6883c9ffd9..2e48e7c798 100644 --- a/Sources/NIOCore/EventLoopFuture+Deprecated.swift +++ b/Sources/NIOCore/EventLoopFuture+Deprecated.swift @@ -13,22 +13,24 @@ //===----------------------------------------------------------------------===// extension EventLoopFuture { + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMap( + public func flatMap( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Value) -> EventLoopFuture + _ callback: @escaping @Sendable (Value) -> EventLoopFuture ) -> EventLoopFuture { self.flatMap(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func flatMapThrowing( + public func flatMapThrowing( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Value) throws -> NewValue + _ callback: @escaping @Sendable (Value) throws -> NewValue ) -> EventLoopFuture { self.flatMapThrowing(callback) } @@ -38,7 +40,7 @@ extension EventLoopFuture { public func flatMapErrorThrowing( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Error) throws -> Value + _ callback: @escaping @Sendable (Error) throws -> Value ) -> EventLoopFuture { self.flatMapErrorThrowing(callback) } @@ -48,7 +50,7 @@ extension EventLoopFuture { public func map( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Value) -> (NewValue) + _ callback: @escaping @Sendable (Value) -> (NewValue) ) -> EventLoopFuture { self.map(callback) } @@ -58,34 +60,37 @@ extension EventLoopFuture { public func flatMapError( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Error) -> EventLoopFuture - ) -> EventLoopFuture { + _ callback: @escaping @Sendable (Error) -> EventLoopFuture + ) -> EventLoopFuture where Value: Sendable { self.flatMapError(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") public func flatMapResult( file: StaticString = #fileID, line: UInt = #line, - _ body: @escaping (Value) -> Result + _ body: @escaping @Sendable (Value) -> Result ) -> EventLoopFuture { self.flatMapResult(body) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") public func recover( file: StaticString = #fileID, line: UInt = #line, - _ callback: @escaping (Error) -> Value + _ callback: @escaping @Sendable (Error) -> Value ) -> EventLoopFuture { self.recover(callback) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and( + public func and( _ other: EventLoopFuture, file: StaticString = #fileID, line: UInt = #line @@ -93,9 +98,10 @@ extension EventLoopFuture { self.and(other) } + @preconcurrency @inlinable @available(*, deprecated, message: "Please don't pass file:line:, there's no point.") - public func and( + public func and( value: OtherValue, file: StaticString = #fileID, line: UInt = #line diff --git a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift index bf76a0e97e..48e2b526f6 100644 --- a/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift +++ b/Sources/NIOCore/EventLoopFuture+WithEventLoop.swift @@ -41,7 +41,7 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMapWithEventLoop( + public func flatMapWithEventLoop( _ callback: @escaping @Sendable (Value, EventLoop) -> EventLoopFuture ) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) @@ -79,7 +79,7 @@ extension EventLoopFuture { @preconcurrency public func flatMapErrorWithEventLoop( _ callback: @escaping @Sendable (Error, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { [eventLoop = self.eventLoop] in switch self._value! { @@ -118,10 +118,11 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func foldWithEventLoop( + public func foldWithEventLoop( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue, EventLoop) -> EventLoopFuture - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0(eventLoop: EventLoop) -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index a6aee1b2c1..d72fe23c33 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -26,7 +26,7 @@ import Dispatch /// In particular, note that _run() here continues to obtain and execute lists of callbacks until it completes. /// This eliminates recursion when processing `flatMap()` chains. @usableFromInline -internal struct CallbackList { +internal struct CallbackList: Sendable { @usableFromInline internal typealias Element = @Sendable () -> CallbackList @usableFromInline @@ -183,8 +183,9 @@ public struct EventLoopPromise { /// /// - parameters: /// - value: The successful result of the operation. + @preconcurrency @inlinable - public func succeed(_ value: Value) { + public func succeed(_ value: Value) where Value: Sendable { self._resolve(value: .success(value)) } @@ -194,7 +195,13 @@ public struct EventLoopPromise { /// - error: The error from the operation. @inlinable public func fail(_ error: Error) { - self._resolve(value: .failure(error)) + if self.futureResult.eventLoop.inEventLoop { + self.futureResult._setError(error)._run() + } else { + self.futureResult.eventLoop.execute { + self.futureResult._setError(error)._run() + } + } } /// Complete the promise with the passed in `EventLoopFuture`. @@ -202,11 +209,15 @@ public struct EventLoopPromise { /// This method is equivalent to invoking `future.cascade(to: promise)`, /// but sometimes may read better than its cascade counterpart. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the passed future and this promise might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - future: The future whose value will be used to succeed or fail this promise. /// - seealso: `EventLoopFuture.cascade(to:)` + @preconcurrency @inlinable - public func completeWith(_ future: EventLoopFuture) { + public func completeWith(_ future: EventLoopFuture) where Value: Sendable { future.cascade(to: self) } @@ -224,8 +235,9 @@ public struct EventLoopPromise { /// /// - parameters: /// - result: The result which will be used to succeed or fail this promise. + @preconcurrency @inlinable - public func completeWith(_ result: Result) { + public func completeWith(_ result: Result) where Value: Sendable { self._resolve(value: result) } @@ -238,7 +250,7 @@ public struct EventLoopPromise { /// - parameters: /// - value: The value to fire the future with. @inlinable - internal func _resolve(value: Result) { + internal func _resolve(value: Result) where Value: Sendable { if self.futureResult.eventLoop.inEventLoop { self._setValue(value: value)._run() } else { @@ -411,7 +423,7 @@ public final class EventLoopFuture { /// A EventLoopFuture that has already succeeded @inlinable - internal init(eventLoop: EventLoop, value: Value) { + internal init(eventLoop: EventLoop, value: Value) where Value: Sendable { self.eventLoop = eventLoop self._value = .success(value) self._callbacks = .init() @@ -471,13 +483,16 @@ extension EventLoopFuture { /// /// Note: In a sense, the `EventLoopFuture` is returned before it's created. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func flatMap( + public func flatMap( _ callback: @escaping @Sendable (Value) -> EventLoopFuture ) -> EventLoopFuture { self._flatMap(callback) @@ -485,7 +500,7 @@ extension EventLoopFuture { @usableFromInline typealias FlatMapCallback = @Sendable (Value) -> EventLoopFuture @inlinable - func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { + func _flatMap(_ callback: @escaping FlatMapCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -516,6 +531,9 @@ extension EventLoopFuture { /// /// If your callback function throws, the returned `EventLoopFuture` will error. /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. @@ -566,8 +584,9 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value or a rethrown error. @inlinable @preconcurrency - public func flatMapErrorThrowing(_ callback: @escaping @Sendable (Error) throws -> Value) -> EventLoopFuture - { + public func flatMapErrorThrowing( + _ callback: @escaping @Sendable (Error) throws -> Value + ) -> EventLoopFuture { self._flatMapErrorThrowing(callback) } @usableFromInline typealias FlatMapErrorThrowingCallback = @Sendable (Error) throws -> Value @@ -619,13 +638,17 @@ extension EventLoopFuture { /// - returns: A future that will receive the eventual value. @inlinable @preconcurrency - public func map(_ callback: @escaping @Sendable (Value) -> (NewValue)) -> EventLoopFuture { + public func map( + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { self._map(callback) } @usableFromInline typealias MapCallback = @Sendable (Value) -> (NewValue) @inlinable - func _map(_ callback: @escaping MapCallback) -> EventLoopFuture { + func _map( + _ callback: @escaping @Sendable (Value) -> (NewValue) + ) -> EventLoopFuture { if NewValue.self == Value.self && NewValue.self == Void.self { self.whenSuccess(callback as! @Sendable (Value) -> Void) return self as! EventLoopFuture @@ -645,6 +668,9 @@ extension EventLoopFuture { /// /// If the callback cannot recover it should return a failed `EventLoopFuture`. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the future returned from the callback + /// might differ i.e. they might be bound to different event loops. + /// /// - parameters: /// - callback: Function that will receive the error value of this `EventLoopFuture` and return /// a new value lifted into a new `EventLoopFuture`. @@ -653,13 +679,7 @@ extension EventLoopFuture { @preconcurrency public func flatMapError( _ callback: @escaping @Sendable (Error) -> EventLoopFuture - ) -> EventLoopFuture { - self._flatMapError(callback) - } - @usableFromInline typealias FlatMapErrorCallback = @Sendable (Error) -> EventLoopFuture - - @inlinable - func _flatMapError(_ callback: @escaping FlatMapErrorCallback) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -740,12 +760,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func recover(_ callback: @escaping @Sendable (Error) -> Value) -> EventLoopFuture { - self._recover(callback) - } - @usableFromInline typealias RecoverCallback = @Sendable (Error) -> Value - - @inlinable - func _recover(_ callback: @escaping RecoverCallback) -> EventLoopFuture { let next = EventLoopPromise.makeUnleakablePromise(eventLoop: self.eventLoop) self._whenComplete { switch self._value! { @@ -758,10 +772,9 @@ extension EventLoopFuture { return next.futureResult } - @usableFromInline typealias AddCallbackCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions. @inlinable - internal func _addCallback(_ callback: @escaping AddCallbackCallback) -> CallbackList { + internal func _addCallback(_ callback: @escaping @Sendable () -> CallbackList) -> CallbackList { self.eventLoop.assertInEventLoop() if self._value == nil { self._callbacks.append(callback) @@ -777,11 +790,10 @@ extension EventLoopFuture { internal func _whenComplete(_ callback: @escaping @Sendable () -> CallbackList) { self._internalWhenComplete(callback) } - @usableFromInline typealias InternalWhenCompleteCallback = @Sendable () -> CallbackList /// Add a callback. If there's already a value, run as much of the chain as we can. @inlinable - internal func _internalWhenComplete(_ callback: @escaping InternalWhenCompleteCallback) { + internal func _internalWhenComplete(_ callback: @escaping @Sendable () -> CallbackList) { if self.eventLoop.inEventLoop { self._addCallback(callback)._run() } else { @@ -804,12 +816,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenSuccess(_ callback: @escaping @Sendable (Value) -> Void) { - self._whenSuccess(callback) - } - @usableFromInline typealias WhenSuccessCallback = @Sendable (Value) -> Void - - @inlinable - func _whenSuccess(_ callback: @escaping WhenSuccessCallback) { self._whenComplete { if case .success(let t) = self._value! { callback(t) @@ -831,12 +837,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenFailure(_ callback: @escaping @Sendable (Error) -> Void) { - self._whenFailure(callback) - } - @usableFromInline typealias WhenFailureCallback = @Sendable (Error) -> Void - - @inlinable - func _whenFailure(_ callback: @escaping WhenFailureCallback) { self._whenComplete { if case .failure(let e) = self._value! { callback(e) @@ -853,11 +853,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func whenComplete(_ callback: @escaping @Sendable (Result) -> Void) { - self._publicWhenComplete(callback) - } - @usableFromInline typealias WhenCompleteCallback = @Sendable (Result) -> Void - @inlinable - func _publicWhenComplete(_ callback: @escaping WhenCompleteCallback) { self._whenComplete { callback(self._value!) return CallbackList() @@ -876,6 +871,21 @@ extension EventLoopFuture { } return CallbackList() } + + /// Internal: Set the value and return a list of callbacks that should be invoked as a result. + /// + /// We need a seperate method for setting the error to avoid Sendable checking of `Value` + @inlinable + internal func _setError(_ error: Error) -> CallbackList { + self.eventLoop.assertInEventLoop() + if self._value == nil { + self._value = .failure(error) + let callbacks = self._callbacks + self._callbacks = CallbackList() + return callbacks + } + return CallbackList() + } } // MARK: and @@ -885,8 +895,14 @@ extension EventLoopFuture { /// provided `EventLoopFuture` both succeed. It then provides the pair /// of results. If either one fails, the combined `EventLoopFuture` will fail with /// the first error encountered. + /// + /// - Note: The `NewValue` must be `Sendable` since the isolation domains of this future and the other future might differ i.e. + /// they might be bound to different event loops. + @preconcurrency @inlinable - public func and(_ other: EventLoopFuture) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + _ other: EventLoopFuture + ) -> EventLoopFuture<(Value, OtherValue)> { let promise = EventLoopPromise<(Value, OtherValue)>.makeUnleakablePromise(eventLoop: self.eventLoop) let box: UnsafeMutableTransferBox<(t: Value?, u: OtherValue?)> = .init((nil, nil)) @@ -926,8 +942,11 @@ extension EventLoopFuture { /// Return a new EventLoopFuture that contains this "and" another value. /// This is just syntactic sugar for `future.and(loop.makeSucceedFuture(value))`. + @preconcurrency @inlinable - public func and(value: OtherValue) -> EventLoopFuture<(Value, OtherValue)> { + public func and( + value: OtherValue // TODO: This should be transferring + ) -> EventLoopFuture<(Value, OtherValue)> { self.and(EventLoopFuture(eventLoop: self.eventLoop, value: value)) } } @@ -954,10 +973,14 @@ extension EventLoopFuture { /// }.cascade(to: userPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill with the results of this future. /// - SeeAlso: `EventLoopPromise.completeWith(_:)` + @preconcurrency @inlinable - public func cascade(to promise: EventLoopPromise?) { + public func cascade(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenComplete { result in switch result { @@ -978,9 +1001,13 @@ extension EventLoopFuture { /// doWorkReturningInt().map({ $0 >= 0 }).cascade(to: boolPromise) /// ``` /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of this future and the promise might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameter to: The `EventLoopPromise` to fulfill when a successful result is available. + @preconcurrency @inlinable - public func cascadeSuccess(to promise: EventLoopPromise?) { + public func cascadeSuccess(to promise: EventLoopPromise?) where Value: Sendable { guard let promise = promise else { return } self.whenSuccess { promise.succeed($0) } } @@ -990,6 +1017,7 @@ extension EventLoopFuture { /// This is an alternative variant of `cascade` that allows you to potentially return early failures in /// error cases, while passing the user `EventLoopPromise` onwards. /// + /// /// - Parameter to: The `EventLoopPromise` that should fail with the error of this `EventLoopFuture`. @inlinable public func cascadeFailure(to promise: EventLoopPromise?) { @@ -1012,16 +1040,19 @@ extension EventLoopFuture { /// /// This is also forbidden in async contexts: prefer ``EventLoopFuture/get()``. /// + /// - Note: The `Value` must be `Sendable` since it is shared outside of the isolation domain of the event loop. + /// /// - returns: The value of the `EventLoopFuture` when it completes. /// - throws: The error value of the `EventLoopFuture` if it errors. @available(*, noasync, message: "wait() can block indefinitely, prefer get()", renamed: "get()") + @preconcurrency @inlinable - public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value { + public func wait(file: StaticString = #file, line: UInt = #line) throws -> Value where Value: Sendable { try self._wait(file: file, line: line) } @inlinable - func _wait(file: StaticString, line: UInt) throws -> Value { + func _wait(file: StaticString, line: UInt) throws -> Value where Value: Sendable { self.eventLoop._preconditionSafeToWait(file: file, line: line) let v: UnsafeMutableTransferBox?> = .init(nil) @@ -1059,25 +1090,20 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall EventLoopFuture. /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - futures: An array of `EventLoopFuture` to wait for. /// - with: A function that will be used to fold the values of two `EventLoopFuture`s and return a new value wrapped in an `EventLoopFuture`. /// - returns: A new `EventLoopFuture` with the folded value whose callbacks run on `self.eventLoop`. @inlinable @preconcurrency - public func fold( + public func fold( _ futures: [EventLoopFuture], with combiningFunction: @escaping @Sendable (Value, OtherValue) -> EventLoopFuture - ) -> EventLoopFuture { - self._fold(futures, with: combiningFunction) - } - @usableFromInline typealias FoldCallback = @Sendable (Value, OtherValue) -> EventLoopFuture - - @inlinable - func _fold( - _ futures: [EventLoopFuture], - with combiningFunction: @escaping FoldCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { + @Sendable func fold0() -> EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in @@ -1120,6 +1146,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1128,23 +1157,23 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the reduced value. @preconcurrency @inlinable - public static func reduce( + public static func reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ nextPartialResult: @escaping @Sendable (Value, InputValue) -> Value - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { Self._reduce(initialResult, futures, on: eventLoop, nextPartialResult) } @usableFromInline typealias ReduceCallback = @Sendable (Value, InputValue) -> Value @inlinable - static func _reduce( + static func _reduce( _ initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ nextPartialResult: @escaping ReduceCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let f0 = eventLoop.makeSucceededFuture(initialResult) let body = f0.fold(futures) { (t: Value, u: InputValue) -> EventLoopFuture in @@ -1165,6 +1194,9 @@ extension EventLoopFuture { /// `EventLoopFuture` objects will no longer be waited for. This function therefore fails fast: once /// a failure is encountered, it will immediately fail the overall `EventLoopFuture`. /// + /// - Note: The `Value` and `InputValue` must be `Sendable` since the isolation domains of this future and the other futures might differ i.e. + /// they might be bound to different event loops. + /// /// - parameters: /// - initialResult: An initial result to begin the reduction. /// - futures: An array of `EventLoopFuture` to wait for. @@ -1173,36 +1205,27 @@ extension EventLoopFuture { /// - returns: A new `EventLoopFuture` with the combined value. @inlinable @preconcurrency - public static func reduce( + public static func reduce( into initialResult: Value, _ futures: [EventLoopFuture], on eventLoop: EventLoop, _ updateAccumulatingResult: @escaping @Sendable (inout Value, InputValue) -> Void - ) -> EventLoopFuture { - Self._reduce(into: initialResult, futures, on: eventLoop, updateAccumulatingResult) - } - @usableFromInline typealias ReduceIntoCallback = @Sendable (inout Value, InputValue) -> Void - - @inlinable - static func _reduce( - into initialResult: Value, - _ futures: [EventLoopFuture], - on eventLoop: EventLoop, - _ updateAccumulatingResult: @escaping ReduceIntoCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { let p0 = eventLoop.makePromise(of: Value.self) - var value: Value = initialResult + let value = NIOLoopBoundBox(_value: initialResult, uncheckedEventLoop: eventLoop) let f0 = eventLoop.makeSucceededFuture(()) let future = f0.fold(futures) { (_: (), newValue: InputValue) -> EventLoopFuture in eventLoop.assertInEventLoop() - updateAccumulatingResult(&value, newValue) + var v = value.value + updateAccumulatingResult(&v, newValue) + value.value = v return eventLoop.makeSucceededFuture(()) } future.whenSuccess { eventLoop.assertInEventLoop() - p0.succeed(value) + p0.succeed(value.value) } future.whenFailure { (error) in eventLoop.assertInEventLoop() @@ -1245,14 +1268,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. @inlinable - public static func andAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceSuccesses0(promise, futures, eventLoop, onValue: { _, _ in }) + self._reduceSuccesses0(promise, futures, eventLoop) } } } @@ -1261,14 +1287,19 @@ extension EventLoopFuture { /// The new `EventLoopFuture` will contain all of the values fulfilled by the futures. /// /// The returned `EventLoopFuture` will fail as soon as any of the futures fails. + /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to wait on for fulfilled values. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all of the values fulfilled by the provided futures. + @preconcurrency public static func whenAllSucceed( _ futures: [EventLoopFuture], on eventLoop: EventLoop - ) -> EventLoopFuture<[Value]> { + ) -> EventLoopFuture<[Value]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Value].self) EventLoopFuture.whenAllSucceed(futures, promise: promise) return promise.futureResult @@ -1279,10 +1310,17 @@ extension EventLoopFuture { /// /// If the _results of all futures should be collected use `andAllComplete` instead. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFutures`s to wait for. /// - promise: The `EventLoopPromise` to complete with the result of this call. - public static func whenAllSucceed(_ futures: [EventLoopFuture], promise: EventLoopPromise<[Value]>) { + @preconcurrency + public static func whenAllSucceed( + _ futures: [EventLoopFuture], + promise: EventLoopPromise<[Value]> + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1311,7 +1349,6 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceSuccessCallback = @Sendable (Int, InputValue) -> Void /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. /// @@ -1322,25 +1359,26 @@ extension EventLoopFuture { _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onValue: @escaping ReduceSuccessCallback - ) { + onValue: @escaping @Sendable (Int, InputValue) -> Void + ) where InputValue: Sendable { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { switch result { case .success(let result): onValue(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } case .failure(let error): @@ -1365,6 +1403,72 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onValue` on the provided `EventLoop` when + /// they succeed. The `onValue` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have succeed, the provided promise will succeed. + /// Once any future fails, the provided promise will fail. + @inlinable + internal static func _reduceSuccesses0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onValue` in case of success and succeeds/fails the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + switch result { + case .success: + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + case .failure(let error): + promise.fail(error) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value + { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~20% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + return + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future.whenComplete { result in + let voidResult = result.map { _ in } + if eventLoop.inEventLoop { + processResult(index, voidResult) + } else { + eventLoop.execute { + processResult(index, voidResult) + } + } + } + } + } + } } // MARK: "fail slow" reduce @@ -1400,14 +1504,17 @@ extension EventLoopFuture { /// - futures: An array of homogenous `EventLoopFuture`s to wait for. /// - promise: The `EventLoopPromise` to succeed when all futures have completed. @inlinable - public static func andAllComplete(_ futures: [EventLoopFuture], promise: EventLoopPromise) { + public static func andAllComplete( + _ futures: [EventLoopFuture], + promise: EventLoopPromise + ) { let eventLoop = promise.futureResult.eventLoop if eventLoop.inEventLoop { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } else { eventLoop.execute { - self._reduceCompletions0(promise, futures, eventLoop, onResult: { _, _ in }) + self._reduceCompletions0(promise, futures, eventLoop) } } } @@ -1417,17 +1524,21 @@ extension EventLoopFuture { /// /// The returned `EventLoopFuture` always succeeds, regardless of any failures from the waiting futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// If it is desired to flatten them into a single `EventLoopFuture` that fails on the first `EventLoopFuture` failure, /// use one of the `reduce` methods instead. /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - on: The `EventLoop` on which the new `EventLoopFuture` callbacks will fire. /// - Returns: A new `EventLoopFuture` with all the results of the provided futures. + @preconcurrency @inlinable public static func whenAllComplete( _ futures: [EventLoopFuture], on eventLoop: EventLoop - ) -> EventLoopFuture<[Result]> { + ) -> EventLoopFuture<[Result]> where Value: Sendable { let promise = eventLoop.makePromise(of: [Result].self) EventLoopFuture.whenAllComplete(futures, promise: promise) return promise.futureResult @@ -1437,14 +1548,18 @@ extension EventLoopFuture { /// /// The promise will always be succeeded, regardless of the outcome of the futures. /// + /// - Note: The `Value` must be `Sendable` since the isolation domains of the futures might differ i.e. + /// they might be bound to different event loops. + /// /// - Parameters: /// - futures: An array of homogenous `EventLoopFuture`s to gather results from. /// - promise: The `EventLoopPromise` to complete with the result of the futures. + @preconcurrency @inlinable public static func whenAllComplete( _ futures: [EventLoopFuture], promise: EventLoopPromise<[Result]> - ) { + ) where Value: Sendable { let eventLoop = promise.futureResult.eventLoop let reduced = eventLoop.makePromise(of: Void.self) @@ -1481,34 +1596,33 @@ extension EventLoopFuture { } } - @usableFromInline typealias ReduceCompletions = @Sendable (Int, Result) -> Void - /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. /// /// Once all the futures have completed, the provided promise will succeed. @inlinable - internal static func _reduceCompletions0( + internal static func _reduceCompletions0( _ promise: EventLoopPromise, _ futures: [EventLoopFuture], _ eventLoop: EventLoop, - onResult: @escaping ReduceCompletions + onResult: @escaping @Sendable (Int, Result) -> Void ) { eventLoop.assertInEventLoop() - var remainingCount = futures.count - - if remainingCount == 0 { + if futures.count == 0 { promise.succeed(()) return } + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable func processResult(_ index: Int, _ result: Result) { onResult(index, result) - remainingCount -= 1 + remainingCount.value -= 1 - if remainingCount == 0 { + if remainingCount.value == 0 { promise.succeed(()) } } @@ -1527,6 +1641,65 @@ extension EventLoopFuture { } } } + + /// Loops through the futures array and attaches callbacks to execute `onResult` on the provided `EventLoop` when + /// they complete. The `onResult` will receive the index of the future that fulfilled the provided `Result`. + /// + /// Once all the futures have completed, the provided promise will succeed. + @inlinable + internal static func _reduceCompletions0( + _ promise: EventLoopPromise, + _ futures: [EventLoopFuture], + _ eventLoop: EventLoop + ) { + eventLoop.assertInEventLoop() + + if futures.count == 0 { + promise.succeed(()) + return + } + + let remainingCount = NIOLoopBoundBox(_value: futures.count, uncheckedEventLoop: eventLoop) + + // Sends the result to `onResult` in case of success and succeeds the input promise, if appropriate. + @Sendable + func processResult(_ index: Int, _ result: Result) { + remainingCount.value -= 1 + + if remainingCount.value == 0 { + promise.succeed(()) + } + } + // loop through the futures to chain callbacks to execute on the initiating event loop and grab their index + // in the "futures" to pass their result to the caller + for (index, future) in futures.enumerated() { + if future.eventLoop.inEventLoop, + let result = future._value + { + // Fast-track already-fulfilled results without the overhead of calling `whenComplete`. This can yield a + // ~30% performance improvement in the case of large arrays where all elements are already fulfilled. + switch result { + case .success: + processResult(index, .success(())) + case .failure(let error): + processResult(index, .failure(error)) + } + } else { + // We have to map to `Void` here to avoid sharing the potentially non-Sendable + // value across event loops. + future.whenComplete { result in + let voidResult = result.map { _ in } + if eventLoop.inEventLoop { + processResult(index, voidResult) + } else { + eventLoop.execute { + processResult(index, voidResult) + } + } + } + } + } + } } // MARK: hop @@ -1540,11 +1713,14 @@ extension EventLoopFuture { /// succinctly. It also contains an optimisation for the case when the loop you're hopping *from* is the same as /// the one you're hopping *to*, allowing you to avoid doing allocations in that case. /// + /// - Note: The `Value` must be `Sendable` since it is shared with the isolation domain of the target event loop. + /// /// - parameters: /// - to: The `EventLoop` that the returned `EventLoopFuture` will run on. /// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop. + @preconcurrency @inlinable - public func hop(to target: EventLoop) -> EventLoopFuture { + public func hop(to target: EventLoop) -> EventLoopFuture where Value: Sendable { if target === self.eventLoop { // We're already on that event loop, nothing to do here. Save an allocation. return self @@ -1567,12 +1743,6 @@ extension EventLoopFuture { @inlinable @preconcurrency public func always(_ callback: @escaping @Sendable (Result) -> Void) -> EventLoopFuture { - self._always(callback) - } - @usableFromInline typealias AlwaysCallback = @Sendable (Result) -> Void - - @inlinable - func _always(_ callback: @escaping AlwaysCallback) -> EventLoopFuture { self.whenComplete { result in callback(result) } return self } @@ -1620,9 +1790,11 @@ extension EventLoopFuture { /// - parameters: /// - orReplace: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. /// - returns: an new `EventLoopFuture` with new type parameter `NewValue` and the value passed in the `orReplace` parameter. + @preconcurrency @inlinable - public func unwrap(orReplace replacement: NewValue) -> EventLoopFuture - where Value == NewValue? { + public func unwrap( + orReplace replacement: NewValue + ) -> EventLoopFuture where Value == NewValue? { self.map { (value) -> NewValue in guard let value = value else { return replacement @@ -1677,25 +1849,18 @@ extension EventLoopFuture { /// blockingTask(value) /// } /// + /// - Note: The `Value` and `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: Function that will receive the value of this `EventLoopFuture` and return /// a new `EventLoopFuture`. @inlinable @preconcurrency - public func flatMapBlocking( + public func flatMapBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Value) throws -> NewValue - ) -> EventLoopFuture { - self._flatMapBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias FlatMapBlockingCallback = @Sendable (Value) throws -> NewValue - - @inlinable - func _flatMapBlocking( - onto queue: DispatchQueue, - _ callbackMayBlock: @escaping FlatMapBlockingCallback - ) -> EventLoopFuture { + ) -> EventLoopFuture where Value: Sendable { self.flatMap { result in queue.asyncWithFuture(eventLoop: self.eventLoop) { try callbackMayBlock(result) } } @@ -1709,11 +1874,17 @@ extension EventLoopFuture { /// If you find yourself passing the results from this `EventLoopFuture` to a new `EventLoopPromise` /// in the body of this function, consider using `cascade` instead. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called with the successful result of the `EventLoopFuture`. + @preconcurrency @inlinable - public func whenSuccessBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping (Value) -> Void) { + public func whenSuccessBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Value) -> Void + ) where Value: Sendable { self.whenSuccess { value in queue.async { callbackMayBlock(value) } } @@ -1732,8 +1903,10 @@ extension EventLoopFuture { /// - callbackMayBlock: The callback that is called with the failed result of the `EventLoopFuture`. @inlinable @preconcurrency - public func whenFailureBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Error) -> Void) - { + public func whenFailureBlocking( + onto queue: DispatchQueue, + _ callbackMayBlock: @escaping @Sendable (Error) -> Void + ) { self._whenFailureBlocking(onto: queue, callbackMayBlock) } @usableFromInline typealias WhenFailureBlockingCallback = @Sendable (Error) -> Void @@ -1748,6 +1921,8 @@ extension EventLoopFuture { /// Adds an observer callback to this `EventLoopFuture` that is called when the /// `EventLoopFuture` has any result. The observer callback is permitted to block. /// + /// - Note: The `NewValue` must be `Sendable` since it is shared between the isolation region queue and the event loop. + /// /// - parameters: /// - onto: the `DispatchQueue` on which the blocking IO / task specified by `callbackMayBlock` is scheduled. /// - callbackMayBlock: The callback that is called when the `EventLoopFuture` is fulfilled. @@ -1756,13 +1931,7 @@ extension EventLoopFuture { public func whenCompleteBlocking( onto queue: DispatchQueue, _ callbackMayBlock: @escaping @Sendable (Result) -> Void - ) { - self._whenCompleteBlocking(onto: queue, callbackMayBlock) - } - @usableFromInline typealias WhenCompleteBlocking = @Sendable (Result) -> Void - - @inlinable - func _whenCompleteBlocking(onto queue: DispatchQueue, _ callbackMayBlock: @escaping WhenCompleteBlocking) { + ) where Value: Sendable { self.whenComplete { value in queue.async { callbackMayBlock(value) } } @@ -1875,13 +2044,19 @@ public struct _NIOEventLoopFutureIdentifier: Hashable, Sendable { } } -// EventLoopPromise is a reference type, but by its very nature is Sendable (if its Value is). -extension EventLoopPromise: Sendable where Value: Sendable {} +// The future is unchecked Sendable following the below isolation rules this is safe +// +// 1. Receiving the value of the future is always done on the EventLoop of the future, hence +// the value is never transferred out of the event loops isolation domain. It only gets transferred +// by certain methods such as `hop()` and those methods are annotated with requiring the Value to be +// Sendable +// 2. The promise is `Sendable` but fulfilling the promise with a value requires the user to +// transfer the value to the promise. This ensures that the value is now isolated to the event loops +// isolation domain. Note: Sendable values can always be transferred + +extension EventLoopPromise: Sendable {} -// EventLoopFuture is a reference type, but it is Sendable (if its Value is). However, we enforce -// that by way of the guarantees of the EventLoop protocol, so the compiler cannot -// check it. -extension EventLoopFuture: @unchecked Sendable where Value: Sendable {} +extension EventLoopFuture: @unchecked Sendable {} extension EventLoopPromise where Value == Void { // Deliver a successful result to the associated `EventLoopFuture` object. @@ -1899,7 +2074,8 @@ extension Optional { /// to `promise`. /// /// - Parameter promise: The promise to set or cascade to. - public mutating func setOrCascade(to promise: EventLoopPromise?) + @preconcurrency + public mutating func setOrCascade(to promise: EventLoopPromise?) where Wrapped == EventLoopPromise { guard let promise = promise else { return } diff --git a/Sources/NIOCore/NIOScheduledCallback.swift b/Sources/NIOCore/NIOScheduledCallback.swift index 8d7a056a0a..b2af415aad 100644 --- a/Sources/NIOCore/NIOScheduledCallback.swift +++ b/Sources/NIOCore/NIOScheduledCallback.swift @@ -89,9 +89,10 @@ public struct NIOScheduledCallback: Sendable { } extension EventLoop { + @preconcurrency package func _scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { let task = self.scheduleTask(deadline: deadline) { handler.handleScheduledCallback(eventLoop: self) } task.futureResult.whenFailure { error in @@ -130,20 +131,22 @@ extension EventLoop { /// /// The implementation of this default conformance has been further factored out so we can use it in /// `NIOAsyncTestingEventLoop`, where the use of `wait()` is _less bad_. + @preconcurrency @discardableResult public func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { self._scheduleCallback(at: deadline, handler: handler) } /// Default implementation of `scheduleCallback(in amount:handler:)`: calls `scheduleCallback(at deadline:handler:)`. + @preconcurrency @discardableResult @inlinable public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { try self.scheduleCallback(at: .now() + amount, handler: handler) } diff --git a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift index 52d56bf9ca..14e5242df0 100644 --- a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift @@ -192,9 +192,10 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { self.scheduleTask(deadline: self.now + `in`, task) } + @preconcurrency public func scheduleCallback( at deadline: NIODeadline, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { /// The default implementation of `scheduledCallback(at:handler)` makes two calls to the event loop because it /// needs to hook the future of the backing scheduled task, which can lead to lost cancellation callbacks when @@ -213,10 +214,11 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { } } + @preconcurrency @discardableResult public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) throws -> NIOScheduledCallback { /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index a6c2b33b5b..3105f3af85 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -160,10 +160,11 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { scheduleTask(deadline: self._now + `in`, task) } + @preconcurrency @discardableResult public func scheduleCallback( in amount: TimeAmount, - handler: some NIOScheduledCallbackHandler + handler: some (NIOScheduledCallbackHandler & Sendable) ) -> NIOScheduledCallback { /// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so /// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 7b38e49301..4e4c6a34e0 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -253,7 +253,9 @@ final class AsyncChannelTests: XCTestCase { let strongSentinel: Sentinel? = Sentinel() sentinel = strongSentinel! try await XCTAsyncAssertNotNil( - await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).get() + await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).map { _ in + true + }.get() ) try await channel.writeInbound(strongSentinel!) _ = try await channel.readInbound(as: Sentinel.self) diff --git a/Tests/NIOPosixTests/EventLoopFutureTest.swift b/Tests/NIOPosixTests/EventLoopFutureTest.swift index fac8f2ab6d..d0f7bb3335 100644 --- a/Tests/NIOPosixTests/EventLoopFutureTest.swift +++ b/Tests/NIOPosixTests/EventLoopFutureTest.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Dispatch +import NIOConcurrencyHelpers import NIOEmbedded import NIOPosix import XCTest @@ -1408,18 +1409,19 @@ class EventLoopFutureTest: XCTestCase { func testWhenSuccessBlocking() { let eventLoop = EmbeddedEventLoop() let sem = DispatchSemaphore(value: 0) - var nonBlockingRan = false + let nonBlockingRan = NIOLockedValueBox(false) let p = eventLoop.makePromise(of: String.self) p.futureResult.whenSuccessBlocking(onto: DispatchQueue.global()) { sem.wait() // Block in callback XCTAssertEqual($0, "hello") - XCTAssertTrue(nonBlockingRan) + nonBlockingRan.withLockedValue { XCTAssertTrue($0) } + } p.succeed("hello") let p2 = eventLoop.makePromise(of: Bool.self) p2.futureResult.whenSuccess { _ in - nonBlockingRan = true + nonBlockingRan.withLockedValue { $0 = true } } p2.succeed(true) diff --git a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift index 3fbdc71e30..405f7e84b7 100644 --- a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift @@ -634,34 +634,41 @@ class NonBlockingFileIOTest: XCTestCase { func testFileOpenWorks() throws { let content = "123" try withTemporaryFile(content: content) { (fileHandle, path) -> Void in - let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() - try fh.withUnsafeFileDescriptor { fd in - XCTAssertGreaterThanOrEqual(fd, 0) - } - XCTAssertTrue(fh.isOpen) - XCTAssertEqual(0, fr.readerIndex) - XCTAssertEqual(3, fr.endIndex) - try fh.close() + try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).flatMapThrowing { vals in + let (fh, fr) = vals + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(3, fr.endIndex) + try fh.close() + }.wait() } } func testFileOpenWorksWithEmptyFile() throws { let content = "" try withTemporaryFile(content: content) { (fileHandle, path) -> Void in - let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() - try fh.withUnsafeFileDescriptor { fd in - XCTAssertGreaterThanOrEqual(fd, 0) - } - XCTAssertTrue(fh.isOpen) - XCTAssertEqual(0, fr.readerIndex) - XCTAssertEqual(0, fr.endIndex) - try fh.close() + try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).flatMapThrowing { vals in + let (fh, fr) = vals + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(0, fr.endIndex) + try fh.close() + }.wait() } } func testFileOpenFails() throws { do { - _ = try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).wait() + try self.fileIO.openFile( + path: "/dev/null/this/does/not/exist", + eventLoop: self.eventLoop + ).map { _ in }.wait() XCTFail("should've thrown") } catch let e as IOError where e.errnoCode == ENOTDIR { // OK