Skip to content

Commit

Permalink
EventLoop.assertInEventLoop and preconditionInEventLoop (#644)
Browse files Browse the repository at this point in the history
Motivation:

Dispatch can only precondition that code is run from a certain queue, it
can't return if code is running on a certain queue.
To support that this introduces new EventLoop.assertInEventLoop and
EventLoop.preconditionInEventLoop methods instead of
assert(eventLoop.inEventLoop).

Modifications:

add EventLoop.assertInEventLoop and EventLoop.preconditionInEventLoop

Result:

better support for Network.framework in NIOTS
  • Loading branch information
weissi authored and Lukasa committed Nov 5, 2018
1 parent 26bee21 commit 035962e
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 108 deletions.
78 changes: 39 additions & 39 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private struct SocketChannelLifecycleManager {

private var currentState: State = .fresh {
didSet {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
switch (oldValue, self.currentState) {
case (_, .activated):
self.isActiveAtomic.store(true)
Expand Down Expand Up @@ -90,7 +90,7 @@ private struct SocketChannelLifecycleManager {
// MARK: private API
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
private mutating func moveState(event: Event, promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

switch (self.currentState, event) {
// origin: .fresh
Expand Down Expand Up @@ -158,12 +158,12 @@ private struct SocketChannelLifecycleManager {

// MARK: convenience properties
internal var isActive: Bool {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return self.currentState == .activated
}

internal var isPreRegistered: Bool {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
switch self.currentState {
case .fresh, .closed:
return false
Expand All @@ -173,7 +173,7 @@ private struct SocketChannelLifecycleManager {
}

internal var isRegisteredFully: Bool {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
switch self.currentState {
case .fresh, .closed, .preRegistered:
return false
Expand All @@ -185,7 +185,7 @@ private struct SocketChannelLifecycleManager {
/// Returns whether the underlying file descriptor is open. This property will always be true (even before registration)
/// until the Channel is closed.
internal var isOpen: Bool {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return self.currentState != .closed
}
}
Expand Down Expand Up @@ -229,13 +229,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
private var autoRead: Bool = true
private var lifecycleManager: SocketChannelLifecycleManager {
didSet {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
}
}

private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() {
didSet {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
self.bufferAllocatorCached.store(Box(self.bufferAllocator))
}
}
Expand Down Expand Up @@ -287,12 +287,12 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

/// `false` if the whole `Channel` is closed and so no more IO operation can be done.
var isOpen: Bool {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return self.lifecycleManager.isOpen
}

var isRegistered: Bool {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return self.lifecycleManager.isPreRegistered
}

Expand Down Expand Up @@ -401,15 +401,15 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func localAddress0() throws -> SocketAddress {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError.ioOnClosedChannel
}
return try self.socket.localAddress()
}

public final func remoteAddress0() throws -> SocketAddress {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError.ioOnClosedChannel
}
Expand All @@ -420,7 +420,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
///
/// - returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister` if everything was written and we don't need to be notified about writability at the moment.
func flushNow() -> IONotificationState {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
// Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by
// `writeToSocket`.
guard !self.inFlushNow && self.isOpen else {
Expand Down Expand Up @@ -475,7 +475,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

func setOption0<T: ChannelOption>(option: T, value: T.OptionType) throws {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard isOpen else {
throw ChannelError.ioOnClosedChannel
Expand Down Expand Up @@ -523,7 +523,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

func getOption0<T: ChannelOption>(option: T) throws -> T.OptionType {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard isOpen else {
throw ChannelError.ioOnClosedChannel
Expand All @@ -550,7 +550,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
///
/// - returns: `true` if `readPending` is `true`, `false` otherwise.
@discardableResult func readIfNeeded0() -> Bool {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
if !self.lifecycleManager.isActive {
return false
}
Expand All @@ -563,7 +563,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

// Methods invoked from the HeadHandler of the ChannelPipeline
public func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
promise?.fail(error: ChannelError.ioOnClosedChannel)
Expand All @@ -581,7 +581,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
// Channel was already closed, fail the promise and not even queue it.
Expand All @@ -598,7 +598,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func registerForWritable() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard !self.interestedEvent.contains(.write) else {
// nothing to do if we were previously interested in write
Expand All @@ -608,7 +608,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

func unregisterForWritable() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.interestedEvent.contains(.write) else {
// nothing to do if we were not previously interested in write
Expand All @@ -618,7 +618,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func flush0() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
return
Expand All @@ -637,7 +637,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public func read0() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
return
Expand All @@ -650,15 +650,15 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private final func pauseRead0() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

if self.lifecycleManager.isPreRegistered {
unregisterForReadable()
}
}

private final func registerForReadable() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isRegisteredFully)

guard !self.lifecycleManager.hasSeenEOFNotification else {
Expand All @@ -674,7 +674,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

internal final func unregisterForReadable() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isRegisteredFully)

guard self.interestedEvent.contains(.read) else {
Expand All @@ -693,7 +693,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
/// - mode: The close mode, must be `.all` for `BaseSocketChannel`
/// - promise: The promise that gets notified about the result of the deregistration/close operations.
public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
promise?.fail(error: ChannelError.alreadyClosed)
Expand Down Expand Up @@ -765,7 +765,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {


public final func register0(promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
promise?.fail(error: ChannelError.ioOnClosedChannel)
Expand All @@ -792,7 +792,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.isOpen)
assert(!self.lifecycleManager.isActive)
let registerPromise: EventLoopPromise<Void> = self.eventLoop.newPromise()
Expand Down Expand Up @@ -821,7 +821,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

// Methods invoked from the EventLoop itself
public final func writable() {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.isOpen)

self.finishConnect() // If we were connecting, that has finished.
Expand All @@ -832,7 +832,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func finishConnect() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isPreRegistered)

if let connectPromise = self.pendingConnect {
Expand All @@ -857,7 +857,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func finishWritable() {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

if self.isOpen {
assert(self.lifecycleManager.isPreRegistered)
Expand Down Expand Up @@ -950,7 +950,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

@discardableResult
private final func readable0() -> ReadStreamState {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isActive)

defer {
Expand Down Expand Up @@ -1016,7 +1016,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(updateLocal || updateRemote)
let cached = addressesCached.load().value
let local = updateLocal ? try? self.localAddress0() : cached.local
Expand All @@ -1025,12 +1025,12 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

internal final func unsetCachedAddressesFromSocket() {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
self.addressesCached.store(Box((local: nil, remote: nil)))
}

public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()

guard self.isOpen else {
promise?.fail(error: ChannelError.ioOnClosedChannel)
Expand Down Expand Up @@ -1087,7 +1087,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private final func safeReregister(interested: SelectorEventSet) {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isRegisteredFully)

guard self.isOpen else {
Expand All @@ -1108,7 +1108,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func safeRegister(interested: SelectorEventSet) throws {
assert(eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(!self.lifecycleManager.isRegisteredFully)

guard self.isOpen else {
Expand All @@ -1126,7 +1126,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

final func becomeFullyRegistered0() throws {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isPreRegistered)
assert(!self.lifecycleManager.isRegisteredFully)

Expand All @@ -1136,7 +1136,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

final func becomeActive0(promise: EventLoopPromise<Void>?) {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isPreRegistered)
if !self.lifecycleManager.isRegisteredFully {
do {
Expand Down
14 changes: 7 additions & 7 deletions Sources/NIO/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,23 @@ public final class ServerBootstrap {
@inline(__always)
func setupChildChannel() -> EventLoopFuture<Void> {
return self.childChannelOptions.applyAll(channel: accepted).then { () -> EventLoopFuture<Void> in
assert(childEventLoop.inEventLoop)
childEventLoop.assertInEventLoop()
return childChannelInit(accepted)
}
}

@inline(__always)
func fireThroughPipeline(_ future: EventLoopFuture<Void>) {
assert(ctxEventLoop.inEventLoop)
ctxEventLoop.assertInEventLoop()
future.then { (_) -> EventLoopFuture<Void> in
assert(ctxEventLoop.inEventLoop)
ctxEventLoop.assertInEventLoop()
guard !ctx.pipeline.destroyed else {
return ctx.eventLoop.newFailedFuture(error: ChannelError.ioOnClosedChannel)
}
ctx.fireChannelRead(data)
return ctx.eventLoop.newSucceededFuture(result: ())
}.whenFailure { error in
assert(ctx.eventLoop.inEventLoop)
ctxEventLoop.assertInEventLoop()
self.closeAndFire(ctx: ctx, accepted: accepted, err: error)
}
}
Expand Down Expand Up @@ -308,9 +308,9 @@ private extension Channel {
// In many cases `body` must be _synchronously_ follow `register`, otherwise in our current
// implementation, `epoll` will send us `EPOLLHUP`. To have it run synchronously, we need to invoke the
// `then` on the eventloop that the `register` will succeed on.
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return self.register().then {
assert(self.eventLoop.inEventLoop)
self.eventLoop.assertInEventLoop()
return body(self)
}
}
Expand Down Expand Up @@ -507,7 +507,7 @@ public final class ClientBootstrap {

@inline(__always)
func setupChannel() -> EventLoopFuture<Channel> {
assert(eventLoop.inEventLoop)
eventLoop.assertInEventLoop()
channelInitializer(channel).then {
channelOptions.applyAll(channel: channel)
}.then {
Expand Down
Loading

0 comments on commit 035962e

Please sign in to comment.