diff --git a/src/datafusion.rs b/src/datafusion.rs index b79d7aa..3777625 100644 --- a/src/datafusion.rs +++ b/src/datafusion.rs @@ -1,4 +1,4 @@ -use crate::{FieldType, FieldValue, Reader}; +use crate::{file::BufReadWriteFile, FieldType, FieldValue, File as DbaseFile}; use async_trait::async_trait; use datafusion::arrow::array::{ ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder, @@ -8,7 +8,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::memory::MemoryStream; @@ -20,27 +20,27 @@ use datafusion::prelude::*; use datafusion_expr::CreateExternalTable; use std::any::Any; use std::fmt::{Debug, Formatter}; -use std::io::BufReader; + use std::path::Path; use std::sync::{Arc, Mutex}; pub struct DbaseTable { path: String, - reader: Arc>>>, + file: Arc>>, } impl Clone for DbaseTable { fn clone(&self) -> Self { return DbaseTable { path: self.path.clone(), - reader: self.reader.clone(), + file: self.file.clone(), }; } } impl DbaseTable { pub fn new + Debug>(path: P) -> Self { - let reader = Reader::from_path(&path) + let file = DbaseFile::open_read_only(&path) .expect(format!("Could not find file {:?} or corresponding memo file", &path).as_str()); return DbaseTable { path: path @@ -48,19 +48,12 @@ impl DbaseTable { .to_str() .expect("Path contains non-unicode characters") .to_string(), - reader: Arc::new(Mutex::new(reader)), + file: Arc::new(Mutex::new(file)), }; } pub fn num_records(&self) -> usize { - return self - .reader - .lock() - .unwrap() - .header() - .num_records - .try_into() - .unwrap(); + return self.file.lock().unwrap().num_records(); } pub(crate) async fn create_physical_plan( @@ -85,12 +78,11 @@ impl TableProvider for DbaseTable { } fn schema(&self) -> SchemaRef { - let reader = self.reader.lock().unwrap(); - let dbase_fields = reader.fields(); + let dbase_file = self.file.lock().unwrap(); + let dbase_fields = dbase_file.fields(); let arrow_fields: Vec<_> = dbase_fields .into_iter() - .filter(|x| x.name() != "DeletionFlag") .map(|field| { let ftype = match field.field_type { FieldType::Character => DataType::Utf8, @@ -214,6 +206,7 @@ impl ExecutionPlan for DbaseExec { let schema = self.schema(); let mut column_builders = Vec::>::new(); let num_records = self.table.num_records(); + let mut dbase_file = self.table.file.lock().unwrap(); let schema_fields = schema.fields(); @@ -242,109 +235,109 @@ impl ExecutionPlan for DbaseExec { }; } - let mut reader = self.table.reader.lock().unwrap(); - reader.seek(0).unwrap(); - - let dbase_field_names: Vec = reader + let dbase_fields: Vec<_> = dbase_file .fields() .iter() - .map(|x| x.name().to_owned()) + .map(|x| dbase_file.field_index(x.name()).unwrap()) .collect(); - let records = reader.iter_records(); + let mut records = dbase_file.records(); - for (l, record) in records.into_iter().enumerate() { - if l >= self.limit { + let mut i = 0; + while let Some(mut record) = records.next() { + if record.is_deleted().unwrap() { + continue; + } + if i >= self.limit { break; } + i += 1; - let r = record.map_err(|e| DataFusionError::Execution(e.to_string()))?; - - for i in 0..self.projections.len() { - match r.get(&dbase_field_names[self.projections[i]]).unwrap() { + for (j, &proj) in self.projections.iter().enumerate() { + match record.field(dbase_fields[proj]).unwrap().read().unwrap() { FieldValue::Character(c) => match c { - Some(c) => column_builders[i] + Some(c) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_value(c.to_string()), - None => column_builders[i] + None => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_null(), }, - FieldValue::Currency(f) => column_builders[i] + FieldValue::Currency(f) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*f), + .append_value(f), FieldValue::Date(d) => match d { - Some(d) => column_builders[i] + Some(d) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_value(d.to_unix_days()), - None => column_builders[i] + None => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_null(), }, FieldValue::DateTime(d) => match d { - d => column_builders[i] + d => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_value(d.to_unix_timestamp()), }, - FieldValue::Double(d) => column_builders[i] + FieldValue::Double(d) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*d), + .append_value(d), FieldValue::Float(f) => match f { - Some(f) => column_builders[i] + Some(f) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*f), - None => column_builders[i] + .append_value(f), + None => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_null(), }, - FieldValue::Integer(v) => column_builders[i] + FieldValue::Integer(v) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*v), + .append_value(v), FieldValue::Logical(l) => match l { - Some(l) => column_builders[i] + Some(l) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*l), - None => column_builders[i] + .append_value(l), + None => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_null(), }, - FieldValue::Memo(m) => column_builders[i] + FieldValue::Memo(m) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() .append_value(m.escape_default().to_string()), FieldValue::Numeric(n) => match n { - Some(n) => column_builders[i] + Some(n) => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap() - .append_value(*n), - None => column_builders[i] + .append_value(n), + None => column_builders[j] .as_any_mut() .downcast_mut::() .unwrap()