Skip to content

Commit

Permalink
remove local file assumption
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 17, 2024
1 parent eb89ddf commit b0e4aed
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 23 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ bytes = "1.8.0"
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
datafusion-proto = "42.0.0"
futures = "0.3"
glob = "0.3.1"
log = "0.4"
object_store = { version = "0.11.1", features = ["aws"] }
prost = "0.13"
Expand Down
4 changes: 3 additions & 1 deletion src/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ pub(crate) fn create_object_store() -> Result<Arc<dyn ObjectStore>> {
println!("Warning! AWS_ACCESS_KEY_ID and/or AWS_SECRET_ACCESS_KEY are not defined");
}

// TODO configs
let bucket_name = "dfray";
let region = "us-east-1";
let endpoint = "http://127.0.0.1:9000";

Ok(Arc::new(
AmazonS3Builder::new()
.with_endpoint("http://127.0.0.1:9000")
.with_endpoint(endpoint)
.with_allow_http(true)
.with_region(region)
.with_bucket_name(bucket_name)
Expand Down
21 changes: 9 additions & 12 deletions src/shuffle/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
};
use futures::{Stream, StreamExt};
use glob::glob;
use log::debug;
use object_store::{path::Path as ObjectStorePath, ObjectStore};
use std::any::Any;
Expand Down Expand Up @@ -115,22 +114,20 @@ impl ExecutionPlan for ShuffleReaderExec {
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let pattern = format!(
"/{}/shuffle_{}_*_{partition}.arrow",
self.shuffle_dir, self.stage_id
);

let mut streams: Vec<SendableRecordBatchStream> = vec![];
for entry in glob(&pattern).expect("Failed to read glob pattern") {
let file = entry.unwrap();
let num_input_parts = 8;

for input_part in 0..num_input_parts {
let file = format!(
"/{}/shuffle_{}_{}_{partition}.arrow",
self.shuffle_dir, self.stage_id, input_part
);
debug!(
"ShuffleReaderExec partition {} reading from stage {} file {}",
partition,
self.stage_id,
file.display()
partition, self.stage_id, file
);

let stream = LocalShuffleStream::new(file);
let stream = LocalShuffleStream::new(PathBuf::from(&file));
if self.schema != stream.schema() {
return Err(DataFusionError::Internal(
"Not all shuffle files have the same schema".to_string(),
Expand Down
22 changes: 14 additions & 8 deletions src/shuffle/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,19 +332,19 @@ impl IPCWriter {
self.writer.finish()?;

if self.num_batches > 0 {
println!(
"Uploading shuffle file {} containing {} bytes",
&self.path.display(),
self.num_bytes
);

// upload to object storage
let mut file = File::open(&self.path)?;

let object_store_path = ObjectStorePath::from_filesystem_path(&self.path)?;

// TODO make threshold configurable
if self.num_bytes > 5 * 1024 * 1024 {
if self.num_bytes > 10 * 1024 * 1024 {
println!(
"Uploading shuffle file {} containing {} bytes (put_multipart)",
&self.path.display(),
self.num_bytes
);

// use multipart put for larger files
const CHUNK_SIZE: usize = 16 * 1024;
let mut buffer = [0u8; CHUNK_SIZE];
Expand All @@ -359,7 +359,13 @@ impl IPCWriter {
}
let _put_result = writer.complete().await?;
} else {
let mut buffer = Vec::new();
println!(
"Uploading shuffle file {} containing {} bytes (put)",
&self.path.display(),
self.num_bytes
);

let mut buffer = Vec::with_capacity(self.num_bytes);
file.read_to_end(&mut buffer)?;
self.object_store
.put(&object_store_path, PutPayload::from(buffer))
Expand Down

0 comments on commit b0e4aed

Please sign in to comment.