Skip to content

Commit

Permalink
Merge branch 'main' into implement-rightmark-join
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanc-n authored Nov 4, 2024
2 parents a6c2e06 + 2482ff4 commit beda8d7
Show file tree
Hide file tree
Showing 137 changed files with 2,675 additions and 2,047 deletions.
6 changes: 3 additions & 3 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -170,13 +170,13 @@ impl RunOpt {

async fn exec_sort(
ctx: &SessionContext,
expr: &[PhysicalSortExpr],
expr: LexOrderingRef<'_>,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub mod scalar;
pub mod stats;
pub mod test_util;
pub mod tree_node;
pub mod types;
pub mod utils;

/// Reexport arrow crate
Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ impl Statistics {
self
}

/// Project the statistics to the given column indices.
///
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
let Some(projection) = projection else {
return self;
};

// todo: it would be nice to avoid cloning column statistics if
// possible (e.g. if the projection did not contain duplicates)
self.column_statistics = projection
.iter()
.map(|&i| self.column_statistics[i].clone())
.collect();

self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
Expand Down
49 changes: 49 additions & 0 deletions datafusion/common/src/types/builtin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::types::{LogicalTypeRef, NativeType};
use std::sync::{Arc, OnceLock};

macro_rules! singleton {
($name:ident, $getter:ident, $ty:ident) => {
// TODO: Use LazyLock instead of getter function when MSRV gets bumped
static $name: OnceLock<LogicalTypeRef> = OnceLock::new();

#[doc = "Getter for singleton instance of a logical type representing"]
#[doc = concat!("[`NativeType::", stringify!($ty), "`].")]
pub fn $getter() -> LogicalTypeRef {
Arc::clone($name.get_or_init(|| Arc::new(NativeType::$ty)))
}
};
}

singleton!(LOGICAL_NULL, logical_null, Null);
singleton!(LOGICAL_BOOLEAN, logical_boolean, Boolean);
singleton!(LOGICAL_INT8, logical_int8, Int8);
singleton!(LOGICAL_INT16, logical_int16, Int16);
singleton!(LOGICAL_INT32, logical_int32, Int32);
singleton!(LOGICAL_INT64, logical_int64, Int64);
singleton!(LOGICAL_UINT8, logical_uint8, UInt8);
singleton!(LOGICAL_UINT16, logical_uint16, UInt16);
singleton!(LOGICAL_UINT32, logical_uint32, UInt32);
singleton!(LOGICAL_UINT64, logical_uint64, UInt64);
singleton!(LOGICAL_FLOAT16, logical_float16, Float16);
singleton!(LOGICAL_FLOAT32, logical_float32, Float32);
singleton!(LOGICAL_FLOAT64, logical_float64, Float64);
singleton!(LOGICAL_DATE, logical_date, Date);
singleton!(LOGICAL_BINARY, logical_binary, Binary);
singleton!(LOGICAL_STRING, logical_string, String);
114 changes: 114 additions & 0 deletions datafusion/common/src/types/field.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{Field, Fields, UnionFields};
use std::hash::{Hash, Hasher};
use std::{ops::Deref, sync::Arc};

use super::{LogicalTypeRef, NativeType};

/// A record of a logical type, its name and its nullability.
#[derive(Debug, Clone, Eq, PartialOrd, Ord)]
pub struct LogicalField {
pub name: String,
pub logical_type: LogicalTypeRef,
pub nullable: bool,
}

impl PartialEq for LogicalField {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.logical_type.eq(&other.logical_type)
&& self.nullable == other.nullable
}
}

impl Hash for LogicalField {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.logical_type.hash(state);
self.nullable.hash(state);
}
}

impl From<&Field> for LogicalField {
fn from(value: &Field) -> Self {
Self {
name: value.name().clone(),
logical_type: Arc::new(NativeType::from(value.data_type().clone())),
nullable: value.is_nullable(),
}
}
}

/// A reference counted [`LogicalField`].
pub type LogicalFieldRef = Arc<LogicalField>;

/// A cheaply cloneable, owned collection of [`LogicalFieldRef`].
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct LogicalFields(Arc<[LogicalFieldRef]>);

impl Deref for LogicalFields {
type Target = [LogicalFieldRef];

fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

impl From<&Fields> for LogicalFields {
fn from(value: &Fields) -> Self {
value
.iter()
.map(|field| Arc::new(LogicalField::from(field.as_ref())))
.collect()
}
}

impl FromIterator<LogicalFieldRef> for LogicalFields {
fn from_iter<T: IntoIterator<Item = LogicalFieldRef>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}

/// A cheaply cloneable, owned collection of [`LogicalFieldRef`] and their
/// corresponding type ids.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct LogicalUnionFields(Arc<[(i8, LogicalFieldRef)]>);

impl Deref for LogicalUnionFields {
type Target = [(i8, LogicalFieldRef)];

fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

impl From<&UnionFields> for LogicalUnionFields {
fn from(value: &UnionFields) -> Self {
value
.iter()
.map(|(i, field)| (i, Arc::new(LogicalField::from(field.as_ref()))))
.collect()
}
}

impl FromIterator<(i8, LogicalFieldRef)> for LogicalUnionFields {
fn from_iter<T: IntoIterator<Item = (i8, LogicalFieldRef)>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}
128 changes: 128 additions & 0 deletions datafusion/common/src/types/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::NativeType;
use crate::error::Result;
use arrow_schema::DataType;
use core::fmt;
use std::{cmp::Ordering, hash::Hash, sync::Arc};

/// Signature that uniquely identifies a type among other types.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TypeSignature<'a> {
/// Represents a built-in native type.
Native(&'a NativeType),
/// Represents an arrow-compatible extension type.
/// (<https://arrow.apache.org/docs/format/Columnar.html#extension-types>)
///
/// The `name` should contain the same value as 'ARROW:extension:name'.
Extension {
name: &'a str,
parameters: &'a [TypeParameter<'a>],
},
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TypeParameter<'a> {
Type(TypeSignature<'a>),
Number(i128),
}

/// A reference counted [`LogicalType`].
pub type LogicalTypeRef = Arc<dyn LogicalType>;

/// Representation of a logical type with its signature and its native backing
/// type.
///
/// The logical type is meant to be used during the DataFusion logical planning
/// phase in order to reason about logical types without worrying about their
/// underlying physical implementation.
///
/// ### Extension types
///
/// [`LogicalType`] is a trait in order to allow the possibility of declaring
/// extension types:
///
/// ```
/// use datafusion_common::types::{LogicalType, NativeType, TypeSignature};
///
/// struct JSON {}
///
/// impl LogicalType for JSON {
/// fn native(&self) -> &NativeType {
/// &NativeType::String
/// }
///
/// fn signature(&self) -> TypeSignature<'_> {
/// TypeSignature::Extension {
/// name: "JSON",
/// parameters: &[],
/// }
/// }
/// }
/// ```
pub trait LogicalType: Sync + Send {
/// Get the native backing type of this logical type.
fn native(&self) -> &NativeType;
/// Get the unique type signature for this logical type. Logical types with identical
/// signatures are considered equal.
fn signature(&self) -> TypeSignature<'_>;

/// Get the default physical type to cast `origin` to in order to obtain a physical type
/// that is logically compatible with this logical type.
fn default_cast_for(&self, origin: &DataType) -> Result<DataType> {
self.native().default_cast_for(origin)
}
}

impl fmt::Debug for dyn LogicalType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("LogicalType")
.field(&self.signature())
.field(&self.native())
.finish()
}
}

impl PartialEq for dyn LogicalType {
fn eq(&self, other: &Self) -> bool {
self.signature().eq(&other.signature())
}
}

impl Eq for dyn LogicalType {}

impl PartialOrd for dyn LogicalType {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for dyn LogicalType {
fn cmp(&self, other: &Self) -> Ordering {
self.signature()
.cmp(&other.signature())
.then(self.native().cmp(other.native()))
}
}

impl Hash for dyn LogicalType {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.signature().hash(state);
self.native().hash(state);
}
}
26 changes: 26 additions & 0 deletions datafusion/common/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod builtin;
mod field;
mod logical;
mod native;

pub use builtin::*;
pub use field::*;
pub use logical::*;
pub use native::*;
Loading

0 comments on commit beda8d7

Please sign in to comment.