Skip to content

Commit

Permalink
Merge branch 'main' into wiedld/refactor-sort-pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Feb 26, 2025
2 parents 238cb46 + f51cd6e commit dffeaac
Show file tree
Hide file tree
Showing 149 changed files with 8,691 additions and 4,933 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,18 @@ url = "2.5.4"
[profile.release]
codegen-units = 1
lto = true
strip = true

# the release profile takes a long time to build so we can use this profile during development to save time
# cargo build --profile release-nonlto
[profile.release-nonlto]
codegen-units = 16
debug = false
debug-assertions = false
incremental = false
inherits = "release"
lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false

[profile.ci]
Expand Down
73 changes: 37 additions & 36 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
Expand All @@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use datafusion::sql::sqlparser::dialect::dialect_from_str;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::sync::Arc;
use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -222,26 +218,25 @@ pub(super) async fn exec_and_print(
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
)
})?;

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
let is_unbounded = physical_plan.boundedness().is_unbounded();
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;

if physical_plan.boundedness().is_unbounded() {
// Both bounded and unbounded streams are streaming prints
if is_unbounded {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
Expand All @@ -250,37 +245,43 @@ pub(super) async fn exec_and_print(
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
print_options
.print_stream(MaxRows::Unlimited, stream, now)
.await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
row_count += curr_num_rows;
let stdout = std::io::stdout();
let mut writer = stdout.lock();

// If we don't want to print the table, we should use the streaming print same as above
if print_options.format != PrintFormat::Table
&& print_options.format != PrintFormat::Automatic
{
print_options
.print_stream(print_options.maxrows, stream, now)
.await?;
continue;
}

// into_inner will finalize the print options to table if it's automatic
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
reservation.free();
.print_table_batch(
print_options,
schema,
&mut stream,
max_rows,
&mut writer,
now,
)
.await?;
}
}

Ok(())
}

Expand Down Expand Up @@ -520,7 +521,7 @@ mod tests {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks."
)
})?;
for location in locations {
Expand Down
Loading

0 comments on commit dffeaac

Please sign in to comment.