-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming CLI support #8651
Streaming CLI support #8651
Changes from 2 commits
927e03f
8c2583d
b35bc73
cf26827
ca87d69
9292881
2ef4931
20407ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,17 @@ | |
// under the License. | ||
|
||
//! Print format variants | ||
|
||
use std::str::FromStr; | ||
|
||
use crate::print_options::MaxRows; | ||
|
||
use arrow::csv::writer::WriterBuilder; | ||
use arrow::json::{ArrayWriter, LineDelimitedWriter}; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow::util::pretty::pretty_format_batches_with_options; | ||
use datafusion::arrow::record_batch::RecordBatch; | ||
use datafusion::common::format::DEFAULT_FORMAT_OPTIONS; | ||
use datafusion::error::{DataFusionError, Result}; | ||
use std::str::FromStr; | ||
|
||
/// Allow records to be printed in different formats | ||
#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)] | ||
|
@@ -33,6 +36,7 @@ pub enum PrintFormat { | |
Table, | ||
Json, | ||
NdJson, | ||
Automatic, | ||
} | ||
|
||
impl FromStr for PrintFormat { | ||
|
@@ -55,6 +59,18 @@ macro_rules! batches_to_json { | |
}}; | ||
} | ||
|
||
macro_rules! stream_to_json { | ||
($WRITER: ident, $batch: expr) => {{ | ||
let mut bytes = vec![]; | ||
{ | ||
let mut writer = $WRITER::new(&mut bytes); | ||
writer.write($batch)?; | ||
writer.finish()?; | ||
} | ||
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? | ||
}}; | ||
} | ||
|
||
fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<String> { | ||
let mut bytes = vec![]; | ||
{ | ||
|
@@ -66,9 +82,26 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<Stri | |
writer.write(batch)?; | ||
} | ||
} | ||
let formatted = | ||
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
Ok(formatted) | ||
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e))) | ||
} | ||
|
||
fn print_stream_with_sep( | ||
batch: &RecordBatch, | ||
delimiter: u8, | ||
with_header: bool, | ||
) -> Result<String> { | ||
let mut bytes = vec![]; | ||
{ | ||
let builder = WriterBuilder::new() | ||
.with_header(with_header) | ||
.with_delimiter(delimiter); | ||
let mut writer = builder.build(&mut bytes); | ||
writer.write(batch)?; | ||
} | ||
if bytes.last() == Some(&b'\n') { | ||
bytes.pop(); | ||
} | ||
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e))) | ||
} | ||
|
||
fn keep_only_maxrows(s: &str, maxrows: usize) -> String { | ||
|
@@ -84,7 +117,6 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String { | |
let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines | ||
result.extend(vec![dotted_line; 3]); // Append ... lines | ||
result.push(last_line.clone()); | ||
|
||
result.join("\n") | ||
} | ||
|
||
|
@@ -146,7 +178,7 @@ impl PrintFormat { | |
match self { | ||
Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), | ||
Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), | ||
Self::Table => { | ||
Self::Table | Self::Automatic => { | ||
if maxrows == MaxRows::Limited(0) { | ||
return Ok(()); | ||
} | ||
|
@@ -159,14 +191,41 @@ impl PrintFormat { | |
} | ||
Ok(()) | ||
} | ||
|
||
pub fn print_stream(&self, batch: &RecordBatch, with_header: bool) -> Result<()> { | ||
if batch.num_rows() == 0 { | ||
return Ok(()); | ||
} | ||
|
||
match self { | ||
Self::Csv | Self::Automatic => { | ||
println!("{}", print_stream_with_sep(batch, b',', with_header)?) | ||
} | ||
Self::Tsv => { | ||
println!("{}", print_stream_with_sep(batch, b'\t', with_header)?) | ||
} | ||
Self::Table => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thought here is that when printing streams, it could just print out a new Table for each batch (rather than buffering all batches). |
||
return Err(DataFusionError::External( | ||
"PrintFormat::Table is not implemented".to_string().into(), | ||
)) | ||
} | ||
Self::Json => println!("{}", stream_to_json!(ArrayWriter, batch)), | ||
Self::NdJson => { | ||
println!("{}", stream_to_json!(LineDelimitedWriter, batch)) | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::sync::Arc; | ||
|
||
use super::*; | ||
|
||
use arrow::array::Int32Array; | ||
use arrow::datatypes::{DataType, Field, Schema}; | ||
use std::sync::Arc; | ||
|
||
#[test] | ||
fn test_print_batches_with_sep() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,13 +15,19 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::print_format::PrintFormat; | ||
use datafusion::arrow::record_batch::RecordBatch; | ||
use datafusion::error::Result; | ||
use std::fmt::{Display, Formatter}; | ||
use std::pin::Pin; | ||
use std::str::FromStr; | ||
use std::time::Instant; | ||
|
||
use crate::print_format::PrintFormat; | ||
|
||
use arrow::record_batch::RecordBatch; | ||
use datafusion::error::Result; | ||
use datafusion::physical_plan::RecordBatchStream; | ||
|
||
use futures::StreamExt; | ||
|
||
#[derive(Debug, Clone, PartialEq, Copy)] | ||
pub enum MaxRows { | ||
/// show all rows in the output | ||
|
@@ -103,4 +109,28 @@ impl PrintOptions { | |
|
||
Ok(()) | ||
} | ||
|
||
#[allow(clippy::println_empty_string)] | ||
pub async fn print_stream( | ||
&self, | ||
mut stream: Pin<Box<dyn RecordBatchStream>>, | ||
query_start_time: Instant, | ||
) -> Result<()> { | ||
let mut row_count = 0_usize; | ||
let mut with_header = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there is any way to test this behavior (specifically that the header is only printed out for the first batch) so that it isn't broken in subsequent refactorings |
||
|
||
while let Some(Ok(batch)) = stream.next().await { | ||
row_count += batch.num_rows(); | ||
self.format.print_stream(&batch, with_header)?; | ||
with_header = false; | ||
} | ||
println!(""); | ||
|
||
let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); | ||
if !self.quiet { | ||
println!("{timing_info}"); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is almost a copy/paste of
print_batch
-- as you say in your comments, the only difference is i Self::Table. I think it would be possible to reduce replication by changing all this code to be in terms of the output stream rather than creating stringsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, I think you could write the functions in terms of
std::io::Write
and then pass in https://doc.rust-lang.org/std/io/fn.stdout.html to avoid having to both copy the code and the code could avoid copying strings