Skip to content

Commit

Permalink
Merge branch 'refs/heads/apache_main' into chore/single-source-exec
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
#	datafusion/physical-plan/src/memory.rs
  • Loading branch information
mertak-synnada committed Jan 10, 2025
2 parents 8040147 + 295ffb4 commit f540df0
Show file tree
Hide file tree
Showing 42 changed files with 2,115 additions and 444 deletions.
8 changes: 4 additions & 4 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

194 changes: 44 additions & 150 deletions datafusion-examples/examples/remote_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,39 @@
use arrow::array::record_batch;
use arrow_schema::{Field, Fields, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{SchemaProvider, TableProvider};
use datafusion::common::DataFusionError;
use datafusion::catalog::TableProvider;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::memory::MemorySourceConfig;
use datafusion::physical_plan::source::DataSourceExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::Session;
use datafusion_common::{
assert_batches_eq, internal_datafusion_err, plan_err, HashMap, TableReference,
};
use datafusion_catalog::{AsyncSchemaProvider, Session};
use datafusion_common::{assert_batches_eq, internal_datafusion_err, plan_err};
use datafusion_expr::{Expr, TableType};
use futures::TryStreamExt;
use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
// As always, we create a session context to interact with DataFusion
let ctx = SessionContext::new();

// Make a connection to the remote catalog, asynchronously, and configure it
let remote_catalog_interface = RemoteCatalogInterface::connect().await?;
let remote_catalog_interface = Arc::new(RemoteCatalogInterface::connect().await?);

// Register a SchemaProvider for tables in a schema named "remote_schema".
//
// This will let DataFusion query tables such as
// `datafusion.remote_schema.remote_table`
let remote_schema: Arc<dyn SchemaProvider> =
Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface)));
ctx.catalog("datafusion")
.ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))?
.register_schema("remote_schema", Arc::clone(&remote_schema))?;
// Create an adapter to provide the AsyncSchemaProvider interface to DataFusion
// based on our remote catalog interface
let remote_catalog_adapter = RemoteCatalogDatafusionAdapter(remote_catalog_interface);

// Here is a query that selects data from a table in the remote catalog.
let sql = "SELECT * from remote_schema.remote_table";

// The `SessionContext::sql` interface is async, but it does not
// support asynchronous access to catalogs, so the following query errors.
// support asynchronous access to catalogs, so we cannot register our schema provider
// directly and the following query fails to find our table.
let results = ctx.sql(sql).await;
assert_eq!(
results.unwrap_err().to_string(),
Expand All @@ -92,27 +84,26 @@ async fn main() -> Result<()> {
// `remote_schema.remote_table`)
let references = state.resolve_table_references(&statement)?;

// Call `load_tables` to load information from the remote catalog for each
// of the referenced tables. Best practice is to fetch the the information
// for all tables required by the query once (rather than one per table) to
// minimize network overhead
let table_names = references.iter().filter_map(|r| {
if refers_to_schema("datafusion", "remote_schema", r) {
Some(r.table())
} else {
None
}
});
remote_schema
.as_any()
.downcast_ref::<RemoteSchema>()
.expect("correct types")
.load_tables(table_names)
// Now we can asynchronously resolve the table references to get a cached catalog
// that we can use for our query
let resolved_catalog = remote_catalog_adapter
.resolve(&references, state.config(), "datafusion", "remote_schema")
.await?;

// Now continue planing the query after having fetched the remote table and
// it can run as normal
let plan = state.statement_to_plan(statement).await?;
// This resolved catalog only makes sense for this query and so we create a clone
// of the session context with the resolved catalog
let query_ctx = ctx.clone();

query_ctx
.catalog("datafusion")
.ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))?
.register_schema("remote_schema", resolved_catalog)?;

// We can now continue planning the query with this new query-specific context that
// contains our cached catalog
let query_state = query_ctx.state();

let plan = query_state.statement_to_plan(statement).await?;
let results = DataFrame::new(state, plan).collect().await?;
assert_batches_eq!(
[
Expand Down Expand Up @@ -146,9 +137,9 @@ impl RemoteCatalogInterface {
}

/// Fetches information for a specific table
pub async fn table_info(&self, name: &str) -> Result<SchemaRef> {
pub async fn table_info(&self, name: &str) -> Result<Option<SchemaRef>> {
if name != "remote_table" {
return plan_err!("Remote table not found: {}", name);
return Ok(None);
}

// In this example, we'll model a remote table with columns "id" and
Expand All @@ -160,7 +151,7 @@ impl RemoteCatalogInterface {
Field::new("id", arrow::datatypes::DataType::Int32, false),
Field::new("name", arrow::datatypes::DataType::Utf8, false),
]));
Ok(Arc::new(schema))
Ok(Some(Arc::new(schema)))
}

/// Fetches data for a table from a remote data source
Expand All @@ -187,95 +178,22 @@ impl RemoteCatalogInterface {
}
}

/// Implements the DataFusion Catalog API interface for tables
/// Implements an async version of the DataFusion SchemaProvider API for tables
/// stored in a remote catalog.
#[derive(Debug)]
struct RemoteSchema {
/// Connection with the remote catalog
remote_catalog_interface: Arc<RemoteCatalogInterface>,
/// Local cache of tables that have been preloaded from the remote
/// catalog
tables: Mutex<HashMap<String, Arc<dyn TableProvider>>>,
}

impl RemoteSchema {
/// Create a new RemoteSchema
pub fn new(remote_catalog_interface: Arc<RemoteCatalogInterface>) -> Self {
Self {
remote_catalog_interface,
tables: Mutex::new(HashMap::new()),
}
}

/// Load information for the specified tables from the remote source into
/// the local cached copy.
pub async fn load_tables(
&self,
references: impl IntoIterator<Item = &str>,
) -> Result<()> {
for table_name in references {
if !self.table_exist(table_name) {
// Fetch information about the table from the remote catalog
//
// Note that a real remote catalog interface could return more
// information, but at the minimum, DataFusion requires the
// table's schema for planing.
let schema = self.remote_catalog_interface.table_info(table_name).await?;
let remote_table = RemoteTable::new(
Arc::clone(&self.remote_catalog_interface),
table_name,
schema,
);

// Add the table to our local cached list
self.tables
.lock()
.expect("mutex invalid")
.insert(table_name.to_string(), Arc::new(remote_table));
};
}
Ok(())
}
}
struct RemoteCatalogDatafusionAdapter(Arc<RemoteCatalogInterface>);

/// Implement the DataFusion Catalog API for [`RemoteSchema`]
#[async_trait]
impl SchemaProvider for RemoteSchema {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
// Note this API is not async so we can't directly call the RemoteCatalogInterface
// instead we use the cached list of loaded tables
self.tables
.lock()
.expect("mutex valid")
.keys()
.cloned()
.collect()
}

// While this API is actually `async` and thus could consult a remote
// catalog directly it is more efficient to use a local cached copy instead,
// which is what we model in this example
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
// Look for any pre-loaded tables
let table = self
.tables
.lock()
.expect("mutex valid")
.get(name)
.map(Arc::clone);
Ok(table)
}

fn table_exist(&self, name: &str) -> bool {
// Look for any pre-loaded tables, note this function is also `async`
self.tables.lock().expect("mutex valid").contains_key(name)
impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
// Fetch information about the table from the remote catalog
//
// Note that a real remote catalog interface could return more
// information, but at the minimum, DataFusion requires the
// table's schema for planing.
Ok(self.0.table_info(name).await?.map(|schema| {
Arc::new(RemoteTable::new(Arc::clone(&self.0), name, schema))
as Arc<dyn TableProvider>
}))
}
}

Expand Down Expand Up @@ -345,27 +263,3 @@ impl TableProvider for RemoteTable {
Ok(Arc::new(DataSourceExec::new(source)))
}
}

/// Return true if this `table_reference` might be for a table in the specified
/// catalog and schema.
fn refers_to_schema(
catalog_name: &str,
schema_name: &str,
table_reference: &TableReference,
) -> bool {
// Check the references are in the correct catalog and schema
// references like foo.bar.baz
if let Some(catalog) = table_reference.catalog() {
if catalog != catalog_name {
return false;
}
}
// references like bar.baz
if let Some(schema) = table_reference.schema() {
if schema != schema_name {
return false;
}
}

true
}
3 changes: 3 additions & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }

[lints]
workspace = true
Loading

0 comments on commit f540df0

Please sign in to comment.