Skip to content

Commit

Permalink
Added Options for Readable Stream BYOB Reader
Browse files Browse the repository at this point in the history
Added Readable Stream Bindings
  • Loading branch information
Redfire75369 committed Aug 11, 2024
1 parent 5084363 commit d589180
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 14 deletions.
108 changes: 108 additions & 0 deletions bindings/globals/flow/streams/readable.js.flow
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// @flow

declare type UnderlyingSourceStartCallback = (controller: ReadableStreamController) => any;
declare type UnderlyingSourcePullCallback = (controller: ReadableStreamController) => Promise<void>;
declare type UnderlyingSourceCancelCallback = (reason?: any) => Promise<void>;

declare type ReadableStreamType = "bytes";

declare interface UnderlyingSource {
start?: UnderlyingSourceStartCallback,
pull?: UnderlyingSourcePullCallback,
cancel?: UnderlyingSourceCancelCallback,
type?: ReadableStreamType,
autoAllocateChunkSize?: number,
}

declare type QueueingStrategySize = (chunk: any) => number;

declare interface QueueingStrategy {
highWaterMark?: number,
size: QueueingStrategySize,
}

declare type ReadableStreamReaderMode = "byob";

declare interface ReadableStreamGetReaderOptions {
mode?: ReadableStreamReaderMode
}

declare class ReadableStream {
constructor(underlyingSource?: UnderlyingSource, strategy?: QueueingStrategy): ReadableStream;

get locked(): boolean;

cancel(reason?: any): Promise<void>;

getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader;

tee(): [ReadableStream, ReadableStream];
}

declare interface ReadableStreamReadResult {
value: any,
done: boolean,
}

declare type ReadableStreamGenericReader = {
get closed(): Promise<void>,
cancel(reason?: any): Promise<void>,
}

declare class ReadableStreamDefaultReader implements ReadableStreamGenericReader {
constructor(stream: ReadableStream): ReadableStreamDefaultReader;

read(): Promise<ReadableStreamReadResult>;

releaseLock(): void;
}

declare var reader: ReadableStreamDefaultReader;

declare interface ReadableStreamBYOBReaderReadOptions {
min?: number,
}

declare class ReadableStreamBYOBReader implements ReadableStreamGenericReader {
constructor(stream: ReadableStream): ReadableStreamBYOBReader;

read(view: ArrayBufferView, options?: ReadableStreamBYOBReaderReadOptions): Promise<ReadableStreamReadResult>;

releaseLock(): void;
}

declare type ReadableStreamReader = ReadableStreamDefaultReader | ReadableStreamBYOBReader;

declare class ReadableStreamDefaultController {
get desiredSize(): number | null;

close(): void;

enqueue(chunk?: any): void;

error(e?: any): void;
}


declare class ReadableByteStreamController {
get byobRequest(): ReadableStreamBYOBRequest | null;

get desiredSize(): number | null;

close(): void;

enqueue(chunk: ArrayBufferView): void;

error(e?: any): void;
}

declare type ReadableStreamController = ReadableStreamDefaultController | ReadableByteStreamController;

declare class ReadableStreamBYOBRequest {
get view(): ArrayBufferView | null;

respond(bytesWritten: number): void;

respondInto(view: ArrayBufferView): void;
}

104 changes: 104 additions & 0 deletions bindings/globals/typescript/streams/readable.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
declare type UnderlyingSourceStartCallback = (controller: ReadableStreamController) => any;
declare type UnderlyingSourcePullCallback = (controller: ReadableStreamController) => Promise<void>;
declare type UnderlyingSourceCancelCallback = (reason?: any) => Promise<void>;

declare type ReadableStreamType = "bytes";

declare interface UnderlyingSource {
start?: UnderlyingSourceStartCallback,
pull?: UnderlyingSourcePullCallback,
cancel?: UnderlyingSourceCancelCallback,
type?: ReadableStreamType,
autoAllocateChunkSize?: number,
}

declare type QueueingStrategySize = (chunk: any) => number;

declare interface QueueingStrategy {
highWaterMark?: number,
size: QueueingStrategySize,
}

declare type ReadableStreamReaderMode = "byob";

declare interface ReadableStreamGetReaderOptions {
mode?: ReadableStreamReaderMode
}

declare class ReadableStream {
constructor(underlyingSource?: UnderlyingSource, strategy?: QueueingStrategy);

get locked(): boolean;

cancel(reason?: any): Promise<void>;

getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader;

tee(): [ReadableStream, ReadableStream];
}

declare interface ReadableStreamReadResult {
value: any,
done: boolean,
}

declare abstract class ReadableStreamGenericReader {
get closed(): Promise<void>;

cancel(reason?: any): Promise<void>;
}

declare class ReadableStreamDefaultReader extends ReadableStreamGenericReader {
constructor(stream: ReadableStream);

read(): Promise<ReadableStreamReadResult>;

releaseLock(): void;
}

declare interface ReadableStreamBYOBReaderReadOptions {
min?: number,
}

declare class ReadableStreamBYOBReader extends ReadableStreamGenericReader {
constructor(stream: ReadableStream);

read(view: ArrayBufferView, options?: ReadableStreamBYOBReaderReadOptions): Promise<ReadableStreamReadResult>;

releaseLock(): void;
}

declare type ReadableStreamReader = ReadableStreamDefaultReader | ReadableStreamBYOBReader;

declare class ReadableStreamDefaultController {
get desiredSize(): number | null;

close(): void;

enqueue(chunk?: any): void;

error(e?: any): void;
}


declare class ReadableByteStreamController {
get byobRequest(): ReadableStreamBYOBRequest | null;

get desiredSize(): number | null;

close(): void;

enqueue(chunk: ArrayBufferView): void;

error(e?: any): void;
}

declare type ReadableStreamController = ReadableStreamDefaultController | ReadableByteStreamController;

declare class ReadableStreamBYOBRequest {
get view(): ArrayBufferView | null;

respond(bytesWritten: number): void;

respondInto(view: ArrayBufferView): void;
}
1 change: 0 additions & 1 deletion ion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ workspace = true
optional = true

[features]
default = []
debugmozjs = ["mozjs/debugmozjs"]
macros = ["dep:ion-proc"]
sourcemap = ["dep:sourcemap"]
Expand Down
16 changes: 10 additions & 6 deletions runtime/src/globals/streams/readable/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(crate) struct PullIntoDescriptor {
pub(crate) offset: usize,
pub(crate) length: usize,
pub(crate) filled: usize,
pub(crate) min: usize,
pub(crate) element: usize,
pub(crate) constructor: unsafe extern "C" fn(*mut JSContext, Handle<*mut JSObject>, usize, i64) -> *mut JSObject,
pub(crate) kind: ReaderKind,
Expand Down Expand Up @@ -197,6 +198,7 @@ impl<'c> Controller<'c> {
offset: 0,
length: controller.auto_allocate_chunk_size,
filled: 0,
min: 1,
element: 1,
constructor: type_to_constructor(Type::Uint8),
kind: ReaderKind::Default,
Expand Down Expand Up @@ -574,11 +576,10 @@ impl ByteStreamController {
pub(crate) fn fill_pull_into_descriptor(
&mut self, cx: &Context, descriptor: &mut PullIntoDescriptor,
) -> Result<bool> {
let aligned = descriptor.filled - descriptor.filled % descriptor.element;
let max_copy = self.common.queue_size.min(descriptor.length - descriptor.filled);
let max_aligned = descriptor.filled + max_copy - (descriptor.filled + max_copy) % descriptor.element;

let ready = max_aligned > aligned;
let ready = max_aligned > descriptor.min;

let mut remaining = if ready {
max_aligned - descriptor.filled
Expand Down Expand Up @@ -708,24 +709,27 @@ impl ByteStreamController {

match stream.state {
State::Readable => {
let mut descriptor = self.pending_descriptors.pop_front().unwrap();
let descriptor = self.pending_descriptors.front_mut().unwrap();
descriptor.filled += written;

let PullIntoDescriptor { filled, offset, length, min, .. } = *descriptor;

match kind {
ReaderKind::None => {
if descriptor.filled > 0 {
self.enqueue_cloned_chunk(cx, &buffer, descriptor.offset, descriptor.length)?;
if filled > 0 {
self.enqueue_cloned_chunk(cx, &buffer, offset, length)?;
}

if let Some(Reader::Byob(reader)) = stream.native_reader(cx)? {
self.process_descriptors(cx, reader, stream.state)?;
}
}
_ => {
if descriptor.filled < descriptor.element {
if filled < min {
return Ok(());
}

let mut descriptor = self.pending_descriptors.pop_front().unwrap();
let remainder = descriptor.filled % descriptor.element;

if remainder > 0 {
Expand Down
54 changes: 48 additions & 6 deletions runtime/src/globals/streams/readable/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use std::collections::VecDeque;

use mozjs::conversions::ConversionBehavior;
use mozjs::jsapi::{Heap, JSObject};

use ion::{ClassDefinition, Context, Error, ErrorKind, Local, Object, Promise, Result, ResultExc, Value};
Expand Down Expand Up @@ -286,6 +287,18 @@ impl DefaultReader {
}
}

#[derive(FromValue)]
pub struct ByobReadOptions {
#[ion(convert = ConversionBehavior::EnforceRange, default = 1)]
min: u64,
}

impl Default for ByobReadOptions {
fn default() -> ByobReadOptions {
ByobReadOptions { min: 1 }
}
}

#[js_class]
#[ion(name = "ReadableStreamBYOBReader")]
pub struct ByobReader {
Expand Down Expand Up @@ -326,7 +339,7 @@ impl ByobReader {
}

pub(crate) fn read_internal<'cx>(
&mut self, cx: &'cx Context, view: ArrayBufferView, request: Request,
&mut self, cx: &'cx Context, view: ArrayBufferView, min: usize, request: Request,
) -> ResultExc<Promise<'cx>> {
let stream = self.common.stream(cx)?.unwrap();
let promise = Promise::from(cx.root(request.promise.get())).unwrap();
Expand All @@ -352,6 +365,7 @@ impl ByobReader {
offset,
length: length * element_size,
filled: 0,
min: min * element_size,
element: element_size,
constructor,
kind: ReaderKind::Byob,
Expand Down Expand Up @@ -411,25 +425,53 @@ impl ByobReader {
self.common.cancel(cx, reason)
}

pub fn read<'cx>(&mut self, cx: &'cx Context, view: ArrayBufferView) -> ResultExc<Promise<'cx>> {
pub fn read<'cx>(
&mut self, cx: &'cx Context, view: ArrayBufferView, Opt(options): Opt<ByobReadOptions>,
) -> ResultExc<Promise<'cx>> {
let promise = Promise::new(cx);
let request = Request::standard(promise.get());
if self.common.stream.is_some() {
if view.is_empty() {
return Err(Error::new("View must not be empty.", ErrorKind::Type).into());
promise.reject(cx, &Error::new("View must not be empty.", ErrorKind::Type).as_value(cx));
return Ok(promise);
}

let buffer = view.buffer(cx);

if buffer.is_empty() {
return Err(Error::new("Buffer must contain bytes.", ErrorKind::Type).into());
promise.reject(
cx,
&Error::new("Buffer must contain bytes.", ErrorKind::Type).as_value(cx),
);
return Ok(promise);
}

if buffer.is_detached() {
return Err(Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).into());
promise.reject(
cx,
&Error::new("ArrayBuffer must not be detached.", ErrorKind::Type).as_value(cx),
);
return Ok(promise);
}

let options = options.unwrap_or_default();
if options.min == 0 {
promise.reject(
cx,
&Error::new("min must be greater than 0.", ErrorKind::Type).as_value(cx),
);
return Ok(promise);
}

if options.min > view.len() as u64 {
promise.reject(
cx,
&Error::new("min is greater than View Length", ErrorKind::Range).as_value(cx),
);
return Ok(promise);
}

self.read_internal(cx, view, request)
self.read_internal(cx, view, options.min as usize, request)
} else {
(request.error)(
cx,
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/globals/streams/readable/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl StreamSource {
let byob_request = ByobRequest::get_private(cx, &byob_request)?;

let view = ArrayBufferView::from(cx.root(byob_request.get_view())).unwrap();
reader.read_internal(cx, view, request)?;
reader.read_internal(cx, view, 1, request)?;
}

promise.resolve(cx, &Value::undefined_handle());
Expand Down

0 comments on commit d589180

Please sign in to comment.