diff --git a/runtime/src/globals/streams/readable/source.rs b/runtime/src/globals/streams/readable/source.rs index 81253677..10a4280c 100644 --- a/runtime/src/globals/streams/readable/source.rs +++ b/runtime/src/globals/streams/readable/source.rs @@ -21,7 +21,10 @@ use ion::clone::StructuredCloneBuffer; use ion::conversions::{FromValue, ToValue}; use ion::function::Opt; use ion::typedarray::{ArrayBuffer, ArrayBufferView, Uint8Array}; -use ion::{ClassDefinition, Context, Exception, Function, Local, Object, Promise, Result, ResultExc, TracedHeap, Value}; +use ion::{ + ClassDefinition, Context, Exception, Function, JSIterator, Local, Object, Promise, Result, ResultExc, TracedHeap, + Value, +}; #[derive(Traceable)] pub enum StreamSource { @@ -33,6 +36,7 @@ pub enum StreamSource { }, Bytes(#[trace(no_trace)] Option), BytesBuf(#[trace(no_trace)] Option>), + Iterator(#[trace(no_trace)] Box, Option>>), TeeDefault(Rc, bool), TeeBytes(Rc, bool), } @@ -72,6 +76,10 @@ impl StreamSource { buf.advance(chunk.len()); Ok(Some(Promise::resolved(cx, &buffer.as_value(cx)))) } + StreamSource::Iterator(iterator, Some(data)) => { + let data = Value::from(unsafe { Local::from_heap(data) }); + Ok(iterator.next_value(cx, &data).map(|value| Promise::resolved(cx, &value))) + } StreamSource::TeeDefault(state, second) => { if state.common.reading.get() { state.read_again.set(true); @@ -461,6 +469,9 @@ impl StreamSource { StreamSource::BytesBuf(buf) => { *buf = None; } + StreamSource::Iterator(_, data) => { + *data = None; + } _ => {} } }