Skip to content

Commit

Permalink
feat: Allow creating a ValuesExec from record batches (#7444)
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr authored Sep 6, 2023
1 parent e83b75f commit 3bbd1f0
Showing 1 changed file with 68 additions and 1 deletion.
69 changes: 68 additions & 1 deletion datafusion/core/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,35 @@ impl ValuesExec {
Ok(Self { schema, data })
}

/// Create a new plan using the provided schema and batches.
///
/// Errors if any of the batches don't match the provided schema, or if no
/// batches are provided.
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}

for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
);
}
}

Ok(ValuesExec {
schema,
data: batches,
})
}

/// provides the data
fn data(&self) -> Vec<RecordBatch> {
pub fn data(&self) -> Vec<RecordBatch> {
self.data.clone()
}
}
Expand Down Expand Up @@ -168,7 +195,10 @@ impl ExecutionPlan for ValuesExec {
#[cfg(test)]
mod tests {
use super::*;

use crate::test::create_vec_batches;
use crate::test_util;
use arrow_schema::{DataType, Field, Schema};

#[tokio::test]
async fn values_empty_case() -> Result<()> {
Expand All @@ -177,4 +207,41 @@ mod tests {
assert!(empty.is_err());
Ok(())
}

#[test]
fn new_exec_with_batches() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let batches = create_vec_batches(&schema, 10);
let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
}

#[test]
fn new_exec_with_batches_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
}

#[test]
fn new_exec_with_batches_invalid_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let batches = create_vec_batches(&schema, 10);

let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
}
}

0 comments on commit 3bbd1f0

Please sign in to comment.