Skip to content

Commit 07022a5

Browse files
authored
Proxy WebSocket communication through the main thread (#576)
* Add Worker proxy support for webR channels * Add `async: false` option for webR Worker proxy * Update NEWS.md * Fix proxy worker under Node * Add tests for proxy worker * Don't add main thread proxies in PostMessage mode
1 parent 2c1a8be commit 07022a5

File tree

14 files changed

+385
-23
lines changed

14 files changed

+385
-23
lines changed

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
* Added CSS media query for system dark mode (#572).
88

9+
* Proxy Web Worker communication through the main thread when using the `SharedArrayBuffer` communication channel (#576).
10+
911
# webR 0.5.5
1012

1113
## New features

src/.eslintrc.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ module.exports = {
2424
"esbuild.js",
2525
"jest.config.js",
2626
"tests/webr.config.js",
27+
"tests/scripts/proxy-worker.worker.js",
2728
"tests/packages.config.js"
2829
],
2930
plugins: ["@typescript-eslint", "jest", "jsdoc", "react"],
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
const { parentPort } = require('node:worker_threads');
2+
3+
parentPort.onmessage = () => {
4+
parentPort.postMessage("pong");
5+
};
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { WebR } from '../../../webR/webr-main';
2+
import { ChannelType } from '../../../webR/chan/channel-common';
3+
4+
const webR = new WebR({
5+
channelType: ChannelType.SharedArrayBuffer,
6+
baseUrl: '../dist/',
7+
RArgs: ['--quiet'],
8+
});
9+
10+
describe('Communicate over Worker proxy', () => {
11+
test('Initialises successfully', async () => {
12+
await expect(webR.init()).resolves.not.toThrow();
13+
});
14+
15+
test('Use Worker from webR worker thread', async () => {
16+
await webR.init();
17+
const res = (await webR.evalRString(`
18+
webr::eval_js('
19+
var worker = new Worker("./tests/scripts/proxy-worker.worker.js");
20+
worker.onmessage = (ev) => globalThis.gotReply = ev.data;
21+
worker.postMessage("ping");
22+
globalThis.gotReply = null;
23+
')
24+
25+
while (is.na(webr::eval_js("globalThis.gotReply"))) {
26+
Sys.sleep(1)
27+
}
28+
29+
webr::eval_js("worker.terminate()")
30+
webr::eval_js("globalThis.gotReply")
31+
`));
32+
expect(res).toEqual("pong");
33+
});
34+
});
35+
36+
afterAll(() => {
37+
return webR.close();
38+
});

src/webR/chan/channel-postmessage.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ export class PostMessageChannelWorker {
142142
#parked = new Map<string, ResolveFn<Message>>();
143143
#dispatch: (msg: Message) => void = () => 0;
144144
#promptDepth = 0;
145+
146+
// Main thread proxies only work with SharedBufferChannelWorker for now
147+
WebSocketProxy = IN_NODE ? undefined : WebSocket;
148+
WorkerProxy = IN_NODE ? undefined : Worker;
145149

146150
constructor() {
147151
this.#ep = (IN_NODE ? require('worker_threads').parentPort : globalThis) as Endpoint;

src/webR/chan/channel-shared.ts

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { promiseHandles, newCrossOriginWorker, isCrossOrigin } from '../utils';
2-
import { EventMessage, Message, Response, SyncRequest, WebSocketCloseMessage, WebSocketMessage, WebSocketOpenMessage } from './message';
2+
import { EventMessage, Message, PostMessageWorkerMessage, Response, SyncRequest, WebSocketCloseMessage, WebSocketMessage, WebSocketOpenMessage, WorkerErrorMessage, WorkerMessage, WorkerMessageErrorMessage } from './message';
33
import { Endpoint } from './task-common';
44
import { syncResponse } from './task-main';
55
import { ChannelMain, ChannelWorker } from './channel';
@@ -145,6 +145,25 @@ export class SharedBufferChannelMain extends ChannelMain {
145145
await syncResponse(worker, reqData, { type: 'eval-response', data });
146146
break;
147147
}
148+
case 'post-message-worker': {
149+
const message = payload.data as PostMessageWorkerMessage['data'];
150+
message.handles = promiseHandles();
151+
this.systemQueue.put({ type: 'postMessageWorker', data: message });
152+
153+
if (message.async) {
154+
await syncResponse(worker, reqData, { type: 'post-message-response' });
155+
} else {
156+
message.handles.promise.then(
157+
(value) => {
158+
void syncResponse(worker, reqData, { type: 'post-message-response', data: { result: value } });
159+
},
160+
(error) => {
161+
void syncResponse(worker, reqData, { type: 'post-message-response', data: { error: String(error) } });
162+
}
163+
);
164+
}
165+
break;
166+
}
148167
default:
149168
throw new WebRChannelError(`Unsupported request type '${payload.type}'.`);
150169
}
@@ -162,11 +181,14 @@ export class SharedBufferChannelMain extends ChannelMain {
162181

163182
import { setEventBuffer, setEventsHandler, SyncTask } from './task-worker';
164183
import { Module } from '../emscripten';
165-
import { WebSocketProxy, WebSocketProxyFactory } from './websocket';
184+
import { WebSocketProxy, WebSocketProxyFactory } from './proxy-websocket';
185+
import { WorkerProxy, WorkerProxyFactory } from './proxy-worker';
166186

167187
export class SharedBufferChannelWorker implements ChannelWorker {
168188
WebSocketProxy: typeof WebSocket;
169-
proxies: Map<string, WebSocketProxy>;
189+
WorkerProxy: typeof Worker;
190+
ws: Map<string, WebSocketProxy>;
191+
workers: Map<string, WorkerProxy>;
170192
#ep: Endpoint;
171193
#dispatch: (msg: Message) => void = () => 0;
172194
#eventBuffer = new Int32Array(new SharedArrayBuffer(4));
@@ -178,23 +200,26 @@ export class SharedBufferChannelWorker implements ChannelWorker {
178200
setEventBuffer(this.#eventBuffer.buffer);
179201
setEventsHandler(() => this.handleEvents());
180202

203+
// Event functionality to be handled via proxy to main thread
181204
this.WebSocketProxy = WebSocketProxyFactory.proxy(this);
182-
this.proxies = new Map();
205+
this.ws = new Map();
206+
this.WorkerProxy = WorkerProxyFactory.proxy(this);
207+
this.workers = new Map();
183208
}
184209

185210
resolve() {
186211
this.write({ type: 'resolve', data: this.#eventBuffer.buffer });
187212
}
188213

189-
write(msg: Message, transfer?: [Transferable]) {
214+
write(msg: Message, transfer?: Transferable[]) {
190215
this.#ep.postMessage(msg, transfer);
191216
}
192217

193-
writeSystem(msg: Message, transfer?: [Transferable]) {
218+
writeSystem(msg: Message, transfer?: Transferable[]) {
194219
this.#ep.postMessage({ type: 'system', data: msg }, transfer);
195220
}
196221

197-
syncRequest(msg: Message, transfer?: [Transferable]): Message {
222+
syncRequest(msg: Message, transfer?: Transferable[]): Message {
198223
const task = new SyncTask(this.#ep, msg, transfer);
199224
return task.syncify() as Message;
200225
}
@@ -240,22 +265,37 @@ export class SharedBufferChannelWorker implements ChannelWorker {
240265
break;
241266
case 'websocket-open': {
242267
const message = response.data.msg as WebSocketOpenMessage;
243-
this.proxies.get(message.data.uuid)?._accept();
268+
this.ws.get(message.data.uuid)?._accept();
244269
break;
245270
}
246271
case 'websocket-message': {
247272
const message = response.data.msg as WebSocketMessage;
248-
this.proxies.get(message.data.uuid)?._recieve(message.data.data);
273+
this.ws.get(message.data.uuid)?._recieve(message.data.data);
249274
break;
250275
}
251276
case 'websocket-close': {
252277
const message = response.data.msg as WebSocketCloseMessage;
253-
this.proxies.get(message.data.uuid)?._close(message.data.code, message.data.reason);
278+
this.ws.get(message.data.uuid)?._close(message.data.code, message.data.reason);
254279
break;
255280
}
256281
case 'websocket-error': {
257282
const message = response.data.msg as WebSocketMessage;
258-
this.proxies.get(message.data.uuid)?._error();
283+
this.ws.get(message.data.uuid)?._error();
284+
break;
285+
}
286+
case 'worker-message': {
287+
const message = response.data.msg as WorkerMessage;
288+
this.workers.get(message.data.uuid)?._message(message.data.data);
289+
break;
290+
}
291+
case 'worker-messageerror': {
292+
const message = response.data.msg as WorkerMessageErrorMessage;
293+
this.workers.get(message.data.uuid)?._messageerror(message.data.data);
294+
break;
295+
}
296+
case 'worker-error': {
297+
const message = response.data.msg as WorkerErrorMessage;
298+
this.workers.get(message.data.uuid)?._error();
259299
break;
260300
}
261301
default:

src/webR/chan/channel.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ export abstract class ChannelMain {
101101
}
102102

103103
export interface ChannelWorker {
104+
WebSocketProxy: typeof WebSocket | undefined;
105+
WorkerProxy: typeof Worker | undefined;
104106
resolve(): void;
105107
write(msg: Message, transfer?: [Transferable]): void;
106108
writeSystem(msg: Message, transfer?: [Transferable]): void;
@@ -112,7 +114,6 @@ export interface ChannelWorker {
112114
inputOrDispatch: () => number;
113115
setDispatchHandler: (dispatch: (msg: Message) => void) => void;
114116
resolveRequest: (msg: Message) => void;
115-
WebSocketProxy?: typeof WebSocket;
116117
}
117118

118119
/**

src/webR/chan/message.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* WebR communication channel messaging and request types.
33
* @module Message
44
*/
5+
import { PromiseHandles } from '../utils';
56
import { generateUUID, transfer, UUID } from './task-common';
67

78
/** A webR communication channel message. */
@@ -153,6 +154,74 @@ export interface WebSocketCloseMessage {
153154
};
154155
}
155156

157+
/** A webR communication channel `proxyWorker` message.
158+
* @internal
159+
*/
160+
export interface ProxyWorkerMessage {
161+
type: 'proxyWorker';
162+
data: {
163+
uuid: string;
164+
url: string;
165+
options?: WorkerOptions;
166+
};
167+
}
168+
169+
/** A webR communication channel `postMessageWorker` message.
170+
* @internal
171+
*/
172+
export interface PostMessageWorkerMessage {
173+
type: 'postMessageWorker';
174+
data: {
175+
uuid: string;
176+
data: unknown;
177+
async: boolean;
178+
transfer?: Transferable[];
179+
handles?: PromiseHandles<unknown>;
180+
};
181+
}
182+
183+
/** A webR communication channel `terminateWorker` message.
184+
* @internal
185+
*/
186+
export interface TerminateWorkerMessage {
187+
type: 'terminateWorker';
188+
data: {
189+
uuid: string;
190+
};
191+
}
192+
193+
/** A webR communication channel `worker-message` message.
194+
* @internal
195+
*/
196+
export interface WorkerMessage {
197+
type: 'worker-message';
198+
data: {
199+
uuid: string;
200+
data: any;
201+
};
202+
}
203+
204+
/** A webR communication channel `worker-messageerror` message.
205+
* @internal
206+
*/
207+
export interface WorkerMessageErrorMessage {
208+
type: 'worker-messageerror';
209+
data: {
210+
uuid: string;
211+
data: any;
212+
};
213+
}
214+
215+
/** A webR communication channel `worker-error` message.
216+
* @internal
217+
*/
218+
export interface WorkerErrorMessage {
219+
type: 'worker-error';
220+
data: {
221+
uuid: string;
222+
};
223+
}
224+
156225
/** A webR communication channel sync-request.
157226
* @internal
158227
*/

src/webR/chan/websocket.ts renamed to src/webR/chan/proxy-websocket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export class WebSocketProxyFactory {
9797
type: 'proxyWebSocket',
9898
data: { uuid: this.uuid, url: this.url, protocol: this.protocol }
9999
});
100-
chan.proxies.set(this.uuid, this);
100+
chan.ws.set(this.uuid, this);
101101
}
102102

103103
send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void {
@@ -129,7 +129,7 @@ export class WebSocketProxyFactory {
129129
const ev = new CloseEvent('close', { code, reason });
130130
this.dispatchEvent(ev);
131131
this.onclose?.(ev);
132-
chan.proxies.delete(this.uuid);
132+
chan.ws.delete(this.uuid);
133133
}
134134

135135
_error(): void {

0 commit comments

Comments
 (0)