-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release reader immediately when shutting down a pipe #1208
base: main
Are you sure you want to change the base?
Release reader immediately when shutting down a pipe #1208
Conversation
Hmm, this means that the pipe could still cancel the readable after it has released its reader lock but before it settles the pipe promise. That might be quite unexpected: the user might have already acquired a new reader, and only that new reader should have the ability to cancel the stream. Possible solutions:
Option 2 is probably the easiest to implement, but it may require a note to explain why this weird "release + acquire" dance is necessary. Then again, we already mention that:
so user agents are free to implement these steps differently if they want. 🤷♂️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like either option (1) or (2) is observably equivalent, right? My instinct would then be to go with whichever is easiest to spec (probably (2)?), and then explain that it's equivalent to the other one.
If we go with (2) and explain it's equivalent to (1), we're explaining the weird dance and saying that perhaps a more natural implementation would be (1), if you're less into code reuse than we are.
If we go with (1) and explain it's equivalent to (2), then we're explaining how despite doing some tricky direct manipulations of the internals, we're not actually doing anything which you couldn't already do with the public API.
Yes, they're equivalent. I'll go with option 2. 👍 |
5e5adad
to
15a9768
Compare
cf688a5
to
7008ee5
Compare
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { | ||
return promiseResolvedWith(true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implements @domenic's suggestion from #1207 (comment).
I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:
Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.
We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't think we need to update the spec text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the reference implementation to fix the issue from #1207 (comment). I still need to update the spec text, although this might be tricky because the pipeTo()
specification is a bit vague on how to implement the "if dest becomes errored" check. 😕
Also, is it okay to piggyback this fix onto this PR, or would you prefer I move it to a separate one? (The new fix does depend on the initial fix though, we can't land it entirely separately.)
@@ -228,7 +235,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |||
}); | |||
|
|||
// Errors must be propagated backward | |||
isOrBecomesErrored(dest, writer._closedPromise, storedError => { | |||
WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new helper allows attaching a synchronous callback for when dest becomes "erroring"
or "errored"
, following the discussion in #1207 (comment). I added a test for this in web-platform-tests/wpt@1646d65.
is or becomes "`errored`", then | ||
is or becomes "`erroring`" or "`errored`", then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's useful to keep the pipe going when dest has already become "erroring"
? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
is or becomes "`errored`", then | ||
is or becomes "`erroring`" or "`errored`", then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { | ||
return promiseResolvedWith(true); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't think we need to update the spec text.
@@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) { | |||
return WritableStreamDefaultControllerGetDesiredSize(stream._controller); | |||
} | |||
|
|||
function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe since this is not part of the standard, it should start with a lower-case letter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. 👍
What do you suggest we do put in the spec text?
- We can add a note saying that "errors must be propagated backward" must be handled synchronously as soon as that condition becomes true, in other words that it's not enough to add an asynchronous callback to
writer.closed
.- On the other hand: since the condition is now "is or becomes erroring or errored", maybe it's already clear enough that
writer.closed
is not good enough?
- On the other hand: since the condition is now "is or becomes erroring or errored", maybe it's already clear enough that
- We can add a note in
WritableStreamStartErroring
below the "setstream.[[state]]
to"erroring"
" step to remind implementers that this is where that condition frompipeTo()
can become true. - Alternatively, we tell implementers to look at the reference implementation for an example on how to do it... 😛
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup.
I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this very specific to the one change from "writable" to "erroring"?
That's the most noticeable case, since it determines whether or not we may drop a chunk (i.e. we accidentally read a chunk that we can no longer write).
I'm not sure whether it matters for the state transitions of the readable end. There might be an edge case where two shutdown conditions become true at the same time, and then it matters which condition is handled first. For example:
readableController.error(error1); // pipeTo() should immediately call writer.abort(error1)
writableController.error(error2); // should be ignored, since writable is already erroring
// => pipeTo() rejects with error1
versus:
writableController.error(error2); // pipeTo() should immediately call reader.cancel(error2)
readableController.error(error1); // should be ignored, since readable is already closed
// => pipeTo() rejects with error2
If we were to use a synchronous reaction for the writable -> erroring
transition but an asynchronous reaction for the readable -> errored
transition, then the first snippet would also behave like the second one... I think. 😛
I'll try to whip up some more WPTs to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are two possible tests for this:
promise_test(async t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipeToPromise = rs.pipeTo(ws);
await flushAsyncEvents();
rs.controller.error(error1);
ws.controller.error(error2);
await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with readable\'s error');
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
await promise_rejects_exactly(t, error1, rs.getReader().closed);
await promise_rejects_exactly(t, error2, ws.getWriter().closed);
}, 'Piping: error the readable stream right before erroring the writable stream');
promise_test(async t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipeToPromise = rs.pipeTo(ws);
await flushAsyncEvents();
ws.controller.error(error1);
rs.controller.error(error2);
await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with writable\'s error');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
await promise_rejects_exactly(t, error1, ws.getWriter().closed);
await rs.getReader().closed;
}, 'Piping: error the writable stream right before erroring the readable stream');
The behavior might be a bit surprising though. In the first test, ws is still writable when we call rs.controller.error()
, so we end up in:
uponFulfillment(waitForWritesToFinish(), doTheRest); |
This adds at least one microtask of delay (even if there are no pending writes), so we will not yet call ws.abort(error1)
. Instead, ws.controller.error(error2)
goes through, and the abort gets ignored later on.
However, in the second test, because ws immediately becomes errored, we don't wait for pending writes to complete and instead we synchronously call rs.cancel(error1)
. Therefore, rs.controller.error(error2)
gets ignored, and the stream ends up cancelled instead of errored.
The specification is a bit vague about this. It says:
Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled).
It doesn't say how long this step can take. We may want to require that if there are no pending writes (i.e. we've never started any writes, or all writes have already settled), then this step must complete synchronously. Then, in the first test, we would call ws.abort(error1)
synchronously and prevent ws.controller.error(error2)
. However, that might be tricky to actually implement correctly... 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest limiting the sync part to as small as possible to fix the issue. This still leaves the problem of how to spec it. We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest limiting the sync part to as small as possible to fix the issue.
The sync part is already minimal. We have to go from "if source becomes errored" all the way to "perform WritableStreamAbort" in order to avoid ws.controller.error()
from affecting the result. Thus, the entirety of step 3 in "shutdown with an action" must become synchronous (only if there are no pending writes that need to be awaited).
Anyway, I found another way to fix it. We keep track of how many ReadableStreamDefaultReaderRead()
requests are still outstanding, and we only handle the source.[[state]] == "closed"
transition after all those requests are settled. See MattiasBuelens@3c8b3c2.
However, this test still fails. We do call dest.abort()
and source.cancel()
in the correct order, but it seems like underlyingSink.abort()
and underlyingSource.cancel()
are being called in the wrong order. When we call WritableStreamStartErroring, the writable controller is not yet started, so we postpone calling sink.abort()
until after sink.start()
resolves. Previously, the writable would already have been started while we were asynchronously waiting for the writes to finish (even if there were no pending writes).
Adding await flushAsyncEvents()
before calling pipeTo()
in that test restores the order and fixes the problem. Good enough? 🤷♂️
We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.
I agree, the reference implementation is becoming increasingly complicated in order to deal with these edge cases. 😞
I'm wondering if it's even worth trying to spec these edge cases, or instead allow some wiggle room in how quickly pipeTo()
must respond to these state transitions. But then it would become impossible to test the behavior, or we'd have to allow multiple valid outcomes... 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.
On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?
My preference would be to try to resolve things like so:
- Decide whether we're OK constraining all observable behavior, or want to allow leeway. In particular when one or both ends of a pipe are web-developer-created streams, a good bit more becomes observable.
- Write tests reflecting the result of the previous decision. E.g. if we want to nail down all observable behavior, I think @MattiasBuelens has done a great job capturing as many scenarios as possible. (❤️!) We should analyze them for what reasonable expected behavior is, and assert that. (If we don't have strong feelings on reasonable expected behavior, then we can feel free to change the assertions in the next step.)
- Come up with some more-rigorous formulation of "[[state]] is or becomes" for the spec which meets the expectations of those tests. This probably will make the spec more complex, and more like the reference implementation. E.g. it could be adding promise handlers (probably in combination with something like MattiasBuelens@3c8b3c2), or having separate synchronous state-change steps. Given that this will only be used for pipeTo, we can probably consider spec strategies that aren't as detailed and algorithmic as the rest of the spec, but they do need to be clear and unambigious between the (1)-(4) above.
As an example of how to apply this process,
Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷♂️
My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.
Correct. Streams created by the user agent will use the exported algorithms, and I think it's safe to assume that those will be called in a separate task, outside of web author code.
On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?
(2) may be ill-specified, since there are cases where streams code calls into author code, which can then call back into streams code. We've even had cases in the past where streams code calls back into itself, e.g. #1172.
I still prefer (1), and that's what I've been implementing. Yes, we need to be very careful when speccing, but at least any problems that arise can be fixed within the streams implementation.
My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.
That seems reasonable. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming back to this:
Adding
await flushAsyncEvents()
before callingpipeTo()
in that test restores the order and fixes the problem. Good enough? 🤷♂️
It seems the main difference is that, when you call readable.cancel()
, we always synchronously call source.cancel()
regardless of whether source.start()
has already settled. On the other hand, when you call writable.abort()
, we first wait for sink.start()
to settle before we call sink.abort()
.
IIRC the reason for this difference is so you can do e.g. an async loop in source.start()
:
new ReadableStream({
async start(c) {
for (let i = 0; i < 10; i++) {
await new Promise(r => setTimeout(r, 1000));
c.enqueue("chunk");
}
c.close();
}
})
whereas for sink.start()
this doesn't make sense.
I guess, if we really wanted to, we could have the test check when writableController.signal
becomes aborted? That should happen synchronously regardless of whether sink.start()
has already settled.
…erroring/errored during pipe
I've generalized the implementation a bit. I added two new helpers:
The This should handle all possible synchronous state transitions, and check the shutdown conditions in the correct order as specified by the standard's text. (Perhaps we want to also use these helpers for |
@@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) { | |||
return WritableStreamDefaultControllerGetDesiredSize(stream._controller); | |||
} | |||
|
|||
function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup.
I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note.
In #1207, we raised concern about whether it's currently possible for
pipeTo()
to drop chunks when the pipe shuts down while it still has a pending read request.It turns out that this is indeed possible. The new WPT tests demonstrate at least one way this can happen: by aborting a pipe with an
AbortSignal
, and then enqueuing a chunk immediately after aborting the signal.This PR fixes this by releasing the pipe's reader immediately when starting the shutdown process, rather than when it finalizes the shutdown. This ensures that any pending reads that were started are immediately rejected, and the chunks stay in the source's queue.
This does have the side effect of releasing the reader before the pipe promise resolves. I don't know if this is acceptable.Fixed in #1208 (comment).(See WHATWG Working Mode: Changes for more details.)
Preview | Diff