From 927e03ff8d1e29f240ab2101f7e0aa2da65405e3 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Mon, 25 Dec 2023 17:20:14 +0300 Subject: [PATCH 1/8] Streaming CLI support --- datafusion-cli/Cargo.lock | 1 + datafusion-cli/Cargo.toml | 3 +- datafusion-cli/src/exec.rs | 60 ++++++++------- datafusion-cli/src/main.rs | 20 ++--- datafusion-cli/src/print_format.rs | 75 +++++++++++++++++-- datafusion-cli/src/print_options.rs | 36 ++++++++- .../core/src/datasource/physical_plan/mod.rs | 15 ++++ 7 files changed, 163 insertions(+), 47 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9f75013c86dc..8e9bbd8a0dfd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1160,6 +1160,7 @@ dependencies = [ "datafusion-common", "dirs", "env_logger", + "futures", "mimalloc", "object_store", "parking_lot", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index f57097683698..4774eb504bef 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avr datafusion-common = { path = "../datafusion/common" } dirs = "4.0.0" env_logger = "0.9" +futures = "0.3" mimalloc = { version = "0.1", default-features = false } object_store = { version = "0.8.0", features = ["aws", "gcp"] } parking_lot = { version = "0.12" } @@ -51,4 +52,4 @@ url = "2.2" assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" -rstest = "0.17" +rstest = "0.17" \ No newline at end of file diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 8af534cd1375..c854e85ddc0c 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -17,6 +17,11 @@ //! Execution functions +use std::io::prelude::*; +use std::io::BufReader; +use std::time::Instant; +use std::{fs::File, sync::Arc}; + use crate::{ command::{Command, OutputFormat}, helper::{unescape_input, CliHelper}, @@ -26,21 +31,19 @@ use crate::{ }, print_options::{MaxRows, PrintOptions}, }; -use datafusion::common::plan_datafusion_err; + +use datafusion::common::{exec_datafusion_err, plan_datafusion_err}; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::physical_plan::is_plan_streaming; +use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; +use datafusion::physical_plan::{collect, execute_stream}; +use datafusion::prelude::SessionContext; use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str}; -use datafusion::{ - datasource::listing::ListingTableUrl, - error::{DataFusionError, Result}, - logical_expr::{CreateExternalTable, DdlStatement}, -}; -use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext}; + use object_store::ObjectStore; use rustyline::error::ReadlineError; use rustyline::Editor; -use std::io::prelude::*; -use std::io::BufReader; -use std::time::Instant; -use std::{fs::File, sync::Arc}; use url::Url; /// run and execute SQL statements and commands, against a context with the given print options @@ -228,17 +231,24 @@ async fn exec_and_print( create_external_table(ctx, cmd).await?; } let df = ctx.execute_logical_plan(plan).await?; - let results = df.collect().await?; + let physical_plan = df.clone().create_physical_plan().await?; - let print_options = if should_ignore_maxrows { - PrintOptions { - maxrows: MaxRows::Unlimited, - ..print_options.clone() - } + if is_plan_streaming(&physical_plan)? { + let stream = execute_stream(physical_plan, task_ctx.clone())?; + print_options.print_stream(stream, now).await?; } else { - print_options.clone() - }; - print_options.print_batches(&results, now)?; + let print_options = if should_ignore_maxrows { + PrintOptions { + maxrows: MaxRows::Unlimited, + ..print_options.clone() + } + } else { + print_options.clone() + }; + + let results = collect(physical_plan, task_ctx.clone()).await?; + print_options.print_batches(&results, now)?; + } } Ok(()) @@ -272,10 +282,7 @@ async fn create_external_table( .object_store_registry .get_store(url) .map_err(|_| { - DataFusionError::Execution(format!( - "Unsupported object store scheme: {}", - scheme - )) + exec_datafusion_err!("Unsupported object store scheme: {}", scheme) })? } }; @@ -290,8 +297,9 @@ mod tests { use std::str::FromStr; use super::*; - use datafusion::common::plan_err; - use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions}; + + use datafusion_common::file_options::StatementOptions; + use datafusion_common::{plan_err, FileTypeWriterOptions}; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 8b74a797b57b..da14b1356d70 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -15,7 +15,12 @@ // specific language governing permissions and limitations // under the License. -use clap::Parser; +use std::collections::HashMap; +use std::env; +use std::path::Path; +use std::str::FromStr; +use std::sync::{Arc, OnceLock}; + use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool}; @@ -29,12 +34,9 @@ use datafusion_cli::{ print_options::{MaxRows, PrintOptions}, DATAFUSION_CLI_VERSION, }; + +use clap::Parser; use mimalloc::MiMalloc; -use std::collections::HashMap; -use std::env; -use std::path::Path; -use std::str::FromStr; -use std::sync::{Arc, OnceLock}; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -111,7 +113,7 @@ struct Args { )] rc: Option>, - #[clap(long, arg_enum, default_value_t = PrintFormat::Table)] + #[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)] format: PrintFormat, #[clap( @@ -331,10 +333,10 @@ fn extract_memory_pool_size(size: &str) -> Result { #[cfg(test)] mod tests { - use datafusion::assert_batches_eq; - use super::*; + use datafusion::assert_batches_eq; + fn assert_conversion(input: &str, expected: Result) { let result = extract_memory_pool_size(input); match expected { diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 0738bf6f9b47..2f5e40424dd5 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -16,14 +16,17 @@ // under the License. //! Print format variants + +use std::str::FromStr; + use crate::print_options::MaxRows; + use arrow::csv::writer::WriterBuilder; use arrow::json::{ArrayWriter, LineDelimitedWriter}; +use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::format::DEFAULT_FORMAT_OPTIONS; use datafusion::error::{DataFusionError, Result}; -use std::str::FromStr; /// Allow records to be printed in different formats #[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)] @@ -33,6 +36,7 @@ pub enum PrintFormat { Table, Json, NdJson, + Automatic, } impl FromStr for PrintFormat { @@ -55,6 +59,18 @@ macro_rules! batches_to_json { }}; } +macro_rules! stream_to_json { + ($WRITER: ident, $batch: expr) => {{ + let mut bytes = vec![]; + { + let mut writer = $WRITER::new(&mut bytes); + writer.write($batch)?; + writer.finish()?; + } + String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? + }}; +} + fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { let mut bytes = vec![]; { @@ -66,9 +82,26 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result Result { + let mut bytes = vec![]; + { + let builder = WriterBuilder::new() + .with_header(with_header) + .with_delimiter(delimiter); + let mut writer = builder.build(&mut bytes); + writer.write(batch)?; + } + if bytes.last() == Some(&b'\n') { + bytes.pop(); + } + String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e))) } fn keep_only_maxrows(s: &str, maxrows: usize) -> String { @@ -84,7 +117,6 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String { let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines result.extend(vec![dotted_line; 3]); // Append ... lines result.push(last_line.clone()); - result.join("\n") } @@ -146,7 +178,7 @@ impl PrintFormat { match self { Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), - Self::Table => { + Self::Table | Self::Automatic => { if maxrows == MaxRows::Limited(0) { return Ok(()); } @@ -159,14 +191,41 @@ impl PrintFormat { } Ok(()) } + + pub fn print_stream(&self, batch: &RecordBatch, with_header: bool) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + match self { + Self::Csv | Self::Automatic => { + println!("{}", print_stream_with_sep(batch, b',', with_header)?) + } + Self::Tsv => { + println!("{}", print_stream_with_sep(batch, b'\t', with_header)?) + } + Self::Table => { + return Err(DataFusionError::External( + "PrintFormat::Table is not implemented".to_string().into(), + )) + } + Self::Json => println!("{}", stream_to_json!(ArrayWriter, batch)), + Self::NdJson => { + println!("{}", stream_to_json!(LineDelimitedWriter, batch)) + } + } + Ok(()) + } } #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; + use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; - use std::sync::Arc; #[test] fn test_print_batches_with_sep() { diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 0a6c8d4c36fc..68f0cec972e2 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -15,13 +15,19 @@ // specific language governing permissions and limitations // under the License. -use crate::print_format::PrintFormat; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::error::Result; use std::fmt::{Display, Formatter}; +use std::pin::Pin; use std::str::FromStr; use std::time::Instant; +use crate::print_format::PrintFormat; + +use arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::RecordBatchStream; + +use futures::StreamExt; + #[derive(Debug, Clone, PartialEq, Copy)] pub enum MaxRows { /// show all rows in the output @@ -103,4 +109,28 @@ impl PrintOptions { Ok(()) } + + #[allow(clippy::println_empty_string)] + pub async fn print_stream( + &self, + mut stream: Pin>, + query_start_time: Instant, + ) -> Result<()> { + let mut row_count = 0_usize; + let mut with_header = true; + + while let Some(Ok(batch)) = stream.next().await { + row_count += batch.num_rows(); + self.format.print_stream(&batch, with_header)?; + with_header = false; + } + println!(""); + + let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + if !self.quiet { + println!("{timing_info}"); + } + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 4a6ebeab09e1..5583991355c6 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -69,6 +69,7 @@ use arrow::{ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_plan::ExecutionPlan; use log::debug; use object_store::path::Path; @@ -507,6 +508,20 @@ fn get_projected_output_ordering( all_orderings } +/// Get output (un)boundedness information for the given `plan`. +pub fn is_plan_streaming(plan: &Arc) -> Result { + if plan.children().is_empty() { + plan.unbounded_output(&[]) + } else { + let children_unbounded_output = plan + .children() + .iter() + .map(is_plan_streaming) + .collect::>>(); + plan.unbounded_output(&children_unbounded_output?) + } +} + #[cfg(test)] mod tests { use arrow_array::cast::AsArray; From 8c2583d5e113850063dc2b10c9ad21be808ae4d1 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Mon, 25 Dec 2023 19:07:49 +0300 Subject: [PATCH 2/8] Update Cargo.toml --- Cargo.toml | 2 +- datafusion-cli/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a698fbf471f9..4ee29ea6298c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] } arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "49.0.0", default-features = false } arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] } -arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] } +arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] } arrow-ord = { version = "49.0.0", default-features = false } arrow-schema = { version = "49.0.0", default-features = false } async-trait = "0.1.73" diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 4774eb504bef..e1ddba4cad1a 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -52,4 +52,4 @@ url = "2.2" assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" -rstest = "0.17" \ No newline at end of file +rstest = "0.17" From b35bc73a3d3bbd8238b5575151addc3d99ef1732 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 27 Dec 2023 15:25:26 +0300 Subject: [PATCH 3/8] Remove duplications --- datafusion-cli/src/exec.rs | 12 +- datafusion-cli/src/print_format.rs | 331 ++++++++++------------------ datafusion-cli/src/print_options.rs | 41 +++- 3 files changed, 146 insertions(+), 238 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index c854e85ddc0c..30bc0c4592ec 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -201,7 +201,6 @@ async fn exec_and_print( sql: String, ) -> Result<()> { let now = Instant::now(); - let sql = unescape_input(&sql)?; let task_ctx = ctx.task_ctx(); let dialect = &task_ctx.session_config().options().sql_parser.dialect; @@ -216,7 +215,6 @@ async fn exec_and_print( for statement in statements { let mut plan = ctx.state().statement_to_plan(statement).await?; - // For plans like `Explain` ignore `MaxRows` option and always display all rows let should_ignore_maxrows = matches!( plan, LogicalPlan::Explain(_) @@ -224,14 +222,12 @@ async fn exec_and_print( | LogicalPlan::Analyze(_) ); - // Note that cmd is a mutable reference so that create_external_table function can remove all - // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion - // will raise Configuration errors. if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(ctx, cmd).await?; } + let df = ctx.execute_logical_plan(plan).await?; - let physical_plan = df.clone().create_physical_plan().await?; + let physical_plan = df.create_physical_plan().await?; if is_plan_streaming(&physical_plan)? { let stream = execute_stream(physical_plan, task_ctx.clone())?; @@ -240,10 +236,10 @@ async fn exec_and_print( let print_options = if should_ignore_maxrows { PrintOptions { maxrows: MaxRows::Unlimited, - ..print_options.clone() + ..*print_options } } else { - print_options.clone() + *print_options }; let results = collect(physical_plan, task_ctx.clone()).await?; diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 2f5e40424dd5..26521a06c43b 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -26,10 +26,10 @@ use arrow::json::{ArrayWriter, LineDelimitedWriter}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::format::DEFAULT_FORMAT_OPTIONS; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; /// Allow records to be printed in different formats -#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)] +#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone, Copy)] pub enum PrintFormat { Csv, Tsv, @@ -48,196 +48,134 @@ impl FromStr for PrintFormat { } macro_rules! batches_to_json { - ($WRITER: ident, $batches: expr) => {{ - let mut bytes = vec![]; + ($WRITER: ident, $writer: expr, $batches: expr) => {{ { - let mut writer = $WRITER::new(&mut bytes); - $batches.iter().try_for_each(|batch| writer.write(batch))?; - writer.finish()?; - } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? - }}; -} - -macro_rules! stream_to_json { - ($WRITER: ident, $batch: expr) => {{ - let mut bytes = vec![]; - { - let mut writer = $WRITER::new(&mut bytes); - writer.write($batch)?; - writer.finish()?; + if !$batches.is_empty() { + let mut json_writer = $WRITER::new(&mut *$writer); + for batch in $batches { + json_writer.write(batch)?; + } + json_writer.finish()?; + writeln!($writer)?; + } } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))? + Ok(()) as Result<()> }}; } -fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new() - .with_header(true) - .with_delimiter(delimiter); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } - } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e))) -} - -fn print_stream_with_sep( - batch: &RecordBatch, +fn print_batches_with_sep( + writer: &mut W, + batches: &[RecordBatch], delimiter: u8, with_header: bool, -) -> Result { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new() - .with_header(with_header) - .with_delimiter(delimiter); - let mut writer = builder.build(&mut bytes); - writer.write(batch)?; +) -> Result<()> { + let builder = WriterBuilder::new() + .with_header(with_header) + .with_delimiter(delimiter); + let mut csv_writer = builder.build(writer); + + for batch in batches { + csv_writer.write(batch)?; } - if bytes.last() == Some(&b'\n') { - bytes.pop(); - } - String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e))) -} - -fn keep_only_maxrows(s: &str, maxrows: usize) -> String { - let lines: Vec = s.lines().map(String::from).collect(); - assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border - - let last_line = &lines[lines.len() - 1]; // bottom border line - - let spaces = last_line.len().saturating_sub(4); - let dotted_line = format!("| .{:( + writer: &mut W, batches: &[RecordBatch], maxrows: MaxRows, -) -> Result { +) -> Result<()> { match maxrows { MaxRows::Limited(maxrows) => { - // Only format enough batches for maxrows + // Filter batches to meet the maxrows condition let mut filtered_batches = Vec::new(); - let mut batches = batches; - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - if row_count > maxrows { - let mut accumulated_rows = 0; - - for batch in batches { + let mut row_count: usize = 0; + for batch in batches { + if row_count + batch.num_rows() > maxrows { + // If adding this batch exceeds maxrows, slice the batch + let limit = maxrows - row_count; + let sliced_batch = batch.slice(0, limit); + filtered_batches.push(sliced_batch); + break; + } else { filtered_batches.push(batch.clone()); - if accumulated_rows + batch.num_rows() > maxrows { - break; - } - accumulated_rows += batch.num_rows(); + row_count += batch.num_rows(); } - - batches = &filtered_batches; - } - - let mut formatted = format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - ); - - if row_count > maxrows { - formatted = keep_only_maxrows(&formatted, maxrows); } - Ok(formatted) + // Formatting and writing to the writer + let formatted = pretty_format_batches_with_options( + &filtered_batches, + &DEFAULT_FORMAT_OPTIONS, + )?; + write!(writer, "{}", formatted)?; } MaxRows::Unlimited => { - // maxrows not specified, print all rows - Ok(format!( - "{}", - pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?, - )) + // Format all rows and write to the writer + let formatted = + pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?; + write!(writer, "{}", formatted)?; } } + + Ok(()) } impl PrintFormat { - /// print the batches to stdout using the specified format - /// `maxrows` option is only used for `Table` format: - /// If `maxrows` is Some(n), then at most n rows will be displayed - /// If `maxrows` is None, then every row will be displayed - pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> Result<()> { - if batches.is_empty() { + /// Print the batches to a writer using the specified format + pub fn print_batches_to_writer( + &self, + writer: &mut W, + batches: &[RecordBatch], + maxrows: MaxRows, + with_header: bool, + ) -> Result<()> { + if batches.is_empty() || batches[0].num_rows() == 0 { return Ok(()); } match self { - Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), - Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), + Self::Csv => print_batches_with_sep(writer, batches, b',', with_header), + Self::Tsv => print_batches_with_sep(writer, batches, b'\t', with_header), Self::Table | Self::Automatic => { if maxrows == MaxRows::Limited(0) { return Ok(()); } - println!("{}", format_batches_with_maxrows(batches, maxrows)?,) - } - Self::Json => println!("{}", batches_to_json!(ArrayWriter, batches)), - Self::NdJson => { - println!("{}", batches_to_json!(LineDelimitedWriter, batches)) - } - } - Ok(()) - } - - pub fn print_stream(&self, batch: &RecordBatch, with_header: bool) -> Result<()> { - if batch.num_rows() == 0 { - return Ok(()); - } - - match self { - Self::Csv | Self::Automatic => { - println!("{}", print_stream_with_sep(batch, b',', with_header)?) - } - Self::Tsv => { - println!("{}", print_stream_with_sep(batch, b'\t', with_header)?) - } - Self::Table => { - return Err(DataFusionError::External( - "PrintFormat::Table is not implemented".to_string().into(), - )) - } - Self::Json => println!("{}", stream_to_json!(ArrayWriter, batch)), - Self::NdJson => { - println!("{}", stream_to_json!(LineDelimitedWriter, batch)) + format_batches_with_maxrows(writer, batches, maxrows) } + Self::Json => batches_to_json!(ArrayWriter, writer, batches), + Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, batches), } - Ok(()) } } #[cfg(test)] mod tests { + use std::io::{Cursor, Read, Write}; use std::sync::Arc; use super::*; - use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::error::Result; + use datafusion_common::DataFusionError; #[test] fn test_print_batches_with_sep() { + let mut buffer = Cursor::new(Vec::new()); let batches = vec![]; - assert_eq!("", print_batches_with_sep(&batches, b',').unwrap()); + print_batches_with_sep(&mut buffer, &batches, b',', true).unwrap(); + buffer.set_position(0); + let mut contents = String::new(); + buffer.read_to_string(&mut contents).unwrap(); + assert_eq!(contents, ""); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), ])); - let batch = RecordBatch::try_new( schema, vec![ @@ -247,110 +185,67 @@ mod tests { ], ) .unwrap(); - let batches = vec![batch]; - let r = print_batches_with_sep(&batches, b',').unwrap(); - assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r); + let mut buffer = Cursor::new(Vec::new()); + print_batches_with_sep(&mut buffer, &batches, b',', true).unwrap(); + buffer.set_position(0); + let mut contents = String::new(); + buffer.read_to_string(&mut contents).unwrap(); + assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n"); } #[test] - fn test_print_batches_to_json_empty() -> Result<()> { + fn test_print_batches_to_json_empty() -> Result<(), DataFusionError> { + let mut buffer = Cursor::new(Vec::new()); let batches = vec![]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!("", r); - - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!("", r); - - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ])); - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![7, 8, 9])), - ], - ) - .unwrap(); + // Test with ArrayWriter + batches_to_json!(ArrayWriter, &mut buffer, &batches)?; + buffer.set_position(0); + let mut contents = String::new(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, ""); // Expecting a newline for empty batches + + // Test with LineDelimitedWriter + let mut buffer = Cursor::new(Vec::new()); // Re-initialize buffer + batches_to_json!(LineDelimitedWriter, &mut buffer, &batches)?; + buffer.set_position(0); + contents.clear(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, ""); // Expecting a newline for empty batches - let batches = vec![batch]; - let r = batches_to_json!(ArrayWriter, &batches); - assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", r); - - let r = batches_to_json!(LineDelimitedWriter, &batches); - assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n", r); Ok(()) } #[test] fn test_format_batches_with_maxrows() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) .unwrap(); - #[rustfmt::skip] - let all_rows_expected = [ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| 2 |", - "| 3 |", - "+---+", - ].join("\n"); + let all_rows_expected = "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n+---+"; // Note the newline at the end + let one_row_expected = "+---+\n| a |\n+---+\n| 1 |\n+---+"; // Newline at the end - #[rustfmt::skip] - let one_row_expected = [ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| . |", - "| . |", - "| . |", - "+---+", - ].join("\n"); + let mut buffer = Cursor::new(Vec::new()); - #[rustfmt::skip] - let multi_batches_expected = [ - "+---+", - "| a |", - "+---+", - "| 1 |", - "| 2 |", - "| 3 |", - "| 1 |", - "| 2 |", - "| . |", - "| . |", - "| . |", - "+---+", - ].join("\n"); + // Writing with unlimited rows + format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Unlimited)?; + buffer.set_position(0); + let mut contents = String::new(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, all_rows_expected); - let no_limit = format_batches_with_maxrows(&[batch.clone()], MaxRows::Unlimited)?; - assert_eq!(all_rows_expected, no_limit); + // Reset buffer and contents for the next test + buffer.set_position(0); + buffer.get_mut().clear(); + contents.clear(); - let maxrows_less_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(1))?; - assert_eq!(one_row_expected, maxrows_less_than_actual); - let maxrows_more_than_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(5))?; - assert_eq!(all_rows_expected, maxrows_more_than_actual); - let maxrows_equals_actual = - format_batches_with_maxrows(&[batch.clone()], MaxRows::Limited(3))?; - assert_eq!(all_rows_expected, maxrows_equals_actual); - let multi_batches = format_batches_with_maxrows( - &[batch.clone(), batch.clone(), batch.clone()], - MaxRows::Limited(5), - )?; - assert_eq!(multi_batches_expected, multi_batches); + // Writing with limited rows + format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(1))?; + buffer.set_position(0); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, one_row_expected); Ok(()) } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 68f0cec972e2..30d9331cfe51 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -16,6 +16,7 @@ // under the License. use std::fmt::{Display, Formatter}; +use std::io::Write; use std::pin::Pin; use std::str::FromStr; use std::time::Instant; @@ -26,6 +27,7 @@ use arrow::record_batch::RecordBatch; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; +use datafusion_common::DataFusionError; use futures::StreamExt; #[derive(Debug, Clone, PartialEq, Copy)] @@ -63,7 +65,7 @@ impl Display for MaxRows { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, @@ -82,7 +84,7 @@ fn get_timing_info_str( }; format!( - "{} {} in set{}. Query took {:.3} seconds.\n", + "\n{} {} in set{}. Query took {:.3} seconds.\n", row_count, row_word, nrows_shown_msg, @@ -91,44 +93,59 @@ fn get_timing_info_str( } impl PrintOptions { - /// print the batches to stdout using the specified format + /// Print the batches to stdout using the specified format pub fn print_batches( &self, batches: &[RecordBatch], query_start_time: Instant, ) -> Result<()> { - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - // Elapsed time should not count time for printing batches - let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); - self.format.print_batches(batches, self.maxrows)?; + self.format + .print_batches_to_writer(&mut writer, batches, self.maxrows, true)?; + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); if !self.quiet { - println!("{timing_info}"); + writeln!(writer, "\n{timing_info}")?; } Ok(()) } - #[allow(clippy::println_empty_string)] + /// Print the stream to stdout using the specified format pub async fn print_stream( &self, mut stream: Pin>, query_start_time: Instant, ) -> Result<()> { + if self.format == PrintFormat::Table { + return Err(DataFusionError::External( + "PrintFormat::Table is not implemented".to_string().into(), + )); + }; + + let stdout = std::io::stdout(); + let mut writer = stdout.lock(); + let mut row_count = 0_usize; let mut with_header = true; while let Some(Ok(batch)) = stream.next().await { row_count += batch.num_rows(); - self.format.print_stream(&batch, with_header)?; + self.format.print_batches_to_writer( + &mut writer, + &vec![batch], + MaxRows::Unlimited, + with_header, + )?; with_header = false; } - println!(""); let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); if !self.quiet { - println!("{timing_info}"); + writeln!(writer, "\n{timing_info}")?; } Ok(()) From cf26827247b88acd2fa0cd25cd486832e36080e6 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 27 Dec 2023 15:40:31 +0300 Subject: [PATCH 4/8] Clean up --- datafusion-cli/src/exec.rs | 11 +++++++---- datafusion-cli/src/main.rs | 1 - datafusion-cli/src/print_format.rs | 4 +--- datafusion-cli/src/print_options.rs | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 30bc0c4592ec..775db102c45d 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -128,7 +128,7 @@ pub async fn exec_from_repl( ))); rl.load_history(".history").ok(); - let mut print_options = print_options.clone(); + let mut print_options = *print_options; loop { match rl.readline("❯ ") { @@ -215,6 +215,7 @@ async fn exec_and_print( for statement in statements { let mut plan = ctx.state().statement_to_plan(statement).await?; + // For plans like `Explain` ignore `MaxRows` option and always display all rows let should_ignore_maxrows = matches!( plan, LogicalPlan::Explain(_) @@ -222,6 +223,9 @@ async fn exec_and_print( | LogicalPlan::Analyze(_) ); + // Note that cmd is a mutable reference so that create_external_table function can remove all + // datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion + // will raise Configuration errors. if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan { create_external_table(ctx, cmd).await?; } @@ -293,9 +297,8 @@ mod tests { use std::str::FromStr; use super::*; - - use datafusion_common::file_options::StatementOptions; - use datafusion_common::{plan_err, FileTypeWriterOptions}; + use datafusion::common::plan_err; + use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions}; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index da14b1356d70..563d172f2c95 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -334,7 +334,6 @@ fn extract_memory_pool_size(size: &str) -> Result { #[cfg(test)] mod tests { use super::*; - use datafusion::assert_batches_eq; fn assert_conversion(input: &str, expected: Result) { diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 26521a06c43b..9c4b2738b7ed 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -104,7 +104,6 @@ fn format_batches_with_maxrows( } } - // Formatting and writing to the writer let formatted = pretty_format_batches_with_options( &filtered_batches, &DEFAULT_FORMAT_OPTIONS, @@ -112,7 +111,6 @@ fn format_batches_with_maxrows( write!(writer, "{}", formatted)?; } MaxRows::Unlimited => { - // Format all rows and write to the writer let formatted = pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?; write!(writer, "{}", formatted)?; @@ -124,7 +122,7 @@ fn format_batches_with_maxrows( impl PrintFormat { /// Print the batches to a writer using the specified format - pub fn print_batches_to_writer( + pub fn print_batches( &self, writer: &mut W, batches: &[RecordBatch], diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 30d9331cfe51..204682044dd0 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -103,7 +103,7 @@ impl PrintOptions { let mut writer = stdout.lock(); self.format - .print_batches_to_writer(&mut writer, batches, self.maxrows, true)?; + .print_batches(&mut writer, batches, self.maxrows, true)?; let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); @@ -134,9 +134,9 @@ impl PrintOptions { while let Some(Ok(batch)) = stream.next().await { row_count += batch.num_rows(); - self.format.print_batches_to_writer( + self.format.print_batches( &mut writer, - &vec![batch], + &[batch], MaxRows::Unlimited, with_header, )?; From ca87d69a7673d158568291474d9890a3c6d85c52 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 27 Dec 2023 16:34:29 +0300 Subject: [PATCH 5/8] Stream test will be added --- datafusion-cli/src/exec.rs | 20 +++++++++++++------- datafusion-cli/src/print_format.rs | 6 ++++-- datafusion-cli/src/print_options.rs | 22 +++++++++++++++++----- datafusion-cli/tests/cli_integration.rs | 8 ++++---- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 775db102c45d..6709e2dc9741 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -22,6 +22,7 @@ use std::io::BufReader; use std::time::Instant; use std::{fs::File, sync::Arc}; +use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, helper::{unescape_input, CliHelper}, @@ -237,13 +238,18 @@ async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { - let print_options = if should_ignore_maxrows { - PrintOptions { - maxrows: MaxRows::Unlimited, - ..*print_options - } - } else { - *print_options + let print_options = PrintOptions { + maxrows: if should_ignore_maxrows { + MaxRows::Unlimited + } else { + print_options.maxrows + }, + format: if print_options.format == PrintFormat::Automatic { + PrintFormat::Table + } else { + print_options.format + }, + quiet: print_options.quiet, }; let results = collect(physical_plan, task_ctx.clone()).await?; diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 9c4b2738b7ed..7dcdeeea1acc 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -134,9 +134,11 @@ impl PrintFormat { } match self { - Self::Csv => print_batches_with_sep(writer, batches, b',', with_header), + Self::Csv | Self::Automatic => { + print_batches_with_sep(writer, batches, b',', with_header) + } Self::Tsv => print_batches_with_sep(writer, batches, b'\t', with_header), - Self::Table | Self::Automatic => { + Self::Table => { if maxrows == MaxRows::Limited(0) { return Ok(()); } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 204682044dd0..3b3e822d33bd 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -84,7 +84,7 @@ fn get_timing_info_str( }; format!( - "\n{} {} in set{}. Query took {:.3} seconds.\n", + "{} {} in set{}. Query took {:.3} seconds.\n", row_count, row_word, nrows_shown_msg, @@ -106,9 +106,19 @@ impl PrintOptions { .print_batches(&mut writer, batches, self.maxrows, true)?; let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); - let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + let timing_info = get_timing_info_str( + row_count, + if self.format == PrintFormat::Table { + self.maxrows + } else { + MaxRows::Unlimited + }, + query_start_time, + ); + + writeln!(writer, "\n")?; if !self.quiet { - writeln!(writer, "\n{timing_info}")?; + writeln!(writer, "{timing_info}")?; } Ok(()) @@ -143,9 +153,11 @@ impl PrintOptions { with_header = false; } - let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time); + let timing_info = + get_timing_info_str(row_count, MaxRows::Unlimited, query_start_time); + writeln!(writer, "\n")?; if !self.quiet { - writeln!(writer, "\n{timing_info}")?; + writeln!(writer, "{timing_info}")?; } Ok(()) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 119a0aa39d3c..9aa273e51758 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -31,19 +31,19 @@ fn init() { #[rstest] #[case::exec_from_commands( ["--command", "select 1", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n" + "[{\"Int64(1)\":1}]\n\n\n" )] #[case::exec_multiple_statements( ["--command", "select 1; select 2;", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n[{\"Int64(2)\":2}]\n" + "[{\"Int64(1)\":1}]\n\n\n[{\"Int64(2)\":2}]\n\n\n" )] #[case::exec_from_files( ["--file", "tests/data/sql.txt", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n" + "[{\"Int64(1)\":1}]\n\n\n" )] #[case::set_batch_size( ["--command", "show datafusion.execution.batch_size", "--format", "json", "-q", "-b", "1"], - "[{\"name\":\"datafusion.execution.batch_size\",\"value\":\"1\"}]\n" + "[{\"name\":\"datafusion.execution.batch_size\",\"value\":\"1\"}]\n\n\n" )] #[test] fn cli_quick_test<'a>( From 929288142db58e614e0d7dc152b8307da31cdb82 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 27 Dec 2023 17:18:22 +0300 Subject: [PATCH 6/8] Update print_format.rs --- datafusion-cli/src/print_format.rs | 70 +++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 7dcdeeea1acc..1404b9370f91 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -198,21 +198,47 @@ mod tests { fn test_print_batches_to_json_empty() -> Result<(), DataFusionError> { let mut buffer = Cursor::new(Vec::new()); let batches = vec![]; - - // Test with ArrayWriter batches_to_json!(ArrayWriter, &mut buffer, &batches)?; buffer.set_position(0); let mut contents = String::new(); buffer.read_to_string(&mut contents)?; - assert_eq!(contents, ""); // Expecting a newline for empty batches + assert_eq!(contents, ""); - // Test with LineDelimitedWriter - let mut buffer = Cursor::new(Vec::new()); // Re-initialize buffer + let mut buffer = Cursor::new(Vec::new()); batches_to_json!(LineDelimitedWriter, &mut buffer, &batches)?; buffer.set_position(0); contents.clear(); buffer.read_to_string(&mut contents)?; - assert_eq!(contents, ""); // Expecting a newline for empty batches + assert_eq!(contents, ""); + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ], + )?; + let batches = vec![batch]; + + let mut buffer = Cursor::new(Vec::new()); + batches_to_json!(ArrayWriter, &mut buffer, &batches)?; + buffer.set_position(0); + contents.clear(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n"); + + let mut buffer = Cursor::new(Vec::new()); + batches_to_json!(LineDelimitedWriter, &mut buffer, &batches)?; + buffer.set_position(0); + contents.clear(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n\n"); Ok(()) } @@ -224,28 +250,42 @@ mod tests { RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) .unwrap(); - let all_rows_expected = "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n+---+"; // Note the newline at the end - let one_row_expected = "+---+\n| a |\n+---+\n| 1 |\n+---+"; // Newline at the end - + let all_rows_expected = "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n+---+"; + let one_row_expected = "+---+\n| a |\n+---+\n| 1 |\n+---+"; + let multi_batches_expected = + "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n| 1 |\n| 2 |\n+---+"; let mut buffer = Cursor::new(Vec::new()); - // Writing with unlimited rows format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Unlimited)?; buffer.set_position(0); let mut contents = String::new(); buffer.read_to_string(&mut contents)?; assert_eq!(contents, all_rows_expected); - // Reset buffer and contents for the next test + let mut buffer = Cursor::new(Vec::new()); + format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(1))?; + buffer.set_position(0); + contents.clear(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, one_row_expected); + + let mut buffer = Cursor::new(Vec::new()); + format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(5))?; buffer.set_position(0); - buffer.get_mut().clear(); contents.clear(); + buffer.read_to_string(&mut contents)?; + assert_eq!(contents, all_rows_expected); - // Writing with limited rows - format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(1))?; + let mut buffer = Cursor::new(Vec::new()); + format_batches_with_maxrows( + &mut buffer, + &[batch.clone(), batch.clone()], + MaxRows::Limited(5), + )?; buffer.set_position(0); + contents.clear(); buffer.read_to_string(&mut contents)?; - assert_eq!(contents, one_row_expected); + assert_eq!(contents, multi_batches_expected); Ok(()) } From 2ef4931e4d5096d68629a3c657959c864233042e Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Thu, 28 Dec 2023 00:03:32 +0300 Subject: [PATCH 7/8] Address feedback --- datafusion-cli/src/exec.rs | 31 ++-- datafusion-cli/src/print_format.rs | 216 +++++++++++++++--------- datafusion-cli/src/print_options.rs | 7 +- datafusion-cli/tests/cli_integration.rs | 8 +- 4 files changed, 156 insertions(+), 106 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 6709e2dc9741..ba9aa2e69aa6 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -129,8 +129,6 @@ pub async fn exec_from_repl( ))); rl.load_history(".history").ok(); - let mut print_options = *print_options; - loop { match rl.readline("❯ ") { Ok(line) if line.starts_with('\\') => { @@ -142,9 +140,7 @@ pub async fn exec_from_repl( Command::OutputFormat(subcommand) => { if let Some(subcommand) = subcommand { if let Ok(command) = subcommand.parse::() { - if let Err(e) = - command.execute(&mut print_options).await - { + if let Err(e) = command.execute(print_options).await { eprintln!("{e}") } } else { @@ -158,7 +154,7 @@ pub async fn exec_from_repl( } } _ => { - if let Err(e) = cmd.execute(ctx, &mut print_options).await { + if let Err(e) = cmd.execute(ctx, print_options).await { eprintln!("{e}") } } @@ -169,7 +165,7 @@ pub async fn exec_from_repl( } Ok(line) => { rl.add_history_entry(line.trim_end())?; - match exec_and_print(ctx, &print_options, line).await { + match exec_and_print(ctx, print_options, line).await { Ok(_) => {} Err(err) => eprintln!("{err}"), } @@ -238,20 +234,13 @@ async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { - let print_options = PrintOptions { - maxrows: if should_ignore_maxrows { - MaxRows::Unlimited - } else { - print_options.maxrows - }, - format: if print_options.format == PrintFormat::Automatic { - PrintFormat::Table - } else { - print_options.format - }, - quiet: print_options.quiet, - }; - + let mut print_options = print_options.clone(); + if should_ignore_maxrows { + print_options.maxrows = MaxRows::Unlimited; + } + if print_options.format == PrintFormat::Automatic { + print_options.format = PrintFormat::Table; + } let results = collect(physical_plan, task_ctx.clone()).await?; print_options.print_batches(&results, now)?; } diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 1404b9370f91..94345d070fdf 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -56,13 +56,20 @@ macro_rules! batches_to_json { json_writer.write(batch)?; } json_writer.finish()?; - writeln!($writer)?; + json_finish!($WRITER, $writer); } } Ok(()) as Result<()> }}; } +macro_rules! json_finish { + (ArrayWriter, $writer: expr) => {{ + writeln!($writer)?; + }}; + (LineDelimitedWriter, $writer: expr) => {{}}; +} + fn print_batches_with_sep( writer: &mut W, batches: &[RecordBatch], @@ -81,6 +88,23 @@ fn print_batches_with_sep( Ok(()) } +fn keep_only_maxrows(s: &str, maxrows: usize) -> String { + let lines: Vec = s.lines().map(String::from).collect(); + + assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border + + let last_line = &lines[lines.len() - 1]; // bottom border line + + let spaces = last_line.len().saturating_sub(4); + let dotted_line = format!("| .{:( writer: &mut W, batches: &[RecordBatch], @@ -91,12 +115,14 @@ fn format_batches_with_maxrows( // Filter batches to meet the maxrows condition let mut filtered_batches = Vec::new(); let mut row_count: usize = 0; + let mut over_limit = false; for batch in batches { if row_count + batch.num_rows() > maxrows { // If adding this batch exceeds maxrows, slice the batch let limit = maxrows - row_count; let sliced_batch = batch.slice(0, limit); filtered_batches.push(sliced_batch); + over_limit = true; break; } else { filtered_batches.push(batch.clone()); @@ -108,7 +134,13 @@ fn format_batches_with_maxrows( &filtered_batches, &DEFAULT_FORMAT_OPTIONS, )?; - write!(writer, "{}", formatted)?; + if over_limit { + let mut formatted_str = format!("{}", formatted); + formatted_str = keep_only_maxrows(&formatted_str, maxrows); + write!(writer, "{}", formatted_str)?; + } else { + write!(writer, "{}", formatted)?; + } } MaxRows::Unlimited => { let formatted = @@ -156,19 +188,28 @@ mod tests { use std::sync::Arc; use super::*; + use arrow::array::Int32Array; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; - use datafusion_common::DataFusionError; - #[test] - fn test_print_batches_with_sep() { + fn run_test(batches: &[RecordBatch], test_fn: F) -> Result + where + F: Fn(&mut Cursor>, &[RecordBatch]) -> Result<()>, + { let mut buffer = Cursor::new(Vec::new()); - let batches = vec![]; - print_batches_with_sep(&mut buffer, &batches, b',', true).unwrap(); + test_fn(&mut buffer, batches)?; buffer.set_position(0); let mut contents = String::new(); - buffer.read_to_string(&mut contents).unwrap(); + buffer.read_to_string(&mut contents)?; + Ok(contents) + } + + #[test] + fn test_print_batches_with_sep() -> Result<()> { + let contents = run_test(&[], |buffer, batches| { + print_batches_with_sep(buffer, batches, b',', true) + })?; assert_eq!(contents, ""); let schema = Arc::new(Schema::new(vec![ @@ -183,32 +224,26 @@ mod tests { Arc::new(Int32Array::from(vec![4, 5, 6])), Arc::new(Int32Array::from(vec![7, 8, 9])), ], - ) - .unwrap(); - let batches = vec![batch]; - let mut buffer = Cursor::new(Vec::new()); - print_batches_with_sep(&mut buffer, &batches, b',', true).unwrap(); - buffer.set_position(0); - let mut contents = String::new(); - buffer.read_to_string(&mut contents).unwrap(); + )?; + + let contents = run_test(&[batch], |buffer, batches| { + print_batches_with_sep(buffer, batches, b',', true) + })?; assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n"); + + Ok(()) } #[test] - fn test_print_batches_to_json_empty() -> Result<(), DataFusionError> { - let mut buffer = Cursor::new(Vec::new()); - let batches = vec![]; - batches_to_json!(ArrayWriter, &mut buffer, &batches)?; - buffer.set_position(0); - let mut contents = String::new(); - buffer.read_to_string(&mut contents)?; + fn test_print_batches_to_json_empty() -> Result<()> { + let contents = run_test(&[], |buffer, batches| { + batches_to_json!(ArrayWriter, buffer, batches) + })?; assert_eq!(contents, ""); - let mut buffer = Cursor::new(Vec::new()); - batches_to_json!(LineDelimitedWriter, &mut buffer, &batches)?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; + let contents = run_test(&[], |buffer, batches| { + batches_to_json!(LineDelimitedWriter, buffer, batches) + })?; assert_eq!(contents, ""); let schema = Arc::new(Schema::new(vec![ @@ -226,19 +261,15 @@ mod tests { )?; let batches = vec![batch]; - let mut buffer = Cursor::new(Vec::new()); - batches_to_json!(ArrayWriter, &mut buffer, &batches)?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; + let contents = run_test(&batches, |buffer, batches| { + batches_to_json!(ArrayWriter, buffer, batches) + })?; assert_eq!(contents, "[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n"); - let mut buffer = Cursor::new(Vec::new()); - batches_to_json!(LineDelimitedWriter, &mut buffer, &batches)?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; - assert_eq!(contents, "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n\n"); + let contents = run_test(&batches, |buffer, batches| { + batches_to_json!(LineDelimitedWriter, buffer, batches) + })?; + assert_eq!(contents, "{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n"); Ok(()) } @@ -246,46 +277,77 @@ mod tests { #[test] fn test_format_batches_with_maxrows() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); - let batch = - RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) - .unwrap(); - - let all_rows_expected = "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n+---+"; - let one_row_expected = "+---+\n| a |\n+---+\n| 1 |\n+---+"; - let multi_batches_expected = - "+---+\n| a |\n+---+\n| 1 |\n| 2 |\n| 3 |\n| 1 |\n| 2 |\n+---+"; - let mut buffer = Cursor::new(Vec::new()); - - format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Unlimited)?; - buffer.set_position(0); - let mut contents = String::new(); - buffer.read_to_string(&mut contents)?; - assert_eq!(contents, all_rows_expected); - - let mut buffer = Cursor::new(Vec::new()); - format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(1))?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; - assert_eq!(contents, one_row_expected); - - let mut buffer = Cursor::new(Vec::new()); - format_batches_with_maxrows(&mut buffer, &[batch.clone()], MaxRows::Limited(5))?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; - assert_eq!(contents, all_rows_expected); + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?; - let mut buffer = Cursor::new(Vec::new()); - format_batches_with_maxrows( - &mut buffer, - &[batch.clone(), batch.clone()], - MaxRows::Limited(5), + #[rustfmt::skip] + let all_rows_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "+---+", + ].join("\n"); + + #[rustfmt::skip] + let one_row_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| . |", + "| . |", + "| . |", + "+---+", + ].join("\n"); + + #[rustfmt::skip] + let multi_batches_expected = [ + "+---+", + "| a |", + "+---+", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| . |", + "| . |", + "| . |", + "+---+", + ].join("\n"); + + let no_limit = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Unlimited) + })?; + assert_eq!(no_limit, all_rows_expected); + + let maxrows_less_than_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(1)) + })?; + assert_eq!(maxrows_less_than_actual, one_row_expected); + + let maxrows_more_than_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5)) + })?; + assert_eq!(maxrows_more_than_actual, all_rows_expected); + + let maxrows_equals_actual = run_test(&[batch.clone()], |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(3)) + })?; + assert_eq!(maxrows_equals_actual, all_rows_expected); + + let multi_batches = run_test( + &[batch.clone(), batch.clone(), batch.clone()], + |buffer, batches| { + format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5)) + }, )?; - buffer.set_position(0); - contents.clear(); - buffer.read_to_string(&mut contents)?; - assert_eq!(contents, multi_batches_expected); + assert_eq!(multi_batches, multi_batches_expected); Ok(()) } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 3b3e822d33bd..b8594352b585 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -24,10 +24,10 @@ use std::time::Instant; use crate::print_format::PrintFormat; use arrow::record_batch::RecordBatch; +use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::RecordBatchStream; -use datafusion_common::DataFusionError; use futures::StreamExt; #[derive(Debug, Clone, PartialEq, Copy)] @@ -65,7 +65,7 @@ impl Display for MaxRows { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, @@ -116,7 +116,6 @@ impl PrintOptions { query_start_time, ); - writeln!(writer, "\n")?; if !self.quiet { writeln!(writer, "{timing_info}")?; } @@ -155,7 +154,7 @@ impl PrintOptions { let timing_info = get_timing_info_str(row_count, MaxRows::Unlimited, query_start_time); - writeln!(writer, "\n")?; + if !self.quiet { writeln!(writer, "{timing_info}")?; } diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 9aa273e51758..119a0aa39d3c 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -31,19 +31,19 @@ fn init() { #[rstest] #[case::exec_from_commands( ["--command", "select 1", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n\n\n" + "[{\"Int64(1)\":1}]\n" )] #[case::exec_multiple_statements( ["--command", "select 1; select 2;", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n\n\n[{\"Int64(2)\":2}]\n\n\n" + "[{\"Int64(1)\":1}]\n[{\"Int64(2)\":2}]\n" )] #[case::exec_from_files( ["--file", "tests/data/sql.txt", "--format", "json", "-q"], - "[{\"Int64(1)\":1}]\n\n\n" + "[{\"Int64(1)\":1}]\n" )] #[case::set_batch_size( ["--command", "show datafusion.execution.batch_size", "--format", "json", "-q", "-b", "1"], - "[{\"name\":\"datafusion.execution.batch_size\",\"value\":\"1\"}]\n\n\n" + "[{\"name\":\"datafusion.execution.batch_size\",\"value\":\"1\"}]\n" )] #[test] fn cli_quick_test<'a>( From 20407ac956e408dba304d6fe373e5a4168c45003 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 28 Dec 2023 10:49:30 +0300 Subject: [PATCH 8/8] Final fix --- datafusion-cli/src/print_format.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion-cli/src/print_format.rs b/datafusion-cli/src/print_format.rs index 94345d070fdf..ea418562495d 100644 --- a/datafusion-cli/src/print_format.rs +++ b/datafusion-cli/src/print_format.rs @@ -137,15 +137,15 @@ fn format_batches_with_maxrows( if over_limit { let mut formatted_str = format!("{}", formatted); formatted_str = keep_only_maxrows(&formatted_str, maxrows); - write!(writer, "{}", formatted_str)?; + writeln!(writer, "{}", formatted_str)?; } else { - write!(writer, "{}", formatted)?; + writeln!(writer, "{}", formatted)?; } } MaxRows::Unlimited => { let formatted = pretty_format_batches_with_options(batches, &DEFAULT_FORMAT_OPTIONS)?; - write!(writer, "{}", formatted)?; + writeln!(writer, "{}", formatted)?; } } @@ -290,7 +290,7 @@ mod tests { "| 1 |", "| 2 |", "| 3 |", - "+---+", + "+---+\n", ].join("\n"); #[rustfmt::skip] @@ -302,7 +302,7 @@ mod tests { "| . |", "| . |", "| . |", - "+---+", + "+---+\n", ].join("\n"); #[rustfmt::skip] @@ -318,7 +318,7 @@ mod tests { "| . |", "| . |", "| . |", - "+---+", + "+---+\n", ].join("\n"); let no_limit = run_test(&[batch.clone()], |buffer, batches| {