From d589180ee010d48006978a427b05071f7427ccf5 Mon Sep 17 00:00:00 2001 From: Redfire Date: Sun, 11 Aug 2024 23:54:55 +0800 Subject: [PATCH] Added Options for Readable Stream BYOB Reader Added Readable Stream Bindings --- .../globals/flow/streams/readable.js.flow | 108 ++++++++++++++++++ .../globals/typescript/streams/readable.d.ts | 104 +++++++++++++++++ ion/Cargo.toml | 1 - .../globals/streams/readable/controller.rs | 16 ++- .../src/globals/streams/readable/reader.rs | 54 ++++++++- .../src/globals/streams/readable/source.rs | 2 +- 6 files changed, 271 insertions(+), 14 deletions(-) create mode 100644 bindings/globals/flow/streams/readable.js.flow create mode 100644 bindings/globals/typescript/streams/readable.d.ts diff --git a/bindings/globals/flow/streams/readable.js.flow b/bindings/globals/flow/streams/readable.js.flow new file mode 100644 index 00000000..16518f49 --- /dev/null +++ b/bindings/globals/flow/streams/readable.js.flow @@ -0,0 +1,108 @@ +// @flow + +declare type UnderlyingSourceStartCallback = (controller: ReadableStreamController) => any; +declare type UnderlyingSourcePullCallback = (controller: ReadableStreamController) => Promise; +declare type UnderlyingSourceCancelCallback = (reason?: any) => Promise; + +declare type ReadableStreamType = "bytes"; + +declare interface UnderlyingSource { + start?: UnderlyingSourceStartCallback, + pull?: UnderlyingSourcePullCallback, + cancel?: UnderlyingSourceCancelCallback, + type?: ReadableStreamType, + autoAllocateChunkSize?: number, +} + +declare type QueueingStrategySize = (chunk: any) => number; + +declare interface QueueingStrategy { + highWaterMark?: number, + size: QueueingStrategySize, +} + +declare type ReadableStreamReaderMode = "byob"; + +declare interface ReadableStreamGetReaderOptions { + mode?: ReadableStreamReaderMode +} + +declare class ReadableStream { + constructor(underlyingSource?: UnderlyingSource, strategy?: QueueingStrategy): ReadableStream; + + get locked(): boolean; + + cancel(reason?: any): Promise; + + getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader; + + tee(): [ReadableStream, ReadableStream]; +} + +declare interface ReadableStreamReadResult { + value: any, + done: boolean, +} + +declare type ReadableStreamGenericReader = { + get closed(): Promise, + cancel(reason?: any): Promise, +} + +declare class ReadableStreamDefaultReader implements ReadableStreamGenericReader { + constructor(stream: ReadableStream): ReadableStreamDefaultReader; + + read(): Promise; + + releaseLock(): void; +} + +declare var reader: ReadableStreamDefaultReader; + +declare interface ReadableStreamBYOBReaderReadOptions { + min?: number, +} + +declare class ReadableStreamBYOBReader implements ReadableStreamGenericReader { + constructor(stream: ReadableStream): ReadableStreamBYOBReader; + + read(view: ArrayBufferView, options?: ReadableStreamBYOBReaderReadOptions): Promise; + + releaseLock(): void; +} + +declare type ReadableStreamReader = ReadableStreamDefaultReader | ReadableStreamBYOBReader; + +declare class ReadableStreamDefaultController { + get desiredSize(): number | null; + + close(): void; + + enqueue(chunk?: any): void; + + error(e?: any): void; +} + + +declare class ReadableByteStreamController { + get byobRequest(): ReadableStreamBYOBRequest | null; + + get desiredSize(): number | null; + + close(): void; + + enqueue(chunk: ArrayBufferView): void; + + error(e?: any): void; +} + +declare type ReadableStreamController = ReadableStreamDefaultController | ReadableByteStreamController; + +declare class ReadableStreamBYOBRequest { + get view(): ArrayBufferView | null; + + respond(bytesWritten: number): void; + + respondInto(view: ArrayBufferView): void; +} + diff --git a/bindings/globals/typescript/streams/readable.d.ts b/bindings/globals/typescript/streams/readable.d.ts new file mode 100644 index 00000000..68fd8a71 --- /dev/null +++ b/bindings/globals/typescript/streams/readable.d.ts @@ -0,0 +1,104 @@ +declare type UnderlyingSourceStartCallback = (controller: ReadableStreamController) => any; +declare type UnderlyingSourcePullCallback = (controller: ReadableStreamController) => Promise; +declare type UnderlyingSourceCancelCallback = (reason?: any) => Promise; + +declare type ReadableStreamType = "bytes"; + +declare interface UnderlyingSource { + start?: UnderlyingSourceStartCallback, + pull?: UnderlyingSourcePullCallback, + cancel?: UnderlyingSourceCancelCallback, + type?: ReadableStreamType, + autoAllocateChunkSize?: number, +} + +declare type QueueingStrategySize = (chunk: any) => number; + +declare interface QueueingStrategy { + highWaterMark?: number, + size: QueueingStrategySize, +} + +declare type ReadableStreamReaderMode = "byob"; + +declare interface ReadableStreamGetReaderOptions { + mode?: ReadableStreamReaderMode +} + +declare class ReadableStream { + constructor(underlyingSource?: UnderlyingSource, strategy?: QueueingStrategy); + + get locked(): boolean; + + cancel(reason?: any): Promise; + + getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader; + + tee(): [ReadableStream, ReadableStream]; +} + +declare interface ReadableStreamReadResult { + value: any, + done: boolean, +} + +declare abstract class ReadableStreamGenericReader { + get closed(): Promise; + + cancel(reason?: any): Promise; +} + +declare class ReadableStreamDefaultReader extends ReadableStreamGenericReader { + constructor(stream: ReadableStream); + + read(): Promise; + + releaseLock(): void; +} + +declare interface ReadableStreamBYOBReaderReadOptions { + min?: number, +} + +declare class ReadableStreamBYOBReader extends ReadableStreamGenericReader { + constructor(stream: ReadableStream); + + read(view: ArrayBufferView, options?: ReadableStreamBYOBReaderReadOptions): Promise; + + releaseLock(): void; +} + +declare type ReadableStreamReader = ReadableStreamDefaultReader | ReadableStreamBYOBReader; + +declare class ReadableStreamDefaultController { + get desiredSize(): number | null; + + close(): void; + + enqueue(chunk?: any): void; + + error(e?: any): void; +} + + +declare class ReadableByteStreamController { + get byobRequest(): ReadableStreamBYOBRequest | null; + + get desiredSize(): number | null; + + close(): void; + + enqueue(chunk: ArrayBufferView): void; + + error(e?: any): void; +} + +declare type ReadableStreamController = ReadableStreamDefaultController | ReadableByteStreamController; + +declare class ReadableStreamBYOBRequest { + get view(): ArrayBufferView | null; + + respond(bytesWritten: number): void; + + respondInto(view: ArrayBufferView): void; +} diff --git a/ion/Cargo.toml b/ion/Cargo.toml index d9319f13..d78ce84a 100644 --- a/ion/Cargo.toml +++ b/ion/Cargo.toml @@ -34,7 +34,6 @@ workspace = true optional = true [features] -default = [] debugmozjs = ["mozjs/debugmozjs"] macros = ["dep:ion-proc"] sourcemap = ["dep:sourcemap"] diff --git a/runtime/src/globals/streams/readable/controller.rs b/runtime/src/globals/streams/readable/controller.rs index b49ae295..918fc5cc 100644 --- a/runtime/src/globals/streams/readable/controller.rs +++ b/runtime/src/globals/streams/readable/controller.rs @@ -30,6 +30,7 @@ pub(crate) struct PullIntoDescriptor { pub(crate) offset: usize, pub(crate) length: usize, pub(crate) filled: usize, + pub(crate) min: usize, pub(crate) element: usize, pub(crate) constructor: unsafe extern "C" fn(*mut JSContext, Handle<*mut JSObject>, usize, i64) -> *mut JSObject, pub(crate) kind: ReaderKind, @@ -197,6 +198,7 @@ impl<'c> Controller<'c> { offset: 0, length: controller.auto_allocate_chunk_size, filled: 0, + min: 1, element: 1, constructor: type_to_constructor(Type::Uint8), kind: ReaderKind::Default, @@ -574,11 +576,10 @@ impl ByteStreamController { pub(crate) fn fill_pull_into_descriptor( &mut self, cx: &Context, descriptor: &mut PullIntoDescriptor, ) -> Result { - let aligned = descriptor.filled - descriptor.filled % descriptor.element; let max_copy = self.common.queue_size.min(descriptor.length - descriptor.filled); let max_aligned = descriptor.filled + max_copy - (descriptor.filled + max_copy) % descriptor.element; - let ready = max_aligned > aligned; + let ready = max_aligned > descriptor.min; let mut remaining = if ready { max_aligned - descriptor.filled @@ -708,13 +709,15 @@ impl ByteStreamController { match stream.state { State::Readable => { - let mut descriptor = self.pending_descriptors.pop_front().unwrap(); + let descriptor = self.pending_descriptors.front_mut().unwrap(); descriptor.filled += written; + let PullIntoDescriptor { filled, offset, length, min, .. } = *descriptor; + match kind { ReaderKind::None => { - if descriptor.filled > 0 { - self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?; + if filled > 0 { + self.enqueue_cloned_chunk(cx, &buffer, offset, length)?; } if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? { @@ -722,10 +725,11 @@ impl ByteStreamController { } } _ => { - if descriptor.filled < descriptor.element { + if filled < min { return Ok(()); } + let mut descriptor = self.pending_descriptors.pop_front().unwrap(); let remainder = descriptor.filled % descriptor.element; if remainder > 0 { diff --git a/runtime/src/globals/streams/readable/reader.rs b/runtime/src/globals/streams/readable/reader.rs index b8a8db30..7600e153 100644 --- a/runtime/src/globals/streams/readable/reader.rs +++ b/runtime/src/globals/streams/readable/reader.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; +use mozjs::conversions::ConversionBehavior; use mozjs::jsapi::{Heap, JSObject}; use ion::{ClassDefinition, Context, Error, ErrorKind, Local, Object, Promise, Result, ResultExc, Value}; @@ -286,6 +287,18 @@ impl DefaultReader { } } +#[derive(FromValue)] +pub struct ByobReadOptions { + #[ion(convert = ConversionBehavior::EnforceRange, default = 1)] + min: u64, +} + +impl Default for ByobReadOptions { + fn default() -> ByobReadOptions { + ByobReadOptions { min: 1 } + } +} + #[js_class] #[ion(name = "ReadableStreamBYOBReader")] pub struct ByobReader { @@ -326,7 +339,7 @@ impl ByobReader { } pub(crate) fn read_internal<'cx>( - &mut self, cx: &'cx Context, view: ArrayBufferView, request: Request, + &mut self, cx: &'cx Context, view: ArrayBufferView, min: usize, request: Request, ) -> ResultExc> { let stream = self.common.stream(cx)?.unwrap(); let promise = Promise::from(cx.root(request.promise.get())).unwrap(); @@ -352,6 +365,7 @@ impl ByobReader { offset, length: length * element_size, filled: 0, + min: min * element_size, element: element_size, constructor, kind: ReaderKind::Byob, @@ -411,25 +425,53 @@ impl ByobReader { self.common.cancel(cx, reason) } - pub fn read<'cx>(&mut self, cx: &'cx Context, view: ArrayBufferView) -> ResultExc> { + pub fn read<'cx>( + &mut self, cx: &'cx Context, view: ArrayBufferView, Opt(options): Opt, + ) -> ResultExc> { let promise = Promise::new(cx); let request = Request::standard(promise.get()); if self.common.stream.is_some() { if view.is_empty() { - return Err(Error::new("View must not be empty.", ErrorKind::Type).into()); + promise.reject(cx, &Error::new("View must not be empty.", ErrorKind::Type).as_value(cx)); + return Ok(promise); } let buffer = view.buffer(cx); if buffer.is_empty() { - return Err(Error::new("Buffer must contain bytes.", ErrorKind::Type).into()); + promise.reject( + cx, + &Error::new("Buffer must contain bytes.", ErrorKind::Type).as_value(cx), + ); + return Ok(promise); } if buffer.is_detached() { - return Err(Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).into()); + promise.reject( + cx, + &Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).as_value(cx), + ); + return Ok(promise); + } + + let options = options.unwrap_or_default(); + if options.min == 0 { + promise.reject( + cx, + &Error::new("min must be greater than 0.", ErrorKind::Type).as_value(cx), + ); + return Ok(promise); + } + + if options.min > view.len() as u64 { + promise.reject( + cx, + &Error::new("min is greater than View Length", ErrorKind::Range).as_value(cx), + ); + return Ok(promise); } - self.read_internal(cx, view, request) + self.read_internal(cx, view, options.min as usize, request) } else { (request.error)( cx, diff --git a/runtime/src/globals/streams/readable/source.rs b/runtime/src/globals/streams/readable/source.rs index 7998dafd..f33eed13 100644 --- a/runtime/src/globals/streams/readable/source.rs +++ b/runtime/src/globals/streams/readable/source.rs @@ -400,7 +400,7 @@ impl StreamSource { let byob_request = ByobRequest::get_private(cx, &byob_request)?; let view = ArrayBufferView::from(cx.root(byob_request.get_view())).unwrap(); - reader.read_internal(cx, view, request)?; + reader.read_internal(cx, view, 1, request)?; } promise.resolve(cx, &Value::undefined_handle());