From c2d12839b4eba513fa7234430619aaac108c4604 Mon Sep 17 00:00:00 2001 From: toyobayashi Date: Thu, 25 Apr 2024 18:28:05 +0800 Subject: [PATCH 1/2] feat: add new option to allow waiting thread start before thread-spawn return --- packages/core/src/emnapi/index.d.ts | 1 + packages/core/src/worker.ts | 51 +++++++++++-- packages/emnapi/src/core/init.ts | 113 ++++++++++++++++++---------- packages/emnapi/src/core/scope.d.ts | 1 + 4 files changed, 120 insertions(+), 46 deletions(-) diff --git a/packages/core/src/emnapi/index.d.ts b/packages/core/src/emnapi/index.d.ts index 135a83b1..e9c5b174 100644 --- a/packages/core/src/emnapi/index.d.ts +++ b/packages/core/src/emnapi/index.d.ts @@ -69,6 +69,7 @@ export declare type BaseCreateOptions = { nodeBinding?: NodeBinding reuseWorker?: boolean asyncWorkPoolSize?: number + waitThreadStart?: boolean onCreateWorker?: (info: CreateWorkerInfo) => any print?: (str: string) => void printErr?: (str: string) => void diff --git a/packages/core/src/worker.ts b/packages/core/src/worker.ts index 2d5098f8..2def93ef 100644 --- a/packages/core/src/worker.ts +++ b/packages/core/src/worker.ts @@ -40,20 +40,27 @@ export class MessageHandler { const onLoad = this.onLoad if (type === 'load') { if (this.instance !== undefined) return - const source = onLoad(payload) + let source: InstantiatedSource | Promise + try { + source = onLoad(payload) + } catch (err) { + onLoaded.call(this, err, null, payload) + return + } const then = source && 'then' in source ? source.then : undefined if (typeof then === 'function') { // eslint-disable-next-line @typescript-eslint/no-floating-promises then.call( source, - (source) => { onLoaded.call(this, source) }, - (err) => { throw err } + (source) => { onLoaded.call(this, null, source, payload) }, + (err) => { onLoaded.call(this, err, null, payload) } ) } else { - onLoaded.call(this, source as InstantiatedSource) + onLoaded.call(this, null, source as InstantiatedSource, payload) } } else if (type === 'start') { handleAfterLoad.call(this, e, () => { + notifyPthreadCreateResult(payload.sab, 1) this.napiModule!.startThread(payload.tid, payload.arg) }) } else if (type === 'async-worker-init') { @@ -77,17 +84,45 @@ function handleAfterLoad (this: MessageHandler, e: any, f: (e: any) => void): vo } } -function onLoaded (this: MessageHandler, source: InstantiatedSource): void { +interface LoadPayload { + wasmModule: WebAssembly.Module + wasmMemory: WebAssembly.Memory + sab?: Int32Array +} + +function notifyPthreadCreateResult (sab: Int32Array | undefined, result: number): void { + if (sab) { + Atomics.store(sab, 0, result) + Atomics.notify(sab, 0) + } +} + +function onLoaded (this: MessageHandler, err: Error | null, source: InstantiatedSource | null, payload: LoadPayload): void { + if (err) { + notifyPthreadCreateResult(payload.sab, 2) + throw err + } + if (source == null) { + notifyPthreadCreateResult(payload.sab, 2) throw new TypeError('onLoad should return an object') } const instance = source.instance const napiModule = source.napiModule - if (!instance) throw new TypeError('onLoad should return an object which includes "instance"') - if (!napiModule) throw new TypeError('onLoad should return an object which includes "napiModule"') - if (!napiModule.childThread) throw new Error('napiModule should be created with `childThread: true`') + if (!instance) { + notifyPthreadCreateResult(payload.sab, 2) + throw new TypeError('onLoad should return an object which includes "instance"') + } + if (!napiModule) { + notifyPthreadCreateResult(payload.sab, 2) + throw new TypeError('onLoad should return an object which includes "napiModule"') + } + if (!napiModule.childThread) { + notifyPthreadCreateResult(payload.sab, 2) + throw new Error('napiModule should be created with `childThread: true`') + } this.instance = instance this.napiModule = napiModule diff --git a/packages/emnapi/src/core/init.ts b/packages/emnapi/src/core/init.ts index 17036b14..ac35ecca 100644 --- a/packages/emnapi/src/core/init.ts +++ b/packages/emnapi/src/core/init.ts @@ -3,6 +3,8 @@ import { makeDynCall, to64 } from 'emscripten:parse-tools' +type SharedInt32Array = Int32Array + export interface InitOptions { instance: WebAssembly.Instance module: WebAssembly.Module @@ -36,6 +38,7 @@ declare const process: any export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string' export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread) export var reuseWorker = Boolean(options.reuseWorker) +export var waitThreadStart = Boolean(options.waitThreadStart) export var wasmInstance: WebAssembly.Instance export var wasmModule: WebAssembly.Module @@ -251,7 +254,32 @@ function terminateWorker (worker: any): void { } } +function cleanThread (worker: any, tid: number, force?: boolean): void { + if (!force && reuseWorker) { + PThread.returnWorkerToPool(worker) + } else { + delete PThread.pthreads[tid] + const index = PThread.runningWorkers.indexOf(worker) + if (index !== -1) { + PThread.runningWorkers.splice(index, 1) + } + terminateWorker(worker) + delete worker.__emnapi_tid + } +} + +function checkSharedWasmMemory (): void { + if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) { + throw new Error( + 'Multithread features require shared wasm memory. ' + + 'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking' + ) + } +} + function spawnThread (startArg: number, errorOrTid: number): number { + checkSharedWasmMemory() + const isNewABI = errorOrTid !== undefined if (!isNewABI) { errorOrTid = _malloc(to64('8')) @@ -284,12 +312,44 @@ function spawnThread (startArg: number, errorOrTid: number): number { return isError ? -result : result } + let sab: Int32Array | undefined + if (waitThreadStart) { + sab = new Int32Array(new SharedArrayBuffer(4)) + Atomics.store(sab, 0, 0) + } + let worker: any + const tid = PThread.nextWorkerID + 43 try { - worker = PThread.getNewWorker() + worker = PThread.getNewWorker(sab) if (!worker) { throw new Error('failed to get new worker') } + + const WASI_THREADS_MAX_TID = 0x1FFFFFFF + PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) + PThread.pthreads[tid] = worker + worker.__emnapi_tid = tid + if (ENVIRONMENT_IS_NODE) { + worker.ref() + } + worker.postMessage({ + __emnapi__: { + type: 'start', + payload: { + tid, + arg: startArg, + sab + } + } + }) + if (waitThreadStart) { + Atomics.wait(sab!, 0, 0) + const r = Atomics.load(sab!, 0) + if (r === 2) { + throw new Error('failed to start pthread') + } + } } catch (e) { const EAGAIN = 6 @@ -305,31 +365,19 @@ function spawnThread (startArg: number, errorOrTid: number): number { return -EAGAIN } - const tid = PThread.nextWorkerID + 43 - Atomics.store(struct, 0, 0) Atomics.store(struct, 1, tid) Atomics.notify(struct, 1) - const WASI_THREADS_MAX_TID = 0x1FFFFFFF - PThread.nextWorkerID = (PThread.nextWorkerID + 1) % (WASI_THREADS_MAX_TID - 42) - PThread.pthreads[tid] = worker - worker.__emnapi_tid = tid PThread.runningWorkers.push(worker) - if (ENVIRONMENT_IS_NODE) { - worker.ref() + if (!waitThreadStart) { + worker.whenLoaded.catch((err: any) => { + delete worker.whenLoaded + cleanThread(worker, tid, true) + throw err + }) } - worker.postMessage({ - __emnapi__: { - type: 'start', - payload: { - tid, - arg: startArg - } - } - }) - if (isNewABI) { return 0 } @@ -376,7 +424,7 @@ export var PThread = { worker.unref() } }, - loadWasmModuleToWorker: (worker: any) => { + loadWasmModuleToWorker: (worker: any, sab?: SharedInt32Array) => { if (worker.whenLoaded) return worker.whenLoaded worker.whenLoaded = new Promise((resolve, reject) => { worker.onmessage = function (e: any) { @@ -395,14 +443,7 @@ export var PThread = { } else if (type === 'spawn-thread') { spawnThread(payload.startArg, payload.errorOrTid) } else if (type === 'cleanup-thread') { - if (reuseWorker) { - PThread.returnWorkerToPool(worker) - } else { - delete PThread.pthreads[payload.tid] - PThread.runningWorkers.splice(PThread.runningWorkers.indexOf(worker), 1) - terminateWorker(worker) - delete worker.__emnapi_tid - } + cleanThread(worker, payload.tid) } } } @@ -437,17 +478,13 @@ export var PThread = { type: 'load', payload: { wasmModule, - wasmMemory + wasmMemory, + sab } } }) } catch (err) { - if (typeof SharedArrayBuffer === 'undefined' || !(wasmMemory.buffer instanceof SharedArrayBuffer)) { - throw new Error( - 'Multithread features require shared wasm memory. ' + - 'Try to compile with `-matomics -mbulk-memory` and use `--import-memory --shared-memory` during linking' - ) - } + checkSharedWasmMemory() throw err } }) @@ -461,16 +498,16 @@ export var PThread = { PThread.unusedWorkers.push(worker) return worker }, - getNewWorker () { + getNewWorker (sab?: SharedInt32Array) { if (reuseWorker) { if (PThread.unusedWorkers.length === 0) { const worker = PThread.allocateUnusedWorker() - PThread.loadWasmModuleToWorker(worker) + PThread.loadWasmModuleToWorker(worker, sab) } return PThread.unusedWorkers.pop() } const worker = PThread.allocateUnusedWorker() - PThread.loadWasmModuleToWorker(worker) + PThread.loadWasmModuleToWorker(worker, sab) return PThread.unusedWorkers.pop() } } diff --git a/packages/emnapi/src/core/scope.d.ts b/packages/emnapi/src/core/scope.d.ts index d823e976..dca5563c 100644 --- a/packages/emnapi/src/core/scope.d.ts +++ b/packages/emnapi/src/core/scope.d.ts @@ -5,6 +5,7 @@ declare interface CreateOptions { childThread?: boolean reuseWorker?: boolean asyncWorkPoolSize?: number + waitThreadStart?: boolean onCreateWorker?: () => any print?: (str: string) => void printErr?: (str: string) => void From aa7977092efe237d620a4b1a7371185673e544c6 Mon Sep 17 00:00:00 2001 From: toyobayashi Date: Thu, 25 Apr 2024 18:32:28 +0800 Subject: [PATCH 2/2] test in node --- packages/test/util.js | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/test/util.js b/packages/test/util.js index 5b6a6591..bdc3ad5c 100644 --- a/packages/test/util.js +++ b/packages/test/util.js @@ -50,6 +50,7 @@ function loadPath (request, options) { : -RUNTIME_UV_THREADPOOL_SIZE, filename: request, reuseWorker: true, + waitThreadStart: true, onCreateWorker () { return new Worker(join(__dirname, './worker.js'), { env: process.env,