diff --git a/ion/src/format/typedarray.rs b/ion/src/format/typedarray.rs index fd641ee0..dacff6a1 100644 --- a/ion/src/format/typedarray.rs +++ b/ion/src/format/typedarray.rs @@ -124,6 +124,10 @@ where ") [".color(colour) )?; + if elements.is_empty() { + return "]".color(colour).fmt(f); + } + let indent = indent_str((self.cfg.indentation + self.cfg.depth + 1) as usize); f.write_str(NEWLINE)?; indent.fmt(f)?; @@ -142,7 +146,7 @@ where } f.write_str(NEWLINE)?; - "}".color(colour).fmt(f)?; + "]".color(colour).fmt(f)?; Ok(()) } diff --git a/runtime/src/globals/streams/readable/controller.rs b/runtime/src/globals/streams/readable/controller.rs index ceecc6b3..b49ae295 100644 --- a/runtime/src/globals/streams/readable/controller.rs +++ b/runtime/src/globals/streams/readable/controller.rs @@ -40,7 +40,7 @@ impl PullIntoDescriptor { ArrayBuffer::from(unsafe { Local::from_heap(&self.buffer) }).unwrap() } - pub(crate) fn construct<'cx>(&self, cx: &'cx Context) -> ResultExc> { + pub(crate) fn construct<'cx>(&self, cx: &'cx Context) -> Result> { let view = unsafe { (self.constructor)( cx.as_ptr(), @@ -52,10 +52,10 @@ impl PullIntoDescriptor { if !view.is_null() { Ok(ArrayBufferView::from(cx.root(view)).unwrap()) - } else if let Some(exception) = Exception::new(cx)? { + } else if let Some(Exception::Error(exception)) = Exception::new(cx)? { Err(exception) } else { - Err(Error::new("Failed to Initialise Array Buffer", ErrorKind::Normal).into()) + Err(Error::new("Failed to Initialise Array Buffer", None)) } } @@ -94,7 +94,7 @@ pub enum Controller<'c> { ByteStream(&'c mut ByteStreamController), } -impl Controller<'_> { +impl<'c> Controller<'c> { pub(crate) fn common_mut(&mut self) -> &mut CommonController { match self { Controller::Default(controller) => &mut controller.common, @@ -102,6 +102,20 @@ impl Controller<'_> { } } + pub(crate) fn into_default(self) -> Option<&'c mut DefaultController> { + match self { + Controller::Default(controller) => Some(controller), + Controller::ByteStream(_) => None, + } + } + + pub(crate) fn into_byte_stream(self) -> Option<&'c mut ByteStreamController> { + match self { + Controller::ByteStream(controller) => Some(controller), + Controller::Default(_) => None, + } + } + pub fn cancel<'cx: 'v, 'v>(&mut self, cx: &'cx Context, reason: Option>) -> ResultExc> { match self { Controller::Default(controller) => controller.reset_queue(cx), @@ -396,10 +410,6 @@ pub(crate) trait ControllerInternals: ClassDefinition { Ok(()) } } - - fn error(&mut self, cx: &Context, error: Option) -> Result<()> { - self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle)) - } } #[js_class] @@ -438,7 +448,7 @@ impl DefaultController { self.common.desired_size(cx) } - pub fn close(&mut self, cx: &Context) -> Result<()> { + pub fn close(&mut self, cx: &Context) -> ResultExc<()> { let stream = self.common.stream(cx)?; if self.common.can_close_or_enqueue(stream) { if self.queue.is_empty() { @@ -449,7 +459,7 @@ impl DefaultController { stream.close(cx) } else { - Err(Error::new("Cannot Close Stream", ErrorKind::Type)) + Err(Error::new("Cannot Close Stream", ErrorKind::Type).into()) } } @@ -503,7 +513,7 @@ impl DefaultController { } pub fn error(&mut self, cx: &Context, error: Option) -> Result<()> { - ControllerInternals::error(self, cx, error) + self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle)) } } @@ -528,7 +538,7 @@ impl ControllerInternals for DefaultController { pub struct ByteStreamController { pub(crate) common: CommonController, pub(crate) auto_allocate_chunk_size: usize, - byob_request: Option>>, + pub(crate) byob_request: Option>>, pub(crate) pending_descriptors: VecDeque, pub(crate) queue: VecDeque<(Box>, usize, usize)>, } @@ -563,7 +573,7 @@ impl ByteStreamController { pub(crate) fn fill_pull_into_descriptor( &mut self, cx: &Context, descriptor: &mut PullIntoDescriptor, - ) -> ResultExc { + ) -> Result { let aligned = descriptor.filled - descriptor.filled % descriptor.element; let max_copy = self.common.queue_size.min(descriptor.length - descriptor.filled); let max_aligned = descriptor.filled + max_copy - (descriptor.filled + max_copy) % descriptor.element; @@ -585,8 +595,13 @@ impl ByteStreamController { len = *length; let chunk = ArrayBuffer::from(unsafe { Local::from_heap(chunk) }).unwrap(); let buffer = descriptor.buffer(); - if !buffer.copy_data_to(cx, &chunk, copy, descriptor.offset + descriptor.filled, *offset) { - return Err(Exception::new(cx)?.unwrap()); + if !chunk.copy_data_to(cx, &buffer, *offset, descriptor.offset + descriptor.filled, copy) { + let error = if let Some(Exception::Error(error)) = Exception::new(cx)? { + error + } else { + Error::new("Failed to copy data to descriptor buffer.", None) + }; + return Err(error); } } @@ -620,13 +635,17 @@ impl ByteStreamController { pub(crate) fn enqueue_cloned_chunk( &mut self, cx: &Context, buffer: &ArrayBuffer, offset: usize, length: usize, - ) -> ResultExc<()> { + ) -> Result<()> { let buffer = match buffer.clone(cx, offset, length) { Some(buffer) => buffer, None => { - let exception = Exception::new(cx)?.unwrap(); - self.error_internal(cx, &exception.as_value(cx))?; - return Err(exception); + let error = if let Some(Exception::Error(error)) = Exception::new(cx)? { + error + } else { + Error::new("Failed to clone ArrayBuffer", None) + }; + self.error_internal(cx, &error.as_value(cx))?; + return Err(error); } }; @@ -656,6 +675,139 @@ impl ByteStreamController { Ok(()) } + pub(crate) fn respond(&mut self, cx: &Context, written: usize) -> ResultExc<()> { + let descriptor = self.pending_descriptors.front().unwrap(); + let stream = self.common.stream(cx)?; + match stream.state { + State::Readable => { + if written == 0 { + return Err(Error::new("Readable Stream must be written to.", ErrorKind::Type).into()); + } + if descriptor.filled + written > descriptor.length { + return Err( + Error::new("Buffer of BYOB Request View has been overwritten.", ErrorKind::Range).into(), + ); + } + } + State::Closed => { + if written != 0 { + return Err(Error::new("Closed Stream must not be written to.", ErrorKind::Type).into()); + } + } + State::Errored => return Err(Error::new("Errored Stream cannot have BYOB Request", ErrorKind::Type).into()), + } + + let (buffer, kind) = { + let descriptor = self.pending_descriptors.get_mut(0).unwrap(); + let buffer = descriptor.buffer().transfer(cx)?; + descriptor.buffer.set(buffer.get()); + (buffer, descriptor.kind) + }; + + self.invalidate_byob_request(cx)?; + + match stream.state { + State::Readable => { + let mut descriptor = self.pending_descriptors.pop_front().unwrap(); + descriptor.filled += written; + + match kind { + ReaderKind::None => { + if descriptor.filled > 0 { + self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?; + } + + if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? { + self.process_descriptors(cx, reader, stream.state)?; + } + } + _ => { + if descriptor.filled < descriptor.element { + return Ok(()); + } + + let remainder = descriptor.filled % descriptor.element; + + if remainder > 0 { + self.enqueue_cloned_chunk( + cx, + &buffer, + descriptor.offset + descriptor.filled - remainder, + remainder, + )?; + + descriptor.filled -= remainder; + } + + if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? { + descriptor.commit(cx, reader, stream.state)?; + self.process_descriptors(cx, reader, stream.state)?; + } + } + } + } + State::Closed => match stream.native_reader(cx)? { + Some(Reader::Byob(reader)) => { + while !reader.common.requests.is_empty() { + let mut descriptor = self.pending_descriptors.pop_front().unwrap(); + descriptor.commit(cx, reader, State::Closed)?; + } + } + _ => { + self.pending_descriptors.pop_front(); + } + }, + State::Errored => unreachable!(), + } + + self.pull_if_needed(cx) + } + + pub(crate) fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> { + let buffer = view.buffer(cx); + + if buffer.is_detached() { + return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into()); + } + + let stream = self.common.stream(cx)?; + match stream.state { + State::Readable => { + if view.is_empty() { + return Err(Error::new("View must have a non-zero length", ErrorKind::Type).into()); + } + } + State::Closed => { + if !view.is_empty() { + return Err(Error::new( + "View for a Closed Readable Stream must have a zero length", + ErrorKind::Type, + ) + .into()); + } + } + State::Errored => unreachable!(), + } + + let offset = view.offset(); + let descriptor = self.pending_descriptors.get_mut(0).unwrap(); + + if descriptor.offset + descriptor.filled != offset { + return Err(Error::new("View Offset must be the same as descriptor.", ErrorKind::Range).into()); + } + if descriptor.length != view.len() { + return Err(Error::new("View Length must be the same as descriptor.", ErrorKind::Range).into()); + } + if descriptor.filled + view.len() > descriptor.length { + return Err(Error::new("View cannot overfill descriptor", ErrorKind::Range).into()); + } + + let len = view.len(); + let buffer = buffer.transfer(cx)?; + descriptor.buffer.set(buffer.get()); + self.respond(cx, len) + } + #[ion(get)] pub fn get_desired_size(&self, cx: &Context) -> Result { self.common.desired_size(cx) @@ -688,7 +840,7 @@ impl ByteStreamController { } } - pub fn close(&mut self, cx: &Context) -> Result<()> { + pub fn close(&mut self, cx: &Context) -> ResultExc<()> { let stream = self.common.stream(cx)?; if self.common.can_close_or_enqueue(stream) { if self.common.queue_size > 0 { @@ -699,14 +851,14 @@ impl ByteStreamController { if descriptor.filled % descriptor.element > 0 { let error = Error::new("Pending Pull-Into Not Empty", ErrorKind::Type); self.error_internal(cx, &error.as_value(cx))?; - return Err(error); + return Err(error.into()); } } self.common.source.clear_algorithms(); stream.close(cx) } else { - Err(Error::new("Cannot Close Byte Stream Controller", ErrorKind::Type)) + Err(Error::new("Cannot Close Byte Stream Controller", ErrorKind::Type).into()) } } @@ -722,15 +874,20 @@ impl ByteStreamController { let stream = self.common.stream(cx)?; if self.common.can_close_or_enqueue(stream) { - let buffer = buffer.transfer(cx)?; let offset = chunk.offset(); + let length = chunk.len(); + let buffer = buffer.transfer(cx)?; let mut shift = false; - if let Some(descriptor) = self.pending_descriptors.front() { + if !self.pending_descriptors.is_empty() { + self.invalidate_byob_request(cx)?; + let descriptor = self.pending_descriptors.front().unwrap(); let buffer = descriptor.buffer().transfer(cx)?; descriptor.buffer.set(buffer.get()); - if descriptor.kind == ReaderKind::None && descriptor.filled > 0 { - self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?; + if descriptor.kind == ReaderKind::None { + if descriptor.filled > 0 { + self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?; + } shift = true; } } @@ -749,7 +906,7 @@ impl ByteStreamController { self.pending_descriptors.pop_front(); let array = - Uint8Array::with_array_buffer(cx, &buffer, offset, chunk.len()).unwrap().as_value(cx); + Uint8Array::with_array_buffer(cx, &buffer, offset, length).unwrap().as_value(cx); (request.chunk)(cx, &promise, &array); complete = true; @@ -772,19 +929,19 @@ impl ByteStreamController { } if !complete { - self.queue.push_back((Heap::boxed(buffer.get()), offset, chunk.len())); - self.common.queue_size += chunk.len(); + self.queue.push_back((Heap::boxed(buffer.get()), offset, length)); + self.common.queue_size += length; } } Some(Reader::Byob(reader)) => { - self.queue.push_back((Heap::boxed(buffer.get()), offset, chunk.len())); - self.common.queue_size += chunk.len(); + self.queue.push_back((Heap::boxed(buffer.get()), offset, length)); + self.common.queue_size += length; self.process_descriptors(cx, reader, stream.state)?; } None => { - self.queue.push_back((Heap::boxed(buffer.get()), offset, chunk.len())); - self.common.queue_size += chunk.len(); + self.queue.push_back((Heap::boxed(buffer.get()), offset, length)); + self.common.queue_size += length; } } self.pull_if_needed(cx) @@ -794,7 +951,7 @@ impl ByteStreamController { } pub fn error(&mut self, cx: &Context, error: Option) -> Result<()> { - ControllerInternals::error(self, cx, error) + self.error_internal(cx, &error.unwrap_or_else(Value::undefined_handle)) } } @@ -829,104 +986,13 @@ impl ByobRequest { )) } - pub(crate) fn respond_internal(&mut self, cx: &Context, written: usize) -> ResultExc<()> { - let controller = self.controller.take().unwrap(); - let controller = Object::from(unsafe { Local::from_heap(&controller) }); - let controller = ByteStreamController::get_mut_private(cx, &controller)?; - let descriptor = controller.pending_descriptors.front().unwrap(); - let stream = controller.common.stream(cx)?; - match stream.state { - State::Readable => { - if written == 0 { - return Err(Error::new("Readable Stream must be written to.", ErrorKind::Type).into()); - } - if descriptor.filled + written > descriptor.length { - return Err( - Error::new("Buffer of BYOB Request View has been overwritten.", ErrorKind::Range).into(), - ); - } - } - State::Closed => { - if written != 0 { - return Err(Error::new("Closed Stream must not be written to.", ErrorKind::Type).into()); - } - } - State::Errored => return Err(Error::new("Errored Stream cannot have BYOB Request", ErrorKind::Type).into()), - } - - let (buffer, kind) = { - let descriptor = controller.pending_descriptors.get_mut(0).unwrap(); - let buffer = descriptor.buffer().transfer(cx)?; - descriptor.buffer.set(buffer.get()); - (buffer, descriptor.kind) - }; - - controller.invalidate_byob_request(cx)?; - - match stream.state { - State::Readable => { - let mut descriptor = controller.pending_descriptors.pop_front().unwrap(); - descriptor.filled += written; - - match kind { - ReaderKind::None => { - if descriptor.filled > 0 { - controller.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?; - } - - if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? { - controller.process_descriptors(cx, reader, stream.state)?; - } - } - _ => { - if descriptor.filled < descriptor.element { - return Ok(()); - } - - let remainder = descriptor.filled % descriptor.element; - - if remainder > 0 { - controller.enqueue_cloned_chunk( - cx, - &buffer, - descriptor.offset + descriptor.filled - remainder, - remainder, - )?; - - descriptor.filled -= remainder; - } - - if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? { - descriptor.commit(cx, reader, stream.state)?; - controller.process_descriptors(cx, reader, stream.state)?; - } - } - } - } - State::Closed => match stream.native_reader(cx)? { - Some(Reader::Byob(reader)) => { - while !reader.common.requests.is_empty() { - let mut descriptor = controller.pending_descriptors.pop_front().unwrap(); - descriptor.commit(cx, reader, State::Closed)?; - } - } - _ => { - controller.pending_descriptors.pop_front(); - } - }, - State::Errored => unreachable!(), - } - - controller.pull_if_needed(cx) - } - #[ion(get)] pub fn get_view(&self) -> *mut JSObject { self.view.as_ref().map(|view| view.get()).unwrap_or_else(ptr::null_mut) } pub fn respond(&mut self, cx: &Context, #[ion(convert = ConversionBehavior::Clamp)] written: u64) -> ResultExc<()> { - if self.controller.is_some() { + if let Some(controller) = self.controller.take() { let view = unsafe { Local::from_heap(self.view.as_ref().unwrap()) }; let view = ArrayBufferView::from(view).unwrap(); let buffer = view.buffer(cx); @@ -939,7 +1005,9 @@ impl ByobRequest { return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into()); } - self.respond_internal(cx, written as usize) + let controller = Object::from(unsafe { Local::from_heap(&controller) }); + let controller = ByteStreamController::get_mut_private(cx, &controller)?; + controller.respond(cx, written as usize) } else { Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into()) } @@ -947,54 +1015,12 @@ impl ByobRequest { #[ion(name = "respondWithNewView")] pub fn respond_with_new_view(&mut self, cx: &Context, view: ArrayBufferView) -> ResultExc<()> { - let len = if let Some(controller) = &self.controller { - let controller = Object::from(unsafe { Local::from_heap(controller) }); + if let Some(controller) = self.controller.take() { + let controller = Object::from(unsafe { Local::from_heap(&controller) }); let controller = ByteStreamController::get_mut_private(cx, &controller)?; - let buffer = view.buffer(cx); - - if buffer.is_detached() { - return Err(Error::new("View Buffer cannot be detached.", ErrorKind::Type).into()); - } - - let stream = controller.common.stream(cx)?; - match stream.state { - State::Readable => { - if view.is_empty() { - return Err(Error::new("View must have a non-zero length", ErrorKind::Type).into()); - } - } - State::Closed => { - if !view.is_empty() { - return Err(Error::new( - "View for a Closed Readable Stream must have a zero length", - ErrorKind::Type, - ) - .into()); - } - } - State::Errored => unreachable!(), - } - - let offset = view.offset(); - let descriptor = controller.pending_descriptors.get_mut(0).unwrap(); - - if descriptor.offset + descriptor.filled != offset { - return Err(Error::new("View Offset must be the same as descriptor.", ErrorKind::Range).into()); - } - if descriptor.length != view.len() { - return Err(Error::new("View Length must be the same as descriptor.", ErrorKind::Range).into()); - } - if descriptor.filled + view.len() > descriptor.length { - return Err(Error::new("View cannot overfill descriptor", ErrorKind::Range).into()); - } - - let len = view.len(); - let buffer = buffer.transfer(cx)?; - descriptor.buffer.set(buffer.get()); - len + controller.respond_with_new_view(cx, view) } else { - return Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into()); - }; - self.respond_internal(cx, len) + Err(Error::new("BYOB Request has already been invalidated.", ErrorKind::Type).into()) + } } } diff --git a/runtime/src/globals/streams/readable/mod.rs b/runtime/src/globals/streams/readable/mod.rs index dde5b4f6..9eb157d9 100644 --- a/runtime/src/globals/streams/readable/mod.rs +++ b/runtime/src/globals/streams/readable/mod.rs @@ -18,8 +18,8 @@ use ion::conversions::{ConversionBehavior, FromValue, ToValue}; use ion::function::Opt; pub use reader::{ByobReader, DefaultReader}; use reader::{Reader, ReaderKind}; +use source::{forward_reader_error, TeeBytesState, TeeDefaultState}; pub use source::StreamSource; -use source::TeeState; mod controller; mod reader; @@ -249,9 +249,9 @@ impl ReadableStream { pub(crate) fn tee_internal<'cx>(&mut self, cx: &'cx Context, clone_branch_2: bool) -> [Object<'cx>; 2] { match self.controller_kind { ControllerKind::Default => { - fn create_branch(cx: &Context, state: Rc, second: bool) -> Object { + fn create_branch(cx: &Context, state: Rc, second: bool) -> Object { let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx))); - let source = StreamSource::Tee(state, second); + let source = StreamSource::TeeDefault(state, second); let controller = DefaultController { common: CommonController::new(&branch, source, 1.0), size: None, @@ -271,22 +271,58 @@ impl ReadableStream { branch } - let state = Rc::new(TeeState::new(cx, self, clone_branch_2)); + let state = Rc::new(TeeDefaultState::new(cx, self, clone_branch_2)); let branch1 = create_branch(cx, Rc::clone(&state), false); let branch2 = create_branch(cx, Rc::clone(&state), true); - state.branch[0].set(branch1.handle().get()); - state.branch[1].set(branch2.handle().get()); + state.common.branch[0].set(branch1.handle().get()); + state.common.branch[1].set(branch2.handle().get()); + + [branch1, branch2] + } + ControllerKind::ByteStream => { + fn create_branch(cx: &Context, state: Rc, second: bool) -> Object { + let branch = Object::from(cx.root(ReadableStream::new_raw_object(cx))); + let source = StreamSource::TeeBytes(state, second); + let controller = ByteStreamController { + common: CommonController::new(&branch, source, 1.0), + auto_allocate_chunk_size: 0, + byob_request: None, + pending_descriptors: VecDeque::default(), + queue: VecDeque::default(), + }; + let controller = Heap::boxed(ByteStreamController::new_object(cx, Box::new(controller))); + + unsafe { + let controller = Object::from(Local::from_heap(&controller)); + ByteStreamController::get_mut_private_unchecked(&controller).start(cx, None).unwrap(); + } + + let stream = ReadableStream::new(ControllerKind::ByteStream, controller); + unsafe { + ReadableStream::set_private(branch.handle().get(), Box::new(stream)); + } + branch + } + + let state = Rc::new(TeeBytesState::new(cx, self)); + let branch1 = create_branch(cx, Rc::clone(&state), false); + let branch2 = create_branch(cx, Rc::clone(&state), true); + + state.common.branch[0].set(branch1.handle().get()); + state.common.branch[1].set(branch2.handle().get()); + + let reader = self.native_reader(cx).unwrap().unwrap().into_default().unwrap(); + forward_reader_error(cx, &reader.common.closed(), Rc::clone(&state)).unwrap(); [branch1, branch2] } - ControllerKind::ByteStream => todo!(), } } - pub(crate) fn close(&mut self, cx: &Context) -> Result<()> { + pub(crate) fn close(&mut self, cx: &Context) -> ResultExc<()> { if self.state != State::Readable { - return Err(Error::new("Cannot Close Stream", None)); + return Err(Error::new("Cannot Close Stream", None).into()); } self.state = State::Closed; diff --git a/runtime/src/globals/streams/readable/reader.rs b/runtime/src/globals/streams/readable/reader.rs index aa9e928b..6075639d 100644 --- a/runtime/src/globals/streams/readable/reader.rs +++ b/runtime/src/globals/streams/readable/reader.rs @@ -15,10 +15,10 @@ use ion::function::Opt; use ion::typedarray::{ArrayBufferView, type_to_constructor, type_to_element_size}; use crate::globals::streams::readable::{ReadableStream, State}; -use crate::globals::streams::readable::controller::{Controller, ControllerInternals, ControllerKind, PullIntoDescriptor}; +use crate::globals::streams::readable::controller::{ControllerInternals, ControllerKind, PullIntoDescriptor}; pub type ChunkErrorClosure = dyn Fn(&Context, &Promise, &Value); -pub type CloseClosure = dyn Fn(&Context, &Promise, Option<&Value>) -> Result<()>; +pub type CloseClosure = dyn Fn(&Context, &Promise, Option<&Value>) -> ResultExc<()>; #[derive(Traceable)] pub struct Request { @@ -86,13 +86,27 @@ pub enum Reader<'r> { } impl<'r> Reader<'r> { - pub fn common(&self) -> &CommonReader { + pub(crate) fn common(&self) -> &CommonReader { match self { Reader::Default(reader) => &reader.common, Reader::Byob(reader) => &reader.common, } } + pub(crate) fn into_default(self) -> Option<&'r mut DefaultReader> { + match self { + Reader::Default(reader) => Some(reader), + Reader::Byob(_) => None, + } + } + + pub(crate) fn into_byob(self) -> Option<&'r mut ByobReader> { + match self { + Reader::Byob(reader) => Some(reader), + Reader::Default(_) => None, + } + } + pub fn requests_closed(self) -> (&'r mut VecDeque, Promise<'r>) { let common = match self { Reader::Default(reader) => &mut reader.common, @@ -236,15 +250,6 @@ impl DefaultReader { }) } - pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt) -> ResultExc> { - self.common.cancel(cx, reason) - } - - pub fn read<'cx>(&mut self, cx: &'cx Context) -> ResultExc> { - let promise = Promise::new(cx); - self.read_internal(cx, Request::standard(promise.get())) - } - pub(crate) fn read_internal<'cx>(&mut self, cx: &'cx Context, request: Request) -> ResultExc> { let promise = Promise::from(cx.root(request.promise.get())).unwrap(); if let Some(stream) = self.common.stream(cx)? { @@ -261,6 +266,15 @@ impl DefaultReader { Ok(promise) } + pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt) -> ResultExc> { + self.common.cancel(cx, reason) + } + + pub fn read<'cx>(&mut self, cx: &'cx Context) -> ResultExc> { + let promise = Promise::new(cx); + self.read_internal(cx, Request::standard(promise.get())) + } + #[ion(name = "releaseLock")] pub fn release_lock(&mut self, cx: &Context) -> Result<()> { self.common.release_lock(cx) @@ -311,6 +325,88 @@ impl ByobReader { }) } + pub(crate) fn read_internal<'cx>( + &mut self, cx: &'cx Context, view: ArrayBufferView, request: Request, + ) -> ResultExc> { + let stream = self.common.stream(cx)?.unwrap(); + let promise = Promise::from(cx.root(request.promise.get())).unwrap(); + + stream.disturbed = true; + if stream.state == State::Errored { + (request.error)(cx, &promise, &stream.stored_error()); + return Ok(promise); + } + + let (constructor, element_size) = { + let ty = view.view_type(); + (type_to_constructor(ty), type_to_element_size(ty)) + }; + + let offset = view.offset(); + let length = view.len(); + let buffer = view.buffer(cx); + match buffer.transfer(cx) { + Ok(buffer) => { + let mut descriptor = PullIntoDescriptor { + buffer: Heap::boxed(buffer.get()), + offset, + length: length * element_size, + filled: 0, + element: element_size, + constructor, + kind: ReaderKind::Byob, + }; + + let controller = stream.native_controller(cx)?.into_byte_stream().unwrap(); + if !controller.pending_descriptors.is_empty() { + controller.pending_descriptors.push_back(descriptor); + + if stream.state == State::Readable { + self.common.requests.push_back(request); + } + return Ok(promise); + } else if stream.state == State::Closed { + let empty = descriptor.construct(cx)?.as_value(cx); + (request.close)(cx, &promise, Some(&empty))?; + return Ok(promise); + } else if controller.common.queue_size > 0 { + if controller.fill_pull_into_descriptor(cx, &mut descriptor)? { + let buffer = buffer.transfer(cx)?; + descriptor.buffer.set(buffer.get()); + let view = descriptor.construct(cx)?.as_value(cx); + + if controller.common.queue_size == 0 && controller.common.close_requested { + controller.close(cx)?; + } else { + controller.pull_if_needed(cx)?; + } + + (request.chunk)(cx, &promise, &view); + return Ok(promise); + } else if controller.common.close_requested { + let error = Error::new("Stream closed by request.", ErrorKind::Type); + // TODO: ByteStreamController Error + (request.error)(cx, &promise, &error.as_value(cx)); + return Ok(promise); + } + } + + controller.pending_descriptors.push_back(descriptor); + if stream.state == State::Readable { + self.common.requests.push_back(request); + } + + let stream = self.common.stream(cx)?.unwrap(); + let controller = stream.native_controller(cx)?.into_byte_stream().unwrap(); + controller.pull_if_needed(cx)?; + } + Err(error) => { + (request.error)(cx, &promise, &error.as_value(cx)); + } + } + Ok(promise) + } + pub fn cancel<'cx>(&self, cx: &'cx Context, reason: Opt) -> ResultExc> { self.common.cancel(cx, reason) } @@ -318,8 +414,7 @@ impl ByobReader { pub fn read<'cx>(&mut self, cx: &'cx Context, view: ArrayBufferView) -> ResultExc> { let promise = Promise::new(cx); let request = Request::standard(promise.get()); - - if let Some(stream) = self.common.stream(cx)? { + if self.common.stream.is_some() { if view.is_empty() { return Err(Error::new("View must not be empty.", ErrorKind::Type).into()); } @@ -334,85 +429,15 @@ impl ByobReader { return Err(Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).into()); } - stream.disturbed = true; - if stream.state == State::Errored { - (request.error)(cx, &promise, &stream.stored_error()); - return Ok(promise); - } - - let (constructor, element_size) = { - let ty = view.view_type(); - (type_to_constructor(ty), type_to_element_size(ty)) - }; - - let offset = view.offset(); - let length = view.len(); - match buffer.transfer(cx) { - Ok(buffer) => { - let mut descriptor = PullIntoDescriptor { - buffer: Heap::boxed(buffer.get()), - offset, - length: length * element_size, - filled: 0, - element: element_size, - constructor, - kind: ReaderKind::Byob, - }; - - if let Controller::ByteStream(controller) = stream.native_controller(cx)? { - if !controller.pending_descriptors.is_empty() { - controller.pending_descriptors.push_back(descriptor); - - if stream.state == State::Readable { - self.common.requests.push_back(request); - } - return Ok(promise); - } else if stream.state == State::Closed { - let empty = descriptor.construct(cx)?.as_value(cx); - (request.close)(cx, &promise, Some(&empty))?; - return Ok(promise); - } else if controller.common.queue_size > 0 { - if controller.fill_pull_into_descriptor(cx, &mut descriptor)? { - let buffer = buffer.transfer(cx)?; - descriptor.buffer.set(buffer.get()); - let view = descriptor.construct(cx)?.as_value(cx); - - if controller.common.queue_size == 0 && controller.common.close_requested { - controller.close(cx)?; - } else { - controller.pull_if_needed(cx)?; - } - - (request.chunk)(cx, &promise, &view); - return Ok(promise); - } else if controller.common.close_requested { - let error = Error::new("Stream closed by request.", ErrorKind::Type); - // TODO: ByteStreamController Error - (request.error)(cx, &promise, &error.as_value(cx)); - return Ok(promise); - } - } - - controller.pending_descriptors.push_back(descriptor); - controller.pull_if_needed(cx)?; - - if stream.state == State::Readable { - self.common.requests.push_back(request); - } - } - } - Err(error) => { - (request.error)(cx, &promise, &error.as_value(cx)); - } - } + self.read_internal(cx, view, request) } else { (request.error)( cx, &promise, &Error::new("Reader has already been released.", ErrorKind::Type).as_value(cx), ); + Ok(Promise::new(cx)) } - Ok(promise) } #[ion(name = "releaseLock")] diff --git a/runtime/src/globals/streams/readable/source.rs b/runtime/src/globals/streams/readable/source.rs index 6480d627..58c4b88f 100644 --- a/runtime/src/globals/streams/readable/source.rs +++ b/runtime/src/globals/streams/readable/source.rs @@ -12,15 +12,15 @@ use mozjs::gc::HandleObject; use mozjs::jsapi::{Heap, JSFunction, JSObject}; use mozjs::jsval::{JSVal, UndefinedValue}; -use ion::{ClassDefinition, Context, Function, Local, Object, Promise, ResultExc, TracedHeap, Value}; +use ion::{ClassDefinition, Context, Exception, Function, Local, Object, Promise, Result, ResultExc, TracedHeap, Value}; use ion::class::NativeObject; use ion::conversions::{FromValue, ToValue}; use ion::function::Opt; -use ion::typedarray::ArrayBuffer; +use ion::typedarray::{ArrayBuffer, ArrayBufferView, Uint8Array}; -use crate::globals::streams::readable::controller::Controller; -use crate::globals::streams::readable::ReadableStream; -use crate::globals::streams::readable::reader::{Reader, Request}; +use crate::globals::streams::readable::{ByobRequest, ByteStreamController, ReadableStream, ReaderOptions}; +use crate::globals::streams::readable::controller::ControllerInternals; +use crate::globals::streams::readable::reader::{ReaderKind, Request}; #[derive(Traceable)] pub enum StreamSource { @@ -32,7 +32,8 @@ pub enum StreamSource { }, Bytes(#[trace(no_trace)] Option), BytesBuf(#[trace(no_trace)] Option>), - Tee(Rc, bool), + TeeDefault(Rc, bool), + TeeBytes(Rc, bool), } impl StreamSource { @@ -70,13 +71,13 @@ impl StreamSource { buf.advance(chunk.len()); Ok(Some(Promise::resolved(cx, &buffer.as_value(cx)))) } - StreamSource::Tee(state, second) => { - if state.reading.get() { + StreamSource::TeeDefault(state, second) => { + if state.common.reading.get() { state.read_again.set(true); return Ok(Some(Promise::resolved(cx, &Value::undefined_handle()))); } - state.reading.set(true); + state.common.reading.set(true); let state1 = Rc::clone(state); let state2 = Rc::clone(state); @@ -97,59 +98,276 @@ impl StreamSource { // TODO: CloneForBranch2 - if !state.cancelled[0].get() { - let branch = state.branch(cx, false)?; - if let Controller::Default(controller) = branch.native_controller(cx)? { - controller.enqueue_internal(cx, &chunk)?; - } + if !state.common.cancelled[0].get() { + let branch = state.common.branch(cx, false)?; + let controller = branch.native_controller(cx)?.into_default().unwrap(); + controller.enqueue_internal(cx, &chunk)?; } - if !state.cancelled[1].get() { - let branch = state.branch(cx, true)?; - if let Controller::Default(controller) = branch.native_controller(cx)? { - controller.enqueue_internal(cx, &chunk)?; - } + if !state.common.cancelled[1].get() { + let branch = state.common.branch(cx, true)?; + let controller = branch.native_controller(cx)?.into_default().unwrap(); + controller.enqueue_internal(cx, &chunk)?; } - state.reading.set(false); + state.common.reading.set(false); if state.read_again.get() { - let branch = state.branch(cx, second)?; - if let Controller::Default(controller) = branch.native_controller(cx)? { - controller.common.source.pull(cx, controller.reflector().get())?; - } + let branch = state.common.branch(cx, second)?; + let controller = branch.native_controller(cx)?.into_default().unwrap(); + controller.common.source.pull(cx, controller.reflector().get())?; } Ok(Value::undefined_handle()) }); }), close: Box::new(move |cx, _, _| { - state2.reading.set(false); + state2.common.reading.set(false); - if !state2.cancelled[0].get() { - let branch = state2.branch(cx, false)?; - if let Controller::Default(controller) = branch.native_controller(cx)? { - controller.close(cx)?; - } + if !state2.common.cancelled[0].get() { + let branch = state2.common.branch(cx, false)?; + branch.native_controller(cx)?.into_default().unwrap().close(cx)?; } - if !state2.cancelled[1].get() { - let branch = state2.branch(cx, true)?; - if let Controller::Default(controller) = branch.native_controller(cx)? { - controller.close(cx)?; - } - } - - if !state2.cancelled[0].get() || !state2.cancelled[1].get() { - let promise = Promise::from(unsafe { Local::from_heap(&state2.cancel_promise) }).unwrap(); - promise.resolve(cx, &Value::undefined_handle()); + if !state2.common.cancelled[1].get() { + let branch = state2.common.branch(cx, true)?; + branch.native_controller(cx)?.into_default().unwrap().close(cx)?; } + state2.common.cancel(cx, &Value::undefined_handle()); Ok(()) }), error: Box::new(move |_, _, _| { - state3.reading.set(false); + state3.common.reading.set(false); }), }; - if let Some(Reader::Default(reader)) = state.stream(cx)?.native_reader(cx)? { + let reader = state.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap(); + reader.read_internal(cx, request)?; + + promise.resolve(cx, &Value::undefined_handle()); + Ok(Some(promise)) + } + StreamSource::TeeBytes(state, second) => { + if state.common.reading.get() { + state.read_again[usize::from(*second)].set(true); + return Ok(Some(Promise::resolved(cx, &Value::undefined_handle()))); + } + + state.common.reading.set(true); + + let stream = state.common.stream(cx)?; + let controller = stream.native_controller(cx)?.into_byte_stream().unwrap(); + let byob_request = controller.get_byob_request(cx); + + let promise = Promise::new(cx); + if byob_request.is_null() { + if stream.reader_kind == ReaderKind::Byob { + { + let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap(); + assert!(reader.common.requests.is_empty()); + reader.release_lock(cx)?; + } + stream.get_reader(cx, Opt(None))?; + + let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap(); + forward_reader_error(cx, &reader.common.closed(), Rc::clone(state))?; + } + + let state1 = Rc::clone(state); + let state2 = Rc::clone(state); + let state3 = Rc::clone(state); + + let request = Request { + promise: Heap::boxed(promise.get()), + chunk: Box::new(move |cx, _, chunk| { + let promise = Promise::resolved(cx, &Value::undefined_handle()); + let chunk = TracedHeap::new(chunk.get()); + let state = Rc::clone(&state1); + + promise.then(cx, move |cx, _| { + state.read_again[0].set(false); + state.read_again[1].set(false); + + let chunk = Value::from(chunk.to_local()); + let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?; + let controller1 = + state.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap(); + let controller2 = + state.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap(); + + match (state.common.cancelled[0].get(), state.common.cancelled[1].get()) { + (false, false) => { + let chunk2 = chunk + .buffer(cx) + .clone(cx, chunk.offset(), chunk.len()) + .and_then(|buffer| { + Uint8Array::with_array_buffer(cx, &buffer, 0, buffer.len()) + }) + .map(|array| ArrayBufferView::from(array.into_local()).unwrap()); + + if let Some(chunk2) = chunk2 { + controller1.enqueue(cx, chunk)?; + controller2.enqueue(cx, chunk2)?; + } else { + let exception = Exception::new(cx).unwrap().as_value(cx); + controller1.error_internal(cx, &exception)?; + controller2.error_internal(cx, &exception)?; + state.common.cancel(cx, &Value::undefined_handle()); + } + + state.common.reading.set(false); + } + (false, true) => controller1.enqueue(cx, chunk)?, + (true, false) => controller2.enqueue(cx, chunk)?, + _ => {} + } + + state.common.reading.set(false); + if state.read_again[0].get() { + controller1.common.source.pull(cx, controller1.reflector().get())?; + } else if state.read_again[0].get() { + controller2.common.source.pull(cx, controller2.reflector().get())?; + } + + Ok(Value::undefined_handle()) + }); + }), + close: Box::new(move |cx, _, _| { + state2.common.reading.set(false); + + let controller1 = + state2.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap(); + let controller2 = + state2.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap(); + + if !state2.common.cancelled[0].get() { + controller1.close(cx)?; + } + if !state2.common.cancelled[1].get() { + controller2.close(cx)?; + } + + if !controller1.pending_descriptors.is_empty() { + controller1.respond(cx, 0)?; + } + if !controller2.pending_descriptors.is_empty() { + controller2.respond(cx, 0)?; + } + + state2.common.cancel(cx, &Value::undefined_handle()); + Ok(()) + }), + error: Box::new(move |_, _, _| { + state3.common.reading.set(false); + }), + }; + + let reader = state.common.stream(cx)?.native_reader(cx)?.unwrap().into_default().unwrap(); reader.read_internal(cx, request)?; + } else { + if stream.reader_kind == ReaderKind::Default { + { + let reader = stream.native_reader(cx)?.unwrap().into_default().unwrap(); + assert!(reader.common.requests.is_empty()); + reader.release_lock(cx)?; + } + stream.get_reader(cx, Opt(Some(ReaderOptions { mode: Some(String::from("byob")) })))?; + + let reader = stream.native_reader(cx)?.unwrap().into_byob().unwrap(); + forward_reader_error(cx, &reader.common.closed(), Rc::clone(state))?; + } + + let state1 = Rc::clone(state); + let state2 = Rc::clone(state); + let state3 = Rc::clone(state); + let second = *second; + + let request = Request { + promise: Heap::boxed(promise.get()), + chunk: Box::new(move |cx, _, chunk| { + let promise = Promise::resolved(cx, &Value::undefined_handle()); + let chunk = TracedHeap::new(chunk.get()); + let state = Rc::clone(&state1); + + promise.then(cx, move |cx, _| { + state.read_again[0].set(false); + state.read_again[1].set(false); + + let chunk = Value::from(chunk.to_local()); + let chunk = ArrayBufferView::from_value(cx, &chunk, true, ())?; + + let byob_controller = + state.common.branch(cx, second)?.native_controller(cx)?.into_byte_stream().unwrap(); + let other_controller = state + .common + .branch(cx, !second)? + .native_controller(cx)? + .into_byte_stream() + .unwrap(); + + if !state.common.cancelled[usize::from(!second)].get() { + let chunk2 = chunk + .buffer(cx) + .clone(cx, chunk.offset(), chunk.len()) + .and_then(|buffer| Uint8Array::with_array_buffer(cx, &buffer, 0, buffer.len())) + .map(|array| ArrayBufferView::from(array.into_local()).unwrap()); + + if let Some(chunk2) = chunk2 { + other_controller.enqueue(cx, chunk2)?; + } else { + let exception = Exception::new(cx).unwrap().as_value(cx); + byob_controller.error_internal(cx, &exception)?; + other_controller.error_internal(cx, &exception)?; + state.common.cancel(cx, &Value::undefined_handle()); + } + } + if !state.common.cancelled[usize::from(second)].get() { + byob_controller.respond_with_new_view(cx, chunk)?; + } + + state.common.reading.set(false); + if state.read_again[usize::from(second)].get() { + byob_controller.common.source.pull(cx, byob_controller.reflector().get())?; + } else if state.read_again[usize::from(!second)].get() { + other_controller.common.source.pull(cx, other_controller.reflector().get())?; + } + + Ok(Value::undefined_handle()) + }); + }), + close: Box::new(move |cx, _, _| { + state2.common.reading.set(false); + + let controller1 = + state2.common.branch(cx, false)?.native_controller(cx)?.into_byte_stream().unwrap(); + let controller2 = + state2.common.branch(cx, true)?.native_controller(cx)?.into_byte_stream().unwrap(); + + if !state2.common.cancelled[0].get() { + controller1.close(cx)?; + } + if !state2.common.cancelled[1].get() { + controller2.close(cx)?; + } + + if !controller1.pending_descriptors.is_empty() { + controller1.respond(cx, 0)?; + } + if !controller2.pending_descriptors.is_empty() { + controller2.respond(cx, 0)?; + } + + state2.common.cancel(cx, &Value::undefined_handle()); + Ok(()) + }), + error: Box::new(move |_, _, _| { + state3.common.reading.set(false); + }), + }; + + let reader = state.common.stream(cx)?.native_reader(cx)?.unwrap().into_byob().unwrap(); + let byob_request = Object::from(cx.root(byob_request)); + let byob_request = ByobRequest::get_private(cx, &byob_request)?; + + let view = ArrayBufferView::from(cx.root(byob_request.get_view())).unwrap(); + reader.read_internal(cx, view, request)?; } promise.resolve(cx, &Value::undefined_handle()); @@ -174,22 +392,22 @@ impl StreamSource { promise.resolve(cx, &Value::undefined_handle()); } } - StreamSource::Tee(state, second) => { + StreamSource::TeeDefault(state, second) => { let reason = reason.unwrap_or_else(Value::undefined_handle); let branch = usize::from(*second); - state.cancelled[branch].set(true); - state.reason[branch].set(reason.get()); + state.common.cancelled[branch].set(true); + state.common.reason[branch].set(reason.get()); - if state.cancelled[usize::from(!*second)].get() { - let composite = [state.reason[0].get(), state.reason[1].get()].as_value(cx); - let result = state.stream(cx)?.cancel(cx, Opt(Some(composite)))?; + if state.common.cancelled[usize::from(!*second)].get() { + let composite = [state.common.reason[0].get(), state.common.reason[1].get()].as_value(cx); + let result = state.common.stream(cx)?.cancel(cx, Opt(Some(composite)))?; - let cancel_promise = Promise::from(unsafe { Local::from_heap(&state.cancel_promise) }).unwrap(); + let cancel_promise = state.common.cancel_promise(); cancel_promise.resolve(cx, &result.as_value(cx)); } - promise.handle_mut().set(state.cancel_promise.get()); + promise.handle_mut().set(state.common.cancel_promise.get()); } _ => {} } @@ -215,43 +433,94 @@ impl StreamSource { } #[derive(Traceable)] -pub struct TeeState { - pub(crate) reading: Cell, - pub(crate) read_again: Cell, - pub(crate) cancelled: [Cell; 2], +pub(crate) struct TeeCommonState { + stream: Box>, + pub(crate) branch: [Box>; 2], - pub(crate) clone_branch_2: bool, - pub(crate) stream: Box>, + reading: Cell, + cancelled: [Cell; 2], - pub(crate) branch: [Box>; 2], - pub(crate) reason: [Box>; 2], - pub(crate) cancel_promise: Box>, + reason: [Box>; 2], + cancel_promise: Box>, } -impl TeeState { - pub(crate) fn new(cx: &Context, stream: &ReadableStream, clone_branch_2: bool) -> TeeState { +impl TeeCommonState { + pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeCommonState { let promise = Promise::new(cx); - TeeState { + TeeCommonState { + stream: Heap::boxed(stream.reflector.get()), + branch: [Box::default(), Box::default()], + reading: Cell::new(false), - read_again: Cell::new(false), cancelled: [Cell::new(false), Cell::new(false)], - clone_branch_2, - stream: Heap::boxed(stream.reflector.get()), - - branch: [Box::default(), Box::default()], reason: [Heap::boxed(UndefinedValue()), Heap::boxed(UndefinedValue())], cancel_promise: Heap::boxed(promise.get()), } } - pub(crate) fn stream(&self, cx: &Context) -> ion::Result<&mut ReadableStream> { + pub(crate) fn stream(&self, cx: &Context) -> Result<&mut ReadableStream> { let stream = Object::from(unsafe { Local::from_heap(&self.stream) }); ReadableStream::get_mut_private(cx, &stream) } - pub(crate) fn branch(&self, cx: &Context, second: bool) -> ion::Result<&mut ReadableStream> { + pub(crate) fn branch(&self, cx: &Context, second: bool) -> Result<&mut ReadableStream> { let stream = Object::from(unsafe { Local::from_heap(&self.branch[usize::from(second)]) }); ReadableStream::get_mut_private(cx, &stream) } + + pub(crate) fn cancel_promise(&self) -> Promise { + Promise::from(unsafe { Local::from_heap(&self.cancel_promise) }).unwrap() + } + + pub(crate) fn cancel(&self, cx: &Context, value: &Value) { + if !self.cancelled[0].get() || !self.cancelled[1].get() { + self.cancel_promise().resolve(cx, value); + } + } +} + +#[derive(Traceable)] +pub struct TeeDefaultState { + pub(crate) common: TeeCommonState, + clone_branch_2: bool, + read_again: Cell, +} + +impl TeeDefaultState { + pub(crate) fn new(cx: &Context, stream: &ReadableStream, clone_branch_2: bool) -> TeeDefaultState { + TeeDefaultState { + common: TeeCommonState::new(cx, stream), + clone_branch_2, + read_again: Cell::new(false), + } + } +} + +#[derive(Traceable)] +pub struct TeeBytesState { + pub(crate) common: TeeCommonState, + read_again: [Cell; 2], +} + +impl TeeBytesState { + pub(crate) fn new(cx: &Context, stream: &ReadableStream) -> TeeBytesState { + TeeBytesState { + common: TeeCommonState::new(cx, stream), + read_again: [Cell::new(false), Cell::new(false)], + } + } +} + +pub(crate) fn forward_reader_error(cx: &Context, closed_promise: &Promise, state: Rc) -> Result<()> { + let controller1 = TracedHeap::new(state.common.branch(cx, false)?.controller.get()); + let controller2 = TracedHeap::new(state.common.branch(cx, true)?.controller.get()); + + closed_promise.catch(cx, move |cx, reason| { + ByteStreamController::from_traced_heap(cx, &controller1)?.error_internal(cx, reason)?; + ByteStreamController::from_traced_heap(cx, &controller2)?.error_internal(cx, reason)?; + state.common.cancel(cx, &Value::undefined_handle()); + Ok(Value::undefined_handle()) + }); + Ok(()) }