Skip to content

Commit

Permalink
Merge branch 'main' into chore/test-compound-field-access
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Dec 12, 2024
2 parents cafb862 + 44f4be2 commit 54fcb4b
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 75 deletions.
115 changes: 48 additions & 67 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
apache-avro = { version = "0.16", default-features = false, features = [
apache-avro = { version = "0.17", default-features = false, features = [
"bzip",
"snappy",
"xz",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ unicode_expressions = [
]

[dependencies]
apache-avro = { version = "0.16", optional = true }
apache-avro = { version = "0.17", optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-ipc = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
}
AvroSchema::Array(schema) => {
let sub_parent_field_name = format!("{}.element", parent_field_name);
Self::child_schema_lookup(&sub_parent_field_name, schema, schema_lookup)?;
Self::child_schema_lookup(
&sub_parent_field_name,
&schema.items,
schema_lookup,
)?;
}
_ => (),
}
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/src/datasource/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,15 @@ fn schema_to_field_with_props(
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Arc::new(
schema_to_field_with_props(item_schema, Some("element"), false, None)?,
schema_to_field_with_props(&item_schema.items, Some("element"), false, None)?,
)),
AvroSchema::Map(value_schema) => {
let value_field =
schema_to_field_with_props(value_schema, Some("value"), false, None)?;
let value_field = schema_to_field_with_props(
&value_schema.types,
Some("value"),
false,
None,
)?;
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(value_field.data_type().clone()),
Expand Down Expand Up @@ -144,14 +148,17 @@ fn schema_to_field_with_props(
AvroSchema::Decimal(DecimalSchema {
precision, scale, ..
}) => DataType::Decimal128(*precision as u8, *scale as i8),
AvroSchema::BigDecimal => DataType::LargeBinary,
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
AvroSchema::TimestampNanos => DataType::Timestamp(TimeUnit::Nanosecond, None),
AvroSchema::LocalTimestampMillis => todo!(),
AvroSchema::LocalTimestampMicros => todo!(),
AvroSchema::LocalTimestampNanos => todo!(),
AvroSchema::Duration => DataType::Duration(TimeUnit::Millisecond),
};

Expand Down Expand Up @@ -371,6 +378,7 @@ mod test {
aliases: Some(vec![alias("foofixed"), alias("barfixed")]),
size: 1,
doc: None,
default: None,
attributes: Default::default(),
});
let props = external_props(&fixed_schema);
Expand Down
27 changes: 25 additions & 2 deletions datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ impl RecordBatchReceiverStream {

pin_project! {
/// Combines a [`Stream`] with a [`SchemaRef`] implementing
/// [`RecordBatchStream`] for the combination
/// [`SendableRecordBatchStream`] for the combination
///
/// See [`Self::new`] for an example
pub struct RecordBatchStreamAdapter<S> {
schema: SchemaRef,

Expand All @@ -347,7 +349,28 @@ pin_project! {
}

impl<S> RecordBatchStreamAdapter<S> {
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
/// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream.
///
/// Note to create a [`SendableRecordBatchStream`] you pin the result
///
/// # Example
/// ```
/// # use arrow::array::record_batch;
/// # use datafusion_execution::SendableRecordBatchStream;
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
/// // Create stream of Result<RecordBatch>
/// let batch = record_batch!(
/// ("a", Int32, [1, 2, 3]),
/// ("b", Float64, [Some(4.0), None, Some(5.0)])
/// ).expect("created batch");
/// let schema = batch.schema();
/// let stream = futures::stream::iter(vec![Ok(batch)]);
/// // Convert the stream to a SendableRecordBatchStream
/// let adapter = RecordBatchStreamAdapter::new(schema, stream);
/// // Now you can use the adapter as a SendableRecordBatchStream
/// let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
/// // ...
/// ```
pub fn new(schema: SchemaRef, stream: S) -> Self {
Self { schema, stream }
}
Expand Down

0 comments on commit 54fcb4b

Please sign in to comment.