Skip to content

Commit

Permalink
Merge branch 'main' into broken-swift-format-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
FranzBusch authored Oct 29, 2024
2 parents 2c3a30e + 411c2c5 commit 332f95c
Show file tree
Hide file tree
Showing 76 changed files with 400 additions and 287 deletions.
8 changes: 2 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import class Foundation.ProcessInfo

let swiftAtomics: PackageDescription.Target.Dependency = .product(name: "Atomics", package: "swift-atomics")
let swiftCollections: PackageDescription.Target.Dependency = .product(name: "DequeModule", package: "swift-collections")
let swiftSystem: PackageDescription.Target.Dependency = .product(
name: "SystemPackage",
package: "swift-system",
condition: .when(platforms: [.macOS, .iOS, .tvOS, .watchOS, .linux, .android])
)
let swiftSystem: PackageDescription.Target.Dependency = .product(name: "SystemPackage", package: "swift-system")

// These platforms require a depdency on `NIOPosix` from `NIOHTTP1` to maintain backward
// compatibility with previous NIO versions.
Expand Down Expand Up @@ -558,7 +554,7 @@ if Context.environment["SWIFTCI_USE_LOCAL_DEPS"] == nil {
package.dependencies += [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.2"),
.package(url: "https://github.com/apple/swift-system.git", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-system.git", from: "1.4.0"),
]
} else {
package.dependencies += [
Expand Down
5 changes: 5 additions & 0 deletions Sources/NIOCore/AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ extension ChannelPipeline {
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext) async throws {
try await self.removeHandler(context: context).get()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public enum NIOAsyncSequenceProducerBackPressureStrategies {
public struct HighLowWatermark: NIOAsyncSequenceProducerBackPressureStrategy {
private let lowWatermark: Int
private let highWatermark: Int
private var hasOustandingDemand: Bool = true

/// Initializes a new ``NIOAsyncSequenceProducerBackPressureStrategies/HighLowWatermark``.
///
Expand All @@ -36,12 +37,29 @@ public enum NIOAsyncSequenceProducerBackPressureStrategies {

public mutating func didYield(bufferDepth: Int) -> Bool {
// We are demanding more until we reach the high watermark
bufferDepth < self.highWatermark
if bufferDepth < self.highWatermark {
precondition(self.hasOustandingDemand)
return true
} else {
self.hasOustandingDemand = false
return false
}
}

public mutating func didConsume(bufferDepth: Int) -> Bool {
// We start demanding again once we are below the low watermark
bufferDepth < self.lowWatermark
if bufferDepth < self.lowWatermark {
if self.hasOustandingDemand {
// We are below and have outstanding demand
return true
} else {
// We are below but don't have outstanding demand but need more
self.hasOustandingDemand = true
return true
}
} else {
return self.hasOustandingDemand
}
}
}
}
161 changes: 129 additions & 32 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ public final class ChannelPipeline: ChannelInvoker {
/// - handler: the `ChannelHandler` to add
/// - position: The position in the `ChannelPipeline` to add `handler`. Defaults to `.last`.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was added.
@preconcurrency
public func addHandler(
_ handler: ChannelHandler,
_ handler: ChannelHandler & Sendable,
name: String? = nil,
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
Expand Down Expand Up @@ -349,7 +350,8 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> {
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler & Sendable) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(handler, promise: promise)
return promise.futureResult
Expand All @@ -371,6 +373,11 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(context: context, promise: promise)
Expand All @@ -382,14 +389,11 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) {
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler & Sendable, promise: EventLoopPromise<Void>?) {
@Sendable
func removeHandler0() {
switch self.contextSync(handler: handler) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
self.syncOperations.removeHandler(handler, promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -407,13 +411,9 @@ public final class ChannelPipeline: ChannelInvoker {
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
@Sendable
func removeHandler0() {
switch self.contextSync(name: name) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
self.syncOperations.removeHandler(name: name, promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -430,13 +430,22 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - promise: An `EventLoopPromise` that will complete when the `ChannelHandler` is removed.
@available(
*,
deprecated,
message: "Use .syncOperations.removeHandler(context:) instead, this method is not Sendable-safe."
)
public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
guard context.handler is RemovableChannelHandler else {
let sendableView = context.sendableView

guard sendableView.channelHandlerIsRemovable else {
promise?.fail(ChannelError._unremovableHandler)
return
}

@Sendable
func removeHandler0() {
context.startUserTriggeredRemoval(promise: promise)
sendableView.wrappedValue.startUserTriggeredRemoval(promise: promise)
}

if self.eventLoop.inEventLoop {
Expand All @@ -453,7 +462,13 @@ public final class ChannelPipeline: ChannelInvoker {
/// - parameters:
/// - handler: the `ChannelHandler` for which the `ChannelHandlerContext` should be returned
/// - returns: the `EventLoopFuture` which will be notified once the the operation completes.
public func context(handler: ChannelHandler) -> EventLoopFuture<ChannelHandlerContext> {
@available(
*,
deprecated,
message: "This method is not strict concurrency safe. Prefer .syncOperations.context(handler:)"
)
@preconcurrency
public func context(handler: ChannelHandler & Sendable) -> EventLoopFuture<ChannelHandlerContext> {
let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)

if self.eventLoop.inEventLoop {
Expand Down Expand Up @@ -1005,8 +1020,9 @@ extension ChannelPipeline {
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
///
/// - returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
@preconcurrency
public func addHandlers(
_ handlers: [ChannelHandler],
_ handlers: [ChannelHandler & Sendable],
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
let future: EventLoopFuture<Void>
Expand All @@ -1030,8 +1046,9 @@ extension ChannelPipeline {
/// - position: The position in the `ChannelPipeline` to add `handlers`. Defaults to `.last`.
///
/// - returns: A future that will be completed when all of the supplied `ChannelHandler`s were added.
@preconcurrency
public func addHandlers(
_ handlers: ChannelHandler...,
_ handlers: (ChannelHandler & Sendable)...,
position: ChannelPipeline.Position = .last
) -> EventLoopFuture<Void> {
self.addHandlers(handlers, position: position)
Expand Down Expand Up @@ -1149,29 +1166,75 @@ extension ChannelPipeline {
/// - parameters:
/// - handler: the `ChannelHandler` to remove.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
@preconcurrency
public func removeHandler(_ handler: RemovableChannelHandler) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(handler, promise: promise)
return promise.futureResult
}

/// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
///
/// - parameters:
/// - handler: the ``ChannelHandler`` to remove.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(_ handler: RemovableChannelHandler, promise: EventLoopPromise<Void>?) {
switch self._pipeline.contextSync(handler: handler) {
case .success(let context):
self._pipeline.removeHandler(context: context, promise: promise)
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise.fail(error)
promise?.fail(error)
}
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(name: String) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self.removeHandler(name: name, promise: promise)
return promise.futureResult
}

/// Remove a ``ChannelHandler`` from the ``ChannelPipeline``.
///
/// - parameters:
/// - name: the name that was used to add the `ChannelHandler` to the `ChannelPipeline` before.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(name: String, promise: EventLoopPromise<Void>?) {
switch self._pipeline.contextSync(name: name) {
case .success(let context):
self.removeHandler(context: context, promise: promise)
case .failure(let error):
promise?.fail(error)
}
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - returns: the `EventLoopFuture` which will be notified once the `ChannelHandler` was removed.
public func removeHandler(context: ChannelHandlerContext) -> EventLoopFuture<Void> {
let promise = self.eventLoop.makePromise(of: Void.self)
self._pipeline.removeHandler(context: context, promise: promise)
self.removeHandler(context: context, promise: promise)
return promise.futureResult
}

/// Remove a `ChannelHandler` from the `ChannelPipeline`.
///
/// - parameters:
/// - context: the `ChannelHandlerContext` that belongs to `ChannelHandler` that should be removed.
/// - promise: an ``EventLoopPromise`` to notify when the ``ChannelHandler`` was removed.
public func removeHandler(context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
if context.handler is RemovableChannelHandler {
context.startUserTriggeredRemoval(promise: promise)
} else {
promise?.fail(ChannelError.unremovableHandler)
}
}

/// Returns the `ChannelHandlerContext` for the given handler instance if it is in
/// the `ChannelPipeline`, if it exists.
///
Expand Down Expand Up @@ -1367,26 +1430,24 @@ extension ChannelPipeline.SynchronousOperations: Sendable {}

extension ChannelPipeline {
/// A `Position` within the `ChannelPipeline` used to insert handlers into the `ChannelPipeline`.
public enum Position {
@preconcurrency
public enum Position: Sendable {
/// The first `ChannelHandler` -- the front of the `ChannelPipeline`.
case first

/// The last `ChannelHandler` -- the back of the `ChannelPipeline`.
case last

/// Before the given `ChannelHandler`.
case before(ChannelHandler)
case before(ChannelHandler & Sendable)

/// After the given `ChannelHandler`.
case after(ChannelHandler)
case after(ChannelHandler & Sendable)
}
}

@available(*, unavailable)
extension ChannelPipeline.Position: Sendable {}

/// Special `ChannelHandler` that forwards all events to the `Channel.Unsafe` implementation.
final class HeadChannelHandler: _ChannelOutboundHandler {
final class HeadChannelHandler: _ChannelOutboundHandler, Sendable {

static let name = "head"
static let sharedInstance = HeadChannelHandler()
Expand Down Expand Up @@ -1442,7 +1503,7 @@ extension CloseMode {
}

/// Special `ChannelInboundHandler` which will consume all inbound events.
final class TailChannelHandler: _ChannelInboundHandler {
final class TailChannelHandler: _ChannelInboundHandler, Sendable {

static let name = "tail"
static let sharedInstance = TailChannelHandler()
Expand Down Expand Up @@ -1977,6 +2038,42 @@ extension ChannelHandlerContext {
}
}

extension ChannelHandlerContext {
var sendableView: SendableView {
SendableView(wrapping: self)
}

/// A wrapper over ``ChannelHandlerContext`` that allows access to the thread-safe API
/// surface on the type.
///
/// Very little of ``ChannelHandlerContext`` is thread-safe, but in a rare few places
/// there are things we can access. This type makes those available.
struct SendableView: @unchecked Sendable {
private let context: ChannelHandlerContext

fileprivate init(wrapping context: ChannelHandlerContext) {
self.context = context
}

/// Whether the ``ChannelHandler`` associated with this context conforms to
/// ``RemovableChannelHandler``.
var channelHandlerIsRemovable: Bool {
// `context.handler` is not mutable, and set at construction, so this access is
// acceptable. The protocol conformance check is also safe.
self.context.handler is RemovableChannelHandler
}

/// Grabs the underlying ``ChannelHandlerContext``. May only be called on the
/// event loop.
var wrappedValue: ChannelHandlerContext {
// The event loop lookup here is also thread-safe, so we can grab the value out
// and use it.
self.context.eventLoop.preconditionInEventLoop()
return self.context
}
}
}

extension ChannelPipeline: CustomDebugStringConvertible {
public var debugDescription: String {
// This method forms output in the following format:
Expand Down
Loading

0 comments on commit 332f95c

Please sign in to comment.