-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: stream data back for CSV and JSON queries #25927
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple questions inline, but otherwise this looks good to me so I'm leaving a ✔️.
influxdb3_server/src/http.rs
Outdated
.with_header(false) | ||
.build(Vec::new()) | ||
}; | ||
writer.write(&batch).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to this PR and still in the fully-buffered branches of the format match statement it looks like writer.write(&batch)?
is being called -- is it not possible to do that here because of the Output
type of this future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work?
writer.write(&batch).unwrap(); | |
if let Err(e) = writer.write(&batch) { | |
return Poll::Ready(Some(Err(e))); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, but I think it would need .into()
called: https://docs.rs/datafusion/latest/datafusion/error/enum.DataFusionError.html#impl-From%3CError%3E-for-DataFusionError-1, right? I was initially assuming there was no impl From<std::io::Error> for DatafusionError
and that it would need to be convered to the Error
defined in this module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ended up working out well with the into.
As for the writer.write(&batch)
we can't use the ? anymore because you now need to return a Poll
type. async
is essentially doing all this under the hood, but since we're writing the future ourselves by hand we have this problem where we need to hand wrap these errors to work.
There is no From impl to make ? work for Poll as far as I'm aware.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a couple of comments. FWIW - the v1 /query
API implementation uses a streamed response via a dedicated type:
influxdb/influxdb3_server/src/http/v1.rs
Lines 81 to 83 in 56ca85e
let stream = QueryResponseStream::new(0, stream, chunk_size, format, epoch, group_by) | |
.map_err(QueryError)?; | |
let body = Body::wrap_stream(stream); |
Not sure if that is at all helpful.
influxdb3_server/src/http.rs
Outdated
.with_header(false) | ||
.build(Vec::new()) | ||
}; | ||
writer.write(&batch).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this work?
writer.write(&batch).unwrap(); | |
if let Err(e) = writer.write(&batch) { | |
return Poll::Ready(Some(Err(e))); | |
} |
let mut bytes = Vec::new(); | ||
let mem_pool = Arc::new(UnboundedMemoryPool::default()); | ||
let mut writer = TrackedMemoryArrowWriter::try_new(&mut bytes, schema, mem_pool)?; | ||
|
||
// Write the first batch we got and then continue writing batches | ||
writer.write(batch)?; | ||
while let Some(batch) = stream.next().await.transpose()? { | ||
writer.write(batch)?; | ||
} | ||
writer.close()?; | ||
Ok(Body::from(Bytes::from(bytes))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this still buffers the whole record batch stream in to memory. I am not so sure how to do this for Parquet; I reckon that you wouldn't be able to take a similar approach to how you've done for JSON/CSV, since (I assume) the parquet writer needs to live for the lifetime of the entire stream.
We do however use the AsyncArrowWriter
for writing parquet in a streaming fashion in the compactor in enterprise, so I wonder if we could adopt a similar approach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no, because there's no way to write to a Body with a writer. If there was I wouldn't be writing these wild state machines by hand. I think for tables and parquet we'll just have to leave them be. The main things we expect people to stream at least are done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if this is a limitation of hyper 0.14
(that we could solve by going to 1.x
) or something that would need to be solved upstream parquet
/arrow
regardless of hyper version?
I think that streaming for --format=parquet
will need to be addressed at some point. I agree for tables though, that is purely for use in the CLI.
1ac4698
to
72c1847
Compare
This commit allows us to stream data back for CSV and JSON formatted queries. Prior to this we would buffer up all of the data in memory before sending it back. Now we can make it so that we only buffer in one RecordBatch at a time to reduce memory overhead. Note that due to the way the APIs for writers work and for how Body in hyper 0.14 works we can't use a streaming body that we can write too. This in turn means we have to use a manually written Future state machine, which works but is far from ideal. Note this does not include the pretty and parquet files as streamable. I'm attempting to get the pretty one to be streamable, but I don't think that this one and parquet are as likely to be streamable back to the user. In general we might want to discourage these formats from being used.
72c1847
to
09133b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good, though I do think we need to address streaming parquet format at some point.
@hiltontj I think so too, but I think it'll require some upstream work in Arrow to work with hyper's Body which can be a trait implemented in 1.0 so that writers can produce Frames in order to stream the data I think. I'm not sure how to square the circle otherwise from my attempts |
Related: #25955 |
In #25927 we missed that JSON queries were broken despite having some tests use the format. This fixes JSON queries such that they now properly contain a comma between RecordBatches. This commit also includes tests for the formats that now stream data back (CSV, JSON, and JSON Lines) so that we won't run into this issue again.
In #25927 we missed that JSON queries were broken despite having some tests use the format. This fixes JSON queries such that they now properly contain a comma between RecordBatches. This commit also includes tests for the formats that now stream data back (CSV, JSON, and JSON Lines) so that we won't run into this issue again.
This commit allows us to stream data back for CSV and JSON formatted queries. Prior to this we would buffer up all of the data in memory before sending it back. Now we can make it so that we only buffer in one RecordBatch at a time to reduce memory overhead.
Note that due to the way the APIs for writers work and for how Body in hyper 0.14 works we can't use a streaming body that we can write too. This in turn means we have to use a manually written Future state machine, which works but is far from ideal.
Note this does not include the pretty and parquet files as streamable. I'm attempting to get the pretty one to be streamable, but I don't think that this one and parquet are as likely to be streamable back to the user. In general we might want to discourage these formats from being used.