Skip to content

Commit d02e83b

Browse files
committed
Allow stream.{read,write}s of length 0 to query/signal readiness
1 parent d43430d commit d02e83b

File tree

4 files changed

+79
-20
lines changed

4 files changed

+79
-20
lines changed

design/mvp/Async.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,11 @@ These built-ins can either return immediately if >0 elements were able to be
325325
written or read immediately (without blocking) or return a sentinel "blocked"
326326
value indicating that the read or write will execute concurrently. The readable
327327
and writable ends of streams and futures can then be [waited](#waiting) on to
328-
make progress.
328+
make progress. Notification of progress signals *completion* of a read or write
329+
(i.e., the bytes have already been copied into the buffer). Additionally,
330+
*readiness* (to perform a read or write in the future) can be queried and
331+
signalled by performing a `0`-length read or write (see the [Stream State]
332+
section in the Canonical ABI explainer for details).
329333

330334
The `T` element type of streams and futures is optional, such that `future` and
331335
`stream` can be written in WIT without a trailing `<T>`. In this case, the

design/mvp/CanonicalABI.md

+30-9
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,8 @@ class BufferGuestImpl(Buffer):
363363
length: int
364364

365365
def __init__(self, t, cx, ptr, length):
366-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
367-
if t:
366+
trap_if(length > Buffer.MAX_LENGTH)
367+
if t and length > 0:
368368
trap_if(ptr != align_to(ptr, alignment(t)))
369369
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
370370
self.cx = cx
@@ -1178,14 +1178,35 @@ but in the opposite direction. Both are implemented by a single underlying
11781178
return 'blocked'
11791179
else:
11801180
ncopy = min(src.remain(), dst.remain())
1181-
assert(ncopy > 0)
1182-
dst.write(src.read(ncopy))
1183-
if self.pending_buffer.remain() > 0:
1184-
self.pending_on_partial_copy(self.reset_pending)
1181+
if ncopy > 0:
1182+
dst.write(src.read(ncopy))
1183+
if self.pending_buffer.remain() > 0:
1184+
self.pending_on_partial_copy(self.reset_pending)
1185+
else:
1186+
self.reset_and_notify_pending()
1187+
return 'done'
11851188
else:
1186-
self.reset_and_notify_pending()
1187-
return 'done'
1188-
```
1189+
if self.pending_buffer.remain() == 0:
1190+
self.reset_and_notify_pending()
1191+
if buffer.remain() == 0:
1192+
return 'done'
1193+
else:
1194+
self.pending_buffer = buffer
1195+
self.pending_on_partial_copy = on_partial_copy
1196+
self.pending_on_copy_done = on_copy_done
1197+
return 'blocked'
1198+
```
1199+
The meaning of a `read` or `write` when the length is `0` is that the caller is
1200+
signalling their "readiness" and wants to know when the other side is "ready".
1201+
When a non-`0`-length `read` or `write` rendezvous with a `0`-length `write` or
1202+
`read`, only the `0`-length operation completes, keeping the non-`0`-length
1203+
pending and immediately ready for a future rendezvous. In a rendezvous where
1204+
*both* the `read` and `write` are `0`-length, both operations complete. Thus,
1205+
"readiness" does not guarantee "the next operation is non-blocking" since after
1206+
both sides learn of readiness, one side must subsequently block with a pending
1207+
non-`0`-length operation for the other side to rendezvous with. Consequently,
1208+
components should always follow a successful `0`-length `read` or `write` with
1209+
a non-`0`-length `read` or `write`.
11891210

11901211
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
11911212
are actually stored in the `waitables` table. The classes are almost entirely

design/mvp/canonical-abi/definitions.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer):
311311
length: int
312312

313313
def __init__(self, t, cx, ptr, length):
314-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
315-
if t:
314+
trap_if(length > Buffer.MAX_LENGTH)
315+
if t and length > 0:
316316
trap_if(ptr != align_to(ptr, alignment(t)))
317317
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
318318
self.cx = cx
@@ -702,13 +702,23 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
702702
return 'blocked'
703703
else:
704704
ncopy = min(src.remain(), dst.remain())
705-
assert(ncopy > 0)
706-
dst.write(src.read(ncopy))
707-
if self.pending_buffer.remain() > 0:
708-
self.pending_on_partial_copy(self.reset_pending)
705+
if ncopy > 0:
706+
dst.write(src.read(ncopy))
707+
if self.pending_buffer.remain() > 0:
708+
self.pending_on_partial_copy(self.reset_pending)
709+
else:
710+
self.reset_and_notify_pending()
711+
return 'done'
709712
else:
710-
self.reset_and_notify_pending()
711-
return 'done'
713+
if self.pending_buffer.remain() == 0:
714+
self.reset_and_notify_pending()
715+
if buffer.remain() == 0:
716+
return 'done'
717+
else:
718+
self.pending_buffer = buffer
719+
self.pending_on_partial_copy = on_partial_copy
720+
self.pending_on_copy_done = on_copy_done
721+
return 'blocked'
712722

713723
class StreamEnd(Waitable):
714724
stream: ReadableStream

design/mvp/canonical-abi/run_tests.py

+26-2
Original file line numberDiff line numberDiff line change
@@ -1457,8 +1457,19 @@ async def core_func1(task, args):
14571457
assert(mem1[retp+0] == wsi)
14581458
assert(mem1[retp+4] == 4)
14591459

1460+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1461+
assert(ret == definitions.BLOCKED)
1462+
14601463
fut4.set_result(None)
14611464

1465+
[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
1466+
assert(event == EventCode.STREAM_WRITE)
1467+
assert(mem1[retp+0] == wsi)
1468+
assert(mem1[retp+4] == 0)
1469+
1470+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1471+
assert(ret == 0)
1472+
14621473
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
14631474
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
14641475
[] = await canon_waitable_set_drop(task, seti)
@@ -1498,6 +1509,9 @@ async def core_func2(task, args):
14981509
fut2.set_result(None)
14991510
await task.on_block(fut3)
15001511

1512+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1513+
assert(ret == 0)
1514+
15011515
mem2[0:8] = bytes(8)
15021516
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
15031517
assert(ret == 2)
@@ -1508,9 +1522,19 @@ async def core_func2(task, args):
15081522

15091523
await task.on_block(fut4)
15101524

1511-
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
1525+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1526+
assert(ret == 0)
1527+
1528+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1529+
assert(ret == definitions.BLOCKED)
1530+
1531+
[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
1532+
assert(event == EventCode.STREAM_READ)
1533+
assert(mem2[retp+0] == rsi)
1534+
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
15121535
errctxi = 1
1513-
assert(ret == (definitions.CLOSED | errctxi))
1536+
assert(p2 == (definitions.CLOSED | errctxi))
1537+
15141538
[] = await canon_stream_close_readable(U8Type(), task, rsi)
15151539
[] = await canon_waitable_set_drop(task, seti)
15161540
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)

0 commit comments

Comments
 (0)