Skip to content

Commit

Permalink
Add StreamProvider for configuring StreamTable (apache#10600)
Browse files Browse the repository at this point in the history
* Start setting up new StreamTable config

* Cleanup

* Cleanup

* Fix some tests

* Cleanup

* Start adding example

* Feedback
  • Loading branch information
matthewmturner authored and findepi committed Jul 16, 2024
1 parent 24facbf commit d825dba
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 50 deletions.
3 changes: 3 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
url = { workspace = true }
uuid = "1.7"

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.28.0", features = ["fs"] }
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ cargo run --example csv_sql
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and analyze `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
Expand Down
186 changes: 186 additions & 0 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::assert_batches_eq;
use datafusion_common::instant::Instant;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SchemaRef;
use futures::StreamExt;
use nix::sys::stat;
use nix::unistd;
use tempfile::TempDir;
use tokio::task::JoinSet;

use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
const TEST_DATA_SIZE: usize = 5;

/// Makes a TableProvider for a fifo file using `StreamTable` with the `StreamProvider` trait
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
.with_header(true);
let config = StreamConfig::new(Arc::new(source)).with_order(sort);
Arc::new(StreamTable::new(Arc::new(config)))
}

fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
let file_path = tmp_dir.path().join(file_name);
// Simulate an infinite environment via a FIFO file
if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
exec_err!("{}", e)
} else {
Ok(file_path)
}
}

fn write_to_fifo(
mut file: &File,
line: &str,
ref_time: Instant,
broken_pipe_timeout: Duration,
) -> Result<()> {
// We need to handle broken pipe error until the reader is ready. This
// is why we use a timeout to limit the wait duration for the reader.
// If the error is different than broken pipe, we fail immediately.
while let Err(e) = file.write_all(line.as_bytes()) {
if e.raw_os_error().unwrap() == 32 {
let interval = Instant::now().duration_since(ref_time);
if interval < broken_pipe_timeout {
thread::sleep(Duration::from_millis(100));
continue;
}
}
return exec_err!("{}", e);
}
Ok(())
}

fn create_writing_thread(
file_path: PathBuf,
maybe_header: Option<String>,
lines: Vec<String>,
waiting_lock: Arc<AtomicBool>,
wait_until: usize,
tasks: &mut JoinSet<()>,
) {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
let sa = file_path.clone();
// Spawn a new thread to write to the FIFO file
#[allow(clippy::disallowed_methods)] // spawn allowed only in tests
tasks.spawn_blocking(move || {
let file = OpenOptions::new().write(true).open(sa).unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
if let Some(header) = maybe_header {
write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
}
for (cnt, line) in lines.iter().enumerate() {
while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
thread::sleep(Duration::from_millis(50));
}
write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
}
drop(file);
});
}

/// This example demonstrates a scanning against an Arrow data source (JSON) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
// Create session context
let config = SessionConfig::new()
.with_batch_size(TEST_BATCH_SIZE)
.with_collect_statistics(false)
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
let tmp_dir = TempDir::new()?;
let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;

let mut tasks: JoinSet<()> = JoinSet::new();
let waiting = Arc::new(AtomicBool::new(true));

let data_iter = 0..TEST_DATA_SIZE;
let lines = data_iter
.map(|i| format!("{},{}\n", i, i + 1))
.collect::<Vec<_>>();

create_writing_thread(
fifo_path.clone(),
Some("a1,a2\n".to_owned()),
lines.clone(),
waiting.clone(),
TEST_DATA_SIZE,
&mut tasks,
);

// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));

// Specify the ordering:
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];

let provider = fifo_table(schema.clone(), fifo_path, order.clone());
ctx.register_table("fifo", provider)?;

let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
let mut stream = df.execute_stream().await.unwrap();

let mut batches = Vec::new();
if let Some(Ok(batch)) = stream.next().await {
batches.push(batch)
}

let expected = vec![
"+----+----+",
"| a1 | a2 |",
"+----+----+",
"| 0 | 1 |",
"| 1 | 2 |",
"| 2 | 3 |",
"| 3 | 4 |",
"| 4 | 5 |",
"+----+----+",
];

assert_batches_eq!(&expected, &batches);

Ok(())
}
Loading

0 comments on commit d825dba

Please sign in to comment.