diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index a3f100cbcb53..b5f82b4d5140 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -54,6 +54,7 @@ cargo run --example dataframe - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file +- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3 diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_json_opener.rs similarity index 50% rename from datafusion-examples/examples/csv_opener.rs rename to datafusion-examples/examples/csv_json_opener.rs index e7b7ead109bc..334e4c83404f 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -15,28 +15,36 @@ // specific language governing permissions and limitations // under the License. -use std::{sync::Arc, vec}; +use std::sync::Arc; +use arrow_schema::{DataType, Field, Schema}; use datafusion::{ assert_batches_eq, datasource::{ file_format::file_compression_type::FileCompressionType, listing::PartitionedFile, object_store::ObjectStoreUrl, - physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream}, + physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener}, }, error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, test_util::aggr_test_schema, }; - use futures::StreamExt; -use object_store::local::LocalFileSystem; +use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; -/// This example demonstrates a scanning against an Arrow data source (CSV) and -/// fetching results +/// This example demonstrates using the low level [`FileStream`] / [`FileOpener`] APIs to directly +/// read data from (CSV/JSON) into Arrow RecordBatches. +/// +/// If you want to query data in CSV or JSON files, see the [`dataframe.rs`] and [`sql_query.rs`] examples #[tokio::main] async fn main() -> Result<()> { + csv_opener().await?; + json_opener().await?; + Ok(()) +} + +async fn csv_opener() -> Result<()> { let object_store = Arc::new(LocalFileSystem::new()); let schema = aggr_test_schema(); @@ -59,18 +67,17 @@ async fn main() -> Result<()> { let path = std::path::Path::new(&path).canonicalize()?; - let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) - .with_projection(Some(vec![12, 0])) - .with_limit(Some(5)) - .with_file(PartitionedFile::new(path.display().to_string(), 10)); - - let result = - FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) - .unwrap() - .map(|b| b.unwrap()) - .collect::>() - .await; + let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_projection(Some(vec![12, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.display().to_string(), 10)); + + let mut result = vec![]; + let mut stream = + FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?; + while let Some(batch) = stream.next().await.transpose()? { + result.push(batch); + } assert_batches_eq!( &[ "+--------------------------------+----+", @@ -87,3 +94,54 @@ async fn main() -> Result<()> { ); Ok(()) } + +async fn json_opener() -> Result<()> { + let object_store = InMemory::new(); + let path = object_store::path::Path::from("demo.json"); + let data = bytes::Bytes::from( + r#"{"num":5,"str":"test"} + {"num":2,"str":"hello"} + {"num":4,"str":"foo"}"#, + ); + + object_store.put(&path, data.into()).await?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("num", DataType::Int64, false), + Field::new("str", DataType::Utf8, false), + ])); + + let projected = Arc::new(schema.clone().project(&[1, 0])?); + + let opener = JsonOpener::new( + 8192, + projected, + FileCompressionType::UNCOMPRESSED, + Arc::new(object_store), + ); + + let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema) + .with_projection(Some(vec![1, 0])) + .with_limit(Some(5)) + .with_file(PartitionedFile::new(path.to_string(), 10)); + + let mut stream = + FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?; + let mut result = vec![]; + while let Some(batch) = stream.next().await.transpose()? { + result.push(batch); + } + assert_batches_eq!( + &[ + "+-------+-----+", + "| str | num |", + "+-------+-----+", + "| test | 5 |", + "| hello | 2 |", + "| foo | 4 |", + "+-------+-----+", + ], + &result + ); + Ok(()) +} diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs deleted file mode 100644 index 7bc431c5c5ee..000000000000 --- a/datafusion-examples/examples/json_opener.rs +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::{sync::Arc, vec}; - -use arrow_schema::{DataType, Field, Schema}; -use datafusion::{ - assert_batches_eq, - datasource::{ - file_format::file_compression_type::FileCompressionType, - listing::PartitionedFile, - object_store::ObjectStoreUrl, - physical_plan::{FileScanConfig, FileStream, JsonOpener}, - }, - error::Result, - physical_plan::metrics::ExecutionPlanMetricsSet, -}; - -use futures::StreamExt; -use object_store::ObjectStore; - -/// This example demonstrates a scanning against an Arrow data source (JSON) and -/// fetching results -#[tokio::main] -async fn main() -> Result<()> { - let object_store = object_store::memory::InMemory::new(); - let path = object_store::path::Path::from("demo.json"); - let data = bytes::Bytes::from( - r#"{"num":5,"str":"test"} - {"num":2,"str":"hello"} - {"num":4,"str":"foo"}"#, - ); - object_store.put(&path, data.into()).await.unwrap(); - - let schema = Arc::new(Schema::new(vec![ - Field::new("num", DataType::Int64, false), - Field::new("str", DataType::Utf8, false), - ])); - - let projected = Arc::new(schema.clone().project(&[1, 0])?); - - let opener = JsonOpener::new( - 8192, - projected, - FileCompressionType::UNCOMPRESSED, - Arc::new(object_store), - ); - - let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone()) - .with_projection(Some(vec![1, 0])) - .with_limit(Some(5)) - .with_file(PartitionedFile::new(path.to_string(), 10)); - - let result = - FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) - .unwrap() - .map(|b| b.unwrap()) - .collect::>() - .await; - assert_batches_eq!( - &[ - "+-------+-----+", - "| str | num |", - "+-------+-----+", - "| test | 5 |", - "| hello | 2 |", - "| foo | 4 |", - "+-------+-----+", - ], - &result - ); - Ok(()) -}