From 4aa86aacff5c2210c982b54e272c20113b192c26 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 20 Feb 2025 20:45:46 -0800 Subject: [PATCH] wip: simple buffered streams --- kernel/src/engine/default/json.rs | 45 +++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index ab296e12a..1bafcd334 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -2,19 +2,21 @@ use std::io::BufReader; use std::ops::Range; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; use std::task::{ready, Poll}; use arrow_json::ReaderBuilder; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::{Buf, Bytes}; +use futures::stream; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{DynObjectStore, GetResultPayload}; use url::Url; use super::executor::TaskExecutor; -use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; +use super::file_stream::{FileOpenFuture, FileOpener}; +use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::parse_json as arrow_parse_json; use crate::engine::arrow_utils::to_json_bytes; use crate::schema::SchemaRef; @@ -40,8 +42,8 @@ impl DefaultJsonHandler { Self { store, task_executor, - readahead: 10, - batch_size: 1024, + readahead: 1000, + batch_size: 1024 * 128, } } @@ -81,15 +83,36 @@ impl JsonHandler for DefaultJsonHandler { return Ok(Box::new(std::iter::empty())); } + println!("Reading {} files", files.len()); + let schema: ArrowSchemaRef = Arc::new(physical_schema.as_ref().try_into()?); let file_opener = JsonOpener::new(self.batch_size, schema.clone(), self.store.clone()); - FileStream::new_async_read_iterator( - self.task_executor.clone(), - schema, - Box::new(file_opener), - files, - self.readahead, - ) + + let (tx, rx) = mpsc::sync_channel(self.readahead); + let files = files.to_vec(); + let readahead = self.readahead; + + self.task_executor.spawn(async move { + let file_futures: Vec<_> = files + .iter() + .map(|file| file_opener.open(file.clone(), None)) + .collect::>>() + .expect("Error creating file futures"); + + let mut stream = stream::iter(file_futures) + .buffered(readahead) + .try_flatten() + .map_ok(|record_batch| { + Box::new(ArrowEngineData::new(record_batch)) as Box + }); + + while let Some(item) = stream.next().await { + // check err? + let _ = tx.send(item); + } + }); + + Ok(Box::new(rx.into_iter())) } // note: for now we just buffer all the data and write it out all at once