Skip to content

Commit

Permalink
Merge pull request #19 from d-exclaimation/pubsub-and-fixes
Browse files Browse the repository at this point in the history
Non-breaking bug fixes and PubSub data structure
  • Loading branch information
d-exclaimation authored Dec 18, 2021
2 parents 7d565ab + 9ddd42d commit f802681
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 131 deletions.
5 changes: 5 additions & 0 deletions Sources/Pioneer/GraphQL/BuiltinTypes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ public extension String {
var id: ID {
.init(self)
}

/// ID from this string
func toID() -> ID {
.init(self)
}
}
102 changes: 102 additions & 0 deletions Sources/Pioneer/Streaming/AsyncPubSub.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//
// AsyncPubSub.swift
// Pioneer
//
// Created by d-exclaimation.
//

import Foundation

public struct AsyncPubSub: Sendable {
public typealias Consumer = AsyncStream<Any>.Continuation

public actor Engine {
private var emitters: [String: Emitter] = [:]

internal func subscribe(for key: String) async -> Emitter {
let emitter = emitters.getOrElse(key) {
.init()
}
emitters.update(key, with: emitter)
return emitter
}

internal func asyncStream(for key: String) async -> AsyncStream<Any> {
let emitter = await subscribe(for: key)
let id = UUID().uuidString.lowercased()
return AsyncStream<Any> { con in
con.onTermination = { @Sendable _ in
Task {
await emitter.unsubscribe(id)
}
}

Task {
await emitter.subscribe(id, with: con)
}
}

}

internal func publish(for key: String, _ value: Any) async {
await emitters[key]?.publish(value)
}

internal func close(for key: String) async {
await emitters[key]?.close()
emitters.delete(key)
}

}

public actor Emitter {
private var consumers: [String: Consumer] = [:]

internal func subscribe(_ key: String, with consumer: Consumer) {
consumers.update(key, with: consumer)
}

internal func unsubscribe(_ key: String) {
consumers.delete(key)
}

internal func publish(_ value: Any) {
consumers.values.forEach { consumer in
consumer.yield(value)
}
}

internal func close() {
consumers.values.forEach { consumer in
consumer.finish()
}
consumers.removeAll()
}
}

private let engine: Engine = .init()

public func asyncStream<DataType>(_ type: DataType.Type = DataType.self, for trigger: String) async -> AsyncStream<DataType> {
let pipe = await engine.asyncStream(for: trigger)
return AsyncStream<DataType> { con in
let task = Task {
for await untyped in pipe {
guard let typed = untyped as? DataType else { continue }
con.yield(typed)
}
con.finish()
}
con.onTermination = { @Sendable _ in
task.cancel()
}
}
}

public func publish(for trigger: String, payload: Any) async {
await engine.publish(for: trigger, payload)
}

public func close(for trigger: String) async {
await engine.close(for: trigger)
}
}
182 changes: 108 additions & 74 deletions Sources/Pioneer/WebSocket/Probe/Drone/Drone.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,99 +33,125 @@ extension Pioneer {
self.process = process
}


enum Act {
case start(oid: String, gql: GraphQLRequest)
case stop(oid: String)
case ended(oid: String)
case output(oid: String, GraphQLMessage)
case acid
}

// Mark: -- States --
// MARK: - Private mutable states
var status: Signal = .running
var tasks: [String: Deferred<Void>] = [:]

func onMessage(msg: Act) async -> Signal {
switch msg {
// Start subscriptions, setup pipe pattern, and callbacks
case .start(oid: let oid, gql: let gql):
// Guards for getting all the required subscriptions stream
guard let subscriptionResult = await subscription(gql: gql) else {
process.send(GraphQLMessage.errors(id: oid, type: proto.next, [.init(message: "Internal server error")]).jsonString)
break
}
guard let subscription = subscriptionResult.stream else {
let res = GraphQL.GraphQLResult(errors: subscriptionResult.errors)
process.send(GraphQLMessage.from(type: proto.next, id: oid, res).jsonString)
process.send(GraphQLMessage(id: oid, type: proto.complete).jsonString)
break
}
guard let asyncStream = subscription.asyncStream() else {
let res = GraphQL.GraphQLResult(errors: [.init(message: "Internal server error, failed to fetch AsyncStream")])
process.send(GraphQLMessage.from(type: proto.next, id: oid, res).jsonString)
process.send(GraphQLMessage(id: oid, type: proto.complete).jsonString)
break
}

let next = proto.next
await onStart(for: oid, given: gql)

// Transform nozzle into flow and Pipe all messages into the Actor itself
let task = asyncStream.pipeBack(to: oneself,
onComplete: {
.ended(oid: oid)
},
onFailure: { _ in
.ended(oid: oid)
},
transform: { res in
.output(oid: oid, GraphQLMessage.from(type: next, id: oid, res))
}
)

tasks.update(oid, with: task)

// Stop subscription, shutdown nozzle and remove it so preventing overflow of any messages
case .stop(oid: let oid):
guard let task = tasks[oid] else { break }
onStop(for: oid)

tasks.delete(oid)
task.cancel()

// Send an ending message
// but prevent completion message if nozzle doesn't exist
// e.g: - Shutdown-ed operation
case .ended(oid: let oid):
guard tasks.has(oid) else { break }
let message = GraphQLMessage(id: oid, type: proto.complete)
process.send(message.jsonString)
onEnd(for: oid)

// Push message to websocket connection
// but prevent completion message if nozzle doesn't exist
// e.g: - Shutdown-ed operation
case .output(oid: let oid, let message):
guard tasks.has(oid) else { break }
process.send(message.jsonString)

// Kill actor
onOutput(for: oid, given: message)

case .acid:
tasks.values.forEach { $0.cancel() }
tasks.removeAll()
onTerminate()
return .stopped
}
return .running
}

// MARK: - Event callbacks

/// Start subscriptions, setup pipe pattern, and callbacks
private func onStart(for oid: String, given gql: GraphQLRequest) async {
let nextTypename = proto.next
let subscriptionResult = await subscription(gql: gql)

// Guard for getting the required subscriptions stream, if not send `next` with errors, and end subscription
guard let subscription = subscriptionResult.stream else {
let res = GraphQL.GraphQLResult(errors: subscriptionResult.errors)
process.send(GraphQLMessage.from(type: proto.next, id: oid, res).jsonString)
process.send(GraphQLMessage(id: oid, type: proto.complete).jsonString)
return
}

// Guard for getting the async stream, if not sent `next` saying failure in convertion, and end subscription
guard let asyncStream = subscription.asyncStream() else {
let res = GraphQL.GraphQLResult(errors: [
.init(message: "Internal server error, failed to fetch AsyncStream")
])
process.send(GraphQLMessage.from(type: proto.next, id: oid, res).jsonString)
process.send(GraphQLMessage(id: oid, type: proto.complete).jsonString)
return
}

private func subscription(gql: GraphQLRequest) async -> SubscriptionResult? {
try? await subscribeGraphQL(
schema: schema,
request: gql.query,
resolver: resolver,
context: process.ctx,
eventLoopGroup: process.req.eventLoop,
variables: gql.variables,
operationName: gql.operationName

// Transform async stream into messages and pipe back all messages into the Actor itself
let task = asyncStream.pipeBack(to: oneself,
onComplete: {
.ended(oid: oid)
},
onFailure: { _ in
.ended(oid: oid)
},
transform: { res in
.output(oid: oid, GraphQLMessage.from(type: nextTypename, id: oid, res))
}
)

tasks.update(oid, with: task)
}

/// Stop subscription, shutdown nozzle and remove it so preventing overflow of any messages
private func onStop(for oid: String) {
guard let task = tasks[oid] else { return }

tasks.delete(oid)
task.cancel()
}

/// Send an ending message
/// but prevent completion message if nozzle doesn't exist
/// e.g: - Shutdown-ed operation
private func onEnd(for oid: String) {
guard tasks.has(oid) else { return }
tasks.delete(oid)
let message = GraphQLMessage(id: oid, type: proto.complete)
process.send(message.jsonString)
}

/// Push message to websocket connection
/// but prevent completion message if nozzle doesn't exist
/// e.g: - Shutdown-ed operation
private func onOutput(for oid: String, given msg: GraphQLMessage) {
guard tasks.has(oid) else { return }
process.send(msg.jsonString)
}

/// Kill actor by cancelling and deallocating all stored task
private func onTerminate() {
tasks.values.forEach { $0.cancel() }
tasks.removeAll()
}

// MARK: - Utility methods

/// Execute subscription from GraphQL Resolver and Schema, await the future value and catch error into a SubscriptionResult
private func subscription(gql: GraphQLRequest) async -> SubscriptionResult {
do {
return try await subscribeGraphQL(
schema: schema,
request: gql.query,
resolver: resolver,
context: process.ctx,
eventLoopGroup: process.req.eventLoop,
variables: gql.variables,
operationName: gql.operationName
)
} catch {
return .init(
stream: nil,
errors: [.init(error)]
)
}
}

deinit {
Expand All @@ -135,5 +161,13 @@ extension Pioneer {
task.cancel()
}
}

enum Act {
case start(oid: String, gql: GraphQLRequest)
case stop(oid: String)
case ended(oid: String)
case output(oid: String, GraphQLMessage)
case acid
}
}
}
Loading

0 comments on commit f802681

Please sign in to comment.