Skip to content

Commit

Permalink
feat: support setting time range in Copy From statement (#4405)
Browse files Browse the repository at this point in the history
* feat: support setting time range in Copy From statement

* test: add batch_filter_test

* fix: ts data type inconsistent error

* test: add sqlness test for copy from with statement

* fix: sqlness result error

* fix: cr comments
  • Loading branch information
poltao authored Jul 29, 2024
1 parent 53fc14a commit 1138f32
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/common/query/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion_common::Column;
use datafusion_expr::col;
use datatypes::prelude::ConcreteDataType;
pub use expr::build_filter_from_timestamp;
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};

pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::udaf::AggregateFunction;
Expand Down
29 changes: 29 additions & 0 deletions src/common/query/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,35 @@ use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::Expr;
use datafusion_expr::{and, binary_expr, Operator};
use datatypes::data_type::DataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;

/// Builds a filter for a timestamp column with the same type as the timestamp column.
/// Returns [None] if time range is [None] or full time range.
pub fn build_same_type_ts_filter(
ts_schema: &ColumnSchema,
time_range: Option<TimestampRange>,
) -> Option<Expr> {
let ts_type = ts_schema.data_type.clone();
let time_range = time_range?;
let start = time_range
.start()
.and_then(|start| ts_type.try_cast(Value::Timestamp(start)));
let end = time_range
.end()
.and_then(|end| ts_type.try_cast(Value::Timestamp(end)));

let time_range = match (start, end) {
(Some(Value::Timestamp(start)), Some(Value::Timestamp(end))) => {
TimestampRange::new(start, end)
}
(Some(Value::Timestamp(start)), None) => Some(TimestampRange::from_start(start)),
(None, Some(Value::Timestamp(end))) => Some(TimestampRange::until_end(end, false)),
_ => return None,
};
build_filter_from_timestamp(&ts_schema.name, time_range.as_ref())
}

/// Builds an `Expr` that filters timestamp column from given timestamp range.
/// Returns [None] if time range is [None] or full time range.
Expand Down
37 changes: 35 additions & 2 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,25 @@ use std::task::{Context, Poll};
use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{
accept, displayable, ExecutionPlan, ExecutionPlanVisitor,
accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::arrow::error::ArrowError;
use datafusion_common::DataFusionError;
use datafusion_common::{DataFusionError, ToDFSchema};
use datatypes::arrow::array::Array;
use datatypes::schema::{Schema, SchemaRef};
use futures::ready;
use pin_project::pin_project;
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::filter::batch_filter;
use crate::{
DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
SendableRecordBatchStream, Stream,
Expand All @@ -50,6 +56,7 @@ pub struct RecordBatchStreamTypeAdapter<T, E> {
stream: T,
projected_schema: DfSchemaRef,
projection: Vec<usize>,
predicate: Option<Arc<dyn PhysicalExpr>>,
phantom: PhantomData<E>,
}

Expand All @@ -69,9 +76,28 @@ where
stream,
projected_schema,
projection,
predicate: None,
phantom: Default::default(),
}
}

pub fn with_filter(mut self, filters: Vec<Expr>) -> Result<Self> {
let filters = if let Some(expr) = conjunction(filters) {
let df_schema = self
.projected_schema
.clone()
.to_dfschema_ref()
.context(error::PhysicalExprSnafu)?;

let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())
.context(error::PhysicalExprSnafu)?;
Some(filters)
} else {
None
};
self.predicate = filters;
Ok(self)
}
}

impl<T, E> DfRecordBatchStream for RecordBatchStreamTypeAdapter<T, E>
Expand Down Expand Up @@ -99,6 +125,8 @@ where

let projected_schema = this.projected_schema.clone();
let projection = this.projection.clone();
let predicate = this.predicate.clone();

let batch = batch.map(|b| {
b.and_then(|b| {
let projected_column = b.project(&projection)?;
Expand All @@ -121,6 +149,11 @@ where
}
}
let record_batch = DfRecordBatch::try_new(projected_schema, columns)?;
let record_batch = if let Some(predicate) = predicate {
batch_filter(&record_batch, &predicate)?
} else {
record_batch
};
Ok(record_batch)
})
});
Expand Down
11 changes: 10 additions & 1 deletion src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Create physical expr error"))]
PhysicalExpr {
#[snafu(source)]
error: datafusion::error::DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Fail to format record batch"))]
Format {
#[snafu(source)]
Expand Down Expand Up @@ -167,7 +175,8 @@ impl ErrorExt for Error {
| Error::PollStream { .. }
| Error::Format { .. }
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal,
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. } => StatusCode::Internal,

Error::ArrowCompute { .. } => StatusCode::IllegalState,

Expand Down
74 changes: 71 additions & 3 deletions src/common/recordbatch/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@

//! Util record batch stream wrapper that can perform precise filter.
use std::sync::Arc;

use datafusion::error::Result as DfResult;
use datafusion::logical_expr::{Expr, Literal, Operator};
use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
use datafusion_common::ScalarValue;
use datafusion_common::cast::{as_boolean_array, as_null_array};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
use datatypes::arrow::compute::filter_record_batch;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;

Expand Down Expand Up @@ -144,13 +151,43 @@ impl SimpleFilterEvaluator {
}
}

/// Evaluate the predicate on the input [RecordBatch], and return a new [RecordBatch].
/// Copy from datafusion::physical_plan::src::filter.rs
pub fn batch_filter(
batch: &RecordBatch,
predicate: &Arc<dyn PhysicalExpr>,
) -> DfResult<RecordBatch> {
predicate
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
let filter_array = match as_boolean_array(&array) {
Ok(boolean_array) => Ok(boolean_array.clone()),
Err(_) => {
let Ok(null_array) = as_null_array(&array) else {
return internal_err!(
"Cannot create filter_array from non-boolean predicates"
);
};

// if the predicate is null, then the result is also null
Ok::<BooleanArray, DataFusionError>(BooleanArray::new_null(null_array.len()))
}
}?;
Ok(filter_record_batch(batch, &filter_array)?)
})
}

#[cfg(test)]
mod test {

use std::sync::Arc;

use datafusion::logical_expr::BinaryExpr;
use datafusion_common::Column;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{col, lit, BinaryExpr};
use datafusion::physical_expr::create_physical_expr;
use datafusion_common::{Column, DFSchema};
use datatypes::arrow::datatypes::{DataType, Field, Schema};

use super::*;

Expand Down Expand Up @@ -281,4 +318,35 @@ mod test {
let result = evaluator.evaluate_scalar(&input_3).unwrap();
assert!(!result);
}

#[test]
fn batch_filter_test() {
let expr = col("ts").gt(lit(123456u64));
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("ts", DataType::UInt64, false),
]);
let df_schema = DFSchema::try_from(schema.clone()).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(datatypes::arrow::array::Int32Array::from(vec![4, 5, 6])),
Arc::new(datatypes::arrow::array::UInt64Array::from(vec![
123456, 123457, 123458,
])),
],
)
.unwrap();
let new_batch = batch_filter(&batch, &physical_expr).unwrap();
assert_eq!(new_batch.num_rows(), 2);
let first_column_values = new_batch
.column(0)
.as_any()
.downcast_ref::<datatypes::arrow::array::Int32Array>()
.unwrap();
let expected = datatypes::arrow::array::Int32Array::from(vec![5, 6]);
assert_eq!(first_column_values, &expected);
}
}
10 changes: 9 additions & 1 deletion src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Create physical expr error"))]
PhysicalExpr {
#[snafu(source)]
error: common_recordbatch::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -788,7 +795,8 @@ impl ErrorExt for Error {
| Error::ViewColumnsMismatch { .. }
| Error::InvalidViewStmt { .. }
| Error::ConvertIdentifier { .. }
| Error::InvalidPartition { .. } => StatusCode::InvalidArguments,
| Error::InvalidPartition { .. }
| Error::PhysicalExpr { .. } => StatusCode::InvalidArguments,

Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => {
StatusCode::TableAlreadyExists
Expand Down
51 changes: 31 additions & 20 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_common::Statistics;
use datafusion_expr::Expr;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
Expand Down Expand Up @@ -225,6 +226,7 @@ impl StatementExecutor {
object_store: &ObjectStore,
file_metadata: &FileMetadata,
projection: Vec<usize>,
filters: Vec<Expr>,
) -> Result<DfSendableRecordBatchStream> {
match file_metadata {
FileMetadata::Csv {
Expand Down Expand Up @@ -252,11 +254,11 @@ impl StatementExecutor {
)
.await?;

Ok(Box::pin(RecordBatchStreamTypeAdapter::new(
projected_schema,
stream,
Some(projection),
)))
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
}
FileMetadata::Json {
format,
Expand Down Expand Up @@ -286,11 +288,11 @@ impl StatementExecutor {
)
.await?;

Ok(Box::pin(RecordBatchStreamTypeAdapter::new(
projected_schema,
stream,
Some(projection),
)))
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
}
FileMetadata::Parquet { metadata, path, .. } => {
let meta = object_store
Expand All @@ -317,11 +319,11 @@ impl StatementExecutor {
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
Ok(Box::pin(RecordBatchStreamTypeAdapter::new(
projected_schema,
stream,
Some(projection),
)))
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
}
FileMetadata::Orc { path, .. } => {
let meta = object_store
Expand All @@ -345,11 +347,11 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);

Ok(Box::pin(RecordBatchStreamTypeAdapter::new(
projected_schema,
stream,
Some(projection),
)))
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
}
}
}
Expand All @@ -370,6 +372,14 @@ impl StatementExecutor {
let (object_store, entries) = self.list_copy_from_entries(&req).await?;
let mut files = Vec::with_capacity(entries.len());
let table_schema = table.schema().arrow_schema().clone();
let filters = table
.schema()
.timestamp_column()
.and_then(|c| {
common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
})
.into_iter()
.collect::<Vec<_>>();

for entry in entries.iter() {
if entry.metadata().mode() != EntryMode::FILE {
Expand Down Expand Up @@ -414,6 +424,7 @@ impl StatementExecutor {
&object_store,
&file_metadata,
file_schema_projection,
filters.clone(),
)
.await?;

Expand Down
Loading

0 comments on commit 1138f32

Please sign in to comment.