Skip to content

Commit

Permalink
Improve stream (#15)
Browse files Browse the repository at this point in the history
* first draft for a non blocking async stream

* fixed a bug and a test

* fixed an issue with colummn ordering in zarr chunks

* linter fix

* addressing PR comments
  • Loading branch information
maximedion2 authored Apr 8, 2024
1 parent bca8335 commit 43a5edb
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 66 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ crc32c = { version = "0.6.5" }
object_store = { version = "0.9" }
futures = { version = "0.3" }
futures-util = { version = "0.3.30" }
tokio = { version = "1.0" }
tokio = { version = "1.0", features = ["full"] }
dyn-clone = { version = "1.0.16" }
arrow = { version = "50.0.0" }
arrow-array = { version = "50.0.0" }
Expand All @@ -38,3 +38,6 @@ all = ["datafusion"]
[dev-dependencies]
arrow-cast = { version = "50.0.0", features = ["prettyprint"] }
chrono = { version = "0.4" }

[[bin]]
name = "async-benchmark"
336 changes: 287 additions & 49 deletions src/async_reader/mod.rs

Large diffs are not rendered by default.

51 changes: 45 additions & 6 deletions src/async_reader/zarr_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use futures_util::{pin_mut, StreamExt};
use object_store::{path::Path, ObjectStore};
use std::collections::HashMap;
use std::sync::Arc;

use crate::reader::metadata::ChunkSeparator;
use crate::reader::{ZarrError, ZarrResult};
use crate::reader::{ZarrInMemoryChunk, ZarrStoreMetadata};

Expand All @@ -20,6 +39,7 @@ pub trait ZarrReadAsync<'a> {
position: &'a [usize],
cols: &'a [String],
real_dims: Vec<usize>,
separators: HashMap<String, ChunkSeparator>,
) -> ZarrResult<ZarrInMemoryChunk>;
}

Expand Down Expand Up @@ -73,13 +93,27 @@ impl<'a> ZarrReadAsync<'a> for ZarrPath {
position: &'a [usize],
cols: &'a [String],
real_dims: Vec<usize>,
separators: HashMap<String, ChunkSeparator>,
) -> ZarrResult<ZarrInMemoryChunk> {
let mut chunk = ZarrInMemoryChunk::new(real_dims);
for var in cols {
let s: Vec<String> = position.iter().map(|i| i.to_string()).collect();
let s = s.join(".");

let p = self.location.child(var.to_string()).child(s);
let separator = separators
.get(var.as_str())
.ok_or(ZarrError::InvalidMetadata(
"Could not find separator for column".to_string(),
))?;

let p = match separator {
ChunkSeparator::Period => self.location.child(var.to_string()).child(s.join(".")),
ChunkSeparator::Slash => {
let mut partial_path = self.location.child(var.to_string()).child("c");
for idx in s {
partial_path = partial_path.child(idx);
}
partial_path
}
};
let data = self.store.get(&p).await?.bytes().await?;
chunk.add_array(var.to_string(), data.to_vec());
}
Expand Down Expand Up @@ -149,7 +183,12 @@ mod zarr_read_async_tests {
// test read from an array where the data is just raw bytes
let pos = vec![1, 2];
let chunk = store
.get_zarr_chunk(&pos, meta.get_columns(), meta.get_real_dims(&pos))
.get_zarr_chunk(
&pos,
meta.get_columns(),
meta.get_real_dims(&pos),
meta.get_separators(),
)
.await
.unwrap();
assert_eq!(
Expand All @@ -165,7 +204,7 @@ mod zarr_read_async_tests {
let col_proj = ZarrProjection::skip(vec!["float_data".to_string()]);
let cols = col_proj.apply_selection(meta.get_columns()).unwrap();
let chunk = store
.get_zarr_chunk(&pos, &cols, meta.get_real_dims(&pos))
.get_zarr_chunk(&pos, &cols, meta.get_real_dims(&pos), meta.get_separators())
.await
.unwrap();
assert_eq!(
Expand All @@ -177,7 +216,7 @@ mod zarr_read_async_tests {
let col_proj = ZarrProjection::keep(vec!["float_data".to_string()]);
let cols = col_proj.apply_selection(meta.get_columns()).unwrap();
let chunk = store
.get_zarr_chunk(&pos, &cols, meta.get_real_dims(&pos))
.get_zarr_chunk(&pos, &cols, meta.get_real_dims(&pos), meta.get_separators())
.await
.unwrap();
assert_eq!(
Expand Down
45 changes: 45 additions & 0 deletions src/bin/async-benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_zarr::async_reader::{ZarrPath, ZarrRecordBatchStreamBuilderNonBlocking};
use futures::TryStreamExt;
use object_store::{local::LocalFileSystem, path::Path};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;

fn get_v2_test_data_path(zarr_store: String) -> ZarrPath {
let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("test-data/data/zarr/v2_data")
.join(zarr_store);
ZarrPath::new(
Arc::new(LocalFileSystem::new()),
Path::from_absolute_path(p).unwrap(),
)
}

#[tokio::main]
async fn main() {
let zp = get_v2_test_data_path("lat_lon_example.zarr".to_string());
let stream_builder = ZarrRecordBatchStreamBuilderNonBlocking::new(zp);

let stream = stream_builder.build().await.unwrap();
let now = Instant::now();
let _: Vec<_> = stream.try_collect().await.unwrap();

println!("{:?}", now.elapsed());
}
2 changes: 1 addition & 1 deletion src/datafusion/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
let schema = first_batch.schema();

let names = schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>();
assert_eq!(names, vec!["lon", "lat"]);
assert_eq!(names, vec!["lat", "lon"]);

Ok(())
}
Expand Down
17 changes: 17 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod async_reader;
pub mod reader;

Expand Down
17 changes: 17 additions & 0 deletions src/reader/codecs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::reader::errors::throw_invalid_meta;
use crate::reader::{ZarrError, ZarrResult};
use arrow_array::*;
Expand Down
17 changes: 17 additions & 0 deletions src/reader/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_schema::ArrowError;
#[cfg(feature = "datafusion")]
use datafusion::error::DataFusionError;
Expand Down
17 changes: 17 additions & 0 deletions src/reader/filters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{BooleanArray, RecordBatch};
use arrow_schema::ArrowError;
use dyn_clone::{clone_trait_object, DynClone};
Expand Down
21 changes: 21 additions & 0 deletions src/reader/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::reader::codecs::{
BloscOptions, CodecType, CompressorName, Endianness, IndexLocation, ShardingOptions,
ShuffleOptions, ZarrCodec, ZarrDataType, PY_UNICODE_SIZE,
Expand Down Expand Up @@ -675,6 +692,10 @@ impl ZarrStoreMetadata {
}
}

// the sort below is important, because within a zarr store, the different arrays are
// not ordered, so there is no predefined order for the different columns. we effectively
// define one here, by ordering the columns alphabetically.
self.columns.sort();
Ok(())
}

Expand Down
10 changes: 2 additions & 8 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ impl<T: ZarrIterator> ZarrRecordBatchReader<T> {
let mut arrs: Vec<ArrayRef> = Vec::with_capacity(self.meta.get_num_columns());
let mut fields: Vec<FieldRef> = Vec::with_capacity(self.meta.get_num_columns());

// the sort below is important, because within a zarr store, the different arrays are
// not ordered, so there is no predefined order for the different columns. we effectively
// define one here, my ordering the columns alphabetically.
let mut cols = chunk.get_cols_in_chunk();
cols.sort();

let cols = chunk.get_cols_in_chunk();
for col in cols {
let data = chunk.take_array(&col)?;
let (arr, field) = self.unpack_array_chunk(
Expand Down Expand Up @@ -438,9 +433,8 @@ mod zarr_reader_tests {
fn validate_names_and_types(targets: &HashMap<String, DataType>, rec: &RecordBatch) {
let mut target_cols: Vec<&String> = targets.keys().collect();
let schema = rec.schema();
let mut from_rec: Vec<&String> = schema.fields.iter().map(|f| f.name()).collect();
let from_rec: Vec<&String> = schema.fields.iter().map(|f| f.name()).collect();

from_rec.sort();
target_cols.sort();
assert_eq!(from_rec, target_cols);

Expand Down
25 changes: 24 additions & 1 deletion src/reader/zarr_read.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::fs::{read, read_to_string};
Expand Down Expand Up @@ -45,7 +62,13 @@ impl ZarrInMemoryChunk {
}

pub(crate) fn get_cols_in_chunk(&self) -> Vec<String> {
self.data.keys().map(|s| s.to_string()).collect_vec()
let mut cols = self.data.keys().map(|s| s.to_string()).collect_vec();

// the sort below is important, because within a zarr store, the different arrays are
// not ordered, so there is no predefined order for the different columns. we effectively
// define one here, by ordering the columns alphabetically.
cols.sort();
cols
}

pub(crate) fn get_real_dims(&self) -> &Vec<usize> {
Expand Down

0 comments on commit 43a5edb

Please sign in to comment.