Skip to content

Commit

Permalink
Update datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
casperhart authored and tmontaigu committed Sep 15, 2023
1 parent 7db5b38 commit 110f423
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ byteorder = "1.4.3"
time = { version = "0.3", features = ["std"] }
serde = { version = "1.0.102", optional = true }
yore = { version = "1.0.1", optional = true }
datafusion = { version = "20", optional = true }
datafusion-expr = { version = "20", optional = true }
datafusion = { version = "31", optional = true }
datafusion-expr = { version = "31", optional = true }
async-trait = { version = "0.1", optional = true }

[dev-dependencies]
Expand Down
34 changes: 26 additions & 8 deletions src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use crate::{FieldType, FieldValue, Record};
use crate::{FieldType, FieldValue, Reader, Record};
use async_trait::async_trait;
use datafusion::datasource::datasource::TableProviderFactory;

use crate::Reader;
use datafusion::arrow::array::{
ArrayBuilder, ArrayRef, BooleanBuilder, Date32Builder, Float32Builder, Float64Builder,
Int32Builder, Int64Builder, StringBuilder,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
Statistics,
};
use datafusion::prelude::*;
use datafusion_expr::CreateExternalTable;
Expand All @@ -26,21 +25,29 @@ use std::path::Path;
use std::sync::{Arc, Mutex};

pub struct DbaseTable {
path: String,
reader: Arc<Mutex<Reader<BufReader<std::fs::File>>>>,
}

impl Clone for DbaseTable {
fn clone(&self) -> Self {
return DbaseTable {
path: self.path.clone(),
reader: self.reader.clone(),
};
}
}

impl DbaseTable {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let reader = Reader::from_path(path).unwrap();
pub fn new<P: AsRef<Path> + Debug>(path: P) -> Self {
let reader = Reader::from_path(&path)
.expect(format!("Could not find file {:?} or corresponding memo file", &path).as_str());
return DbaseTable {
path: path
.as_ref()
.to_str()
.expect("Path contains non-unicode characters")
.to_string(),
reader: Arc::new(Mutex::new(reader)),
};
}
Expand Down Expand Up @@ -81,7 +88,7 @@ impl TableProvider for DbaseTable {
let reader = self.reader.lock().unwrap();
let dbase_fields = reader.fields();

let arrow_fields = dbase_fields
let arrow_fields: Vec<_> = dbase_fields
.into_iter()
.filter(|x| x.name() != "DeletionFlag")
.map(|field| {
Expand Down Expand Up @@ -160,6 +167,17 @@ impl Debug for DbaseExec {
}
}

impl DisplayAs for DbaseExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DbaseExec: {:?}", self.table.path)?;
}
}
Ok(())
}
}

impl ExecutionPlan for DbaseExec {
fn as_any(&self) -> &dyn Any {
self
Expand Down

0 comments on commit 110f423

Please sign in to comment.