diff --git a/integrations/object_store/Cargo.toml b/integrations/object_store/Cargo.toml index d7c78cd1598e..47e02079d8be 100644 --- a/integrations/object_store/Cargo.toml +++ b/integrations/object_store/Cargo.toml @@ -58,3 +58,5 @@ tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } anyhow = "1.0.86" libtest-mimic = "0.7.3" uuid = "1.11.0" +datafusion = "44.0.0" +url = "2.5.2" \ No newline at end of file diff --git a/integrations/object_store/README.md b/integrations/object_store/README.md index 115488f3b8b2..6f30e457bd69 100644 --- a/integrations/object_store/README.md +++ b/integrations/object_store/README.md @@ -21,13 +21,21 @@ This crate can help you to access 30 more storage services with the same object_ ## Examples +`opendal_store_opendal` depends on the `opendal` crate. Please make sure to always use the latest versions of both. + +latest `object_store_opendal` ![Crate](https://img.shields.io/crates/v/object_store_opendal.svg) + +latest `opendal` ![Crate](https://img.shields.io/crates/v/opendal.svg) + +### 1. using `object_store` API to access S3 + Add the following dependencies to your `Cargo.toml` with correct version: ```toml [dependencies] object_store = "0.11.0" -object_store_opendal = "0.47.0" -opendal = { version = "0.49.0", features = ["services-s3"] } +object_store_opendal = "xxx" # see the latest version above +opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above ``` Build `OpendalStore` via `opendal::Operator`: @@ -78,6 +86,77 @@ async fn main() { } ``` +### 2. querying data in a S3 bucket using DataFusion + +Add the following dependencies to your `Cargo.toml` with correct version: + +```toml +[dependencies] +object_store = "0.11.0" +object_store_opendal = "xxx" # see the latest version above +opendal = { version = "xxx", features = ["services-s3"] } # see the latest version above +datafusion = "44.0.0" +url = "2.5.2" +``` + +Build `OpendalStore` via `opendal::Operator` and register it to `DataFusion`: + +```rust +use datafusion::error::DataFusionError; +use datafusion::error::Result; +use datafusion::prelude::*; +use opendal::services::S3; +use opendal::Operator; +use std::sync::Arc; +use url::Url; + + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Configure OpenDAL for S3 + let region = "my_region"; + let bucket_name = "my_bucket"; + let builder = S3::default() + .endpoint("my_endpoint") + .bucket(bucket_name) + .region(region) + .access_key_id("my_access_key") + .secret_access_key("my_secret_key"); + let op = Operator::new(builder) + .map_err(|err| DataFusionError::External(Box::new(err)))? + .finish(); + let store = object_store_opendal::OpendalStore::new(op); + + // Register the object store + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + ctx.register_object_store(&s3_url, Arc::new(store)); + + // Register CSV file as a table + let path = format!("s3://{bucket_name}/csv/data.csv"); + ctx.register_csv("trips", &path, CsvReadOptions::default()) + .await?; + + // Execute the query + let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?; + // Print the results + df.show().await?; + + // Dynamic query using the file path directly + let ctx = ctx.enable_url_table(); + let df = ctx + .sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) + .await?; + // Print the results + df.show().await?; + + Ok(()) +} +``` + + ## WASM support To build with `wasm32-unknown-unknown` target, you need to enable the `send_wrapper` feature: diff --git a/integrations/object_store/examples/datafusion.rs b/integrations/object_store/examples/datafusion.rs new file mode 100644 index 000000000000..e12a3f6477ca --- /dev/null +++ b/integrations/object_store/examples/datafusion.rs @@ -0,0 +1,52 @@ +use datafusion::error::DataFusionError; +use datafusion::error::Result; +use datafusion::prelude::*; +use opendal::services::S3; +use opendal::Operator; +use std::sync::Arc; +use url::Url; + +/// This example demonstrates querying data in a S3 bucket using DataFusion and `object_store_opendal` +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + // Configure OpenDAL for S3 + let region = "my_region"; + let bucket_name = "my_bucket"; + let builder = S3::default() + .endpoint("my_endpoint") + .bucket(bucket_name) + .region(region) + .access_key_id("my_access_key") + .secret_access_key("my_secret_key"); + let op = Operator::new(builder) + .map_err(|err| DataFusionError::External(Box::new(err)))? + .finish(); + let store = object_store_opendal::OpendalStore::new(op); + + // Register the object store + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + ctx.register_object_store(&s3_url, Arc::new(store)); + + // Register CSV file as a table + let path = format!("s3://{bucket_name}/csv/data.csv"); + ctx.register_csv("trips", &path, CsvReadOptions::default()) + .await?; + + // Execute the query + let df = ctx.sql("SELECT * FROM trips LIMIT 10").await?; + // Print the results + df.show().await?; + + // Dynamic query using the file path directly + let ctx = ctx.enable_url_table(); + let df = ctx + .sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) + .await?; + // Print the results + df.show().await?; + + Ok(()) +}