Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: scalar regex match physical expr #12270

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d357df7
feat: scalar regex match physical expr
zhuliquan Aug 31, 2024
02f58b2
bench: add scalar regex match benchmarks
zhuliquan Sep 17, 2024
2dcb317
feat: apply scalar_regex_match optimize to similar_to case
zhuliquan Sep 18, 2024
3a33a71
minor: regen datafusion protobuf
Sep 18, 2024
930b7a8
bench: improve scalar_regex_match
zhuliquan Nov 9, 2024
5f2b555
minor: update cargo.lock
zhuliquan Nov 9, 2024
f309341
fix: fix wrong merge conflict
zhuliquan Dec 3, 2024
a0ba73a
bench: init expr in scalar_regex_match bench iter
zhuliquan Dec 6, 2024
25d9f93
Merge branch 'apache:main' into feature-scalar_regexp_match_expr
zhuliquan Dec 6, 2024
7014ac3
bench: diff batch run over in scalar_regex_match
zhuliquan Dec 7, 2024
ef66c56
improve: improve performance of scalar_regex_match
zhuliquan Dec 8, 2024
13adab5
Minor: Comment temporary function for documentation migration (#13669)
comphead Dec 6, 2024
7cfaf1e
Minor: Rephrase MSRV policy to be more explanatory (#13668)
comphead Dec 6, 2024
98b7488
fix: repartitioned reads of CSV with custom line terminator (#13677)
korowa Dec 7, 2024
14dcf20
chore: macros crate cleanup (#13685)
findepi Dec 7, 2024
f6cafba
Refactor regexplike signature (#13394)
jiashenC Dec 8, 2024
cebf94f
Performance: enable array allocation reuse (`ScalarFunctionArgs` gets…
alamb Dec 8, 2024
6dd3f3a
Temporary fix for CI (#13689)
jonahgao Dec 8, 2024
de36fb6
refactor: use `LazyLock` in the `user_doc` macro (#13684)
jonahgao Dec 8, 2024
7728525
Unlock lexical-write-integer version. (#13693)
Alexhuszagh Dec 9, 2024
4884ac2
Minor: Use `div_ceil`
akurmustafa Dec 9, 2024
021a500
Fix hash join with sort push down (#13560)
haohuaijin Dec 9, 2024
ec5e038
Improve substr() performance by avoiding using owned string (#13688)
richox Dec 9, 2024
412d3f6
reinstate down_cast_any_ref (#13705)
andygrove Dec 9, 2024
dc17dd6
Optimize performance of `character_length` function (#13696)
tlm365 Dec 10, 2024
9b57875
Update prost-build requirement from =0.13.3 to =0.13.4 (#13698)
dependabot[bot] Dec 10, 2024
4a08545
Minor: Output elapsed time for sql logic test (#13718)
comphead Dec 10, 2024
d02d587
refactor: simplify the `make_udf_function` macro (#13712)
jonahgao Dec 11, 2024
0e41341
refactor: replace `Vec` with `IndexMap` for expression mappings in `P…
Weijun-H Dec 11, 2024
a8fc264
Handle alias when parsing sql(parse_sql_expr) (#12939)
Eason0729 Dec 11, 2024
a505610
Improve documentation for TableProvider (#13724)
alamb Dec 11, 2024
4fb668b
Reveal implementing type and return type in simple UDF implementation…
findepi Dec 11, 2024
1ab089e
minor: Extract tests for `EXTRACT` AND `date_part` to their own file …
alamb Dec 11, 2024
2b65fb3
Support unparsing `UNNEST` plan to `UNNEST` table factor SQL (#13660)
goldmedal Dec 11, 2024
79cb7d6
feat: new way to make bool_buffer in scalar_regex_match
zhuliquan Dec 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,17 @@ Optional features:

## Rust Version Compatibility Policy

DataFusion's Minimum Required Stable Rust Version (MSRV) policy is to support stable [4 latest
Rust versions](https://releases.rs) OR the stable minor Rust version as of 4 months, whichever is lower.
The Rust toolchain releases are tracked at [Rust Versions](https://releases.rs) and follow
[semantic versioning](https://semver.org/). A Rust toolchain release can be identified
by a version string like `1.80.0`, or more generally `major.minor.patch`.

DataFusion's supports the last 4 stable Rust minor versions released and any such versions released within the last 4 months.

For example, given the releases `1.78.0`, `1.79.0`, `1.80.0`, `1.80.1` and `1.81.0` DataFusion will support 1.78.0, which is 3 minor versions prior to the most minor recent `1.81`.

If a hotfix is released for the minimum supported Rust version (MSRV), the MSRV will be the minor version with all hotfixes, even if it surpasses the four-month window.
Note: If a Rust hotfix is released for the current MSRV, the MSRV will be updated to the specific minor version that includes all applicable hotfixes preceding other policies.

We enforce this policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)
DataFusion enforces MSRV policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)

## DataFusion API evolution policy

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

126 changes: 100 additions & 26 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion::prelude::*;
use datafusion_common::{internal_err, ScalarValue};
use datafusion_common::{exec_err, internal_err, ScalarValue};
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
};

/// This example shows how to use the full ScalarUDFImpl API to implement a user
/// defined function. As in the `simple_udf.rs` example, this struct implements
Expand Down Expand Up @@ -83,23 +85,27 @@ impl ScalarUDFImpl for PowUdf {
Ok(DataType::Float64)
}

/// This is the function that actually calculates the results.
/// This function actually calculates the results of the scalar function.
///
/// This is the same way that functions provided with DataFusion are invoked,
/// which permits important special cases:
///
/// This is the same way that functions built into DataFusion are invoked,
/// which permits important special cases when one or both of the arguments
/// are single values (constants). For example `pow(a, 2)`
///1. When one or both of the arguments are single values (constants).
/// For example `pow(a, 2)`
/// 2. When the input arrays can be reused (avoid allocating a new output array)
///
/// However, it also means the implementation is more complex than when
/// using `create_udf`.
fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
// The other fields of the `args` struct are used for more specialized
// uses, and are not needed in this example
let ScalarFunctionArgs { mut args, .. } = args;
// DataFusion has arranged for the correct inputs to be passed to this
// function, but we check again to make sure
assert_eq!(args.len(), 2);
let (base, exp) = (&args[0], &args[1]);
// take ownership of arguments by popping in reverse order
let exp = args.pop().unwrap();
let base = args.pop().unwrap();
assert_eq!(base.data_type(), DataType::Float64);
assert_eq!(exp.data_type(), DataType::Float64);

Expand All @@ -118,7 +124,7 @@ impl ScalarUDFImpl for PowUdf {
) => {
// compute the output. Note DataFusion treats `None` as NULL.
let res = match (base, exp) {
(Some(base), Some(exp)) => Some(base.powf(*exp)),
(Some(base), Some(exp)) => Some(base.powf(exp)),
// one or both arguments were NULL
_ => None,
};
Expand All @@ -140,31 +146,33 @@ impl ScalarUDFImpl for PowUdf {
// kernel creates very fast "vectorized" code and
// handles things like null values for us.
let res: Float64Array =
compute::unary(base_array, |base| base.powf(*exp));
compute::unary(base_array, |base| base.powf(exp));
Arc::new(res)
}
};
Ok(ColumnarValue::Array(result_array))
}

// special case if the base is a constant (note this code is quite
// similar to the previous case, so we omit comments)
// special case if the base is a constant.
//
// Note this case is very similar to the previous case, so we could
// use the same pattern. However, for this case we demonstrate an
// even more advanced pattern to potentially avoid allocating a new array
(
ColumnarValue::Scalar(ScalarValue::Float64(base)),
ColumnarValue::Array(exp_array),
) => {
let res = match base {
None => new_null_array(exp_array.data_type(), exp_array.len()),
Some(base) => {
let exp_array = exp_array.as_primitive::<Float64Type>();
let res: Float64Array =
compute::unary(exp_array, |exp| base.powf(exp));
Arc::new(res)
}
Some(base) => maybe_pow_in_place(base, exp_array)?,
};
Ok(ColumnarValue::Array(res))
}
// Both arguments are arrays so we have to perform the calculation for every row
// Both arguments are arrays so we have to perform the calculation
// for every row
//
// Note this could also be done in place using `binary_mut` as
// is done in `maybe_pow_in_place` but here we use binary for simplicity
(ColumnarValue::Array(base_array), ColumnarValue::Array(exp_array)) => {
let res: Float64Array = compute::binary(
base_array.as_primitive::<Float64Type>(),
Expand All @@ -191,6 +199,52 @@ impl ScalarUDFImpl for PowUdf {
}
}

/// Evaluate `base ^ exp` *without* allocating a new array, if possible
fn maybe_pow_in_place(base: f64, exp_array: ArrayRef) -> Result<ArrayRef> {
// Calling `unary` creates a new array for the results. Avoiding
// allocations is a common optimization in performance critical code.
// arrow-rs allows this optimization via the `unary_mut`
// and `binary_mut` kernels in certain cases
//
// These kernels can only be used if there are no other references to
// the arrays (exp_array has to be the last remaining reference).
let owned_array = exp_array
// as in the previous example, we first downcast to &Float64Array
.as_primitive::<Float64Type>()
// non-obviously, we call clone here to get an owned `Float64Array`.
// Calling clone() is relatively inexpensive as it increments
// some ref counts but doesn't clone the data)
//
// Once we have the owned Float64Array we can drop the original
// exp_array (untyped) reference
.clone();

// We *MUST* drop the reference to `exp_array` explicitly so that
// owned_array is the only reference remaining in this function.
//
// Note that depending on the query there may still be other references
// to the underlying buffers, which would prevent reuse. The only way to
// know for sure is the result of `compute::unary_mut`
drop(exp_array);

// If we have the only reference, compute the result directly into the same
// allocation as was used for the input array
match compute::unary_mut(owned_array, |exp| base.powf(exp)) {
Err(_orig_array) => {
// unary_mut will return the original array if there are other
// references into the underling buffer (and thus reuse is
// impossible)
//
// In a real implementation, this case should fall back to
// calling `unary` and allocate a new array; In this example
// we will return an error for demonstration purposes
exec_err!("Could not reuse array for maybe_pow_in_place")
}
// a result of OK means the operation was run successfully
Ok(res) => Ok(Arc::new(res)),
}
}

/// In this example we register `PowUdf` as a user defined function
/// and invoke it via the DataFrame API and SQL
#[tokio::main]
Expand All @@ -215,9 +269,29 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
let sql_df = ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t").await?;
sql_df.show().await?;
// You can also invoke both pow(2, 10) and its alias my_pow(a, b) using SQL
ctx.sql("SELECT pow(2, 10), my_pow(a, b) FROM t")
.await?
.show()
.await?;

// You can also invoke pow_in_place by passing a constant base and a
// column `a` as the exponent . If there is only a single
// reference to `a` the code works well
ctx.sql("SELECT pow(2, a) FROM t").await?.show().await?;

// However, if there are multiple references to `a` in the evaluation
// the array storage can not be reused
let err = ctx
.sql("SELECT pow(2, a), pow(3, a) FROM t")
.await?
.show()
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Could not reuse array for maybe_pow_in_place"
);

Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion-examples/examples/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ async fn query_parquet_demo() -> Result<()> {

assert_batches_eq!(
&[
"+------------+----------------------+",
"| double_col | sum(?table?.int_col) |",
"+------------+----------------------+",
"| 10.1 | 4 |",
"+------------+----------------------+",
"+------------+-------------+",
"| double_col | sum_int_col |",
"+------------+-------------+",
"| 10.1 | 4 |",
"+------------+-------------+",
],
&result
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/regexp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<()> {

// invalid flags will result in an error
let result = ctx
.sql(r"select regexp_like('\b4(?!000)\d\d\d\b', 4010, 'g')")
.sql(r"select regexp_like('\b4(?!000)\d\d\d\b', '4010', 'g')")
.await?
.collect()
.await;
Expand Down
14 changes: 13 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@ use datafusion_expr::{
};
use datafusion_physical_plan::ExecutionPlan;

/// Source table
/// A named table which can be queried.
///
/// Please see [`CatalogProvider`] for details of implementing a custom catalog.
///
/// [`TableProvider`] represents a source of data which can provide data as
/// Apache Arrow `RecordBatch`es. Implementations of this trait provide
/// important information for planning such as:
///
/// 1. [`Self::schema`]: The schema (columns and their types) of the table
/// 2. [`Self::supports_filters_pushdown`]: Should filters be pushed into this scan
/// 2. [`Self::scan`]: An [`ExecutionPlan`] that can read data
///
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait TableProvider: Debug + Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ fn print_window_docs() -> Result<String> {
// the migration of UDF documentation generation from code based
// to attribute based
// To be removed
#[allow(dead_code)]
fn save_doc_code_text(documentation: &Documentation, name: &str) {
let attr_text = documentation.to_doc_attribute();

Expand Down Expand Up @@ -182,7 +183,7 @@ fn print_docs(
};

// Temporary for doc gen migration, see `save_doc_code_text` comments
save_doc_code_text(documentation, &name);
// save_doc_code_text(documentation, &name);

// first, the name, description and syntax example
let _ = write!(
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,13 @@ impl FileOpener for CsvOpener {
}

let store = Arc::clone(&self.config.object_store);
let terminator = self.config.terminator;

Ok(Box::pin(async move {
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)

let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range =
calculate_range(&file_meta, &store, terminator).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl FileOpener for JsonOpener {
let file_compression_type = self.file_compression_type.to_owned();

Ok(Box::pin(async move {
let calculated_range = calculate_range(&file_meta, &store).await?;
let calculated_range = calculate_range(&file_meta, &store, None).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,25 @@ enum RangeCalculation {
async fn calculate_range(
file_meta: &FileMeta,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = file_meta.location();
let file_size = file_meta.object_meta.size;
let newline = terminator.unwrap_or(b'\n');

match file_meta.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let (start, end) = (start as usize, end as usize);

let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size).await?
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};

let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size).await?
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
Expand All @@ -462,7 +464,7 @@ async fn calculate_range(
/// within an object, such as a file, in an object store.
///
/// This function scans the contents of the object starting from the specified `start` position
/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character.
/// up to the `end` position, looking for the first occurrence of a newline character.
/// It returns the position of the first newline relative to the start of the range.
///
/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
Expand All @@ -474,6 +476,7 @@ async fn find_first_newline(
location: &Path,
start: usize,
end: usize,
newline: u8,
) -> Result<usize> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
Expand All @@ -486,7 +489,7 @@ async fn find_first_newline(
let mut index = 0;

while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
return Ok(index + position);
}

Expand Down
Loading
Loading