Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming CLI support #8651

Merged
merged 8 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow-array = { version = "49.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "49.0.0", default-features = false }
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] }
arrow-ord = { version = "49.0.0", default-features = false }
arrow-schema = { version = "49.0.0", default-features = false }
async-trait = "0.1.73"
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = "34.0.0", features = ["avr
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
Expand Down
60 changes: 34 additions & 26 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

//! Execution functions

use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};

use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
Expand All @@ -26,21 +31,19 @@ use crate::{
},
print_options::{MaxRows, PrintOptions},
};
use datafusion::common::plan_datafusion_err;

use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
datasource::listing::ListingTableUrl,
error::{DataFusionError, Result},
logical_expr::{CreateExternalTable, DdlStatement},
};
use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};

use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};
use url::Url;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -228,17 +231,24 @@ async fn exec_and_print(
create_external_table(ctx, cmd).await?;
}
let df = ctx.execute_logical_plan(plan).await?;
let results = df.collect().await?;
let physical_plan = df.clone().create_physical_plan().await?;

let print_options = if should_ignore_maxrows {
PrintOptions {
maxrows: MaxRows::Unlimited,
..print_options.clone()
}
if is_plan_streaming(&physical_plan)? {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
print_options.clone()
};
print_options.print_batches(&results, now)?;
let print_options = if should_ignore_maxrows {
PrintOptions {
maxrows: MaxRows::Unlimited,
..print_options.clone()
}
} else {
print_options.clone()
};

let results = collect(physical_plan, task_ctx.clone()).await?;
print_options.print_batches(&results, now)?;
}
}

Ok(())
Expand Down Expand Up @@ -272,10 +282,7 @@ async fn create_external_table(
.object_store_registry
.get_store(url)
.map_err(|_| {
DataFusionError::Execution(format!(
"Unsupported object store scheme: {}",
scheme
))
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
})?
}
};
Expand All @@ -290,8 +297,9 @@ mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{file_options::StatementOptions, FileTypeWriterOptions};

use datafusion_common::file_options::StatementOptions;
use datafusion_common::{plan_err, FileTypeWriterOptions};

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
20 changes: 11 additions & 9 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use clap::Parser;
use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
Expand All @@ -29,12 +34,9 @@ use datafusion_cli::{
print_options::{MaxRows, PrintOptions},
DATAFUSION_CLI_VERSION,
};

use clap::Parser;
use mimalloc::MiMalloc;
use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -111,7 +113,7 @@ struct Args {
)]
rc: Option<Vec<String>>,

#[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
#[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
format: PrintFormat,

#[clap(
Expand Down Expand Up @@ -331,10 +333,10 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use datafusion::assert_batches_eq;

use super::*;

use datafusion::assert_batches_eq;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
match expected {
Expand Down
75 changes: 67 additions & 8 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
// under the License.

//! Print format variants

use std::str::FromStr;

use crate::print_options::MaxRows;

use arrow::csv::writer::WriterBuilder;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches_with_options;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::format::DEFAULT_FORMAT_OPTIONS;
use datafusion::error::{DataFusionError, Result};
use std::str::FromStr;

/// Allow records to be printed in different formats
#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
Expand All @@ -33,6 +36,7 @@ pub enum PrintFormat {
Table,
Json,
NdJson,
Automatic,
}

impl FromStr for PrintFormat {
Expand All @@ -55,6 +59,18 @@ macro_rules! batches_to_json {
}};
}

macro_rules! stream_to_json {
($WRITER: ident, $batch: expr) => {{
let mut bytes = vec![];
{
let mut writer = $WRITER::new(&mut bytes);
writer.write($batch)?;
writer.finish()?;
}
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?
}};
}

fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<String> {
let mut bytes = vec![];
{
Expand All @@ -66,9 +82,26 @@ fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result<Stri
writer.write(batch)?;
}
}
let formatted =
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(formatted)
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))
}

fn print_stream_with_sep(
batch: &RecordBatch,
delimiter: u8,
with_header: bool,
) -> Result<String> {
let mut bytes = vec![];
{
let builder = WriterBuilder::new()
.with_header(with_header)
.with_delimiter(delimiter);
let mut writer = builder.build(&mut bytes);
writer.write(batch)?;
}
if bytes.last() == Some(&b'\n') {
bytes.pop();
}
String::from_utf8(bytes).map_err(|e| DataFusionError::External(Box::new(e)))
}

fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
Expand All @@ -84,7 +117,6 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and `maxrows` lines
result.extend(vec![dotted_line; 3]); // Append ... lines
result.push(last_line.clone());

result.join("\n")
}

Expand Down Expand Up @@ -146,7 +178,7 @@ impl PrintFormat {
match self {
Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?),
Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?),
Self::Table => {
Self::Table | Self::Automatic => {
if maxrows == MaxRows::Limited(0) {
return Ok(());
}
Expand All @@ -159,14 +191,41 @@ impl PrintFormat {
}
Ok(())
}

pub fn print_stream(&self, batch: &RecordBatch, with_header: bool) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is almost a copy/paste of print_batch -- as you say in your comments, the only difference is i Self::Table. I think it would be possible to reduce replication by changing all this code to be in terms of the output stream rather than creating strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, I think you could write the functions in terms of std::io::Write and then pass in https://doc.rust-lang.org/std/io/fn.stdout.html to avoid having to both copy the code and the code could avoid copying strings

if batch.num_rows() == 0 {
return Ok(());
}

match self {
Self::Csv | Self::Automatic => {
println!("{}", print_stream_with_sep(batch, b',', with_header)?)
}
Self::Tsv => {
println!("{}", print_stream_with_sep(batch, b'\t', with_header)?)
}
Self::Table => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought here is that when printing streams, it could just print out a new Table for each batch (rather than buffering all batches).

return Err(DataFusionError::External(
"PrintFormat::Table is not implemented".to_string().into(),
))
}
Self::Json => println!("{}", stream_to_json!(ArrayWriter, batch)),
Self::NdJson => {
println!("{}", stream_to_json!(LineDelimitedWriter, batch))
}
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;

use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

#[test]
fn test_print_batches_with_sep() {
Expand Down
36 changes: 33 additions & 3 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::print_format::PrintFormat;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use std::fmt::{Display, Formatter};
use std::pin::Pin;
use std::str::FromStr;
use std::time::Instant;

use crate::print_format::PrintFormat;

use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;

use futures::StreamExt;

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
/// show all rows in the output
Expand Down Expand Up @@ -103,4 +109,28 @@ impl PrintOptions {

Ok(())
}

#[allow(clippy::println_empty_string)]
pub async fn print_stream(
&self,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
) -> Result<()> {
let mut row_count = 0_usize;
let mut with_header = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any way to test this behavior (specifically that the header is only printed out for the first batch) so that it isn't broken in subsequent refactorings


while let Some(Ok(batch)) = stream.next().await {
row_count += batch.num_rows();
self.format.print_stream(&batch, with_header)?;
with_header = false;
}
println!("");

let timing_info = get_timing_info_str(row_count, self.maxrows, query_start_time);
if !self.quiet {
println!("{timing_info}");
}

Ok(())
}
}
15 changes: 15 additions & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use arrow::{
use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlan;

use log::debug;
use object_store::path::Path;
Expand Down Expand Up @@ -507,6 +508,20 @@ fn get_projected_output_ordering(
all_orderings
}

/// Get output (un)boundedness information for the given `plan`.
pub fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
if plan.children().is_empty() {
plan.unbounded_output(&[])
} else {
let children_unbounded_output = plan
.children()
.iter()
.map(is_plan_streaming)
.collect::<Result<Vec<_>>>();
plan.unbounded_output(&children_unbounded_output?)
}
}

#[cfg(test)]
mod tests {
use arrow_array::cast::AsArray;
Expand Down