Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new option to allow waiting thread start before thread-spawn return #116

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/emnapi/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export declare type BaseCreateOptions = {
nodeBinding?: NodeBinding
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
onCreateWorker?: (info: CreateWorkerInfo) => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
51 changes: 43 additions & 8 deletions packages/core/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,27 @@ export class MessageHandler {
const onLoad = this.onLoad
if (type === 'load') {
if (this.instance !== undefined) return
const source = onLoad(payload)
let source: InstantiatedSource | Promise<InstantiatedSource>
try {
source = onLoad(payload)
} catch (err) {
onLoaded.call(this, err, null, payload)
return
}
const then = source && 'then' in source ? source.then : undefined
if (typeof then === 'function') {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
then.call(
source,
(source) => { onLoaded.call(this, source) },
(err) => { throw err }
(source) => { onLoaded.call(this, null, source, payload) },
(err) => { onLoaded.call(this, err, null, payload) }
)
} else {
onLoaded.call(this, source as InstantiatedSource)
onLoaded.call(this, null, source as InstantiatedSource, payload)
}
} else if (type === 'start') {
handleAfterLoad.call(this, e, () => {
notifyPthreadCreateResult(payload.sab, 1)
this.napiModule!.startThread(payload.tid, payload.arg)
})
} else if (type === 'async-worker-init') {
Expand All @@ -77,17 +84,45 @@ function handleAfterLoad (this: MessageHandler, e: any, f: (e: any) => void): vo
}
}

function onLoaded (this: MessageHandler, source: InstantiatedSource): void {
interface LoadPayload {
wasmModule: WebAssembly.Module
wasmMemory: WebAssembly.Memory
sab?: Int32Array
}

function notifyPthreadCreateResult (sab: Int32Array | undefined, result: number): void {
if (sab) {
Atomics.store(sab, 0, result)
Atomics.notify(sab, 0)
}
}

function onLoaded (this: MessageHandler, err: Error | null, source: InstantiatedSource | null, payload: LoadPayload): void {
if (err) {
notifyPthreadCreateResult(payload.sab, 2)
throw err
}

if (source == null) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object')
}

const instance = source.instance
const napiModule = source.napiModule

if (!instance) throw new TypeError('onLoad should return an object which includes "instance"')
if (!napiModule) throw new TypeError('onLoad should return an object which includes "napiModule"')
if (!napiModule.childThread) throw new Error('napiModule should be created with `childThread: true`')
if (!instance) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object which includes "instance"')
}
if (!napiModule) {
notifyPthreadCreateResult(payload.sab, 2)
throw new TypeError('onLoad should return an object which includes "napiModule"')
}
if (!napiModule.childThread) {
notifyPthreadCreateResult(payload.sab, 2)
throw new Error('napiModule should be created with `childThread: true`')
}

this.instance = instance
this.napiModule = napiModule
Expand Down
113 changes: 75 additions & 38 deletions packages/emnapi/src/core/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import { makeDynCall, to64 } from 'emscripten:parse-tools'

type SharedInt32Array = Int32Array

export interface InitOptions {
instance: WebAssembly.Instance
module: WebAssembly.Module
Expand Down Expand Up @@ -36,6 +38,7 @@ declare const process: any
export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string'
export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread)
export var reuseWorker = Boolean(options.reuseWorker)
export var waitThreadStart = Boolean(options.waitThreadStart)

export var wasmInstance: WebAssembly.Instance
export var wasmModule: WebAssembly.Module
Expand Down Expand Up @@ -251,7 +254,32 @@ function terminateWorker (worker: any): void {
}
}

function cleanThread (worker: any, tid: number, force?: boolean): void {
if (!force && reuseWorker) {
PThread.returnWorkerToPool(worker)
} else {
delete PThread.pthreads[tid]
const index = PThread.runningWorkers.indexOf(worker)
if (index !== -1) {
PThread.runningWorkers.splice(index, 1)
}
terminateWorker(worker)
delete worker.__emnapi_tid
}
}

function checkSharedWasmMemory (): void {
if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) {
throw new Error(
'Multithread features require shared wasm memory. ' +
'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking'
)
}
}

function spawnThread (startArg: number, errorOrTid: number): number {
checkSharedWasmMemory()

const isNewABI = errorOrTid !== undefined
if (!isNewABI) {
errorOrTid = _malloc(to64('8'))
Expand Down Expand Up @@ -284,12 +312,44 @@ function spawnThread (startArg: number, errorOrTid: number): number {
return isError ? -result : result
}

let sab: Int32Array | undefined
if (waitThreadStart) {
sab = new Int32Array(new SharedArrayBuffer(4))
Atomics.store(sab, 0, 0)
}

let worker: any
const tid = PThread.nextWorkerID + 43
try {
worker = PThread.getNewWorker()
worker = PThread.getNewWorker(sab)
if (!worker) {
throw new Error('failed to get new worker')
}

const WASI_THREADS_MAX_TID = 0x1FFFFFFF
PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42)
PThread.pthreads[tid] = worker
worker.__emnapi_tid = tid
if (ENVIRONMENT_IS_NODE) {
worker.ref()
}
worker.postMessage({
__emnapi__: {
type: 'start',
payload: {
tid,
arg: startArg,
sab
}
}
})
if (waitThreadStart) {
Atomics.wait(sab!, 0, 0)
const r = Atomics.load(sab!, 0)
if (r === 2) {
throw new Error('failed to start pthread')
}
}
} catch (e) {
const EAGAIN = 6

Expand All @@ -305,31 +365,19 @@ function spawnThread (startArg: number, errorOrTid: number): number {
return -EAGAIN
}

const tid = PThread.nextWorkerID + 43

Atomics.store(struct, 0, 0)
Atomics.store(struct, 1, tid)
Atomics.notify(struct, 1)

const WASI_THREADS_MAX_TID = 0x1FFFFFFF
PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42)
PThread.pthreads[tid] = worker
worker.__emnapi_tid = tid
PThread.runningWorkers.push(worker)
if (ENVIRONMENT_IS_NODE) {
worker.ref()
if (!waitThreadStart) {
worker.whenLoaded.catch((err: any) => {
delete worker.whenLoaded
cleanThread(worker, tid, true)
throw err
})
}

worker.postMessage({
__emnapi__: {
type: 'start',
payload: {
tid,
arg: startArg
}
}
})

if (isNewABI) {
return 0
}
Expand Down Expand Up @@ -376,7 +424,7 @@ export var PThread = {
worker.unref()
}
},
loadWasmModuleToWorker: (worker: any) => {
loadWasmModuleToWorker: (worker: any, sab?: SharedInt32Array) => {
if (worker.whenLoaded) return worker.whenLoaded
worker.whenLoaded = new Promise<any>((resolve, reject) => {
worker.onmessage = function (e: any) {
Expand All @@ -395,14 +443,7 @@ export var PThread = {
} else if (type === 'spawn-thread') {
spawnThread(payload.startArg, payload.errorOrTid)
} else if (type === 'cleanup-thread') {
if (reuseWorker) {
PThread.returnWorkerToPool(worker)
} else {
delete PThread.pthreads[payload.tid]
PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1)
terminateWorker(worker)
delete worker.__emnapi_tid
}
cleanThread(worker, payload.tid)
}
}
}
Expand Down Expand Up @@ -437,17 +478,13 @@ export var PThread = {
type: 'load',
payload: {
wasmModule,
wasmMemory
wasmMemory,
sab
}
}
})
} catch (err) {
if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) {
throw new Error(
'Multithread features require shared wasm memory. ' +
'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking'
)
}
checkSharedWasmMemory()
throw err
}
})
Expand All @@ -461,16 +498,16 @@ export var PThread = {
PThread.unusedWorkers.push(worker)
return worker
},
getNewWorker () {
getNewWorker (sab?: SharedInt32Array) {
if (reuseWorker) {
if (PThread.unusedWorkers.length === 0) {
const worker = PThread.allocateUnusedWorker()
PThread.loadWasmModuleToWorker(worker)
PThread.loadWasmModuleToWorker(worker, sab)
}
return PThread.unusedWorkers.pop()
}
const worker = PThread.allocateUnusedWorker()
PThread.loadWasmModuleToWorker(worker)
PThread.loadWasmModuleToWorker(worker, sab)
return PThread.unusedWorkers.pop()
}
}
1 change: 1 addition & 0 deletions packages/emnapi/src/core/scope.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare interface CreateOptions {
childThread?: boolean
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
onCreateWorker?: () => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
1 change: 1 addition & 0 deletions packages/test/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function loadPath (request, options) {
: -RUNTIME_UV_THREADPOOL_SIZE,
filename: request,
reuseWorker: true,
waitThreadStart: true,
onCreateWorker () {
return new Worker(join(__dirname, './worker.js'), {
env: process.env,
Expand Down
Loading