Skip to content

Commit

Permalink
feat: stream data back for CSV and JSON queries
Browse files Browse the repository at this point in the history
This commit allows us to stream data back for CSV and JSON formatted
queries. Prior to this we would buffer up all of the data in memory
before sending it back. Now we can make it so that we only buffer in
one RecordBatch at a time to reduce memory overhead.

Note that due to the way the APIs for writers work and for how Body in
hyper 0.14 works we can't use a streaming body that we can write too.
This in turn means we have to use a manually written Future state
machine, which works but is far from ideal.

Note this does not include the pretty and parquet files as streamable.
I'm attempting to get the pretty one to be streamable, but I don't think
that this one and parquet are as likely to be streamable back to the
user. In general we might want to discourage these formats from being
used.
  • Loading branch information
mgattozzi committed Jan 28, 2025
1 parent 705a165 commit 1ac4698
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 47 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ chrono = "0.4"
cron = "0.15"
clap = { version = "4", features = ["derive", "env", "string"] }
clru = "0.6.2"
comfy-table = "7.1.3"
crc32fast = "1.2.0"
criterion = { version = "0.5", features = ["html_reports"] }
crossbeam-channel = "0.5.11"
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
comfy-table.workspace = true
csv.workspace = true
datafusion.workspace = true
flate2.workspace = true
Expand Down
199 changes: 152 additions & 47 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use datafusion::error::DataFusionError;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use datafusion::execution::RecordBatchStream;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::FutureExt;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use hyper::header::ACCEPT;
Expand Down Expand Up @@ -46,6 +47,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::str::Utf8Error;
use std::string::FromUtf8Error;
Expand Down Expand Up @@ -1491,70 +1493,173 @@ async fn record_batch_stream_to_body(
mut stream: Pin<Box<dyn RecordBatchStream + Send>>,
format: QueryFormat,
) -> Result<Body, Error> {
fn to_json(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_json::ArrayWriter::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}

writer.finish()?;

Ok(Bytes::from(writer.into_inner()))
}

fn to_csv(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut writer = arrow_csv::writer::Writer::new(Vec::new());
for batch in batches {
writer.write(&batch)?;
}

Ok(Bytes::from(writer.into_inner()))
}

fn to_pretty(batches: Vec<RecordBatch>) -> Result<Bytes> {
Ok(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
)))
}

fn to_parquet(batches: Vec<RecordBatch>) -> Result<Bytes> {
let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer =
TrackedMemoryArrowWriter::try_new(&mut bytes, batches[0].schema(), mem_pool)?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
Ok(Bytes::from(bytes))
}

match format {
QueryFormat::Pretty => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_pretty(batches).map(Body::from)
Ok(Body::from(Bytes::from(format!(
"{}",
pretty::pretty_format_batches(&batches)?
))))
}
QueryFormat::Parquet => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_parquet(batches).map(Body::from)
// Grab the first batch so that we can get the schema
let Some(batch) = stream.next().await.transpose()? else {
return Ok(Body::empty());
};
let schema = batch.schema();

let mut bytes = Vec::new();
let mem_pool = Arc::new(UnboundedMemoryPool::default());
let mut writer = TrackedMemoryArrowWriter::try_new(&mut bytes, schema, mem_pool)?;

// Write the first batch we got and then continue writing batches
writer.write(batch)?;
while let Some(batch) = stream.next().await.transpose()? {
writer.write(batch)?;
}
writer.close()?;
Ok(Body::from(Bytes::from(bytes)))
}
QueryFormat::Csv => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_csv(batches).map(Body::from)
struct CsvFuture {
first_poll: bool,
stream: Pin<Box<dyn RecordBatchStream + Send>>,
}

impl Future for CsvFuture {
type Output = Option<Result<Bytes, DataFusionError>>;

fn poll(
mut self: Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.stream.poll_next_unpin(ctx) {
Poll::Ready(Some(batch)) => {
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
let mut writer = if self.first_poll {
self.first_poll = false;
arrow_csv::Writer::new(Vec::new())
} else {
arrow_csv::WriterBuilder::new()
.with_header(false)
.build(Vec::new())
};
writer.write(&batch).unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

let mut future = CsvFuture {
first_poll: true,
stream,
};
Ok(Body::wrap_stream(futures::stream::poll_fn(move |ctx| {
future.poll_unpin(ctx)
})))
}
QueryFormat::Json => {
let batches = stream.try_collect::<Vec<RecordBatch>>().await?;
to_json(batches).map(Body::from)
struct JsonFuture {
state: State,
stream: Pin<Box<dyn RecordBatchStream + Send>>,
}

enum State {
FirstPoll,
Body,
Done,
}

#[derive(Default, Debug)]
struct JsonMiddle;

impl arrow_json::writer::JsonFormat for JsonMiddle {
fn start_row<W: std::io::Write>(
&self,
writer: &mut W,
is_first_row: bool,
) -> std::result::Result<(), arrow_schema::ArrowError> {
if !is_first_row {
writer.write_all(b",")?;
}

Ok(())
}
}

impl Future for JsonFuture {
type Output = Option<Result<Bytes, DataFusionError>>;

fn poll(
mut self: Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match dbg!(self.stream.poll_next_unpin(ctx)) {
Poll::Ready(Some(batch)) => {
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
match self.state {
State::FirstPoll => {
let mut writer = arrow_json::ArrayWriter::new(Vec::new());
writer.write(&batch).unwrap();
self.state = State::Body;
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
State::Body => {
let mut writer = arrow_json::WriterBuilder::new()
.build::<Vec<_>, JsonMiddle>(Vec::new());
writer.write(&batch).unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
}
_ => unreachable!(),
}
}
Poll::Ready(None) => {
match self.state {
// We've never written anything so just close the
// stream with an empty array
State::FirstPoll => {
self.state = State::Done;
Poll::Ready(Some(Ok(Bytes::from("[]"))))
}
// We have written to this before so we should
// write the final bytes to close the object
State::Body => {
self.state = State::Done;
Poll::Ready(Some(Ok(Bytes::from("]"))))
}
State::Done => Poll::Ready(None),
}
}
Poll::Pending => Poll::Pending,
}
}
}

let mut future = JsonFuture {
state: State::FirstPoll,
stream,
};
Ok(Body::wrap_stream(futures::stream::poll_fn(move |ctx| {
future.poll_unpin(ctx)
})))
}
QueryFormat::JsonLines => {
let stream = futures::stream::poll_fn(move |ctx| match stream.poll_next_unpin(ctx) {
Poll::Ready(Some(batch)) => {
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
let batch = match batch {
Ok(batch) => batch,
Err(e) => return Poll::Ready(Some(Err(e))),
};
let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new());
writer.write(&batch).unwrap();
writer.finish().unwrap();
Poll::Ready(Some(Ok(Bytes::from(writer.into_inner()))))
Expand Down

0 comments on commit 1ac4698

Please sign in to comment.