Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/regexp_count
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Oct 17, 2024
2 parents 97e61ae + 56946b4 commit 74b545a
Show file tree
Hide file tree
Showing 115 changed files with 6,081 additions and 2,461 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,9 @@ jobs:
#
# To reproduce:
# 1. Install the version of Rust that is failing. Example:
# rustup install 1.78.0
# rustup install 1.79.0
# 2. Run the command that failed with that version. Example:
# cargo +1.78.0 check -p datafusion
# cargo +1.79.0 check -p datafusion
#
# To resolve, either:
# 1. Change your code to use older Rust features,
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.78"
rust-version = "1.79"
version = "42.0.0"

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::datasource::listing::{
use datafusion::datasource::MemTable;
use datafusion::prelude::CsvReadOptions;
use datafusion::{arrow::util::pretty, error::Result, prelude::SessionContext};
use datafusion_benchmarks::BenchmarkRun;
use datafusion_benchmarks::util::BenchmarkRun;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::path::Path;
use std::path::PathBuf;

use crate::util::{BenchmarkRun, CommonOpt};
use datafusion::{
error::{DataFusionError, Result},
prelude::SessionContext,
Expand All @@ -26,8 +27,6 @@ use datafusion_common::exec_datafusion_err;
use datafusion_common::instant::Instant;
use structopt::StructOpt;

use crate::{BenchmarkRun, CommonOpt};

/// Run the clickbench benchmark
///
/// The ClickBench[1] benchmarks are widely cited in the industry and
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES};
use crate::{BenchmarkRun, CommonOpt};
use crate::util::{BenchmarkRun, CommonOpt};

use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
Expand Down Expand Up @@ -489,6 +489,7 @@ mod tests {

use super::*;

use crate::util::CommonOpt;
use datafusion::common::exec_err;
use datafusion::error::Result;
use datafusion_proto::bytes::{
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ pub mod imdb;
pub mod parquet_filter;
pub mod sort;
pub mod tpch;
mod util;
pub use util::*;
pub mod util;
2 changes: 1 addition & 1 deletion benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::path::PathBuf;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};
use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::path::PathBuf;
use std::sync::Arc;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};
use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
};
use crate::{BenchmarkRun, CommonOpt};
use crate::util::{BenchmarkRun, CommonOpt};

use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.78"
rust-version = "1.79"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

FROM rust:1.78-bookworm AS builder
FROM rust:1.79-bookworm AS builder

COPY . /usr/src/datafusion
COPY ./datafusion /usr/src/datafusion/datafusion
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
ctx.register_table_options_extension_from_scheme(scheme);

// Clone and modify the default table options based on the provided options
let mut table_options = ctx.session_state().default_table_options().clone();
let mut table_options = ctx.session_state().default_table_options();
if let Some(format) = format {
table_options.set_config_format(format);
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
let builder =
Expand Down Expand Up @@ -540,7 +540,7 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
let err = get_s3_object_store_builder(table_url.as_ref(), aws_options)
Expand All @@ -566,7 +566,7 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
// ensure this isn't an error
Expand Down Expand Up @@ -594,7 +594,7 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options = table_options.extensions.get::<AwsOptions>().unwrap();
let builder = get_oss_object_store_builder(table_url.as_ref(), aws_options)?;
Expand Down Expand Up @@ -631,7 +631,7 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options().clone();
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
let gcp_options = table_options.extensions.get::<GcpOptions>().unwrap();
let builder = get_gcs_object_store_builder(table_url.as_ref(), gcp_options)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.78"
rust-version = "1.79"

[lints]
workspace = true
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec};
const KB: usize = 1 << 10;
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_1k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, true)] {
async fn test_sort_10k_mem() {
for (batch_size, should_spill) in [(5, false), (20000, true), (500000, true)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(10 * KB)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use datafusion_functions_window::dense_rank::dense_rank_udwf;
use datafusion_functions_window::rank::rank_udwf;
use datafusion_functions_window::rank::{dense_rank_udwf, rank_udwf};
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down
16 changes: 8 additions & 8 deletions datafusion/expr-common/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::fmt::Debug;
/// function])
///
/// * convert its internal state to a vector of aggregate values via
/// [`state`] and combine the state from multiple accumulators'
/// [`state`] and combine the state from multiple accumulators
/// via [`merge_batch`], as part of efficient multi-phase grouping.
///
/// [`GroupsAccumulator`]: crate::GroupsAccumulator
Expand Down Expand Up @@ -68,7 +68,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// result in potentially non-deterministic behavior.
///
/// This function gets `&mut self` to allow for the accumulator to build
/// arrow compatible internal state that can be returned without copying
/// arrow-compatible internal state that can be returned without copying
/// when possible (for example distinct strings)
fn evaluate(&mut self) -> Result<ScalarValue>;

Expand All @@ -89,14 +89,14 @@ pub trait Accumulator: Send + Sync + Debug {
/// result in potentially non-deterministic behavior.
///
/// This function gets `&mut self` to allow for the accumulator to build
/// arrow compatible internal state that can be returned without copying
/// arrow-compatible internal state that can be returned without copying
/// when possible (for example distinct strings).
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
/// # Multi-Phase Grouping
///
/// ```text
/// ▲
Expand Down Expand Up @@ -140,9 +140,9 @@ pub trait Accumulator: Send + Sync + Debug {
/// to be summed together)
///
/// Some accumulators can return multiple values for their
/// intermediate states. For example average, tracks `sum` and
/// `n`, and this function should return
/// a vector of two values, sum and n.
/// intermediate states. For example, the average accumulator
/// tracks `sum` and `n`, and this function should return a vector
/// of two values, sum and n.
///
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
Expand Down Expand Up @@ -204,7 +204,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// `AggregateMode::Final` GroupBy nodes. The outputs of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
Expand Down
16 changes: 13 additions & 3 deletions datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

//! [`ColumnarValue`] represents the result of evaluating an expression.
use arrow::array::ArrayRef;
use arrow::array::NullArray;
use arrow::array::{Array, ArrayRef, NullArray};
use arrow::compute::{kernels, CastOptions};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion_common::format::DEFAULT_CAST_OPTIONS;
Expand Down Expand Up @@ -130,7 +129,7 @@ impl ColumnarValue {
})
}

/// null columnar values are implemented as a null array in order to pass batch
/// Null columnar values are implemented as a null array in order to pass batch
/// num_rows
pub fn create_null_array(num_rows: usize) -> Self {
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
Expand Down Expand Up @@ -218,6 +217,17 @@ impl ColumnarValue {
}
}
}

/// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied arguments.
/// This is useful for scalar UDF implementations to fulfil their contract:
/// if all arguments are scalar values, the result should also be a scalar value.
pub fn from_args_and_result(args: &[Self], result: ArrayRef) -> Result<Self> {
if result.len() == 1 && args.iter().all(|arg| matches!(arg, Self::Scalar(_))) {
Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?))
} else {
Ok(Self::Array(result))
}
}
}

#[cfg(test)]
Expand Down
9 changes: 4 additions & 5 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ pub trait GroupsAccumulator: Send {
///
/// * `values`: the input arguments to the accumulator
///
/// * `group_indices`: To which groups do the rows in `values`
/// belong, group id)
/// * `group_indices`: The group indices to which each row in `values` belongs.
///
/// * `opt_filter`: if present, only update aggregate state using
/// `values[i]` if `opt_filter[i]` is true
Expand Down Expand Up @@ -185,9 +184,9 @@ pub trait GroupsAccumulator: Send {
/// differ. See [`Self::state`] for more details on how state is
/// used and merged.
///
/// * `values`: arrays produced from calling `state` previously to the accumulator
/// * `values`: arrays produced from previously calling `state` on other accumulators.
///
/// Other arguments are the same as for [`Self::update_batch`];
/// Other arguments are the same as for [`Self::update_batch`].
fn merge_batch(
&mut self,
values: &[ArrayRef],
Expand All @@ -196,7 +195,7 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts an input batch directly the intermediate aggregate state.
/// Converts an input batch directly to the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr-common/src/interval_arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,8 @@ pub fn satisfy_greater(
}
}

// Only the lower bound of left hand side and the upper bound of the right
// hand side can change after propagating the greater-than operation.
// Only the lower bound of left-hand side and the upper bound of the right-hand
// side can change after propagating the greater-than operation.
let new_left_lower = if left.lower.is_null() || left.lower <= right.lower {
if strict {
next_value(right.lower.clone())
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr-common/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const TIMEZONE_WILDCARD: &str = "+TZ";
/// valid length. It exists to avoid the need to enumerate all possible fixed size list lengths.
pub const FIXED_SIZE_LIST_WILDCARD: i32 = i32::MIN;

///A function's volatility, which defines the functions eligibility for certain optimizations
/// A function's volatility, which defines the functions eligibility for certain optimizations
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
pub enum Volatility {
/// An immutable function will always return the same output when given the same
Expand Down Expand Up @@ -86,7 +86,7 @@ pub enum Volatility {
/// ```
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum TypeSignature {
/// One or more arguments of an common type out of a list of valid types.
/// One or more arguments of a common type out of a list of valid types.
///
/// # Examples
/// A function such as `concat` is `Variadic(vec![DataType::Utf8, DataType::LargeUtf8])`
Expand Down Expand Up @@ -127,7 +127,7 @@ pub enum TypeSignature {
Numeric(usize),
/// Fixed number of arguments of all the same string types.
/// The precedence of type from high to low is Utf8View, LargeUtf8 and Utf8.
/// Null is considerd as Utf8 by default
/// Null is considerd as `Utf8` by default
/// Dictionary with string value type is also handled.
String(usize),
}
Expand Down
Loading

0 comments on commit 74b545a

Please sign in to comment.