Skip to content

Commit 1d36241

Browse files
committed
feat(graalvm): cleanup webstreams api surface
Signed-off-by: Dario Valdespino <[email protected]>
1 parent bd884e0 commit 1d36241

31 files changed

+443
-911
lines changed

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/JsPromiseImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal class JsPromiseImpl<T> private constructor(token: Token) : CompletableJ
6464
private val onReject: MutableList<(Any?) -> Unit> = mutableListOf()
6565

6666
/** The promise is considered as 'closed' once the [token] is no longer [Pending]. */
67-
override val isClosed: Boolean get() = token.get() != Pending
67+
override val isDone: Boolean get() = token.get() != Pending
6868

6969
@Suppress("UNCHECKED_CAST")
7070
override fun then(onFulfilled: (T) -> Unit, onCatch: ((Any?) -> Unit)?): JsPromise<T> = apply {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package elide.runtime.gvm.internals.intrinsics.js.webstreams
2+
3+
import java.util.concurrent.atomic.AtomicInteger
4+
import elide.runtime.intrinsics.js.ReadableStream
5+
import elide.runtime.intrinsics.js.stream.QueueingStrategy
6+
import elide.runtime.intrinsics.js.stream.ReadableStreamSource
7+
8+
9+
/**
10+
* Base abstract class for the native `ReadableStream` standard implementation.
11+
*
12+
* Unlike in the WHATWG reference, the default and BYOB stream components are separated into specialized classes to
13+
* remove the need for casting and type checks on every operation.
14+
*
15+
* This class defines the shared public API for streams, according to the
16+
* [spec](https://streams.spec.whatwg.org/#rs-class-definition).
17+
*/
18+
public sealed class AbstractReadableStream : ReadableStream {
19+
/** Thread-safe state flag for the stream. */
20+
protected val streamState: AtomicInteger = AtomicInteger(STREAM_READABLE)
21+
22+
/** The source this stream pulls data from. */
23+
protected abstract val source: ReadableStreamSource
24+
25+
/** The queuing strategy used by the stream's controller to pull data from the [source]. */
26+
protected abstract val strategy: QueueingStrategy
27+
28+
public companion object {
29+
// Stream state values
30+
31+
/** The stream is readable and fully functional. */
32+
internal const val STREAM_READABLE = 0
33+
34+
/** The stream has been closed normally without an exception. */
35+
internal const val STREAM_CLOSED = 1
36+
37+
/** The stream has been closed exceptionally. */
38+
internal const val STREAM_ERRORED = 2
39+
40+
// Controller state values
41+
42+
/** The underlying source has not been initialized yet.*/
43+
internal const val CONTROLLER_UNINITIALIZED = 0
44+
45+
/** The controller is ready, the source has completed startup. */
46+
internal const val CONTROLLER_STARTED = 1
47+
48+
/** The controller is pulling a new value from the source. */
49+
internal const val CONTROLLER_PULLING = 2
50+
51+
/** The controller is pulling and will pull again when the current pull completes. */
52+
internal const val CONTROLLER_PULL_AGAIN = 3
53+
54+
/** The controller is being closed, but must finish other operations before finishing. */
55+
internal const val CONTROLLER_CLOSING = 4
56+
57+
/** The controller is closed and can no longer be used. */
58+
internal const val CONTROLLER_CLOSED = 5
59+
60+
public fun from(
61+
source: ReadableStreamSource,
62+
strategy: QueueingStrategy = QueueingStrategy.Default
63+
): ReadableStream = when (source.type) {
64+
ReadableStream.Type.Default -> ReadableDefaultStream(source, strategy)
65+
ReadableStream.Type.BYOB -> TODO("BYOB streams are not yet supported")
66+
}
67+
}
68+
}
69+

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/readable/ReadableDefaultStream.kt renamed to packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/ReadableDefaultStream.kt

Lines changed: 40 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
package elide.runtime.gvm.internals.intrinsics.js.webstreams.readable
1+
package elide.runtime.gvm.internals.intrinsics.js.webstreams
22

33
import com.google.common.util.concurrent.AtomicDouble
44
import java.util.concurrent.ConcurrentLinkedQueue
55
import java.util.concurrent.atomic.AtomicInteger
66
import java.util.concurrent.atomic.AtomicReference
7-
import elide.runtime.gvm.internals.intrinsics.js.webstreams.readable.ReadableDefaultStream.ReadResult
8-
import elide.runtime.intrinsics.js.CompletableJsPromise
9-
import elide.runtime.intrinsics.js.JsPromise
7+
import elide.runtime.intrinsics.js.*
8+
import elide.runtime.intrinsics.js.ReadableStream.ReadResult
109
import elide.runtime.intrinsics.js.err.TypeError
10+
import elide.runtime.intrinsics.js.stream.*
1111
import elide.vm.annotations.Polyglot
1212

1313
/** Shorthand for a completable promise used to define read queues. */
@@ -19,39 +19,34 @@ private typealias DefaultReadRequest = CompletableJsPromise<ReadResult>
1919
*/
2020
internal class ReadableDefaultStream internal constructor(
2121
override val source: ReadableStreamSource,
22-
override val strategy: ReadableStreamQueuingStrategy,
23-
) : ReadableStream() {
24-
/**
25-
* Encapsulates the result of a read; [done] indicates whether the [chunk] is the final value that will be available
26-
* from the source.
27-
*/
28-
data class ReadResult(val chunk: Any?, val done: Boolean)
29-
22+
override val strategy: QueueingStrategy = QueueingStrategy.Default,
23+
) : AbstractReadableStream() {
3024
/** An arbitrary [chunk] value with an attached [size], as calculated by the stream's queueing strategy. */
3125
data class SizedChunk(val chunk: Any?, val size: Double)
3226

3327
/**
3428
* [`ReadableStreamDefaultController`](https://streams.spec.whatwg.org/#rs-default-controller-class) spec
3529
* implementation, including all the exposed fields and methods in the spec.
3630
*/
37-
private inner class ReadableStreamDefaultController : ReadableStreamController {
31+
private inner class ControllerImpl : ReadableStreamDefaultController {
3832
/** Atomic controller state flag. */
3933
private val controllerState = AtomicInteger(CONTROLLER_STARTED)
4034

41-
/** Desired total size for the inbound queue, used for backpressure control. */
42-
val desiredSize: Double?
35+
override val desiredSize: Double?
4336
@Polyglot get() = when (streamState.get()) {
4437
STREAM_CLOSED -> 0.0
4538
STREAM_READABLE -> strategy.highWaterMark() - sourceChunksSize.get()
4639
else -> null
4740
}
4841

49-
init {
50-
// setup the source
42+
fun setup() {
43+
// set up the source
5144
runCatching { source.start(this) }
52-
.onFailure(::error)
45+
.onFailure {
46+
error(it)
47+
}
5348
.onSuccess {
54-
controllerState.set(CONTROLLER_STARTED)
49+
controllerState.compareAndSet(CONTROLLER_UNINITIALIZED, CONTROLLER_STARTED)
5550
if (shouldPull()) pull()
5651
}
5752
}
@@ -75,32 +70,20 @@ internal class ReadableDefaultStream internal constructor(
7570
return source.cancel(reason)
7671
}
7772

78-
/**
79-
* Close the controller and the associated stream. If there are undelivered chunks, the stream will not be closed
80-
* until they are claimed.
81-
*/
82-
@Polyglot fun close() {
73+
@Polyglot override fun close() {
8374
if (controllerState.getAndSet(CONTROLLER_CLOSING) == CONTROLLER_CLOSING) return
8475
// don't close the stream if there are undelivered elements
8576
if (sourceChunks.isEmpty()) this@ReadableDefaultStream.close()
8677
}
8778

88-
/**
89-
* Shut down the controller and associated stream with the given error [reason]. Unlike [close], this method does
90-
* not wait for undelivered elements to be claimed, all unread data will be lost.
91-
*/
92-
@Polyglot fun error(reason: Any? = null) {
79+
@Polyglot override fun error(reason: Any?) {
9380
if (streamState.get() != STREAM_READABLE) return
9481
sourceChunks.clear()
9582
sourceChunksSize.set(0.0)
9683
this@ReadableDefaultStream.error(reason)
9784
}
9885

99-
/**
100-
* Enqueue a new chunk, making it available immediately for readers. If any pending read requests are found, the
101-
* chunk will be delivered directly without using the queue.
102-
*/
103-
@Polyglot fun enqueue(chunk: Any? = null) {
86+
@Polyglot override fun enqueue(chunk: Any?) {
10487
// check the controller isn't closing or closed
10588
if (controllerState.get() == CONTROLLER_CLOSING || streamState.get() != STREAM_READABLE) {
10689
throw TypeError.create("Controller is closing or stream is not readable")
@@ -166,16 +149,14 @@ internal class ReadableDefaultStream internal constructor(
166149
}
167150
}
168151

169-
170152
/**
171153
* [`ReadableStreamDefaultReader`](https://streams.spec.whatwg.org/#default-reader-class) spec
172154
* implementation, including all the exposed fields and methods in the spec.
173155
*/
174-
private inner class ReadableStreamDefaultReader : ReadableStreamReader {
175-
/** A promised that completes when the reader is closed, and rejects when the reader errors. */
176-
@Polyglot val closed: CompletableJsPromise<Unit> = CompletableJsPromise()
156+
private inner class ReadableStreamDefaultReaderImpl : ReadableStreamDefaultReader {
157+
@Polyglot override val closed: CompletableJsPromise<Unit> = JsPromise()
177158

178-
init {
159+
fun setup() {
179160
// early closure
180161
when (streamState.get()) {
181162
STREAM_CLOSED -> closed.resolve(Unit)
@@ -184,16 +165,17 @@ internal class ReadableDefaultStream internal constructor(
184165
}
185166
}
186167

187-
/** Read a chunk from the stream, returning a promise that is fulfilled with the result. */
188-
@Polyglot fun read(): JsPromise<ReadResult> {
189-
if (closed.isDone) throw TypeError.create("Reader has been closed")
168+
@Polyglot override fun read(): JsPromise<ReadResult> {
169+
if (reader.get() !== this) throw TypeError.create("Reader is detached")
190170
return when (streamState.get()) {
191171
STREAM_READABLE -> {
192172
// consume queued chunks if available; if the stream was closed after polling, mark as 'done'
193-
controller.poll()?.let { return JsPromise.resolved(ReadResult(it, streamState.get() != STREAM_READABLE)) }
173+
controller.poll()?.let {
174+
return JsPromise.resolved(ReadResult(it.chunk, streamState.get() != STREAM_READABLE))
175+
}
194176

195177
// enqueue read request and pull
196-
CompletableJsPromise<ReadResult>().also { request ->
178+
JsPromise<ReadResult>().also { request ->
197179
readQueue.add(request)
198180
controller.pullIfNeeded()
199181
}
@@ -204,18 +186,13 @@ internal class ReadableDefaultStream internal constructor(
204186
}
205187
}
206188

207-
/**
208-
* Release this reader's lock on the stream, allowing a new reader to be acquired and invalidating this instance.
209-
* Pending reads will still complete normally.
210-
*/
211-
@Polyglot fun releaseLock() {
189+
@Polyglot override fun releaseLock() {
212190
if (closed.isDone) throw TypeError.create("Reader has already been released")
213191
closed.reject(TypeError.create("Reader lock was released"))
214192
reader.set(null)
215193
}
216194

217-
/** Cancel the stream for this reader. */
218-
@Polyglot fun cancel() {
195+
@Polyglot override fun cancel() {
219196
this@ReadableDefaultStream.cancel()
220197
}
221198
}
@@ -235,17 +212,17 @@ internal class ReadableDefaultStream internal constructor(
235212
*/
236213
private val readQueue: ConcurrentLinkedQueue<DefaultReadRequest> = ConcurrentLinkedQueue()
237214

238-
/** Handle used by the [source] to push new chunks and control the stream. */
239-
private val controller = ReadableStreamDefaultController()
215+
/** Stored cause for the stream's failure. */
216+
private val errorCause = AtomicReference<Any>()
240217

241218
/**
242219
* Handle used by consumers to read chunks; if not `null`, the stream is considered 'locked' and must be released
243220
* before a new reader can be acquired.
244221
*/
245-
private val reader = AtomicReference<ReadableStreamDefaultReader>()
222+
private val reader = AtomicReference<ReadableStreamDefaultReaderImpl>()
246223

247-
/** Stored cause for the stream's failure. */
248-
private val errorCause = AtomicReference<Any>()
224+
/** Handle used by the [source] to push new chunks and control the stream. */
225+
private val controller = ControllerImpl().also { it.setup() }
249226

250227
@get:Polyglot override val locked: Boolean get() = reader.get() != null
251228

@@ -282,7 +259,13 @@ internal class ReadableDefaultStream internal constructor(
282259
}
283260

284261
@Polyglot override fun getReader(options: Any?): ReadableStreamReader {
285-
if (reader.getAndUpdate { it ?: ReadableStreamDefaultReader() } != null) throw TypeError.create("Stream is locked")
262+
if (reader.getAndUpdate { current ->
263+
current ?: ReadableStreamDefaultReaderImpl().also { it.setup() }
264+
} != null) throw TypeError.create("Stream is locked")
286265
return reader.get()
287266
}
267+
268+
override fun pipeThrough(transform: TransformStream, options: Any?): ReadableStream = TODO("Not yet implemented")
269+
override fun pipeTo(destination: WritableStream, options: Any?): JsPromise<Unit> = TODO("Not yet implemented")
270+
override fun tee(): Array<ReadableStream> = TODO("Not yet implemented")
288271
}

packages/graalvm/src/main/kotlin/elide/runtime/gvm/internals/intrinsics/js/webstreams/ReadableStreamIntrinsic.kt

Lines changed: 0 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,64 +12,12 @@
1212
*/
1313
package elide.runtime.gvm.internals.intrinsics.js.webstreams
1414

15-
import java.io.InputStream
16-
import java.io.Reader
17-
import java.nio.ByteBuffer
1815
import elide.runtime.gvm.api.Intrinsic
1916
import elide.runtime.gvm.internals.intrinsics.js.AbstractJsIntrinsic
2017
import elide.runtime.intrinsics.GuestIntrinsic
21-
import elide.runtime.intrinsics.js.ReadableStream
2218

2319
/** Implementation of readable streams (via the Web Streams standard). */
2420
@Intrinsic(global = "ReadableStream") internal class ReadableStreamIntrinsic : AbstractJsIntrinsic() {
25-
/**
26-
* TBD.
27-
*/
28-
internal class ReadableStreamImpl : ReadableStream {
29-
30-
}
31-
32-
/**
33-
* TBD.
34-
*/
35-
internal companion object Factory : ReadableStream.Factory<ReadableStreamImpl> {
36-
/**
37-
* TBD.
38-
*/
39-
override fun empty(): ReadableStreamImpl {
40-
TODO("Not yet implemented")
41-
}
42-
43-
/**
44-
* TBD.
45-
*/
46-
override fun wrap(input: InputStream): ReadableStreamImpl {
47-
TODO("Not yet implemented")
48-
}
49-
50-
/**
51-
* TBD.
52-
*/
53-
override fun wrap(reader: Reader): ReadableStreamImpl {
54-
TODO("Not yet implemented")
55-
}
56-
57-
/**
58-
* TBD.
59-
*/
60-
override fun wrap(bytes: ByteArray): ReadableStreamImpl {
61-
// @TODO
62-
return ReadableStreamImpl()
63-
}
64-
65-
/**
66-
* TBD.
67-
*/
68-
override fun wrap(buffer: ByteBuffer): ReadableStreamImpl {
69-
TODO("Not yet implemented")
70-
}
71-
}
72-
7321
override fun install(bindings: GuestIntrinsic.MutableIntrinsicBindings) {
7422
// not yet implemented
7523
}

0 commit comments

Comments
 (0)