Skip to content

Commit

Permalink
Reading stats tables reuses schemas (#1637)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Dec 10, 2024
1 parent 0d018de commit c2746a5
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 44 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ fn tpc_h_l_comment(c: &mut Criterion) {
.map(|chunk| {
StructArray::try_from(chunk)
.unwrap()
.project(&[Field::Name("l_comment".to_string())])
.project(&[Field::from("l_comment")])
.unwrap()
.into_array()
})
Expand Down
6 changes: 2 additions & 4 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@ pub struct CompressionRunResults {
pub async fn execute_query(ctx: &SessionContext, query: &str) -> anyhow::Result<Vec<RecordBatch>> {
let plan = ctx.sql(query).await?;
let (state, plan) = plan.into_parts();
let optimized = state.optimize(&plan)?;
let physical_plan = state.create_physical_plan(&optimized).await?;
let physical_plan = state.create_physical_plan(&plan).await?;
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
Ok(result)
}
Expand All @@ -264,8 +263,7 @@ pub async fn physical_plan(
) -> anyhow::Result<Arc<dyn ExecutionPlan>> {
let plan = ctx.sql(query).await?;
let (state, plan) = plan.into_parts();
let optimized = state.optimize(&plan)?;
Ok(state.create_physical_plan(&optimized).await?)
Ok(state.create_physical_plan(&plan).await?)
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 1 addition & 3 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> Vorte
fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
fn field_from_pyany(field: &Bound<PyAny>) -> PyResult<Field> {
if field.clone().is_instance_of::<PyString>() {
Ok(Field::Name(
field.downcast::<PyString>()?.to_str()?.to_string(),
))
Ok(Field::from(field.downcast::<PyString>()?.to_str()?))
} else if field.is_instance_of::<PyLong>() {
Ok(Field::Index(field.extract()?))
} else {
Expand Down
2 changes: 1 addition & 1 deletion pyvortex/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub fn column<'py>(name: &Bound<'py, PyString>) -> PyResult<Bound<'py, PyExpr>>
Bound::new(
py,
PyExpr {
inner: Column::new_expr(Field::Name(name)),
inner: Column::new_expr(Field::from(name)),
},
)
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl StructArray {
Field::Name(n) => self
.names()
.iter()
.position(|name| name.as_ref() == n)
.position(|name| name == n)
.ok_or_else(|| vortex_err!("Unknown field {n}"))?,
Field::Index(i) => *i,
};
Expand Down
5 changes: 3 additions & 2 deletions vortex-dtype/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use core::fmt;
use std::fmt::{Display, Formatter};
use std::sync::Arc;

use itertools::Itertools;

Expand All @@ -13,7 +14,7 @@ use itertools::Itertools;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum Field {
/// A field selector by name
Name(String),
Name(Arc<str>),
/// A field selector by index (position)
Index(usize),
}
Expand All @@ -26,7 +27,7 @@ impl From<&str> for Field {

impl From<String> for Field {
fn from(value: String) -> Self {
Field::Name(value)
Field::Name(value.into())
}
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-dtype/src/serde/flatbuffers/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn resolve_field<'a, 'b: 'a>(fb: fb::Struct_<'b>, field: &'a Field) -> Vorte
.ok_or_else(|| vortex_err!("Missing field names"))?;
names
.iter()
.position(|name| name == n)
.position(|name| name == &**n)
.ok_or_else(|| vortex_err!("Unknown field name {n}"))
}
Field::Index(i) => Ok(*i),
Expand Down
25 changes: 9 additions & 16 deletions vortex-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ mod tests {

#[test]
fn basic_expr_split_test() {
let lhs = Column::new_expr(Field::Name("a".to_string()));
let lhs = Column::new_expr(Field::from("a"));
let rhs = Literal::new_expr(1.into());
let expr = BinaryExpr::new_expr(lhs, Operator::Eq, rhs);
let conjunction = split_conjunction(&expr);
Expand All @@ -100,7 +100,7 @@ mod tests {

#[test]
fn basic_conjunction_split_test() {
let lhs = Column::new_expr(Field::Name("a".to_string()));
let lhs = Column::new_expr(Field::from("a"));
let rhs = Literal::new_expr(1.into());
let expr = BinaryExpr::new_expr(lhs, Operator::And, rhs);
let conjunction = split_conjunction(&expr);
Expand All @@ -109,16 +109,13 @@ mod tests {

#[test]
fn expr_display() {
assert_eq!(
Column::new_expr(Field::Name("a".to_string())).to_string(),
"$a"
);
assert_eq!(Column::new_expr(Field::from("a")).to_string(), "$a");
assert_eq!(Column::new_expr(Field::Index(1)).to_string(), "[1]");
assert_eq!(Identity.to_string(), "[]");
assert_eq!(Identity.to_string(), "[]");

let col1: Arc<dyn VortexExpr> = Column::new_expr(Field::Name("col1".to_string()));
let col2: Arc<dyn VortexExpr> = Column::new_expr(Field::Name("col2".to_string()));
let col1: Arc<dyn VortexExpr> = Column::new_expr(Field::from("col1"));
let col2: Arc<dyn VortexExpr> = Column::new_expr(Field::from("col2"));
assert_eq!(
BinaryExpr::new_expr(col1.clone(), Operator::And, col2.clone()).to_string(),
"($col1 and $col2)"
Expand Down Expand Up @@ -165,21 +162,17 @@ mod tests {
assert_eq!(Not::new_expr(col1.clone()).to_string(), "!$col1");

assert_eq!(
Select::include(vec![Field::Name("col1".to_string())]).to_string(),
Select::include(vec![Field::from("col1")]).to_string(),
"Include($col1)"
);
assert_eq!(
Select::include(vec![
Field::Name("col1".to_string()),
Field::Name("col2".to_string())
])
.to_string(),
Select::include(vec![Field::from("col1"), Field::from("col2")]).to_string(),
"Include($col1,$col2)"
);
assert_eq!(
Select::exclude(vec![
Field::Name("col1".to_string()),
Field::Name("col2".to_string()),
Field::from("col1"),
Field::from("col2"),
Field::Index(1),
])
.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion vortex-expr/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl VortexExpr for Select {
let normalized_exclusion = e
.iter()
.map(|ef| match ef {
Field::Name(n) => Ok(n.as_str()),
Field::Name(n) => Ok(&**n),
Field::Index(i) => st
.names()
.get(*i)
Expand Down
8 changes: 4 additions & 4 deletions vortex-file/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ pub enum FieldOrIdentity {
}

pub(crate) fn stat_column_field(field: &Field, stat: Stat) -> Field {
Field::Name(stat_column_name_string(field, stat))
Field::from(stat_column_name_string(field, stat))
}

pub(crate) fn stat_column_name_string(field: &Field, stat: Stat) -> String {
Expand All @@ -406,7 +406,7 @@ pub(crate) fn stat_column_name_string(field: &Field, stat: Stat) -> String {

impl FieldOrIdentity {
pub(crate) fn stat_column_field(&self, stat: Stat) -> Field {
Field::Name(self.stat_column_name_string(stat))
Field::from(self.stat_column_name_string(stat))
}

pub(crate) fn stat_column_name_string(&self, stat: Stat) -> String {
Expand Down Expand Up @@ -768,13 +768,13 @@ mod tests {

let expected_expr = BinaryExpr::new_expr(
BinaryExpr::new_expr(
Column::new_expr(Field::Name("min".to_string())),
Column::new_expr(Field::from("min")),
Operator::Gte,
Literal::new_expr(10.into()),
),
Operator::Or,
BinaryExpr::new_expr(
Column::new_expr(Field::Name("max".to_string())),
Column::new_expr(Field::from("max")),
Operator::Lte,
Literal::new_expr(50.into()),
),
Expand Down
5 changes: 3 additions & 2 deletions vortex-file/src/read/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl LazyDType {
Field::Name(n) => sdt
.names()
.iter()
.position(|name| name.as_ref() == n.as_str())
.position(|name| name == n)
.ok_or_else(|| vortex_err!("Can't find {n} in the type")),
Field::Index(i) => Ok(*i),
}
Expand Down Expand Up @@ -279,7 +279,8 @@ impl RelativeLayoutCache {
}

pub fn relative(&self, id: LayoutPartId, dtype: Arc<LazyDType>) -> Self {
let mut new_path = self.path.clone();
let mut new_path = Vec::with_capacity(self.path.len() + 1);
new_path.clone_from(&self.path);
new_path.push(id);
Self {
root: self.root.clone(),
Expand Down
14 changes: 8 additions & 6 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex_array::array::ChunkedArray;
use vortex_array::compute::{scalar_at, take};
use vortex_array::stats::{stats_from_bitset_bytes, ArrayStatistics as _, Stat};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_dtype::field::Field;
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_error::{
vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap,
Expand Down Expand Up @@ -79,17 +80,18 @@ impl ChunkedLayoutBuilder<'_> {
.children()
.ok_or_else(|| vortex_err!("Must have children if layout has metadata"))?
.get(0);
let stats_dtype = stats_table_dtype(&set_stats, self.message_cache.dtype().value()?);
let DType::Struct(ref s, _) = stats_dtype else {
vortex_bail!("Chunked layout stats must be a Struct, got {stats_dtype}")
};
Some(self.layout_builder.read_layout(
metadata_fb,
Scan::new(Some(Arc::new(Select::include(
set_stats.iter().map(|s| s.to_string().into()).collect(),
s.names().iter().map(|s| Field::Name(s.clone())).collect(),
)))),
self.message_cache.relative(
METADATA_LAYOUT_PART_ID,
Arc::new(LazyDType::from_dtype(stats_table_dtype(
&set_stats,
self.message_cache.dtype().value()?,
))),
Arc::new(LazyDType::from_dtype(stats_dtype)),
),
)?)
} else {
Expand Down Expand Up @@ -138,7 +140,7 @@ fn stats_table_dtype(stats: &[Stat], dtype: &DType) -> DType {
let dtypes = stats.iter().map(|s| s.dtype(dtype).as_nullable()).collect();

DType::Struct(
StructDType::new(stats.iter().map(|s| s.to_string().into()).collect(), dtypes),
StructDType::new(stats.iter().map(|s| s.name().into()).collect(), dtypes),
Nullability::NonNullable,
)
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async fn test_read_projection() {
assert_eq!(actual, strings_expected);

let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default())
.with_projection(Projection::Flat(vec![Field::Name("strings".to_string())]))
.with_projection(Projection::Flat(vec![Field::from("strings")]))
.build()
.await
.unwrap()
Expand Down Expand Up @@ -261,7 +261,7 @@ async fn test_read_projection() {
assert_eq!(actual, numbers_expected);

let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default())
.with_projection(Projection::Flat(vec![Field::Name("numbers".to_string())]))
.with_projection(Projection::Flat(vec![Field::from("numbers")]))
.build()
.await
.unwrap()
Expand Down

0 comments on commit c2746a5

Please sign in to comment.