Skip to content

Commit

Permalink
Review Part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak committed Feb 21, 2024
1 parent 8d8cb8b commit c8cece8
Show file tree
Hide file tree
Showing 58 changed files with 672 additions and 926 deletions.
15 changes: 7 additions & 8 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ use crate::{
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::dml::CopyTo;
use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream};
use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -231,7 +231,7 @@ async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.unbounded_output().is_unbounded() {
if physical_plan.execution_mode().is_unbounded() {
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
Expand Down Expand Up @@ -305,10 +305,9 @@ mod tests {
use std::str::FromStr;

use super::*;
use datafusion::common::plan_err;
use datafusion_common::{
file_options::StatementOptions, FileType, FileTypeWriterOptions,
};

use datafusion::common::{plan_err, FileType, FileTypeWriterOptions};
use datafusion_common::file_options::StatementOptions;

async fn create_external_table_test(location: &str, sql: &str) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
16 changes: 5 additions & 11 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
PlanPropertiesCache, SendableRecordBatchStream,
Partitioning, PlanPropertiesCache, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -209,17 +209,11 @@ impl CustomExec {
}

fn with_cache(mut self) -> Self {
let mut new_cache = self.cache;
self.cache = self
.cache
.with_partitioning(Partitioning::UnknownPartitioning(1))
.with_exec_mode(ExecutionMode::Bounded);

// Output Partitioning
let output_partitioning =
datafusion::physical_plan::Partitioning::UnknownPartitioning(1);
new_cache = new_cache.with_partitioning(output_partitioning);

// Execution Mode
new_cache = new_cache.with_exec_mode(ExecutionMode::Bounded);

self.cache = new_cache;
self
}
}
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::FunctionRegistry;
use crate::logical_expr::utils::find_window_exprs;
use crate::logical_expr::{
col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType,
Expand All @@ -40,6 +38,7 @@ use crate::physical_plan::{
collect, collect_partitioned, execute_stream, execute_stream_partitioned,
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::SessionContext;

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
Expand All @@ -58,7 +57,6 @@ use datafusion_expr::{
TableProviderFilterPushDown, UNNAMED_TABLE,
};

use crate::prelude::SessionContext;
use async_trait::async_trait;

/// Contains options that control how data is
Expand Down
16 changes: 6 additions & 10 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use super::FileGroupPartitioner;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
Expand All @@ -42,8 +43,6 @@ use futures::StreamExt;
use itertools::Itertools;
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

use super::FileGroupPartitioner;

/// Execution plan for scanning Arrow data source
#[derive(Debug, Clone)]
#[allow(dead_code)]
Expand Down Expand Up @@ -89,14 +88,11 @@ impl ArrowExec {
&self.projected_output_ordering,
);

// Output Partitioning
let output_partitioning = self.output_partitioning_helper();

// Execution Mode
let exec_mode = ExecutionMode::Bounded;

self.cache =
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode);
self.cache = PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}

Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanPropertiesCache, SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -73,16 +72,13 @@ impl AvroExec {
self.schema(),
&self.projected_output_ordering,
);
let n_partitions = self.base_config.file_groups.len();

// Output Partitioning
let output_partitioning =
Partitioning::UnknownPartitioning(self.base_config.file_groups.len());

// Execution Mode
let exec_mode = ExecutionMode::Bounded;

self.cache =
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode);
self.cache = PlanPropertiesCache::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}
}
Expand Down Expand Up @@ -168,6 +164,7 @@ mod private {
use crate::datasource::avro_to_arrow::Reader as AvroReader;
use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
use crate::datasource::physical_plan::FileMeta;

use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};
Expand Down
20 changes: 8 additions & 12 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanPropertiesCache, SendableRecordBatchStream, Statistics,
};

use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache};

use bytes::{Buf, Bytes};
use datafusion_common::config::ConfigOptions;
use futures::{ready, StreamExt, TryStreamExt};
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -128,14 +127,11 @@ impl CsvExec {
&self.projected_output_ordering,
);

// Output Partitioning
let output_partitioning = self.output_partitioning_helper();

// Execution Mode
let exec_mode = ExecutionMode::Bounded;

self.cache =
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode);
self.cache = PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use std::task::{Context, Poll};
use std::time::Instant;

use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector;
use crate::datasource::physical_plan::{FileMeta, FileScanConfig};
use crate::error::Result;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
Expand Down
43 changes: 20 additions & 23 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanPropertiesCache, SendableRecordBatchStream, Statistics,
};

use arrow::json::ReaderBuilder;
Expand All @@ -42,10 +42,8 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache};
use futures::{ready, StreamExt, TryStreamExt};
use object_store::{self, GetOptions};
use object_store::{GetResultPayload, ObjectStore};
use object_store::{self, GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;

Expand Down Expand Up @@ -97,14 +95,11 @@ impl NdJsonExec {
&self.projected_output_ordering,
);

// Output Partitioning
let output_partitioning = self.output_partitioning_helper();

// Execution Mode
let exec_mode = ExecutionMode::Bounded;

self.cache =
PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode);
self.cache = PlanPropertiesCache::new(
eq_properties,
self.output_partitioning_helper(), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
);
self
}

Expand Down Expand Up @@ -378,32 +373,34 @@ pub async fn plan_to_json(

#[cfg(test)]
mod tests {
use arrow::array::Array;
use arrow::datatypes::{Field, SchemaBuilder};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use std::fs;
use std::path::Path;

use super::*;
use crate::assert_batches_eq;
use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::file_compression_type::FileTypeExt;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use crate::prelude::{
CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext,
};
use crate::test::partitioned_file_groups;

use arrow::array::Array;
use arrow::datatypes::{Field, SchemaBuilder};
use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array};
use datafusion_common::FileType;

use futures::StreamExt;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use rstest::*;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
use url::Url;

use super::*;

const TEST_DATA_BASE: &str = "tests/data";

async fn prepare_store(
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
pub use file_groups::FileGroupPartitioner;
use futures::StreamExt;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
Expand All @@ -37,7 +36,6 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
use file_scan_config::PartitionColumnProjector;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
};
Expand Down Expand Up @@ -72,9 +70,9 @@ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;

use futures::StreamExt;
use log::debug;
use object_store::ObjectMeta;
use object_store::{path::Path, GetOptions, GetRange, ObjectStore};
use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore};

/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
Expand Down Expand Up @@ -589,18 +587,18 @@ async fn find_first_newline(

#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::{DefaultDisplay, VerboseDisplay};

use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
use arrow_array::{
BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, StringArray,
UInt64Array,
};
use arrow_schema::Field;
use chrono::Utc;

use crate::physical_plan::{DefaultDisplay, VerboseDisplay};

use super::*;
use chrono::Utc;

#[test]
fn schema_mapping_map_batch() {
Expand Down
Loading

0 comments on commit c8cece8

Please sign in to comment.