From 035962e6b6e03c8721a91a9b96dc084289795cb4 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Mon, 5 Nov 2018 09:50:33 +0000 Subject: [PATCH] EventLoop.assertInEventLoop and preconditionInEventLoop (#644) Motivation: Dispatch can only precondition that code is run from a certain queue, it can't return if code is running on a certain queue. To support that this introduces new EventLoop.assertInEventLoop and EventLoop.preconditionInEventLoop methods instead of assert(eventLoop.inEventLoop). Modifications: add EventLoop.assertInEventLoop and EventLoop.preconditionInEventLoop Result: better support for Network.framework in NIOTS --- Sources/NIO/BaseSocketChannel.swift | 78 ++++++++++++++--------------- Sources/NIO/Bootstrap.swift | 14 +++--- Sources/NIO/ChannelPipeline.swift | 60 +++++++++++----------- Sources/NIO/EventLoop.swift | 38 ++++++++++---- Sources/NIO/EventLoopFuture.swift | 14 +++--- Sources/NIO/NonBlockingFileIO.swift | 2 +- Sources/NIO/SocketChannel.swift | 26 +++++----- Sources/NIO/Utilities.swift | 1 + 8 files changed, 125 insertions(+), 108 deletions(-) diff --git a/Sources/NIO/BaseSocketChannel.swift b/Sources/NIO/BaseSocketChannel.swift index ac8c00d344..067801200d 100644 --- a/Sources/NIO/BaseSocketChannel.swift +++ b/Sources/NIO/BaseSocketChannel.swift @@ -43,7 +43,7 @@ private struct SocketChannelLifecycleManager { private var currentState: State = .fresh { didSet { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() switch (oldValue, self.currentState) { case (_, .activated): self.isActiveAtomic.store(true) @@ -90,7 +90,7 @@ private struct SocketChannelLifecycleManager { // MARK: private API @inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined private mutating func moveState(event: Event, promise: EventLoopPromise?) -> ((ChannelPipeline) -> Void) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() switch (self.currentState, event) { // origin: .fresh @@ -158,12 +158,12 @@ private struct SocketChannelLifecycleManager { // MARK: convenience properties internal var isActive: Bool { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return self.currentState == .activated } internal var isPreRegistered: Bool { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() switch self.currentState { case .fresh, .closed: return false @@ -173,7 +173,7 @@ private struct SocketChannelLifecycleManager { } internal var isRegisteredFully: Bool { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() switch self.currentState { case .fresh, .closed, .preRegistered: return false @@ -185,7 +185,7 @@ private struct SocketChannelLifecycleManager { /// Returns whether the underlying file descriptor is open. This property will always be true (even before registration) /// until the Channel is closed. internal var isOpen: Bool { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return self.currentState != .closed } } @@ -229,13 +229,13 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { private var autoRead: Bool = true private var lifecycleManager: SocketChannelLifecycleManager { didSet { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() } } private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() { didSet { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() self.bufferAllocatorCached.store(Box(self.bufferAllocator)) } } @@ -287,12 +287,12 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { /// `false` if the whole `Channel` is closed and so no more IO operation can be done. var isOpen: Bool { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return self.lifecycleManager.isOpen } var isRegistered: Bool { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return self.lifecycleManager.isPreRegistered } @@ -401,7 +401,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func localAddress0() throws -> SocketAddress { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { throw ChannelError.ioOnClosedChannel } @@ -409,7 +409,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func remoteAddress0() throws -> SocketAddress { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { throw ChannelError.ioOnClosedChannel } @@ -420,7 +420,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { /// /// - returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister` if everything was written and we don't need to be notified about writability at the moment. func flushNow() -> IONotificationState { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() // Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by // `writeToSocket`. guard !self.inFlushNow && self.isOpen else { @@ -475,7 +475,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } func setOption0(option: T, value: T.OptionType) throws { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -523,7 +523,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } func getOption0(option: T) throws -> T.OptionType { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -550,7 +550,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { /// /// - returns: `true` if `readPending` is `true`, `false` otherwise. @discardableResult func readIfNeeded0() -> Bool { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if !self.lifecycleManager.isActive { return false } @@ -563,7 +563,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { // Methods invoked from the HeadHandler of the ChannelPipeline public func bind0(to address: SocketAddress, promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { promise?.fail(error: ChannelError.ioOnClosedChannel) @@ -581,7 +581,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func write0(_ data: NIOAny, promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { // Channel was already closed, fail the promise and not even queue it. @@ -598,7 +598,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private func registerForWritable() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard !self.interestedEvent.contains(.write) else { // nothing to do if we were previously interested in write @@ -608,7 +608,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } func unregisterForWritable() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.interestedEvent.contains(.write) else { // nothing to do if we were not previously interested in write @@ -618,7 +618,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func flush0() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { return @@ -637,7 +637,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public func read0() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { return @@ -650,7 +650,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private final func pauseRead0() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if self.lifecycleManager.isPreRegistered { unregisterForReadable() @@ -658,7 +658,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private final func registerForReadable() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isRegisteredFully) guard !self.lifecycleManager.hasSeenEOFNotification else { @@ -674,7 +674,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } internal final func unregisterForReadable() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isRegisteredFully) guard self.interestedEvent.contains(.read) else { @@ -693,7 +693,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { /// - mode: The close mode, must be `.all` for `BaseSocketChannel` /// - promise: The promise that gets notified about the result of the deregistration/close operations. public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { promise?.fail(error: ChannelError.alreadyClosed) @@ -765,7 +765,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { public final func register0(promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { promise?.fail(error: ChannelError.ioOnClosedChannel) @@ -792,7 +792,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func registerAlreadyConfigured0(promise: EventLoopPromise?) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.isOpen) assert(!self.lifecycleManager.isActive) let registerPromise: EventLoopPromise = self.eventLoop.newPromise() @@ -821,7 +821,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { // Methods invoked from the EventLoop itself public final func writable() { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.isOpen) self.finishConnect() // If we were connecting, that has finished. @@ -832,7 +832,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private func finishConnect() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isPreRegistered) if let connectPromise = self.pendingConnect { @@ -857,7 +857,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private func finishWritable() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if self.isOpen { assert(self.lifecycleManager.isPreRegistered) @@ -950,7 +950,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { @discardableResult private final func readable0() -> ReadStreamState { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isActive) defer { @@ -1016,7 +1016,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(updateLocal || updateRemote) let cached = addressesCached.load().value let local = updateLocal ? try? self.localAddress0() : cached.local @@ -1025,12 +1025,12 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } internal final func unsetCachedAddressesFromSocket() { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() self.addressesCached.store(Box((local: nil, remote: nil))) } public final func connect0(to address: SocketAddress, promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { promise?.fail(error: ChannelError.ioOnClosedChannel) @@ -1087,7 +1087,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private final func safeReregister(interested: SelectorEventSet) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isRegisteredFully) guard self.isOpen else { @@ -1108,7 +1108,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } private func safeRegister(interested: SelectorEventSet) throws { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(!self.lifecycleManager.isRegisteredFully) guard self.isOpen else { @@ -1126,7 +1126,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } final func becomeFullyRegistered0() throws { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isPreRegistered) assert(!self.lifecycleManager.isRegisteredFully) @@ -1136,7 +1136,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } final func becomeActive0(promise: EventLoopPromise?) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self.lifecycleManager.isPreRegistered) if !self.lifecycleManager.isRegisteredFully { do { diff --git a/Sources/NIO/Bootstrap.swift b/Sources/NIO/Bootstrap.swift index 43eeface73..f52d3c445c 100644 --- a/Sources/NIO/Bootstrap.swift +++ b/Sources/NIO/Bootstrap.swift @@ -259,23 +259,23 @@ public final class ServerBootstrap { @inline(__always) func setupChildChannel() -> EventLoopFuture { return self.childChannelOptions.applyAll(channel: accepted).then { () -> EventLoopFuture in - assert(childEventLoop.inEventLoop) + childEventLoop.assertInEventLoop() return childChannelInit(accepted) } } @inline(__always) func fireThroughPipeline(_ future: EventLoopFuture) { - assert(ctxEventLoop.inEventLoop) + ctxEventLoop.assertInEventLoop() future.then { (_) -> EventLoopFuture in - assert(ctxEventLoop.inEventLoop) + ctxEventLoop.assertInEventLoop() guard !ctx.pipeline.destroyed else { return ctx.eventLoop.newFailedFuture(error: ChannelError.ioOnClosedChannel) } ctx.fireChannelRead(data) return ctx.eventLoop.newSucceededFuture(result: ()) }.whenFailure { error in - assert(ctx.eventLoop.inEventLoop) + ctxEventLoop.assertInEventLoop() self.closeAndFire(ctx: ctx, accepted: accepted, err: error) } } @@ -308,9 +308,9 @@ private extension Channel { // In many cases `body` must be _synchronously_ follow `register`, otherwise in our current // implementation, `epoll` will send us `EPOLLHUP`. To have it run synchronously, we need to invoke the // `then` on the eventloop that the `register` will succeed on. - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return self.register().then { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return body(self) } } @@ -507,7 +507,7 @@ public final class ClientBootstrap { @inline(__always) func setupChannel() -> EventLoopFuture { - assert(eventLoop.inEventLoop) + eventLoop.assertInEventLoop() channelInitializer(channel).then { channelOptions.applyAll(channel: channel) }.then { diff --git a/Sources/NIO/ChannelPipeline.swift b/Sources/NIO/ChannelPipeline.swift index 5cb2642c08..2a007b8f26 100644 --- a/Sources/NIO/ChannelPipeline.swift +++ b/Sources/NIO/ChannelPipeline.swift @@ -150,7 +150,7 @@ public final class ChannelPipeline: ChannelInvoker { /// The `Channel` that this `ChannelPipeline` belongs to. internal var channel: Channel { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(self._channel != nil || self.destroyed) return self._channel ?? DeadChannel(pipeline: self) } @@ -257,7 +257,7 @@ public final class ChannelPipeline: ChannelInvoker { relativeHandler: ChannelHandler, operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void, promise: EventLoopPromise) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if self.destroyed { promise.fail(error: ChannelError.ioOnClosedChannel) return @@ -293,7 +293,7 @@ public final class ChannelPipeline: ChannelInvoker { relativeContext: ChannelHandlerContext, operation: (ChannelHandlerContext, ChannelHandlerContext) -> Void, promise: EventLoopPromise) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if destroyed { promise.fail(error: ChannelError.ioOnClosedChannel) @@ -322,7 +322,7 @@ public final class ChannelPipeline: ChannelInvoker { /// - new: The `ChannelHandlerContext` to add to the pipeline. /// - existing: The `ChannelHandlerContext` that `new` will be added after. private func add0(context new: ChannelHandlerContext, after existing: ChannelHandlerContext) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() let next = existing.next new.prev = existing @@ -341,7 +341,7 @@ public final class ChannelPipeline: ChannelInvoker { /// - new: The `ChannelHandlerContext` to add to the pipeline. /// - existing: The `ChannelHandlerContext` that `new` will be added before. private func add0(context new: ChannelHandlerContext, before existing: ChannelHandlerContext) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() let prev = existing.prev new.prev = prev @@ -468,7 +468,7 @@ public final class ChannelPipeline: ChannelInvoker { /// Remove a `ChannelHandlerContext` from the `ChannelPipeline`. Must only be called from within the `EventLoop`. private func remove0(ctx: ChannelHandlerContext, promise: EventLoopPromise?) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() let nextCtx = ctx.next let prevCtx = ctx.prev @@ -493,7 +493,7 @@ public final class ChannelPipeline: ChannelInvoker { /// Returns the next name to use for a `ChannelHandler`. private func nextName() -> String { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() let name = "handler\(idx)" idx += 1 @@ -505,7 +505,7 @@ public final class ChannelPipeline: ChannelInvoker { /// This method must only be called from within the `EventLoop`. It should only be called from a `ChannelCore` /// implementation. Once called, the `ChannelPipeline` is no longer active and cannot be used again. func removeHandlers() { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if let head = self.head { while let ctx = head.next { @@ -1239,7 +1239,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelRegistered() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelRegistered(ctx: self) @@ -1249,7 +1249,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelUnregistered() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelUnregistered(ctx: self) @@ -1259,7 +1259,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelActive() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelActive(ctx: self) @@ -1269,7 +1269,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelInactive() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelInactive(ctx: self) @@ -1279,7 +1279,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelRead(_ data: NIOAny) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelRead(ctx: self, data: data) @@ -1289,7 +1289,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelReadComplete() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelReadComplete(ctx: self) @@ -1299,7 +1299,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeChannelWritabilityChanged() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.channelWritabilityChanged(ctx: self) @@ -1309,7 +1309,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeErrorCaught(_ error: Error) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.errorCaught(ctx: self, error: error) @@ -1319,7 +1319,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeUserInboundEventTriggered(_ event: Any) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let inboundHandler = self.inboundHandler { inboundHandler.userInboundEventTriggered(ctx: self, event: event) @@ -1329,7 +1329,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeRegister(promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1340,7 +1340,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeBind(to address: SocketAddress, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1351,7 +1351,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeConnect(to address: SocketAddress, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1362,7 +1362,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeWrite(_ data: NIOAny, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1373,7 +1373,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeFlush() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let outboundHandler = self.outboundHandler { outboundHandler.flush(ctx: self) @@ -1383,7 +1383,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeWriteAndFlush(_ data: NIOAny, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1395,7 +1395,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeRead() { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() if let outboundHandler = self.outboundHandler { outboundHandler.read(ctx: self) @@ -1405,7 +1405,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeClose(mode: CloseMode, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1416,7 +1416,7 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeTriggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise?) { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() assert(promise.map { !$0.futureResult.isFulfilled } ?? true, "Promise \(promise!) already fulfilled") if let outboundHandler = self.outboundHandler { @@ -1427,20 +1427,16 @@ public final class ChannelHandlerContext: ChannelInvoker { } fileprivate func invokeHandlerAdded() throws { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() handler.handlerAdded(ctx: self) } fileprivate func invokeHandlerRemoved() throws { - assert(inEventLoop) + self.eventLoop.assertInEventLoop() handler.handlerRemoved(ctx: self) } - - private var inEventLoop: Bool { - return eventLoop.inEventLoop - } } extension ChannelPipeline: CustomDebugStringConvertible { diff --git a/Sources/NIO/EventLoop.swift b/Sources/NIO/EventLoop.swift index b9c9de4633..71c4c8f033 100644 --- a/Sources/NIO/EventLoop.swift +++ b/Sources/NIO/EventLoop.swift @@ -76,7 +76,7 @@ public final class RepeatedTask { } private func begin0(in delay: TimeAmount) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard let task = self.task else { return } @@ -103,14 +103,14 @@ public final class RepeatedTask { } private func cancel0() { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() self.scheduled?.cancel() self.scheduled = nil self.task = nil } private func reschedule() { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard let scheduled = self.scheduled else { return } @@ -127,7 +127,7 @@ public final class RepeatedTask { } private func reschedule0() { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.task != nil else { return } @@ -209,6 +209,10 @@ public protocol EventLoop: EventLoopGroup { /// Schedule a `task` that is executed by this `SelectableEventLoop` after the given amount of time. func scheduleTask(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled + + /// Checks that this call is run from the `EventLoop`. If this is called from within the `EventLoop` this function + /// will have no effect, if called from outside the `EventLoop` it will crash the process with a trap. + func preconditionInEventLoop(file: StaticString, line: UInt) } /// Represents a time _interval_. @@ -384,6 +388,22 @@ extension EventLoop { public func makeIterator() -> EventLoopIterator? { return EventLoopIterator([self]) } + + /// Checks that this call is run from the EventLoop. If this is called from within the EventLoop this function will + /// have no effect, if called from outside the EventLoop it will crash the process with a trap if run in debug mode. + /// In release mode this function never has any effect. + /// + /// - note: This is not a customization point so calls to this function can be fully optimized out in release mode. + @_inlineable + public func assertInEventLoop(file: StaticString = #file, line: UInt = #line) { + debugOnly { + self.preconditionInEventLoop(file: file, line: line) + } + } + + public func preconditionInEventLoop(file: StaticString = #file, line: UInt = #line) { + precondition(self.inEventLoop, file: file, line: line) + } } /// Internal representation of a `Registration` to an `Selector`. @@ -508,14 +528,14 @@ internal final class SelectableEventLoop: EventLoop { /// Is this `SelectableEventLoop` still open (ie. not shutting down or shut down) internal var isOpen: Bool { - assert(self.inEventLoop) + self.assertInEventLoop() return self.lifecycleState == .open } /// Register the given `SelectableChannel` with this `SelectableEventLoop`. After this point all I/O for the `SelectableChannel` will be processed by this `SelectableEventLoop` until it /// is deregistered by calling `deregister`. public func register(channel: C) throws { - assert(inEventLoop) + self.assertInEventLoop() // Don't allow registration when we're closed. guard self.lifecycleState == .open else { @@ -527,7 +547,7 @@ internal final class SelectableEventLoop: EventLoop { /// Deregister the given `SelectableChannel` from this `SelectableEventLoop`. public func deregister(channel: C) throws { - assert(inEventLoop) + self.assertInEventLoop() guard lifecycleState == .open else { // It's possible the EventLoop was closed before we were able to call deregister, so just return in this case as there is no harm. return @@ -538,7 +558,7 @@ internal final class SelectableEventLoop: EventLoop { /// Register the given `SelectableChannel` with this `SelectableEventLoop`. This should be done whenever `channel.interestedEvents` has changed and it should be taken into account when /// waiting for new I/O for the given `SelectableChannel`. public func reregister(channel: C) throws { - assert(inEventLoop) + self.assertInEventLoop() try selector.reregister(selectable: channel.selectable, interested: channel.interestedEvent) } @@ -636,7 +656,7 @@ internal final class SelectableEventLoop: EventLoop { /// Start processing I/O and tasks for this `SelectableEventLoop`. This method will continue running (and so block) until the `SelectableEventLoop` is closed. public func run() throws { - precondition(self.inEventLoop, "tried to run the EventLoop on the wrong thread.") + self.preconditionInEventLoop() defer { var scheduledTasksCopy = ContiguousArray() tasksLock.withLockVoid { diff --git a/Sources/NIO/EventLoopFuture.swift b/Sources/NIO/EventLoopFuture.swift index acad176ad5..348f96d82f 100644 --- a/Sources/NIO/EventLoopFuture.swift +++ b/Sources/NIO/EventLoopFuture.swift @@ -589,7 +589,7 @@ extension EventLoopFuture { /// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions. fileprivate func _addCallback(_ callback: @escaping () -> CallbackList) -> CallbackList { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if value == nil { callbacks.append(callback) return CallbackList() @@ -672,7 +672,7 @@ extension EventLoopFuture { /// Internal: Set the value and return a list of callbacks that should be invoked as a result. fileprivate func _setValue(value: EventLoopFutureValue) -> CallbackList { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() if self.value == nil { self.value = value let callbacks = self.callbacks @@ -711,7 +711,7 @@ extension EventLoopFuture { let hopOver = other.hopTo(eventLoop: self.eventLoop) hopOver._whenComplete { () -> CallbackList in - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() switch other.value! { case .failure(let error): return promise._setValue(value: .failure(error)) @@ -854,7 +854,7 @@ extension EventLoopFuture { let body = futures.reduce(self) { (f1: EventLoopFuture, f2: EventLoopFuture) -> EventLoopFuture in let newFuture = f1.and(f2).then { (args: (T, U)) -> EventLoopFuture in let (f1Value, f2Value) = args - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() return combiningFunction(f1Value, f2Value) } assert(newFuture.eventLoop === self.eventLoop) @@ -933,17 +933,17 @@ extension EventLoopFuture { let f0 = eventLoop.newSucceededFuture(result: ()) let future = f0.fold(futures) { (_: (), value: U) -> EventLoopFuture in - assert(eventLoop.inEventLoop) + eventLoop.assertInEventLoop() updateAccumulatingResult(&result, value) return eventLoop.newSucceededFuture(result: ()) } future.whenSuccess { - assert(eventLoop.inEventLoop) + eventLoop.assertInEventLoop() p0.succeed(result: result) } future.whenFailure { (error) in - assert(eventLoop.inEventLoop) + eventLoop.assertInEventLoop() p0.fail(error: error) } return p0.futureResult diff --git a/Sources/NIO/NonBlockingFileIO.swift b/Sources/NIO/NonBlockingFileIO.swift index 3d24be8074..1e063d65dc 100644 --- a/Sources/NIO/NonBlockingFileIO.swift +++ b/Sources/NIO/NonBlockingFileIO.swift @@ -126,7 +126,7 @@ public struct NonBlockingFileIO { assert(readSize > 0) return self.read(fileHandle: fileHandle, byteCount: readSize, allocator: allocator, eventLoop: eventLoop).then { buffer in chunkHandler(buffer).then { () -> EventLoopFuture in - assert(eventLoop.inEventLoop) + eventLoop.assertInEventLoop() return _read(remainingReads: remainingReads - 1) } } diff --git a/Sources/NIO/SocketChannel.swift b/Sources/NIO/SocketChannel.swift index 620915f7e5..f3ed4dc144 100644 --- a/Sources/NIO/SocketChannel.swift +++ b/Sources/NIO/SocketChannel.swift @@ -48,7 +48,7 @@ final class SocketChannel: BaseSocketChannel { } override var isOpen: Bool { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(super.isOpen == self.pendingWrites.isOpen) return super.isOpen } @@ -71,7 +71,7 @@ final class SocketChannel: BaseSocketChannel { } override func setOption0(option: T, value: T.OptionType) throws { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -92,7 +92,7 @@ final class SocketChannel: BaseSocketChannel { } override func getOption0(option: T) throws -> T.OptionType { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -122,7 +122,7 @@ final class SocketChannel: BaseSocketChannel { } override func readFromSocket() throws -> ReadResult { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() // Just allocate one time for the while read loop. This is fine as ByteBuffer is a struct and uses COW. var buffer = recvAllocator.buffer(allocator: allocator) var result = ReadResult.none @@ -339,7 +339,7 @@ final class ServerSocketChannel: BaseSocketChannel { } override func setOption0(option: T, value: T.OptionType) throws { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -354,7 +354,7 @@ final class ServerSocketChannel: BaseSocketChannel { } override func getOption0(option: T) throws -> T.OptionType { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -369,7 +369,7 @@ final class ServerSocketChannel: BaseSocketChannel { } override public func bind0(to address: SocketAddress, promise: EventLoopPromise?) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isOpen else { promise?.fail(error: ChannelError.ioOnClosedChannel) @@ -450,7 +450,7 @@ final class ServerSocketChannel: BaseSocketChannel { } override public func channelRead0(_ data: NIOAny) { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() let ch = data.forceAsOther() as SocketChannel ch.eventLoop.execute { @@ -500,7 +500,7 @@ final class DatagramChannel: BaseSocketChannel { } override var isOpen: Bool { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() assert(super.isOpen == self.pendingWrites.isOpen) return super.isOpen } @@ -545,7 +545,7 @@ final class DatagramChannel: BaseSocketChannel { // MARK: Datagram Channel overrides required by BaseSocketChannel override func setOption0(option: T, value: T.OptionType) throws { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -562,7 +562,7 @@ final class DatagramChannel: BaseSocketChannel { } override func getOption0(option: T) throws -> T.OptionType { - assert(eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard isOpen else { throw ChannelError.ioOnClosedChannel @@ -696,7 +696,7 @@ final class DatagramChannel: BaseSocketChannel { // MARK: Datagram Channel overrides not required by BaseSocketChannel override func bind0(to address: SocketAddress, promise: EventLoopPromise?) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isRegistered else { promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState) return @@ -790,7 +790,7 @@ extension DatagramChannel: MulticastChannel { interface: NIONetworkInterface?, promise: EventLoopPromise?, operation: GroupOperation) { - assert(self.eventLoop.inEventLoop) + self.eventLoop.assertInEventLoop() guard self.isActive else { promise?.fail(error: ChannelLifecycleError.inappropriateOperationForState) diff --git a/Sources/NIO/Utilities.swift b/Sources/NIO/Utilities.swift index 2b9cad64b6..76f405d89d 100644 --- a/Sources/NIO/Utilities.swift +++ b/Sources/NIO/Utilities.swift @@ -19,6 +19,7 @@ /// https://forums.swift.org/t/support-debug-only-code/11037 for a discussion. import CNIOLinux +@_inlineable @_versioned internal func debugOnly(_ body: () -> Void) { assert({ body(); return true }()) }