Skip to content

Commit

Permalink
wip: simple buffered streams
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 21, 2025
1 parent 72b585d commit 4aa86aa
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
use std::io::BufReader;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{mpsc, Arc};
use std::task::{ready, Poll};

use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use bytes::{Buf, Bytes};
use futures::stream;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};
use url::Url;

use super::executor::TaskExecutor;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::file_stream::{FileOpenFuture, FileOpener};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::engine::arrow_utils::to_json_bytes;
use crate::schema::SchemaRef;
Expand All @@ -40,8 +42,8 @@ impl<E: TaskExecutor> DefaultJsonHandler<E> {
Self {
store,
task_executor,
readahead: 10,
batch_size: 1024,
readahead: 1000,
batch_size: 1024 * 128,
}
}

Expand Down Expand Up @@ -81,15 +83,36 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
return Ok(Box::new(std::iter::empty()));
}

println!("Reading {} files", files.len());

let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?);
let file_opener = JsonOpener::new(self.batch_size, schema.clone(), self.store.clone());
FileStream::new_async_read_iterator(
self.task_executor.clone(),
schema,
Box::new(file_opener),
files,
self.readahead,
)

let (tx, rx) = mpsc::sync_channel(self.readahead);
let files = files.to_vec();
let readahead = self.readahead;

self.task_executor.spawn(async move {
let file_futures: Vec<_> = files
.iter()
.map(|file| file_opener.open(file.clone(), None))
.collect::<DeltaResult<Vec<_>>>()
.expect("Error creating file futures");

let mut stream = stream::iter(file_futures)
.buffered(readahead)
.try_flatten()
.map_ok(|record_batch| {
Box::new(ArrowEngineData::new(record_batch)) as Box<dyn EngineData>
});

while let Some(item) = stream.next().await {
// check err?
let _ = tx.send(item);
}
});

Ok(Box::new(rx.into_iter()))
}

// note: for now we just buffer all the data and write it out all at once
Expand Down

0 comments on commit 4aa86aa

Please sign in to comment.