Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
isahers1 committed Dec 10, 2024
1 parent 8f6bf08 commit 4c5dd6c
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
5 changes: 5 additions & 0 deletions js/src/types/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Make this a type to override ReadableStream's async iterator type in case
// the popular web-streams-polyfill is imported - the supplied types
// in that case don't quite match.
export type IterableReadableStreamInterface<T> = ReadableStream<T> &
AsyncIterable<T>;
118 changes: 118 additions & 0 deletions js/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import type { IterableReadableStreamInterface } from "../types/stream.js"

// Re-exported for backwards compatibility
// Do NOT import this type from this file inside the project. Instead, always import from `types/stream.js`
export type { IterableReadableStreamInterface };

/*
* Support async iterator syntax for ReadableStreams in all environments.
* Source: https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export class IterableReadableStream<T>
extends ReadableStream<T>
implements IterableReadableStreamInterface<T>
{
public reader: ReadableStreamDefaultReader<T>;

ensureReader() {
if (!this.reader) {
this.reader = this.getReader();
}
}

async next(): Promise<IteratorResult<T>> {
this.ensureReader();
try {
const result = await this.reader.read();
if (result.done) {
this.reader.releaseLock(); // release lock when stream becomes closed
return {
done: true,
value: undefined,
};
} else {
return {
done: false,
value: result.value,
};
}
} catch (e) {
this.reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
}

async return(): Promise<IteratorResult<T>> {
this.ensureReader();
// If wrapped in a Node stream, cancel is already called.
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
return { done: true, value: undefined };
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async throw(e: any): Promise<IteratorResult<T>> {
this.ensureReader();
if (this.locked) {
const cancelPromise = this.reader.cancel(); // cancel first, but don't await yet
this.reader.releaseLock(); // release lock first
await cancelPromise; // now await it
}
throw e;
}

[Symbol.asyncIterator]() {
return this;
}

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore Not present in Node 18 types, required in latest Node 22
async [Symbol.asyncDispose]() {
await this.return();
}

static fromReadableStream<T>(stream: ReadableStream<T>) {
// From https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#reading_the_stream
const reader = stream.getReader();
return new IterableReadableStream<T>({
start(controller) {
return pump();
function pump(): Promise<T | undefined> {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
cancel() {
reader.releaseLock();
},
});
}

static fromAsyncGenerator<T>(generator: AsyncGenerator<T>) {
return new IterableReadableStream<T>({
async pull(controller) {
const { value, done } = await generator.next();
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
}
// Fix: `else if (value)` will hang the streaming when nullish value (e.g. empty string) is pulled
controller.enqueue(value);
},
async cancel(reason) {
await generator.return(reason);
},
});
}
}

0 comments on commit 4c5dd6c

Please sign in to comment.