From d825dbaef34fb3ed2dabd5ba2d1e7457bb713404 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 6 Jun 2024 08:32:19 -0400 Subject: [PATCH] Add `StreamProvider` for configuring `StreamTable` (#10600) * Start setting up new StreamTable config * Cleanup * Cleanup * Fix some tests * Cleanup * Start adding example * Feedback --- datafusion-examples/Cargo.toml | 3 + datafusion-examples/README.md | 1 + .../examples/file_stream_provider.rs | 186 ++++++++++++++++++ datafusion/core/src/datasource/stream.rs | 140 +++++++++---- .../core/src/physical_optimizer/test_utils.rs | 5 +- datafusion/core/src/test_util/mod.rs | 6 +- datafusion/core/tests/fifo/mod.rs | 6 +- datafusion/core/tests/sql/joins.rs | 14 +- 8 files changed, 311 insertions(+), 50 deletions(-) create mode 100644 datafusion-examples/examples/file_stream_provider.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 0074a2b8d40cf..0bcf7c1afc15f 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tonic = "0.11" url = { workspace = true } uuid = "1.7" + +[target.'cfg(not(target_os = "windows"))'.dev-dependencies] +nix = { version = "0.28.0", features = ["fs"] } diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index a5395ea7aab33..c34f706adb827 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -56,6 +56,7 @@ cargo run --example csv_sql - [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s +- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function diff --git a/datafusion-examples/examples/file_stream_provider.rs b/datafusion-examples/examples/file_stream_provider.rs new file mode 100644 index 0000000000000..4e79f9afc2ca1 --- /dev/null +++ b/datafusion-examples/examples/file_stream_provider.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::assert_batches_eq; +use datafusion_common::instant::Instant; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use arrow::datatypes::{DataType, Field, Schema}; +use arrow_schema::SchemaRef; +use futures::StreamExt; +use nix::sys::stat; +use nix::unistd; +use tempfile::TempDir; +use tokio::task::JoinSet; + +use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; +use datafusion::datasource::TableProvider; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::{exec_err, Result}; +use datafusion_expr::Expr; + +// Number of lines written to FIFO +const TEST_BATCH_SIZE: usize = 5; +const TEST_DATA_SIZE: usize = 5; + +/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait +fn fifo_table( + schema: SchemaRef, + path: impl Into, + sort: Vec>, +) -> Arc { + let source = FileStreamProvider::new_file(schema, path.into()) + .with_batch_size(TEST_BATCH_SIZE) + .with_header(true); + let config = StreamConfig::new(Arc::new(source)).with_order(sort); + Arc::new(StreamTable::new(Arc::new(config))) +} + +fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result { + let file_path = tmp_dir.path().join(file_name); + // Simulate an infinite environment via a FIFO file + if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) { + exec_err!("{}", e) + } else { + Ok(file_path) + } +} + +fn write_to_fifo( + mut file: &File, + line: &str, + ref_time: Instant, + broken_pipe_timeout: Duration, +) -> Result<()> { + // We need to handle broken pipe error until the reader is ready. This + // is why we use a timeout to limit the wait duration for the reader. + // If the error is different than broken pipe, we fail immediately. + while let Err(e) = file.write_all(line.as_bytes()) { + if e.raw_os_error().unwrap() == 32 { + let interval = Instant::now().duration_since(ref_time); + if interval < broken_pipe_timeout { + thread::sleep(Duration::from_millis(100)); + continue; + } + } + return exec_err!("{}", e); + } + Ok(()) +} + +fn create_writing_thread( + file_path: PathBuf, + maybe_header: Option, + lines: Vec, + waiting_lock: Arc, + wait_until: usize, + tasks: &mut JoinSet<()>, +) { + // Timeout for a long period of BrokenPipe error + let broken_pipe_timeout = Duration::from_secs(10); + let sa = file_path.clone(); + // Spawn a new thread to write to the FIFO file + #[allow(clippy::disallowed_methods)] // spawn allowed only in tests + tasks.spawn_blocking(move || { + let file = OpenOptions::new().write(true).open(sa).unwrap(); + // Reference time to use when deciding to fail the test + let execution_start = Instant::now(); + if let Some(header) = maybe_header { + write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap(); + } + for (cnt, line) in lines.iter().enumerate() { + while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { + thread::sleep(Duration::from_millis(50)); + } + write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap(); + } + drop(file); + }); +} + +/// This example demonstrates a scanning against an Arrow data source (JSON) and +/// fetching results +#[tokio::main] +async fn main() -> Result<()> { + // Create session context + let config = SessionConfig::new() + .with_batch_size(TEST_BATCH_SIZE) + .with_collect_statistics(false) + .with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + let tmp_dir = TempDir::new()?; + let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?; + + let mut tasks: JoinSet<()> = JoinSet::new(); + let waiting = Arc::new(AtomicBool::new(true)); + + let data_iter = 0..TEST_DATA_SIZE; + let lines = data_iter + .map(|i| format!("{},{}\n", i, i + 1)) + .collect::>(); + + create_writing_thread( + fifo_path.clone(), + Some("a1,a2\n".to_owned()), + lines.clone(), + waiting.clone(), + TEST_DATA_SIZE, + &mut tasks, + ); + + // Create schema + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::UInt32, false), + Field::new("a2", DataType::UInt32, false), + ])); + + // Specify the ordering: + let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]]; + + let provider = fifo_table(schema.clone(), fifo_path, order.clone()); + ctx.register_table("fifo", provider)?; + + let df = ctx.sql("SELECT * FROM fifo").await.unwrap(); + let mut stream = df.execute_stream().await.unwrap(); + + let mut batches = Vec::new(); + if let Some(Ok(batch)) = stream.next().await { + batches.push(batch) + } + + let expected = vec![ + "+----+----+", + "| a1 | a2 |", + "+----+----+", + "| 0 | 1 |", + "| 1 | 2 |", + "| 2 | 3 |", + "| 3 | 4 |", + "| 4 | 5 |", + "+----+----+", + ]; + + assert_batches_eq!(&expected, &batches); + + Ok(()) +} diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index bcce3c1b64226..9cfdb7bb1168f 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -71,11 +71,13 @@ impl TableProviderFactory for StreamTableFactory { ); }; - let config = StreamConfig::new_file(schema, location.into()) + let source = FileStreamProvider::new_file(schema, location.into()) .with_encoding(encoding) - .with_order(cmd.order_exprs.clone()) .with_batch_size(state.config().batch_size()) - .with_header(header) + .with_header(header); + + let config = StreamConfig::new(Arc::new(source)) + .with_order(cmd.order_exprs.clone()) .with_constraints(cmd.constraints.clone()); Ok(Arc::new(StreamTable(Arc::new(config)))) @@ -103,19 +105,44 @@ impl FromStr for StreamEncoding { } } -/// The configuration for a [`StreamTable`] +/// The StreamProvider trait is used as a generic interface for reading and writing from streaming +/// data sources (such as FIFO, Websocket, Kafka, etc.). Implementations of the provider are +/// responsible for providing a `RecordBatchReader` and optionally a `RecordBatchWriter`. +pub trait StreamProvider: std::fmt::Debug + Send + Sync { + /// Get a reference to the schema for this stream + fn schema(&self) -> &SchemaRef; + /// Provide `RecordBatchReader` + fn reader(&self) -> Result>; + /// Provide `RecordBatchWriter` + fn writer(&self) -> Result> { + unimplemented!() + } + /// Display implementation when using as a DataSink + fn stream_write_display( + &self, + t: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result; +} + +/// Stream data from the file at `location` +/// +/// * Data will be read sequentially from the provided `location` +/// * New data will be appended to the end of the file +/// +/// The encoding can be configured with [`Self::with_encoding`] and +/// defaults to [`StreamEncoding::Csv`] #[derive(Debug)] -pub struct StreamConfig { - schema: SchemaRef, +pub struct FileStreamProvider { location: PathBuf, - batch_size: usize, encoding: StreamEncoding, + /// Get a reference to the schema for this file stream + pub schema: SchemaRef, header: bool, - order: Vec>, - constraints: Constraints, + batch_size: usize, } -impl StreamConfig { +impl FileStreamProvider { /// Stream data from the file at `location` /// /// * Data will be read sequentially from the provided `location` @@ -129,19 +156,11 @@ impl StreamConfig { location, batch_size: 1024, encoding: StreamEncoding::Csv, - order: vec![], header: false, - constraints: Constraints::empty(), } } - /// Specify a sort order for the stream - pub fn with_order(mut self, order: Vec>) -> Self { - self.order = order; - self - } - - /// Specify the batch size + /// Set the batch size (the number of rows to load at one time) pub fn with_batch_size(mut self, batch_size: usize) -> Self { self.batch_size = batch_size; self @@ -158,11 +177,11 @@ impl StreamConfig { self.encoding = encoding; self } +} - /// Assign constraints - pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.constraints = constraints; - self +impl StreamProvider for FileStreamProvider { + fn schema(&self) -> &SchemaRef { + &self.schema } fn reader(&self) -> Result> { @@ -210,6 +229,58 @@ impl StreamConfig { } } } + + fn stream_write_display( + &self, + _t: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + f.debug_struct("StreamWrite") + .field("location", &self.location) + .field("batch_size", &self.batch_size) + .field("encoding", &self.encoding) + .field("header", &self.header) + .finish_non_exhaustive() + } +} + +/// The configuration for a [`StreamTable`] +#[derive(Debug)] +pub struct StreamConfig { + source: Arc, + order: Vec>, + constraints: Constraints, +} + +impl StreamConfig { + /// Create a new `StreamConfig` from a `StreamProvider` + pub fn new(source: Arc) -> Self { + Self { + source, + order: vec![], + constraints: Constraints::empty(), + } + } + + /// Specify a sort order for the stream + pub fn with_order(mut self, order: Vec>) -> Self { + self.order = order; + self + } + + /// Assign constraints + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = constraints; + self + } + + fn reader(&self) -> Result> { + self.source.reader() + } + + fn writer(&self) -> Result> { + self.source.writer() + } } /// A [`TableProvider`] for an unbounded stream source @@ -238,7 +309,7 @@ impl TableProvider for StreamTable { } fn schema(&self) -> SchemaRef { - self.0.schema.clone() + self.0.source.schema().clone() } fn constraints(&self) -> Option<&Constraints> { @@ -258,14 +329,14 @@ impl TableProvider for StreamTable { ) -> Result> { let projected_schema = match projection { Some(p) => { - let projected = self.0.schema.project(p)?; + let projected = self.0.source.schema().project(p)?; create_ordering(&projected, &self.0.order)? } - None => create_ordering(self.0.schema.as_ref(), &self.0.order)?, + None => create_ordering(self.0.source.schema(), &self.0.order)?, }; Ok(Arc::new(StreamingTableExec::try_new( - self.0.schema.clone(), + self.0.source.schema().clone(), vec![Arc::new(StreamRead(self.0.clone())) as _], projection, projected_schema, @@ -282,7 +353,7 @@ impl TableProvider for StreamTable { ) -> Result> { let ordering = match self.0.order.first() { Some(x) => { - let schema = self.0.schema.as_ref(); + let schema = self.0.source.schema(); let orders = create_ordering(schema, std::slice::from_ref(x))?; let ordering = orders.into_iter().next().unwrap(); Some(ordering.into_iter().map(Into::into).collect()) @@ -293,7 +364,7 @@ impl TableProvider for StreamTable { Ok(Arc::new(DataSinkExec::new( input, Arc::new(StreamWrite(self.0.clone())), - self.0.schema.clone(), + self.0.source.schema().clone(), ordering, ))) } @@ -303,12 +374,12 @@ struct StreamRead(Arc); impl PartitionStream for StreamRead { fn schema(&self) -> &SchemaRef { - &self.0.schema + self.0.source.schema() } fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let config = self.0.clone(); - let schema = self.0.schema.clone(); + let schema = self.0.source.schema().clone(); let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); let tx = builder.tx(); builder.spawn_blocking(move || { @@ -329,12 +400,7 @@ struct StreamWrite(Arc); impl DisplayAs for StreamWrite { fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("StreamWrite") - .field("location", &self.0.location) - .field("batch_size", &self.0.batch_size) - .field("encoding", &self.0.encoding) - .field("header", &self.0.header) - .finish_non_exhaustive() + self.0.source.stream_write_display(_t, f) } } diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index cfd0312f813d8..5895c39a5f87d 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; -use crate::datasource::stream::{StreamConfig, StreamTable}; +use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use crate::error::Result; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -62,7 +62,8 @@ async fn register_current_csv( match infinite { true => { - let config = StreamConfig::new_file(schema, path.into()); + let source = FileStreamProvider::new_file(schema, path.into()); + let config = StreamConfig::new(Arc::new(source)); ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; } false => { diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 7aec66825de3b..e876cfe46547f 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -31,7 +31,7 @@ use std::task::{Context, Poll}; use crate::dataframe::DataFrame; use crate::datasource::provider::TableProviderFactory; -use crate::datasource::stream::{StreamConfig, StreamTable}; +use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; use crate::error::Result; use crate::execution::context::{SessionState, TaskContext}; @@ -355,8 +355,8 @@ pub fn register_unbounded_file_with_ordering( table_name: &str, file_sort_order: Vec>, ) -> Result<()> { - let config = - StreamConfig::new_file(schema, file_path.into()).with_order(file_sort_order); + let source = FileStreamProvider::new_file(schema, file_path.into()); + let config = StreamConfig::new(Arc::new(source)).with_order(file_sort_order); // Register table: ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; diff --git a/datafusion/core/tests/fifo/mod.rs b/datafusion/core/tests/fifo/mod.rs index a63240d03d941..2e21abffab875 100644 --- a/datafusion/core/tests/fifo/mod.rs +++ b/datafusion/core/tests/fifo/mod.rs @@ -39,7 +39,7 @@ mod unix_test { use tempfile::TempDir; use tokio::task::{spawn_blocking, JoinHandle}; - use datafusion::datasource::stream::{StreamConfig, StreamTable}; + use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::datasource::TableProvider; use datafusion::{ prelude::{CsvReadOptions, SessionConfig, SessionContext}, @@ -54,10 +54,10 @@ mod unix_test { path: impl Into, sort: Vec>, ) -> Arc { - let config = StreamConfig::new_file(schema, path.into()) - .with_order(sort) + let source = FileStreamProvider::new_file(schema, path.into()) .with_batch_size(TEST_BATCH_SIZE) .with_header(true); + let config = StreamConfig::new(Arc::new(source)).with_order(sort); Arc::new(StreamTable::new(Arc::new(config))) } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index f7d5205db0d3f..fad9b94b01120 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::stream::{StreamConfig, StreamTable}; +use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::test_util::register_unbounded_file_with_ordering; use super::*; @@ -166,12 +166,14 @@ async fn join_change_in_planner_without_sort() -> Result<()> { Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); - let left = StreamConfig::new_file(schema.clone(), left_file_path); + let left_source = FileStreamProvider::new_file(schema.clone(), left_file_path); + let left = StreamConfig::new(Arc::new(left_source)); ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?; let right_file_path = tmp_dir.path().join("right.csv"); File::create(right_file_path.clone())?; - let right = StreamConfig::new_file(schema, right_file_path); + let right_source = FileStreamProvider::new_file(schema, right_file_path); + let right = StreamConfig::new(Arc::new(right_source)); ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?; let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; let dataframe = ctx.sql(sql).await?; @@ -216,11 +218,13 @@ async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> { Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); - let left = StreamConfig::new_file(schema.clone(), left_file_path); + let left_source = FileStreamProvider::new_file(schema.clone(), left_file_path); + let left = StreamConfig::new(Arc::new(left_source)); ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?; let right_file_path = tmp_dir.path().join("right.csv"); File::create(right_file_path.clone())?; - let right = StreamConfig::new_file(schema.clone(), right_file_path); + let right_source = FileStreamProvider::new_file(schema.clone(), right_file_path); + let right = StreamConfig::new(Arc::new(right_source)); ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?; let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?; match df.create_physical_plan().await {