Skip to content

Commit

Permalink
fix double remove of handlers (#1091)
Browse files Browse the repository at this point in the history
* fix double remove of handlers

Motivation:

Previously, it was possible that a handler was removed twice: Once by
Channel teardown and another time by a user-trigger removal that wasn't
instantaneous. This racy situation can happen in the real world. NIO
behaved wrongly in two ways:

1. we would call `handlerRemoved` twiced
2. ByteToMessageHandler would fail an assertion about the current
   removal state

Modifications:

- Only call `handlerRemoved` when the handler actually gets removed.
- Fix the assertion.

Result:

fewer bugs

* Update Sources/NIO/ChannelPipeline.swift

Co-Authored-By: Cory Benfield <[email protected]>
  • Loading branch information
weissi and Lukasa committed Jul 31, 2019
1 parent 0225a2b commit b07efec
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
10 changes: 10 additions & 0 deletions Sources/NIO/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,23 @@ public final class ChannelPipeline: ChannelInvoker {

let nextCtx = context.next
let prevCtx = context.prev
var inThePipeline = false

if let prevCtx = prevCtx {
inThePipeline = true
prevCtx.next = nextCtx
}
if let nextCtx = nextCtx {
inThePipeline = true
nextCtx.prev = prevCtx
}

guard inThePipeline else {
// if both next and prev are nil already, then we were previously removed from the pipeline
promise?.succeed(())
return
}

do {
try context.invokeHandlerRemoved()
promise?.succeed(())
Expand Down
28 changes: 24 additions & 4 deletions Sources/NIO/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,19 @@ public final class ByteToMessageHandler<Decoder: ByteToMessageDecoder> {
}

private enum RemovalState {
/// No one tried to remove this handler.
case notBeingRemoved

/// The user-triggered removal has been started but isn't complete yet. This state will not be entered if the
/// removal is triggered by Channel teardown.
case removalStarted

/// The user-triggered removal is complete. This state will not be entered if the removal is triggered by
/// Channel teardown.
case removalCompleted

/// This handler has been removed from the pipeline.
case handlerRemovedCalled
}

private enum State {
Expand Down Expand Up @@ -454,7 +464,8 @@ public final class ByteToMessageHandler<Decoder: ByteToMessageDecoder> {
}

deinit {
assert(self.removalState == .removalCompleted, "illegal state in deinit: removalState = \(self.removalState)")
assert(self.removalState == .handlerRemovedCalled,
"illegal state in deinit: removalState = \(self.removalState)")
assert(self.state.isFinalState, "illegal state in deinit: state = \(self.state)")
}
}
Expand Down Expand Up @@ -589,7 +600,7 @@ extension ByteToMessageHandler: ChannelInboundHandler {
public func handlerRemoved(context: ChannelHandlerContext) {
// very likely, the removal state is `.notBeingRemoved` or `.removalCompleted` here but we can't assert it
// because the pipeline might be torn down during the formal removal process.
self.removalState = .removalCompleted
self.removalState = .handlerRemovedCalled
if !self.state.isFinalState {
self.state = .done
}
Expand Down Expand Up @@ -691,8 +702,17 @@ extension ByteToMessageHandler: RemovableChannelHandler {
context.eventLoop.execute {
self.processLeftovers(context: context)
assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)")
assert(self.removalState == .removalStarted, "illegal removal state: \(self.removalState)")
self.removalState = .removalCompleted
switch self.removalState {
case .removalStarted:
self.removalState = .removalCompleted
case .handlerRemovedCalled:
// if we're here, then the channel has also been torn down between the start and the completion of
// the user-triggered removal. That's okay.
()
default:
assertionFailure("illegal removal state: \(self.removalState)")
}
// this is necessary as it'll complete the promise.
context.leavePipeline(removalToken: removalToken)
}
}
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/CodecTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ extension ByteToMessageDecoderTest {
("testWeAreOkayWithReceivingDataAfterFullClose", testWeAreOkayWithReceivingDataAfterFullClose),
("testPayloadTooLarge", testPayloadTooLarge),
("testPayloadTooLargeButHandlerOk", testPayloadTooLargeButHandlerOk),
("testRemoveHandlerBecauseOfChannelTearDownWhilstUserTriggeredRemovalIsInProgress", testRemoveHandlerBecauseOfChannelTearDownWhilstUserTriggeredRemovalIsInProgress),
]
}
}
Expand Down
60 changes: 60 additions & 0 deletions Tests/NIOTests/CodecTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1452,6 +1452,66 @@ public final class ByteToMessageDecoderTest: XCTestCase {
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
XCTAssertGreaterThan(decoder.decodeCalls, 0)
}

func testRemoveHandlerBecauseOfChannelTearDownWhilstUserTriggeredRemovalIsInProgress() {
class Decoder: ByteToMessageDecoder {
typealias InboundOut = Never

var removedCalls = 0

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
XCTFail("\(#function) should never have been called")
return .needMoreData
}

func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
XCTAssertEqual(0, buffer.readableBytes)
XCTAssertTrue(seenEOF)
return .needMoreData
}

func decoderRemoved(context: ChannelHandlerContext) {
self.removedCalls += 1
XCTAssertEqual(1, self.removedCalls)
}
}

let decoder = Decoder()
let decoderHandler = ByteToMessageHandler(decoder)
let channel = EmbeddedChannel(handler: decoderHandler)

XCTAssertNoThrow(try channel.connect(to: .init(ipAddress: "1.2.3.4", port: 5)).wait())

// We are now trying to get the channel into the following states (ordered by time):
// 1. user-triggered removal is in progress (started but not completed)
// 2. `removeHandlers()` as part of the Channel teardown is called
// 3. user-triggered removal completes
//
// The way we can get into this situation might be slightly counter-intuitive but currently, the easiest way
// to trigger this is:
// 1. `channel.close()` (because `removeHandlers()` is called inside an `eventLoop.execute` so is delayed
// 2. user-triggered removal start (`channel.pipeline.removeHandler`) which will also use an
// `eventLoop.execute` to ask for the handler to actually be removed.
// 3. run the event loop (this will now first call `removeHandlers()` which completes the channel tear down
// and a little later will complete the user-triggered removal.

let closeFuture = channel.close() // close the channel, `removeHandlers` will be called in next EL tick.

// user-trigger the handelr removal (the actual removal will be done on the next EL tick too)
let removalFuture = channel.pipeline.removeHandler(decoderHandler)

// run the event loop, this will make `removeHandlers` run first because it was enqueued before the
// user-triggered handler removal
channel.embeddedEventLoop.run()

// just to make sure everything has completed.
XCTAssertNoThrow(try closeFuture.wait())
XCTAssertNoThrow(try removalFuture.wait())

XCTAssertThrowsError(try channel.finish()) { error in
XCTAssertEqual(ChannelError.alreadyClosed, error as? ChannelError)
}
}
}

public final class MessageToByteEncoderTest: XCTestCase {
Expand Down

0 comments on commit b07efec

Please sign in to comment.