Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix-list-sort
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 29, 2023
2 parents a584248 + cb793a5 commit 8dafbf3
Show file tree
Hide file tree
Showing 61 changed files with 1,370 additions and 1,436 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Install python dev
run: |
apt update
apt install -y libpython3.9-dev
apt install -y libpython3.11-dev
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down
5 changes: 1 addition & 4 deletions arrow-array/src/array/dictionary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,7 @@ pub struct TypedDictionaryArray<'a, K: ArrowDictionaryKeyType, V> {
// Manually implement `Clone` to avoid `V: Clone` type constraint
impl<'a, K: ArrowDictionaryKeyType, V> Clone for TypedDictionaryArray<'a, K, V> {
fn clone(&self) -> Self {
Self {
dictionary: self.dictionary,
values: self.values,
}
*self
}
}

Expand Down
9 changes: 4 additions & 5 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ mod tests {
assert_eq!(3, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
let formatted = vec!["00:00:00.001", "10:30:00.005", "23:59:59.210"];
let formatted = ["00:00:00.001", "10:30:00.005", "23:59:59.210"];
for (i, formatted) in formatted.iter().enumerate().take(3) {
// check that we can't create dates or datetimes from time instances
assert_eq!(None, arr.value_as_datetime(i));
Expand All @@ -1604,7 +1604,7 @@ mod tests {
assert_eq!(3, arr.len());
assert_eq!(0, arr.offset());
assert_eq!(0, arr.null_count());
let formatted = vec!["00:00:00.001", "10:30:00.005", "23:59:59.210"];
let formatted = ["00:00:00.001", "10:30:00.005", "23:59:59.210"];
for (i, item) in formatted.iter().enumerate().take(3) {
// check that we can't create dates or datetimes from time instances
assert_eq!(None, arr.value_as_datetime(i));
Expand Down Expand Up @@ -2219,7 +2219,7 @@ mod tests {

#[test]
fn test_decimal_from_iter_values() {
let array = Decimal128Array::from_iter_values(vec![-100, 0, 101].into_iter());
let array = Decimal128Array::from_iter_values(vec![-100, 0, 101]);
assert_eq!(array.len(), 3);
assert_eq!(array.data_type(), &DataType::Decimal128(38, 10));
assert_eq!(-100_i128, array.value(0));
Expand Down Expand Up @@ -2419,8 +2419,7 @@ mod tests {
expected = "Trying to access an element at index 4 from a PrimitiveArray of length 3"
)]
fn test_fixed_size_binary_array_get_value_index_out_of_bound() {
let array = Decimal128Array::from_iter_values(vec![-100, 0, 101].into_iter());

let array = Decimal128Array::from(vec![-100, 0, 101]);
array.value(4);
}

Expand Down
7 changes: 2 additions & 5 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,7 @@ pub struct TypedRunArray<'a, R: RunEndIndexType, V> {
// Manually implement `Clone` to avoid `V: Clone` type constraint
impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> {
fn clone(&self) -> Self {
Self {
run_array: self.run_array,
values: self.values,
}
*self
}
}

Expand Down Expand Up @@ -1093,7 +1090,7 @@ mod tests {
let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
let array = RunArray::try_new(&run, &values).unwrap();

let expected = vec![
let expected = [
true, true, true, false, false, false, true, true, true, false, false, false,
];

Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/array/string_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,14 @@ mod tests {

#[test]
fn test_string_array_from_iter_values() {
let data = vec!["hello", "hello2"];
let data = ["hello", "hello2"];
let array1 = StringArray::from_iter_values(data.iter());

assert_eq!(array1.value(0), "hello");
assert_eq!(array1.value(1), "hello2");

// Also works with String types.
let data2: Vec<String> = vec!["goodbye".into(), "goodbye2".into()];
let data2 = ["goodbye".to_string(), "goodbye2".to_string()];
let array2 = StringArray::from_iter_values(data2.iter());

assert_eq!(array2.value(0), "goodbye");
Expand Down
2 changes: 1 addition & 1 deletion arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ mod tests {
))))
.add_child_data(a2_child.into_data())
.len(2)
.add_buffer(Buffer::from(vec![0i32, 3, 4].to_byte_slice()))
.add_buffer(Buffer::from([0i32, 3, 4].to_byte_slice()))
.build()
.unwrap();
let a2: ArrayRef = Arc::new(ListArray::from(a2));
Expand Down
4 changes: 2 additions & 2 deletions arrow-array/src/run_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod tests {
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec.clone().into_iter());
builder.extend(input_vec.iter().copied());
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();

Expand All @@ -261,7 +261,7 @@ mod tests {
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec.into_iter());
builder.extend(input_vec);
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();

Expand Down
2 changes: 1 addition & 1 deletion arrow-array/src/trusted_len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod tests {

#[test]
fn trusted_len_unzip_good() {
let vec = vec![Some(1u32), None];
let vec = [Some(1u32), None];
let (null, buffer) = unsafe { trusted_len_unzip(vec.iter()) };
assert_eq!(null.as_slice(), &[0b00000001]);
assert_eq!(buffer.as_slice(), &[1u8, 0, 0, 0, 0, 0, 0, 0]);
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub trait ToByteSlice {
impl<T: ArrowNativeType> ToByteSlice for [T] {
#[inline]
fn to_byte_slice(&self) -> &[u8] {
let raw_ptr = self.as_ptr() as *const T as *const u8;
let raw_ptr = self.as_ptr() as *const u8;
unsafe { std::slice::from_raw_parts(raw_ptr, std::mem::size_of_val(self)) }
}
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/util/bit_chunk_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<'a> UnalignedBitChunk<'a> {
self.prefix
.into_iter()
.chain(self.chunks.iter().cloned())
.chain(self.suffix.into_iter())
.chain(self.suffix)
}

/// Counts the number of ones
Expand Down
47 changes: 11 additions & 36 deletions arrow-cast/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3428,50 +3428,24 @@ mod tests {

macro_rules! generate_cast_test_case {
($INPUT_ARRAY: expr, $OUTPUT_TYPE_ARRAY: ident, $OUTPUT_TYPE: expr, $OUTPUT_VALUES: expr) => {
let output = $OUTPUT_TYPE_ARRAY::from($OUTPUT_VALUES)
.with_data_type($OUTPUT_TYPE.clone());

// assert cast type
let input_array_type = $INPUT_ARRAY.data_type();
assert!(can_cast_types(input_array_type, $OUTPUT_TYPE));
let casted_array = cast($INPUT_ARRAY, $OUTPUT_TYPE).unwrap();
let result_array = casted_array
.as_any()
.downcast_ref::<$OUTPUT_TYPE_ARRAY>()
.unwrap();
assert_eq!($OUTPUT_TYPE, result_array.data_type());
assert_eq!(result_array.len(), $OUTPUT_VALUES.len());
for (i, x) in $OUTPUT_VALUES.iter().enumerate() {
match x {
Some(x) => {
assert!(!result_array.is_null(i));
assert_eq!(result_array.value(i), *x);
}
None => {
assert!(result_array.is_null(i));
}
}
}
let result = cast($INPUT_ARRAY, $OUTPUT_TYPE).unwrap();
assert_eq!($OUTPUT_TYPE, result.data_type());
assert_eq!(result.as_ref(), &output);

let cast_option = CastOptions {
safe: false,
format_options: FormatOptions::default(),
};
let casted_array_with_option =
let result =
cast_with_options($INPUT_ARRAY, $OUTPUT_TYPE, &cast_option).unwrap();
let result_array = casted_array_with_option
.as_any()
.downcast_ref::<$OUTPUT_TYPE_ARRAY>()
.unwrap();
assert_eq!($OUTPUT_TYPE, result_array.data_type());
assert_eq!(result_array.len(), $OUTPUT_VALUES.len());
for (i, x) in $OUTPUT_VALUES.iter().enumerate() {
match x {
Some(x) => {
assert_eq!(result_array.value(i), *x);
}
None => {
assert!(result_array.is_null(i));
}
}
}
assert_eq!($OUTPUT_TYPE, result.data_type());
assert_eq!(result.as_ref(), &output);
};
}

Expand Down Expand Up @@ -5997,7 +5971,7 @@ mod tests {

#[test]
fn test_str_to_str_casts() {
for data in vec![
for data in [
vec![Some("foo"), Some("bar"), Some("ham")],
vec![Some("foo"), None, Some("bar")],
] {
Expand Down Expand Up @@ -8934,6 +8908,7 @@ mod tests {
};

#[test]
#[allow(clippy::assertions_on_constants)]
fn test_const_options() {
assert!(CAST_OPTIONS.safe)
}
Expand Down
14 changes: 5 additions & 9 deletions arrow-cast/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,15 +1003,11 @@ impl Interval {
fn parse(value: &str, config: &IntervalParseConfig) -> Result<Self, ArrowError> {
let components = parse_interval_components(value, config)?;

let result = components.into_iter().fold(
Ok(Self::default()),
|result, (amount, unit)| match result {
Ok(result) => result.add(amount, unit),
Err(e) => Err(e),
},
)?;

Ok(result)
components
.into_iter()
.try_fold(Self::default(), |result, (amount, unit)| {
result.add(amount, unit)
})
}

/// Interval addition following Postgres behavior. Fractional units will be spilled into smaller units.
Expand Down
2 changes: 1 addition & 1 deletion arrow-cast/src/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ mod tests {
let mut buf = String::new();
write!(&mut buf, "{}", pretty_format_batches(&[batch]).unwrap()).unwrap();

let s = vec![
let s = [
"+---+-----+",
"| a | b |",
"+---+-----+",
Expand Down
3 changes: 3 additions & 0 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ cli = ["arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subscriber", "t
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
assert_cmd = "2.0.8"
http = "0.2.9"
http-body = "0.4.5"
pin-project-lite = "0.2"
tempfile = "3.3"
tokio-stream = { version = "0.1", features = ["net"] }
tower = "0.4.13"
Expand Down
20 changes: 9 additions & 11 deletions arrow-flight/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
use std::task::Poll;

use crate::{
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, PutResult, Ticket,
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
trailers::extract_lazy_trailers, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, PutResult, Ticket,
};
use arrow_schema::Schema;
use bytes::Bytes;
Expand Down Expand Up @@ -204,16 +204,14 @@ impl FlightClient {
pub async fn do_get(&mut self, ticket: Ticket) -> Result<FlightRecordBatchStream> {
let request = self.make_request(ticket);

let response_stream = self
.inner
.do_get(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);
let (md, response_stream, _ext) = self.inner.do_get(request).await?.into_parts();
let (response_stream, trailers) = extract_lazy_trailers(response_stream);

Ok(FlightRecordBatchStream::new_from_flight_data(
response_stream,
))
response_stream.map_err(FlightError::Tonic),
)
.with_headers(md)
.with_trailers(trailers))
}

/// Make a `GetFlightInfo` call to the server with the provided
Expand Down
44 changes: 42 additions & 2 deletions arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{utils::flight_data_to_arrow_batch, FlightData};
use crate::{trailers::LazyTrailers, utils::flight_data_to_arrow_batch, FlightData};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_schema::{Schema, SchemaRef};
Expand All @@ -24,6 +24,7 @@ use futures::{ready, stream::BoxStream, Stream, StreamExt};
use std::{
collections::HashMap, convert::TryFrom, fmt::Debug, pin::Pin, sync::Arc, task::Poll,
};
use tonic::metadata::MetadataMap;

use crate::error::{FlightError, Result};

Expand Down Expand Up @@ -82,13 +83,23 @@ use crate::error::{FlightError, Result};
/// ```
#[derive(Debug)]
pub struct FlightRecordBatchStream {
/// Optional grpc header metadata.
headers: MetadataMap,

/// Optional grpc trailer metadata.
trailers: Option<LazyTrailers>,

inner: FlightDataDecoder,
}

impl FlightRecordBatchStream {
/// Create a new [`FlightRecordBatchStream`] from a decoded stream
pub fn new(inner: FlightDataDecoder) -> Self {
Self { inner }
Self {
inner,
headers: MetadataMap::default(),
trailers: None,
}
}

/// Create a new [`FlightRecordBatchStream`] from a stream of [`FlightData`]
Expand All @@ -98,9 +109,37 @@ impl FlightRecordBatchStream {
{
Self {
inner: FlightDataDecoder::new(inner),
headers: MetadataMap::default(),
trailers: None,
}
}

/// Record response headers.
pub fn with_headers(self, headers: MetadataMap) -> Self {
Self { headers, ..self }
}

/// Record response trailers.
pub fn with_trailers(self, trailers: LazyTrailers) -> Self {
Self {
trailers: Some(trailers),
..self
}
}

/// Headers attached to this stream.
pub fn headers(&self) -> &MetadataMap {
&self.headers
}

/// Trailers attached to this stream.
///
/// Note that this will return `None` until the entire stream is consumed.
/// Only after calling `next()` returns `None`, might any available trailers be returned.
pub fn trailers(&self) -> Option<MetadataMap> {
self.trailers.as_ref().and_then(|trailers| trailers.get())
}

/// Has a message defining the schema been received yet?
#[deprecated = "use schema().is_some() instead"]
pub fn got_schema(&self) -> bool {
Expand All @@ -117,6 +156,7 @@ impl FlightRecordBatchStream {
self.inner
}
}

impl futures::Stream for FlightRecordBatchStream {
type Item = Result<RecordBatch>;

Expand Down
Loading

0 comments on commit 8dafbf3

Please sign in to comment.