Skip to content

Commit

Permalink
Merge branch 'apache_main' into feature/optimize-projections
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Apr 30, 2024
2 parents fb8bf5a + dd56837 commit 4315494
Show file tree
Hide file tree
Showing 21 changed files with 392 additions and 296 deletions.
18 changes: 9 additions & 9 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Command within CLI
use crate::exec::exec_from_lines;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
use crate::print_options::PrintOptions;
Expand Down Expand Up @@ -58,18 +58,18 @@ impl Command {
ctx: &mut SessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
let now = Instant::now();
match self {
Self::Help => print_options.print_batches(&[all_commands_info()], now),
Self::Help => {
let now = Instant::now();
let command_batch = all_commands_info();
print_options.print_batches(command_batch.schema(), &[command_batch], now)
}
Self::ListTables => {
let df = ctx.sql("SHOW TABLES").await?;
let batches = df.collect().await?;
print_options.print_batches(&batches, now)
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
}
Self::DescribeTableStmt(name) => {
let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?;
let batches = df.collect().await?;
print_options.print_batches(&batches, now)
exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {}", name))
.await
}
Self::Include(filename) => {
if let Some(filename) = filename {
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn exec_from_repl(
rl.save_history(".history")
}

async fn exec_and_print(
pub(super) async fn exec_and_print(
ctx: &mut SessionContext,
print_options: &PrintOptions,
sql: String,
Expand Down Expand Up @@ -235,8 +235,9 @@ async fn exec_and_print(
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(&results, now)?;
adjusted.into_inner().print_batches(schema, &results, now)?;
}
}

Expand Down
130 changes: 105 additions & 25 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::str::FromStr;
use crate::print_options::MaxRows;

use arrow::csv::writer::WriterBuilder;
use arrow::datatypes::SchemaRef;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches_with_options;
Expand Down Expand Up @@ -157,6 +158,7 @@ impl PrintFormat {
pub fn print_batches<W: std::io::Write>(
&self,
writer: &mut W,
schema: SchemaRef,
batches: &[RecordBatch],
maxrows: MaxRows,
with_header: bool,
Expand All @@ -168,7 +170,7 @@ impl PrintFormat {
.cloned()
.collect();
if batches.is_empty() {
return Ok(());
return self.print_empty(writer, schema);
}

match self {
Expand All @@ -186,33 +188,69 @@ impl PrintFormat {
Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
}
}

/// Print when the result batches contain no rows
fn print_empty<W: std::io::Write>(
&self,
writer: &mut W,
schema: SchemaRef,
) -> Result<()> {
match self {
// Print column headers for Table format
Self::Table if !schema.fields().is_empty() => {
let empty_batch = RecordBatch::new_empty(schema);
let formatted = pretty_format_batches_with_options(
&[empty_batch],
&DEFAULT_FORMAT_OPTIONS,
)?;
writeln!(writer, "{}", formatted)?;
}
_ => {}
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array};
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};

#[test]
fn print_empty() {
for format in [
PrintFormat::Csv,
PrintFormat::Tsv,
PrintFormat::Table,
PrintFormat::Json,
PrintFormat::NdJson,
PrintFormat::Automatic,
] {
// no output for empty batches, even with header set
PrintBatchesTest::new()
.with_format(format)
.with_schema(three_column_schema())
.with_batches(vec![])
.with_expected(&[""])
.run();
}

// output column headers for empty batches when format is Table
#[rustfmt::skip]
let expected = &[
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"+---+---+---+",
];
PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_schema(three_column_schema())
.with_batches(vec![])
.with_expected(expected)
.run();
}

#[test]
Expand Down Expand Up @@ -385,6 +423,7 @@ mod tests {
for max_rows in [MaxRows::Unlimited, MaxRows::Limited(5), MaxRows::Limited(3)] {
PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_schema(one_column_schema())
.with_batches(vec![one_column_batch()])
.with_maxrows(max_rows)
.with_expected(expected)
Expand Down Expand Up @@ -450,15 +489,15 @@ mod tests {
let empty_batch = RecordBatch::new_empty(batch.schema());

#[rustfmt::skip]
let expected =&[
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"| 3 |",
"+---+",
];
let expected =&[
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"| 3 |",
"+---+",
];

PrintBatchesTest::new()
.with_format(PrintFormat::Table)
Expand All @@ -468,14 +507,32 @@ mod tests {
}

#[test]
fn test_print_batches_empty_batches_no_header() {
fn test_print_batches_empty_batch() {
let empty_batch = RecordBatch::new_empty(one_column_batch().schema());

// empty batches should not print a header
let expected = &[""];
// Print column headers for empty batch when format is Table
#[rustfmt::skip]
let expected =&[
"+---+",
"| a |",
"+---+",
"+---+",
];

PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_schema(one_column_schema())
.with_batches(vec![empty_batch])
.with_header(WithHeader::Yes)
.with_expected(expected)
.run();

// No output for empty batch when schema contains no columns
let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
let expected = &[""];
PrintBatchesTest::new()
.with_format(PrintFormat::Table)
.with_schema(Arc::new(Schema::empty()))
.with_batches(vec![empty_batch])
.with_header(WithHeader::Yes)
.with_expected(expected)
Expand All @@ -485,6 +542,7 @@ mod tests {
#[derive(Debug)]
struct PrintBatchesTest {
format: PrintFormat,
schema: SchemaRef,
batches: Vec<RecordBatch>,
maxrows: MaxRows,
with_header: WithHeader,
Expand All @@ -504,6 +562,7 @@ mod tests {
fn new() -> Self {
Self {
format: PrintFormat::Table,
schema: Arc::new(Schema::empty()),
batches: vec![],
maxrows: MaxRows::Unlimited,
with_header: WithHeader::Ignored,
Expand All @@ -517,6 +576,12 @@ mod tests {
self
}

// set the schema
fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = schema;
self
}

/// set the batches to convert
fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
self.batches = batches;
Expand Down Expand Up @@ -573,21 +638,31 @@ mod tests {
fn output_with_header(&self, with_header: bool) -> String {
let mut buffer: Vec<u8> = vec![];
self.format
.print_batches(&mut buffer, &self.batches, self.maxrows, with_header)
.print_batches(
&mut buffer,
self.schema.clone(),
&self.batches,
self.maxrows,
with_header,
)
.unwrap();
String::from_utf8(buffer).unwrap()
}
}

/// Return a batch with three columns and three rows
fn three_column_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
/// Return a schema with three columns
fn three_column_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
]))
}

/// Return a batch with three columns and three rows
fn three_column_batch() -> RecordBatch {
RecordBatch::try_new(
schema,
three_column_schema(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Expand All @@ -597,12 +672,17 @@ mod tests {
.unwrap()
}

/// Return a schema with one column
fn one_column_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
}

/// return a batch with one column and three rows
fn one_column_batch() -> RecordBatch {
RecordBatch::try_from_iter(vec![(
"a",
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
)])
RecordBatch::try_new(
one_column_schema(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap()
}

Expand Down
7 changes: 5 additions & 2 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::common::instant::Instant;
use std::fmt::{Display, Formatter};
use std::io::Write;
use std::pin::Pin;
use std::str::FromStr;

use crate::print_format::PrintFormat;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::instant::Instant;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;
Expand Down Expand Up @@ -98,14 +99,15 @@ impl PrintOptions {
/// Print the batches to stdout using the specified format
pub fn print_batches(
&self,
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();

self.format
.print_batches(&mut writer, batches, self.maxrows, true)?;
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;

let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let formatted_exec_details = get_execution_details_formatted(
Expand Down Expand Up @@ -148,6 +150,7 @@ impl PrintOptions {
row_count += batch.num_rows();
self.format.print_batches(
&mut writer,
batch.schema(),
&[batch],
MaxRows::Unlimited,
with_header,
Expand Down
16 changes: 9 additions & 7 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,9 +810,16 @@ impl From<&DFSchema> for Schema {
impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: Schema) -> Result<Self, Self::Error> {
Self::try_from(Arc::new(schema))
}
}

impl TryFrom<SchemaRef> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
let field_count = schema.fields.len();
let dfschema = Self {
inner: schema.into(),
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
Expand Down Expand Up @@ -856,12 +863,7 @@ impl ToDFSchema for Schema {

impl ToDFSchema for SchemaRef {
fn to_dfschema(self) -> Result<DFSchema> {
// Attempt to use the Schema directly if there are no other
// references, otherwise clone
match Self::try_unwrap(self) {
Ok(schema) => DFSchema::try_from(schema),
Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()),
}
DFSchema::try_from(self)
}
}

Expand Down
Loading

0 comments on commit 4315494

Please sign in to comment.