From 110f423c8088d2f63ea8189d1d4e3a09e8b93a9c Mon Sep 17 00:00:00 2001 From: Casper Hart Date: Tue, 12 Sep 2023 20:11:03 -0400 Subject: [PATCH] Update datafusion --- Cargo.toml | 4 ++-- src/datafusion.rs | 34 ++++++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index edcd22f..e24b41a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ byteorder = "1.4.3" time = { version = "0.3", features = ["std"] } serde = { version = "1.0.102", optional = true } yore = { version = "1.0.1", optional = true } -datafusion = { version = "20", optional = true } -datafusion-expr = { version = "20", optional = true } +datafusion = { version = "31", optional = true } +datafusion-expr = { version = "31", optional = true } async-trait = { version = "0.1", optional = true } [dev-dependencies] diff --git a/src/datafusion.rs b/src/datafusion.rs index c03e532..ff95c9d 100644 --- a/src/datafusion.rs +++ b/src/datafusion.rs @@ -1,21 +1,20 @@ -use crate::{FieldType, FieldValue, Record}; +use crate::{FieldType, FieldValue, Reader, Record}; use async_trait::async_trait; -use datafusion::datasource::datasource::TableProviderFactory; - -use crate::Reader; use datafusion::arrow::array::{ ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, }; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics, + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, + Statistics, }; use datafusion::prelude::*; use datafusion_expr::CreateExternalTable; @@ -26,21 +25,29 @@ use std::path::Path; use std::sync::{Arc, Mutex}; pub struct DbaseTable { + path: String, reader: Arc>>>, } impl Clone for DbaseTable { fn clone(&self) -> Self { return DbaseTable { + path: self.path.clone(), reader: self.reader.clone(), }; } } impl DbaseTable { - pub fn new>(path: P) -> Self { - let reader = Reader::from_path(path).unwrap(); + pub fn new + Debug>(path: P) -> Self { + let reader = Reader::from_path(&path) + .expect(format!("Could not find file {:?} or corresponding memo file", &path).as_str()); return DbaseTable { + path: path + .as_ref() + .to_str() + .expect("Path contains non-unicode characters") + .to_string(), reader: Arc::new(Mutex::new(reader)), }; } @@ -81,7 +88,7 @@ impl TableProvider for DbaseTable { let reader = self.reader.lock().unwrap(); let dbase_fields = reader.fields(); - let arrow_fields = dbase_fields + let arrow_fields: Vec<_> = dbase_fields .into_iter() .filter(|x| x.name() != "DeletionFlag") .map(|field| { @@ -160,6 +167,17 @@ impl Debug for DbaseExec { } } +impl DisplayAs for DbaseExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "DbaseExec: {:?}", self.table.path)?; + } + } + Ok(()) + } +} + impl ExecutionPlan for DbaseExec { fn as_any(&self) -> &dyn Any { self