Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream ReadableStream instances #51

Closed
wants to merge 6 commits into from

Conversation

phryneas
Copy link

@phryneas phryneas commented Nov 8, 2024

This allows for ReadableStream instances to be passed through the stream.

@@ -238,22 +271,24 @@ export function encode(
if (Array.isArray(id)) {
controller.enqueue(
textEncoder.encode(
`${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
`${TYPE_PROMISE}${TYPE_ERROR}${deferredId}:[["${TYPE_PREVIOUS_RESOLVED}",${id[0]}]]\n`
Copy link
Author

@phryneas phryneas Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did change the existing transport format a bit here - this is not strictly necessary, but this way, we have less of an explosion of types & type cases, and a bit more consistency.

It went from
E5 to PE5 to signal that Promise 5 Errored.

Further down I introduced
TE5 to signal that sTream 5 Errored.

This "combination of characters" is kinda necessary as otherwise I would have to introduce another new signal for "stream done" as well as "stream errored" and re-use a few characters in ways that do not necessarily make sense

So instead, further down I introduce

  • T5 for "new value in sTream 5"
  • TO5 for "sTream 5 is dOne"
  • TE5 for "sTream 5 Errored"

if (done) {
controller.enqueue(
textEncoder.encode(
`${TYPE_STREAM}${TYPE_DONE}${streamId}:[]\n`
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`${TYPE_STREAM}${TYPE_DONE}${streamId}:[]\n`
`${TYPE_STREAM}${TYPE_DONE}${streamId}\n`

This could be modified like this to save a few characters, but it seemed more consistent that way - I'm very open for input there :)

Comment on lines +146 to +155
const [left, right] = input.tee();
input.getReader = left.getReader.bind(left);
input.cancel = left.cancel.bind(left);
input.pipeThrough = left.pipeThrough.bind(left);
input.pipeTo = left.pipeTo.bind(left);
input.tee = left.tee.bind(left);
Object.defineProperty(input, "locked", {
get: () => left.locked,
});
streams[index] = right;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be omitted if there were a consensus that encoding is a destructive action - React does so - but I honestly felt a bit more comfortable with this tee approach.

Suggested change
const [left, right] = input.tee();
input.getReader = left.getReader.bind(left);
input.cancel = left.cancel.bind(left);
input.pipeThrough = left.pipeThrough.bind(left);
input.pipeTo = left.pipeTo.bind(left);
input.tee = left.tee.bind(left);
Object.defineProperty(input, "locked", {
get: () => left.locked,
});
streams[index] = right;
streams[index] = input;

Comment on lines +655 to +656
const inputValue = await inputReader.read();
const decodedValue = await decodedReader.read();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests currently test the "non-destructive" approach with tee, which makes it possible to still consume the original input stream.

@@ -88,8 +91,10 @@ async function decodeDeferred(
const line = read.value;
switch (line[0]) {
case TYPE_PROMISE: {
const isError = line[1] === TYPE_ERROR;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case TYPE_ERROR: was extremely similar, and falls into the TYPE_PROMISE case now that I slightly changed the format, so I could delete that second case with a few minor adjustments.

await decoded.done;
});

test("streams in a stream", async () => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this works now 🤣

@phryneas
Copy link
Author

phryneas commented Nov 8, 2024

Okay, I think I'm happy with this. Ready for review/discussion :)

@phryneas phryneas marked this pull request as ready for review November 8, 2024 14:49
@remorses
Copy link

remorses commented Dec 3, 2024

I would love to use this in Remix to use streaming loaders and actions, my use case is running a very slow loader while showing feedback to the user (like percentage completed)

@phryneas
Copy link
Author

phryneas commented Jan 8, 2025

@remorses meanwhile you can use https://github.com/phryneas/promiscade to convert a ReadableStream into a cascade of promises and vice versa - streaming those with turbo-stream works fine.

@remorses
Copy link

remorses commented Jan 8, 2025

Thank you for letting me know!

@jacob-ebey
Copy link
Owner

jacob-ebey commented Jan 30, 2025

@phryneas I have a major overhaul coming very soon that will bring support for ReadableStream and AsyncIterable's. Thank you for the PR. I'll let you know when I have that up if you'd like to review it.

@jacob-ebey
Copy link
Owner

jacob-ebey commented Feb 1, 2025

@phryneas Here it is if you'd like to review / test it out. Additional test cases for what you are after would be appreciated. #59

@jacob-ebey
Copy link
Owner

v3 is released with support for ReadableStreams

@jacob-ebey jacob-ebey closed this Feb 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants