Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow stream.{read,write}s of length 0 to query/signal readiness #444

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ These built-ins can either return immediately if >0 elements were able to be
written or read immediately (without blocking) or return a sentinel "blocked"
value indicating that the read or write will execute concurrently. The readable
and writable ends of streams and futures can then be [waited](#waiting) on to
make progress.
make progress. Notification of progress signals *completion* of a read or write
(i.e., the bytes have already been copied into the buffer). Additionally,
*readiness* (to perform a read or write in the future) can be queried and
signalled by performing a `0`-length read or write (see the [Stream State]
section in the Canonical ABI explainer for details).

The `T` element type of streams and futures is optional, such that `future` and
`stream` can be written in WIT without a trailing `<T>`. In this case, the
Expand Down
57 changes: 42 additions & 15 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ class BufferGuestImpl(Buffer):
length: int

def __init__(self, t, cx, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
if t:
trap_if(length > Buffer.MAX_LENGTH)
if t and length > 0:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
Expand Down Expand Up @@ -1104,9 +1104,12 @@ class ReadableStreamGuestImpl(ReadableStream):
self.reset_pending()

def reset_pending(self):
self.pending_buffer = None
self.pending_on_partial_copy = None
self.pending_on_copy_done = None
self.set_pending(None, None, None)

def set_pending(self, buffer, on_partial_copy, on_copy_done):
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
```
The `impl` field records the component instance that created this stream and is
used by `lower_stream` below.
Expand Down Expand Up @@ -1168,20 +1171,44 @@ but in the opposite direction. Both are implemented by a single underlying
if self.closed_:
return 'done'
elif not self.pending_buffer:
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
self.set_pending(buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
ncopy = min(src.remain(), dst.remain())
assert(ncopy > 0)
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
if buffer.remain() > 0:
dst.write(src.read(min(src.remain(), dst.remain())))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
return 'done'
else:
self.reset_and_notify_pending()
return 'done'
```
if buffer.remain() > 0 or buffer is dst:
self.reset_and_notify_pending()
self.set_pending(buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
return 'done'
```
The meaning of a `read` or `write` when the length is `0` is that the caller is
querying the "readiness" of the other side. When a `0`-length read/write
rendezvous with a non-`0`-length read/write, only the `0`-length read/write
completes; the non-`0`-length read/write is kept pending (and ready for a
subsequent rendezvous).

In the corner case where a `0`-length read *and* write rendezvous, only the
*writer* is notified of readiness. To avoid livelock, the Canonical ABI
requires that a writer *must* (eventually) follow a completed `0`-length write
with a non-`0`-length write that is allowed to block (allowing the reader end
to run and rendezvous with its own non-`0`-length read). To implement a
traditional `O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a
buffering scheme in which, after `select()` (or a similar API) signals a file
descriptor is ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on
that file descriptor copies to an internal buffer and suceeds, issuing an
`async` `stream.write` in the background and waiting for completion before
signalling readiness again. Note that buffering only occurs when streaming
between two components using non-blocking I/O; if either side is the host or a
component using blocking or completion-based I/O, no buffering is necessary.

Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
are actually stored in the `waitables` table. The classes are almost entirely
Expand Down
36 changes: 22 additions & 14 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer):
length: int

def __init__(self, t, cx, ptr, length):
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
if t:
trap_if(length > Buffer.MAX_LENGTH)
if t and length > 0:
trap_if(ptr != align_to(ptr, alignment(t)))
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
self.cx = cx
Expand Down Expand Up @@ -660,9 +660,12 @@ def __init__(self, t, inst):
self.reset_pending()

def reset_pending(self):
self.pending_buffer = None
self.pending_on_partial_copy = None
self.pending_on_copy_done = None
self.set_pending(None, None, None)

def set_pending(self, buffer, on_partial_copy, on_copy_done):
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done

def reset_and_notify_pending(self):
pending_on_copy_done = self.pending_on_copy_done
Expand Down Expand Up @@ -696,19 +699,24 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
if self.closed_:
return 'done'
elif not self.pending_buffer:
self.pending_buffer = buffer
self.pending_on_partial_copy = on_partial_copy
self.pending_on_copy_done = on_copy_done
self.set_pending(buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
ncopy = min(src.remain(), dst.remain())
assert(ncopy > 0)
dst.write(src.read(ncopy))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
if buffer.remain() > 0:
dst.write(src.read(min(src.remain(), dst.remain())))
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
return 'done'
else:
self.reset_and_notify_pending()
return 'done'
if buffer.remain() > 0 or buffer is dst:
self.reset_and_notify_pending()
self.set_pending(buffer, on_partial_copy, on_copy_done)
return 'blocked'
else:
return 'done'

class StreamEnd(Waitable):
stream: ReadableStream
Expand Down
25 changes: 23 additions & 2 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,19 @@ async def core_func1(task, args):
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 4)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
assert(ret == definitions.BLOCKED)

fut4.set_result(None)

[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
assert(event == EventCode.STREAM_WRITE)
assert(mem1[retp+0] == wsi)
assert(mem1[retp+4] == 0)

[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
assert(ret == 0)

[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
[] = await canon_waitable_set_drop(task, seti)
Expand Down Expand Up @@ -1498,6 +1509,9 @@ async def core_func2(task, args):
fut2.set_result(None)
await task.on_block(fut3)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
assert(ret == 0)

mem2[0:8] = bytes(8)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
assert(ret == 2)
Expand All @@ -1508,9 +1522,16 @@ async def core_func2(task, args):

await task.on_block(fut4)

[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
assert(ret == definitions.BLOCKED)

[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
assert(event == EventCode.STREAM_READ)
assert(mem2[retp+0] == rsi)
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
assert(p2 == (definitions.CLOSED | errctxi))

[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)
[] = await canon_waitable_set_drop(task, seti)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
Expand Down