From 558653d1797031739db2bb479370b25e50f6a1c2 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 16 Jan 2022 00:32:33 +0100 Subject: [PATCH 1/4] Test whether pipe stops pulling on abort --- streams/piping/abort.any.js | 57 +++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/streams/piping/abort.any.js b/streams/piping/abort.any.js index bb62dc8c41448e..165f5b85391b6b 100644 --- a/streams/piping/abort.any.js +++ b/streams/piping/abort.any.js @@ -385,3 +385,60 @@ promise_test(t => { assert_array_equals(rs.events, ['pull'], 'cancel should not have been called'); }); }, 'abort should do nothing after the writable is errored'); + +promise_test(async t => { + let resolvePullCalled; + const pullCalledPromise = new Promise(resolve => { + resolvePullCalled = resolve; + }); + const rs = recordingReadableStream({ + pull: t.step_func(() => { + resolvePullCalled(); + }) + }, { highWaterMark: 0 }); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + let resolveWrite; + const ws = recordingWritableStream({ + write: t.step_func(() => { + resolveWriteCalled(); + return new Promise(resolve => { + resolveWrite = resolve; + }); + }), + }, { highWaterMark: Infinity }); + + rs.controller.enqueue('a'); + assert_array_equals(rs.events, [], 'pull() has not yet been called'); + + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }); + + // The pipe must start writing the first chunk. + await writeCalledPromise; + assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + // The pipe must immediately try to read the next chunk, since the destination desired more chunks. + await pullCalledPromise; + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + + // Abort the pipe. + // Any chunks enqueued after aborting must not be piped to the destination. + abortController.abort(error1); + rs.controller.enqueue('b'); + await flushAsyncEvents(); + + // Finish the current write, allowing the pipe to complete. + resolveWrite(); + + await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo() should reject with abort reason'); + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + + const reader = rs.getReader(); + const result = await reader.read(); + assert_object_equals(result, { value: 'b', done: false }, 'first read after pipeTo() should be correct'); +}, 'abort should stop pulling from source; preventCancel = true, preventAbort = true'); From 2aeaaf146d07af07ee18019e216d5dbf9e8daeab Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 16 Jan 2022 00:38:29 +0100 Subject: [PATCH 2/4] Add similar test with preventAbort = false --- streams/piping/abort.any.js | 57 +++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/streams/piping/abort.any.js b/streams/piping/abort.any.js index 165f5b85391b6b..f66510bfb6bca0 100644 --- a/streams/piping/abort.any.js +++ b/streams/piping/abort.any.js @@ -386,6 +386,63 @@ promise_test(t => { }); }, 'abort should do nothing after the writable is errored'); +promise_test(async t => { + let resolvePullCalled; + const pullCalledPromise = new Promise(resolve => { + resolvePullCalled = resolve; + }); + const rs = recordingReadableStream({ + pull: t.step_func(() => { + resolvePullCalled(); + }) + }, { highWaterMark: 0 }); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + let resolveWrite; + const ws = recordingWritableStream({ + write: t.step_func(() => { + resolveWriteCalled(); + return new Promise(resolve => { + resolveWrite = resolve; + }); + }), + }, { highWaterMark: Infinity }); + + rs.controller.enqueue('a'); + assert_array_equals(rs.events, [], 'pull() has not yet been called'); + + const abortController = new AbortController(); + const signal = abortController.signal; + const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true }); + + // The pipe must start writing the first chunk. + await writeCalledPromise; + assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + // The pipe must immediately try to read the next chunk, since the destination desired more chunks. + await pullCalledPromise; + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + + // Abort the pipe. + // Any chunks enqueued after aborting must not be piped to the destination. + abortController.abort(error1); + rs.controller.enqueue('b'); + await flushAsyncEvents(); + + // Finish the current write, allowing the pipe to complete. + resolveWrite(); + + await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo() should reject with abort reason'); + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + assert_array_equals(ws.events, ['write', 'a', 'abort', error1], 'write() and abort() must have been called'); + + const reader = rs.getReader(); + const result = await reader.read(); + assert_object_equals(result, { value: 'b', done: false }, 'first read after pipeTo() should be correct'); +}, 'abort should stop pulling from source; preventCancel = true'); + promise_test(async t => { let resolvePullCalled; const pullCalledPromise = new Promise(resolve => { From 9c259c33236adb4464bbe57383064e8fb22a0a6d Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 16 Jan 2022 01:21:15 +0100 Subject: [PATCH 3/4] Prevent test from getting stuck --- streams/piping/abort.any.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/streams/piping/abort.any.js b/streams/piping/abort.any.js index f66510bfb6bca0..f67f915eb9ae47 100644 --- a/streams/piping/abort.any.js +++ b/streams/piping/abort.any.js @@ -402,12 +402,13 @@ promise_test(async t => { resolveWriteCalled = resolve; }); let resolveWrite; + const writePromise = new Promise(resolve => { + resolveWrite = resolve; + }); const ws = recordingWritableStream({ write: t.step_func(() => { resolveWriteCalled(); - return new Promise(resolve => { - resolveWrite = resolve; - }); + return writePromise; }), }, { highWaterMark: Infinity }); @@ -438,6 +439,7 @@ promise_test(async t => { assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); assert_array_equals(ws.events, ['write', 'a', 'abort', error1], 'write() and abort() must have been called'); + rs.controller.close(); const reader = rs.getReader(); const result = await reader.read(); assert_object_equals(result, { value: 'b', done: false }, 'first read after pipeTo() should be correct'); @@ -459,12 +461,13 @@ promise_test(async t => { resolveWriteCalled = resolve; }); let resolveWrite; + const writePromise = new Promise(resolve => { + resolveWrite = resolve; + }); const ws = recordingWritableStream({ write: t.step_func(() => { resolveWriteCalled(); - return new Promise(resolve => { - resolveWrite = resolve; - }); + return writePromise; }), }, { highWaterMark: Infinity }); @@ -495,6 +498,7 @@ promise_test(async t => { assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + rs.controller.close(); const reader = rs.getReader(); const result = await reader.read(); assert_object_equals(result, { value: 'b', done: false }, 'first read after pipeTo() should be correct'); From 1646d657bf258f850afb5f22860ec02dccbc6b7a Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 20 Jan 2022 23:30:08 +0100 Subject: [PATCH 4/4] Test whether erroring immediately before enqueuing works --- .../piping/error-propagation-backward.any.js | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/streams/piping/error-propagation-backward.any.js b/streams/piping/error-propagation-backward.any.js index 6dc203066e3d7e..962c03181d72bc 100644 --- a/streams/piping/error-propagation-backward.any.js +++ b/streams/piping/error-propagation-backward.any.js @@ -628,3 +628,57 @@ promise_test(t => { }); }, 'Errors must be propagated backward: erroring via the controller errors once pending write completes'); + +promise_test(async t => { + let resolvePullCalled; + const pullCalledPromise = new Promise(resolve => { + resolvePullCalled = resolve; + }); + const rs = recordingReadableStream({ + pull: t.step_func(() => { + resolvePullCalled(); + }) + }, { highWaterMark: 0 }); + + let resolveWriteCalled; + const writeCalledPromise = new Promise(resolve => { + resolveWriteCalled = resolve; + }); + let resolveWrite; + const writePromise = new Promise(resolve => { + resolveWrite = resolve; + }); + const ws = recordingWritableStream({ + write: t.step_func(() => { + resolveWriteCalled(); + return writePromise; + }), + }, { highWaterMark: Infinity }); + + rs.controller.enqueue('a'); + assert_array_equals(rs.events, [], 'pull() has not yet been called'); + + const pipePromise = rs.pipeTo(ws, { preventCancel: true }); + + // The pipe must start writing the first chunk. + await writeCalledPromise; + assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + // The pipe must immediately try to read the next chunk, since the destination desired more chunks. + await pullCalledPromise; + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + + // Error the destination. + // Any chunks enqueued after erroring must not be piped to the destination. + ws.controller.error(error1); + rs.controller.enqueue('b'); + resolveWrite(); + + await promise_rejects_exactly(t, error1, pipePromise, 'pipeTo() should reject with writable error'); + assert_array_equals(rs.events, ['pull'], 'pull() must have been called once'); + assert_array_equals(ws.events, ['write', 'a'], 'write() must have been called once'); + + rs.controller.close(); + const reader = rs.getReader(); + const result = await reader.read(); + assert_object_equals(result, { value: 'b', done: false }, 'first read after pipeTo() should be correct'); +}, 'Errors must be propagated backward: becomes errored immediately before source receives a chunk; preventCancel = true');