Skip to content

Commit

Permalink
datafusion: use buffered file
Browse files Browse the repository at this point in the history
  • Loading branch information
casperhart authored and tmontaigu committed Nov 5, 2023
1 parent b016dca commit 99568c8
Showing 1 changed file with 44 additions and 51 deletions.
95 changes: 44 additions & 51 deletions src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{FieldType, FieldValue, Reader};
use crate::{file::BufReadWriteFile, FieldType, FieldValue, File as DbaseFile};
use async_trait::async_trait;
use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder,
Expand All @@ -8,7 +8,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
Expand All @@ -20,47 +20,40 @@ use datafusion::prelude::*;
use datafusion_expr::CreateExternalTable;
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::io::BufReader;

use std::path::Path;
use std::sync::{Arc, Mutex};

pub struct DbaseTable {
path: String,
reader: Arc<Mutex<Reader<BufReader<std::fs::File>>>>,
file: Arc<Mutex<DbaseFile<BufReadWriteFile>>>,
}

impl Clone for DbaseTable {
fn clone(&self) -> Self {
return DbaseTable {
path: self.path.clone(),
reader: self.reader.clone(),
file: self.file.clone(),
};
}
}

impl DbaseTable {
pub fn new<P: AsRef<Path> + Debug>(path: P) -> Self {
let reader = Reader::from_path(&path)
let file = DbaseFile::open_read_only(&path)
.expect(format!("Could not find file {:?} or corresponding memo file", &path).as_str());
return DbaseTable {
path: path
.as_ref()
.to_str()
.expect("Path contains non-unicode characters")
.to_string(),
reader: Arc::new(Mutex::new(reader)),
file: Arc::new(Mutex::new(file)),
};
}

pub fn num_records(&self) -> usize {
return self
.reader
.lock()
.unwrap()
.header()
.num_records
.try_into()
.unwrap();
return self.file.lock().unwrap().num_records();
}

pub(crate) async fn create_physical_plan(
Expand All @@ -85,12 +78,11 @@ impl TableProvider for DbaseTable {
}

fn schema(&self) -> SchemaRef {
let reader = self.reader.lock().unwrap();
let dbase_fields = reader.fields();
let dbase_file = self.file.lock().unwrap();
let dbase_fields = dbase_file.fields();

let arrow_fields: Vec<_> = dbase_fields
.into_iter()
.filter(|x| x.name() != "DeletionFlag")
.map(|field| {
let ftype = match field.field_type {
FieldType::Character => DataType::Utf8,
Expand Down Expand Up @@ -214,6 +206,7 @@ impl ExecutionPlan for DbaseExec {
let schema = self.schema();
let mut column_builders = Vec::<Box<dyn ArrayBuilder>>::new();
let num_records = self.table.num_records();
let mut dbase_file = self.table.file.lock().unwrap();

let schema_fields = schema.fields();

Expand Down Expand Up @@ -242,109 +235,109 @@ impl ExecutionPlan for DbaseExec {
};
}

let mut reader = self.table.reader.lock().unwrap();
reader.seek(0).unwrap();

let dbase_field_names: Vec<String> = reader
let dbase_fields: Vec<_> = dbase_file
.fields()
.iter()
.map(|x| x.name().to_owned())
.map(|x| dbase_file.field_index(x.name()).unwrap())
.collect();

let records = reader.iter_records();
let mut records = dbase_file.records();

for (l, record) in records.into_iter().enumerate() {
if l >= self.limit {
let mut i = 0;
while let Some(mut record) = records.next() {
if record.is_deleted().unwrap() {
continue;
}
if i >= self.limit {
break;
}
i += 1;

let r = record.map_err(|e| DataFusionError::Execution(e.to_string()))?;

for i in 0..self.projections.len() {
match r.get(&dbase_field_names[self.projections[i]]).unwrap() {
for (j, &proj) in self.projections.iter().enumerate() {
match record.field(dbase_fields[proj]).unwrap().read().unwrap() {
FieldValue::Character(c) => match c {
Some(c) => column_builders[i]
Some(c) => column_builders[j]
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.append_value(c.to_string()),
None => column_builders[i]
None => column_builders[j]
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.append_null(),
},
FieldValue::Currency(f) => column_builders[i]
FieldValue::Currency(f) => column_builders[j]
.as_any_mut()
.downcast_mut::<Float64Builder>()
.unwrap()
.append_value(*f),
.append_value(f),
FieldValue::Date(d) => match d {
Some(d) => column_builders[i]
Some(d) => column_builders[j]
.as_any_mut()
.downcast_mut::<Date32Builder>()
.unwrap()
.append_value(d.to_unix_days()),

None => column_builders[i]
None => column_builders[j]
.as_any_mut()
.downcast_mut::<Date32Builder>()
.unwrap()
.append_null(),
},
FieldValue::DateTime(d) => match d {
d => column_builders[i]
d => column_builders[j]
.as_any_mut()
.downcast_mut::<Int64Builder>()
.unwrap()
.append_value(d.to_unix_timestamp()),
},
FieldValue::Double(d) => column_builders[i]
FieldValue::Double(d) => column_builders[j]
.as_any_mut()
.downcast_mut::<Float64Builder>()
.unwrap()
.append_value(*d),
.append_value(d),
FieldValue::Float(f) => match f {
Some(f) => column_builders[i]
Some(f) => column_builders[j]
.as_any_mut()
.downcast_mut::<Float32Builder>()
.unwrap()
.append_value(*f),
None => column_builders[i]
.append_value(f),
None => column_builders[j]
.as_any_mut()
.downcast_mut::<Float32Builder>()
.unwrap()
.append_null(),
},
FieldValue::Integer(v) => column_builders[i]
FieldValue::Integer(v) => column_builders[j]
.as_any_mut()
.downcast_mut::<Int32Builder>()
.unwrap()
.append_value(*v),
.append_value(v),
FieldValue::Logical(l) => match l {
Some(l) => column_builders[i]
Some(l) => column_builders[j]
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.unwrap()
.append_value(*l),
None => column_builders[i]
.append_value(l),
None => column_builders[j]
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.unwrap()
.append_null(),
},
FieldValue::Memo(m) => column_builders[i]
FieldValue::Memo(m) => column_builders[j]
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.append_value(m.escape_default().to_string()),
FieldValue::Numeric(n) => match n {
Some(n) => column_builders[i]
Some(n) => column_builders[j]
.as_any_mut()
.downcast_mut::<Float64Builder>()
.unwrap()
.append_value(*n),
None => column_builders[i]
.append_value(n),
None => column_builders[j]
.as_any_mut()
.downcast_mut::<Float64Builder>()
.unwrap()
Expand Down

0 comments on commit 99568c8

Please sign in to comment.