Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlifoobar committed Jun 22, 2024
1 parent 1f25eaa commit 81da1ae
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 84 deletions.
34 changes: 13 additions & 21 deletions datafusion-examples/examples/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::{
array::{Float64Array, Int64Array, RecordBatch},
datatypes::{DataType, Field, Schema},
};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
error::Result,
prelude::{ParquetReadOptions, SessionContext},
};
Expand Down Expand Up @@ -116,27 +112,23 @@ async fn query_parquet_demo() -> Result<()> {
vec![df.parse_sql_expr("double_col")?],
vec![df.parse_sql_expr("SUM(int_col) as sum_int_col")?],
)?
// Directly parse the SQL text into a sort expression is not supported
// Directly parsing the SQL text into a sort expression is not supported yet, so
// construct it programatically
.sort(vec![col("double_col").sort(false, false)])?
.limit(0, Some(1))?;

let result = df.collect().await?;

assert_eq!(result.len(), 1);
let expected = format!(
"{:?}",
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("double_col", DataType::Float64, true),
Field::new("sum(?table?.int_col)", DataType::Int64, true),
])),
vec![
Arc::new(Float64Array::from(vec![10.1])),
Arc::new(Int64Array::from(vec![4])),
],
)?
assert_batches_eq!(
&[
"+------------+----------------------+",
"| double_col | sum(?table?.int_col) |",
"+------------+----------------------+",
"| 10.1 | 4 |",
"+------------+----------------------+",
],
&result
);
assert_eq!(format!("{:?}", result[0]), expected);

Ok(())
}
Expand Down
33 changes: 15 additions & 18 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,22 @@ impl DataFrame {
///
/// # Example: Parsing SQL queries
/// ```
/// use arrow::datatypes::{DataType, Field, Schema};
/// use datafusion::prelude::*;
/// use datafusion_common::{DFSchema, Result};
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // datafusion will parse number as i64 first.
/// let sql = "a > 1 and b in (1, 10)";
/// let expected = col("a").gt(lit(1 as i64))
/// .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
///
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let expr = df.parse_sql_expr(sql)?;
///
/// assert_eq!(expected, expr);
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion::prelude::*;
/// # use datafusion_common::{DFSchema, Result};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// // datafusion will parse number as i64 first.
/// let sql = "a > 1 and b in (1, 10)";
/// let expected = col("a").gt(lit(1 as i64))
/// .and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let expr = df.parse_sql_expr(sql)?;
/// assert_eq!(expected, expr);
///
/// Ok(())
/// }
/// # Ok(())
/// # }
/// ```
pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr> {
let df_schema = self.schema();
Expand Down
39 changes: 18 additions & 21 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_common::{
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{MinAccumulator,MaxAccumulator};
use datafusion_physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

Expand Down Expand Up @@ -424,16 +424,15 @@ pub async fn statistics_from_parquet_meta(
};

let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array =
vec![Precision::Exact(0); table_schema.fields().len()];
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
0,
&stats_converter,
row_groups_metadata,
)?;
let mut null_counts_array = vec![Precision::Exact(0); table_schema.fields().len()];
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
0,
&stats_converter,
row_groups_metadata,
)?;

fields_iter.enumerate().for_each(|(idx, field)| {
let _ = StatisticsConverter::try_new_from_arrow_schema_index(
Expand Down Expand Up @@ -469,29 +468,27 @@ pub async fn statistics_from_parquet_meta(
}

fn summarize_min_max_null_counts(
min_accs: & mut [Option<MinAccumulator>],
max_accs: & mut [Option<MaxAccumulator>],
null_counts_array: & mut [Precision<usize>],
min_accs: &mut [Option<MinAccumulator>],
max_accs: &mut [Option<MaxAccumulator>],
null_counts_array: &mut [Precision<usize>],
arrow_schema_index: usize,
stats_converter: &StatisticsConverter,
row_groups_metadata: &[RowGroupMetaData],
) -> Result<()> {
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
let null_counts =
stats_converter.row_group_null_counts(row_groups_metadata)?;
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;

if let Some(max_acc) = & mut max_accs[arrow_schema_index] {
if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
max_acc.update_batch(&[max_values])?;
}

if let Some(min_acc) = & mut min_accs[arrow_schema_index] {
if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
min_acc.update_batch(&[min_values])?;
}

null_counts_array[arrow_schema_index] = Precision::Exact(
sum(&null_counts).unwrap_or_default() as usize,
);
null_counts_array[arrow_schema_index] =
Precision::Exact(sum(&null_counts).unwrap_or_default() as usize);

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
// Copy from arrow-rs
// https://github.com/apache/arrow-rs/blob/198af7a3f4aa20f9bd003209d9f04b0f37bb120e/parquet/src/arrow/buffer/bit_util.rs#L54
// Convert the byte slice to fixed length byte array with the length of N.
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {N}");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
Expand Down
39 changes: 17 additions & 22 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,30 +491,25 @@ impl SessionContext {
/// # Example: Parsing SQL queries
///
/// ```
/// use arrow::datatypes::{DataType, Field, Schema};
/// use datafusion::prelude::*;
/// use datafusion_common::{DFSchema, Result};
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// // datafusion will parse number as i64 first.
/// let sql = "a > 10";
/// let expected = col("a").gt(lit(10 as i64));
///
/// // provide type information that `a` is an Int32
/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
/// let df_schema = DFSchema::try_from(schema).unwrap();
///
/// let expr = SessionContext::new()
/// .parse_sql_expr(sql, &df_schema)?;
///
/// assert_eq!(expected, expr);
///
/// Ok(())
/// }
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion::prelude::*;
/// # use datafusion_common::{DFSchema, Result};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// // datafusion will parse number as i64 first.
/// let sql = "a > 10";
/// let expected = col("a").gt(lit(10 as i64));
/// // provide type information that `a` is an Int32
/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
/// let df_schema = DFSchema::try_from(schema).unwrap();
/// let expr = SessionContext::new()
/// .parse_sql_expr(sql, &df_schema)?;
/// assert_eq!(expected, expr);
/// # Ok(())
/// # }
/// ```
pub fn parse_sql_expr(&self, sql: &str, df_schema: &DFSchema) -> Result<Expr> {
self.state().create_logical_expr(sql, df_schema)
self.state.read().create_logical_expr(sql, df_schema)
}

/// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ impl SessionState {
}

/// parse a sql string into a sqlparser-rs AST [`SQLExpr`].
///
/// See [`Self::parse_sql_expr`] for parsing sql to [`Expr`].
pub fn sql_to_expr(
&self,
sql: &str,
Expand Down Expand Up @@ -586,7 +588,9 @@ impl SessionState {
Ok(plan)
}

/// Creates a datafusion style AST [`Expr`] from a SQL string..
/// Creates a datafusion style AST [`Expr`] from a SQL string.
///
/// See example on [`SessionContext::parse_sql_expr`]
pub fn create_logical_expr(
&self,
sql: &str,
Expand Down

0 comments on commit 81da1ae

Please sign in to comment.