diff --git a/design/mvp/Async.md b/design/mvp/Async.md index f80fb4ca..9fefb898 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -327,7 +327,11 @@ These built-ins can either return immediately if >0 elements were able to be written or read immediately (without blocking) or return a sentinel "blocked" value indicating that the read or write will execute concurrently. The readable and writable ends of streams and futures can then be [waited](#waiting) on to -make progress. +make progress. Notification of progress signals *completion* of a read or write +(i.e., the bytes have already been copied into the buffer). Additionally, +*readiness* (to perform a read or write in the future) can be queried and +signalled by performing a `0`-length read or write (see the [Stream State] +section in the Canonical ABI explainer for details). The `T` element type of streams and futures is optional, such that `future` and `stream` can be written in WIT without a trailing ``. In this case, the diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 9b2b5523..d8909319 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -364,8 +364,8 @@ class BufferGuestImpl(Buffer): length: int def __init__(self, t, cx, ptr, length): - trap_if(length == 0 or length > Buffer.MAX_LENGTH) - if t: + trap_if(length > Buffer.MAX_LENGTH) + if t and length > 0: trap_if(ptr != align_to(ptr, alignment(t))) trap_if(ptr + length * elem_size(t) > len(cx.opts.memory)) self.cx = cx @@ -1104,9 +1104,12 @@ class ReadableStreamGuestImpl(ReadableStream): self.reset_pending() def reset_pending(self): - self.pending_buffer = None - self.pending_on_partial_copy = None - self.pending_on_copy_done = None + self.set_pending(None, None, None) + + def set_pending(self, buffer, on_partial_copy, on_copy_done): + self.pending_buffer = buffer + self.pending_on_partial_copy = on_partial_copy + self.pending_on_copy_done = on_copy_done ``` The `impl` field records the component instance that created this stream and is used by `lower_stream` below. @@ -1168,20 +1171,44 @@ but in the opposite direction. Both are implemented by a single underlying if self.closed_: return 'done' elif not self.pending_buffer: - self.pending_buffer = buffer - self.pending_on_partial_copy = on_partial_copy - self.pending_on_copy_done = on_copy_done + self.set_pending(buffer, on_partial_copy, on_copy_done) return 'blocked' else: - ncopy = min(src.remain(), dst.remain()) - assert(ncopy > 0) - dst.write(src.read(ncopy)) if self.pending_buffer.remain() > 0: - self.pending_on_partial_copy(self.reset_pending) + if buffer.remain() > 0: + dst.write(src.read(min(src.remain(), dst.remain()))) + if self.pending_buffer.remain() > 0: + self.pending_on_partial_copy(self.reset_pending) + else: + self.reset_and_notify_pending() + return 'done' else: - self.reset_and_notify_pending() - return 'done' -``` + if buffer.remain() > 0 or buffer is dst: + self.reset_and_notify_pending() + self.set_pending(buffer, on_partial_copy, on_copy_done) + return 'blocked' + else: + return 'done' +``` +The meaning of a `read` or `write` when the length is `0` is that the caller is +querying the "readiness" of the other side. When a `0`-length read/write +rendezvous with a non-`0`-length read/write, only the `0`-length read/write +completes; the non-`0`-length read/write is kept pending (and ready for a +subsequent rendezvous). + +In the corner case where a `0`-length read *and* write rendezvous, only the +*writer* is notified of readiness. To avoid livelock, the Canonical ABI +requires that a writer *must* (eventually) follow a completed `0`-length write +with a non-`0`-length write that is allowed to block (allowing the reader end +to run and rendezvous with its own non-`0`-length read). To implement a +traditional `O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a +buffering scheme in which, after `select()` (or a similar API) signals a file +descriptor is ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on +that file descriptor copies to an internal buffer and suceeds, issuing an +`async` `stream.write` in the background and waiting for completion before +signalling readiness again. Note that buffering only occurs when streaming +between two components using non-blocking I/O; if either side is the host or a +component using blocking or completion-based I/O, no buffering is necessary. Given the above, we can define the `{Readable,Writable}StreamEnd` classes that are actually stored in the `waitables` table. The classes are almost entirely diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 89a219f4..24744fe7 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer): length: int def __init__(self, t, cx, ptr, length): - trap_if(length == 0 or length > Buffer.MAX_LENGTH) - if t: + trap_if(length > Buffer.MAX_LENGTH) + if t and length > 0: trap_if(ptr != align_to(ptr, alignment(t))) trap_if(ptr + length * elem_size(t) > len(cx.opts.memory)) self.cx = cx @@ -660,9 +660,12 @@ def __init__(self, t, inst): self.reset_pending() def reset_pending(self): - self.pending_buffer = None - self.pending_on_partial_copy = None - self.pending_on_copy_done = None + self.set_pending(None, None, None) + + def set_pending(self, buffer, on_partial_copy, on_copy_done): + self.pending_buffer = buffer + self.pending_on_partial_copy = on_partial_copy + self.pending_on_copy_done = on_copy_done def reset_and_notify_pending(self): pending_on_copy_done = self.pending_on_copy_done @@ -696,19 +699,24 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst): if self.closed_: return 'done' elif not self.pending_buffer: - self.pending_buffer = buffer - self.pending_on_partial_copy = on_partial_copy - self.pending_on_copy_done = on_copy_done + self.set_pending(buffer, on_partial_copy, on_copy_done) return 'blocked' else: - ncopy = min(src.remain(), dst.remain()) - assert(ncopy > 0) - dst.write(src.read(ncopy)) if self.pending_buffer.remain() > 0: - self.pending_on_partial_copy(self.reset_pending) + if buffer.remain() > 0: + dst.write(src.read(min(src.remain(), dst.remain()))) + if self.pending_buffer.remain() > 0: + self.pending_on_partial_copy(self.reset_pending) + else: + self.reset_and_notify_pending() + return 'done' else: - self.reset_and_notify_pending() - return 'done' + if buffer.remain() > 0 or buffer is dst: + self.reset_and_notify_pending() + self.set_pending(buffer, on_partial_copy, on_copy_done) + return 'blocked' + else: + return 'done' class StreamEnd(Waitable): stream: ReadableStream diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index c8340b89..ccc53d0d 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1457,8 +1457,19 @@ async def core_func1(task, args): assert(mem1[retp+0] == wsi) assert(mem1[retp+4] == 4) + [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0) + assert(ret == definitions.BLOCKED) + fut4.set_result(None) + [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) + assert(event == EventCode.STREAM_WRITE) + assert(mem1[retp+0] == wsi) + assert(mem1[retp+4] == 0) + + [ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0) + assert(ret == 0) + [errctxi] = await canon_error_context_new(opts1, task, 0, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi) [] = await canon_waitable_set_drop(task, seti) @@ -1498,6 +1509,9 @@ async def core_func2(task, args): fut2.set_result(None) await task.on_block(fut3) + [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0) + assert(ret == 0) + mem2[0:8] = bytes(8) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) assert(ret == 2) @@ -1508,9 +1522,16 @@ async def core_func2(task, args): await task.on_block(fut4) - [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) + [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0) + assert(ret == definitions.BLOCKED) + + [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) + assert(event == EventCode.STREAM_READ) + assert(mem2[retp+0] == rsi) + p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False) errctxi = 1 - assert(ret == (definitions.CLOSED | errctxi)) + assert(p2 == (definitions.CLOSED | errctxi)) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0)