From b0e4aed1e120875e34787c453d82d543cd217aa6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 Nov 2024 11:30:38 -0700 Subject: [PATCH] remove local file assumption --- Cargo.lock | 1 - Cargo.toml | 1 - src/shuffle/mod.rs | 4 +++- src/shuffle/reader.rs | 21 +++++++++------------ src/shuffle/writer.rs | 22 ++++++++++++++-------- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44bbb95..41e4643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1154,7 +1154,6 @@ dependencies = [ "datafusion", "datafusion-proto", "futures", - "glob", "log", "object_store", "pretty_assertions", diff --git a/Cargo.toml b/Cargo.toml index faa6721..17128b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ bytes = "1.8.0" datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] } datafusion-proto = "42.0.0" futures = "0.3" -glob = "0.3.1" log = "0.4" object_store = { version = "0.11.1", features = ["aws"] } prost = "0.13" diff --git a/src/shuffle/mod.rs b/src/shuffle/mod.rs index 3c9dbe0..4b3c1b1 100644 --- a/src/shuffle/mod.rs +++ b/src/shuffle/mod.rs @@ -107,12 +107,14 @@ pub(crate) fn create_object_store() -> Result> { println!("Warning! AWS_ACCESS_KEY_ID and/or AWS_SECRET_ACCESS_KEY are not defined"); } + // TODO configs let bucket_name = "dfray"; let region = "us-east-1"; + let endpoint = "http://127.0.0.1:9000"; Ok(Arc::new( AmazonS3Builder::new() - .with_endpoint("http://127.0.0.1:9000") + .with_endpoint(endpoint) .with_allow_http(true) .with_region(region) .with_bucket_name(bucket_name) diff --git a/src/shuffle/reader.rs b/src/shuffle/reader.rs index e733225..deb7b4d 100644 --- a/src/shuffle/reader.rs +++ b/src/shuffle/reader.rs @@ -29,7 +29,6 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, }; use futures::{Stream, StreamExt}; -use glob::glob; use log::debug; use object_store::{path::Path as ObjectStorePath, ObjectStore}; use std::any::Any; @@ -115,22 +114,20 @@ impl ExecutionPlan for ShuffleReaderExec { partition: usize, _context: Arc, ) -> Result { - let pattern = format!( - "/{}/shuffle_{}_*_{partition}.arrow", - self.shuffle_dir, self.stage_id - ); - let mut streams: Vec = vec![]; - for entry in glob(&pattern).expect("Failed to read glob pattern") { - let file = entry.unwrap(); + let num_input_parts = 8; + + for input_part in 0..num_input_parts { + let file = format!( + "/{}/shuffle_{}_{}_{partition}.arrow", + self.shuffle_dir, self.stage_id, input_part + ); debug!( "ShuffleReaderExec partition {} reading from stage {} file {}", - partition, - self.stage_id, - file.display() + partition, self.stage_id, file ); - let stream = LocalShuffleStream::new(file); + let stream = LocalShuffleStream::new(PathBuf::from(&file)); if self.schema != stream.schema() { return Err(DataFusionError::Internal( "Not all shuffle files have the same schema".to_string(), diff --git a/src/shuffle/writer.rs b/src/shuffle/writer.rs index 3cc93a1..6283dec 100644 --- a/src/shuffle/writer.rs +++ b/src/shuffle/writer.rs @@ -332,19 +332,19 @@ impl IPCWriter { self.writer.finish()?; if self.num_batches > 0 { - println!( - "Uploading shuffle file {} containing {} bytes", - &self.path.display(), - self.num_bytes - ); - // upload to object storage let mut file = File::open(&self.path)?; let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?; // TODO make threshold configurable - if self.num_bytes > 5 * 1024 * 1024 { + if self.num_bytes > 10 * 1024 * 1024 { + println!( + "Uploading shuffle file {} containing {} bytes (put_multipart)", + &self.path.display(), + self.num_bytes + ); + // use multipart put for larger files const CHUNK_SIZE: usize = 16 * 1024; let mut buffer = [0u8; CHUNK_SIZE]; @@ -359,7 +359,13 @@ impl IPCWriter { } let _put_result = writer.complete().await?; } else { - let mut buffer = Vec::new(); + println!( + "Uploading shuffle file {} containing {} bytes (put)", + &self.path.display(), + self.num_bytes + ); + + let mut buffer = Vec::with_capacity(self.num_bytes); file.read_to_end(&mut buffer)?; self.object_store .put(&object_store_path, PutPayload::from(buffer))