Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stream data back for CSV and JSON queries #25927

Merged
merged 1 commit into from
Jan 31, 2025

Conversation

mgattozzi
Copy link
Contributor

This commit allows us to stream data back for CSV and JSON formatted queries. Prior to this we would buffer up all of the data in memory before sending it back. Now we can make it so that we only buffer in one RecordBatch at a time to reduce memory overhead.

Note that due to the way the APIs for writers work and for how Body in hyper 0.14 works we can't use a streaming body that we can write too. This in turn means we have to use a manually written Future state machine, which works but is far from ideal.

Note this does not include the pretty and parquet files as streamable. I'm attempting to get the pretty one to be streamable, but I don't think that this one and parquet are as likely to be streamable back to the user. In general we might want to discourage these formats from being used.

Copy link
Contributor

@waynr waynr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple questions inline, but otherwise this looks good to me so I'm leaving a ✔️.

influxdb3_server/src/http.rs Show resolved Hide resolved
.with_header(false)
.build(Vec::new())
};
writer.write(&batch).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR and still in the fully-buffered branches of the format match statement it looks like writer.write(&batch)? is being called -- is it not possible to do that here because of the Output type of this future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work?

Suggested change
writer.write(&batch).unwrap();
if let Err(e) = writer.write(&batch) {
return Poll::Ready(Some(Err(e)));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, but I think it would need .into() called: https://docs.rs/datafusion/latest/datafusion/error/enum.DataFusionError.html#impl-From%3CError%3E-for-DataFusionError-1, right? I was initially assuming there was no impl From<std::io::Error> for DatafusionError and that it would need to be convered to the Error defined in this module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ended up working out well with the into.

As for the writer.write(&batch) we can't use the ? anymore because you now need to return a Poll type. async is essentially doing all this under the hood, but since we're writing the future ourselves by hand we have this problem where we need to hand wrap these errors to work.

There is no From impl to make ? work for Poll as far as I'm aware.

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a couple of comments. FWIW - the v1 /query API implementation uses a streamed response via a dedicated type:

let stream = QueryResponseStream::new(0, stream, chunk_size, format, epoch, group_by)
.map_err(QueryError)?;
let body = Body::wrap_stream(stream);

Not sure if that is at all helpful.

Cargo.toml Outdated Show resolved Hide resolved
.with_header(false)
.build(Vec::new())
};
writer.write(&batch).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work?

Suggested change
writer.write(&batch).unwrap();
if let Err(e) = writer.write(&batch) {
return Poll::Ready(Some(Err(e)));
}

influxdb3_server/src/http.rs Outdated Show resolved Hide resolved
Comment on lines +1511 to +1575
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer = TrackedMemoryArrowWriter::try_new(&mut bytes, schema, mem_pool)?;

// Write the first batch we got and then continue writing batches
writer.write(batch)?;
while let Some(batch) = stream.next().await.transpose()? {
writer.write(batch)?;
}
writer.close()?;
Ok(Body::from(Bytes::from(bytes)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this still buffers the whole record batch stream in to memory. I am not so sure how to do this for Parquet; I reckon that you wouldn't be able to take a similar approach to how you've done for JSON/CSV, since (I assume) the parquet writer needs to live for the lifetime of the entire stream.

We do however use the AsyncArrowWriter for writing parquet in a streaming fashion in the compactor in enterprise, so I wonder if we could adopt a similar approach here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, because there's no way to write to a Body with a writer. If there was I wouldn't be writing these wild state machines by hand. I think for tables and parquet we'll just have to leave them be. The main things we expect people to stream at least are done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if this is a limitation of hyper 0.14 (that we could solve by going to 1.x) or something that would need to be solved upstream parquet/arrow regardless of hyper version?

I think that streaming for --format=parquet will need to be addressed at some point. I agree for tables though, that is purely for use in the CLI.

@mgattozzi mgattozzi requested review from waynr and hiltontj January 31, 2025 16:53
@mgattozzi mgattozzi force-pushed the mgattozzi/streaming-query branch from 1ac4698 to 72c1847 Compare January 31, 2025 16:53
This commit allows us to stream data back for CSV and JSON formatted
queries. Prior to this we would buffer up all of the data in memory
before sending it back. Now we can make it so that we only buffer in
one RecordBatch at a time to reduce memory overhead.

Note that due to the way the APIs for writers work and for how Body in
hyper 0.14 works we can't use a streaming body that we can write too.
This in turn means we have to use a manually written Future state
machine, which works but is far from ideal.

Note this does not include the pretty and parquet files as streamable.
I'm attempting to get the pretty one to be streamable, but I don't think
that this one and parquet are as likely to be streamable back to the
user. In general we might want to discourage these formats from being
used.
@mgattozzi mgattozzi force-pushed the mgattozzi/streaming-query branch from 72c1847 to 09133b9 Compare January 31, 2025 16:55
@mgattozzi
Copy link
Contributor Author

@waynr and @hiltontj I incorporated your suggestions. Let me know if this needs anything else!

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, though I do think we need to address streaming parquet format at some point.

@mgattozzi
Copy link
Contributor Author

@hiltontj I think so too, but I think it'll require some upstream work in Arrow to work with hyper's Body which can be a trait implemented in 1.0 so that writers can produce Frames in order to stream the data I think. I'm not sure how to square the circle otherwise from my attempts

@mgattozzi mgattozzi merged commit 20fdc7b into main Jan 31, 2025
13 checks passed
@mgattozzi mgattozzi deleted the mgattozzi/streaming-query branch January 31, 2025 18:25
@hiltontj
Copy link
Contributor

hiltontj commented Feb 3, 2025

Related: #25955

mgattozzi added a commit that referenced this pull request Feb 6, 2025
In #25927 we missed that JSON queries were broken despite having some
tests use the format. This fixes JSON queries such that they now
properly contain a comma between RecordBatches. This commit also
includes tests for the formats that now stream data back (CSV, JSON, and
JSON Lines) so that we won't run into this issue again.
mgattozzi added a commit that referenced this pull request Feb 7, 2025
In #25927 we missed that JSON queries were broken despite having some
tests use the format. This fixes JSON queries such that they now
properly contain a comma between RecordBatches. This commit also
includes tests for the formats that now stream data back (CSV, JSON, and
JSON Lines) so that we won't run into this issue again.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants