-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce the DynamicFileCatalog
in datafusion-catalog
#11035
Changes from 2 commits
37b5526
ad1a854
97ea11c
2729c49
6f86577
c91cdc6
3306df6
d82b273
c0491d5
a60eeea
9fa01aa
2ab3639
a8ee733
7faab9f
e1f3908
cf73ba2
c641e6b
0806263
f4d24e6
4b71e59
9964150
da1e5d3
ed670fe
ea5816e
fb8b9e0
fa73ae7
04cc155
1ede35e
51b1d41
75b0b84
3e8d094
4eb8ca5
9913405
5d861b8
ea1c075
16be2e7
db90c28
9353123
daa7ed8
e4a2174
506d1d6
72ce464
fb1b6ce
8f0952d
f062fec
b1baa84
76d7fee
f0f070b
6b77b6b
4e51a77
fafc9dc
a3a4f4d
7dc238f
f7b4b8c
25d0ff6
a78bd3c
edeff33
b1a922c
e5ab14d
87d7503
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use datafusion::error::Result; | ||
use datafusion::prelude::*; | ||
|
||
/// This example demonstrates executing a simple query against an Arrow data source (CSV) and | ||
/// fetching results | ||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
// create local execution context | ||
let ctx = SessionContext::new(); | ||
|
||
let testdata = datafusion::test_util::arrow_test_data(); | ||
let path = &format!("file:///{testdata}/csv/aggregate_test_100.csv"); | ||
// execute the query | ||
let df = ctx | ||
.sql( | ||
format!( | ||
r#"SELECT column_1, MIN(column_12), MAX(column_12) | ||
FROM '{}' | ||
WHERE column_11 > 0.1 AND column_11 < 0.9 | ||
GROUP BY column_1"#, | ||
path | ||
) | ||
.as_str(), | ||
) | ||
.await?; | ||
|
||
// print the results | ||
df.show().await?; | ||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
use std::any::Any; | ||
use std::sync::{Arc, Weak}; | ||
|
||
use async_trait::async_trait; | ||
use dirs::home_dir; | ||
use parking_lot::{Mutex, RwLock}; | ||
|
||
use datafusion_common::plan_datafusion_err; | ||
|
||
use crate::catalog::schema::SchemaProvider; | ||
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; | ||
use crate::datasource::TableProvider; | ||
use crate::error::Result; | ||
use crate::execution::context::SessionState; | ||
|
||
/// Wraps another schema provider | ||
pub struct DynamicFileSchemaProvider { | ||
inner: Arc<dyn SchemaProvider>, | ||
state_store: StateStore, | ||
} | ||
|
||
impl DynamicFileSchemaProvider { | ||
pub fn new(inner: Arc<dyn SchemaProvider>) -> Self { | ||
Self { | ||
inner, | ||
state_store: StateStore::new(), | ||
} | ||
} | ||
|
||
pub fn with_state(&self, state: Weak<RwLock<SessionState>>) { | ||
self.state_store.with_state(state); | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl SchemaProvider for DynamicFileSchemaProvider { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
|
||
fn table_names(&self) -> Vec<String> { | ||
self.inner.table_names() | ||
} | ||
|
||
fn register_table( | ||
&self, | ||
name: String, | ||
table: Arc<dyn TableProvider>, | ||
) -> Result<Option<Arc<dyn TableProvider>>> { | ||
self.inner.register_table(name, table) | ||
} | ||
|
||
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { | ||
let inner_table = self.inner.table(name).await?; | ||
if inner_table.is_some() { | ||
return Ok(inner_table); | ||
} | ||
let optimized_url = substitute_tilde(name.to_owned()); | ||
let table_url = ListingTableUrl::parse(optimized_url.as_str())?; | ||
let state = &self | ||
.state_store | ||
.get_state() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
.upgrade() | ||
.ok_or_else(|| plan_datafusion_err!("locking error"))? | ||
.read() | ||
.clone(); | ||
let cfg = ListingTableConfig::new(table_url.clone()) | ||
.infer(state) | ||
.await?; | ||
|
||
Ok(Some(Arc::new(ListingTable::try_new(cfg)?))) | ||
} | ||
|
||
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { | ||
self.inner.deregister_table(name) | ||
} | ||
|
||
fn table_exist(&self, name: &str) -> bool { | ||
self.inner.table_exist(name) | ||
} | ||
} | ||
fn substitute_tilde(cur: String) -> String { | ||
if let Some(usr_dir_path) = home_dir() { | ||
if let Some(usr_dir) = usr_dir_path.to_str() { | ||
if cur.starts_with('~') && !usr_dir.is_empty() { | ||
return cur.replacen('~', usr_dir, 1); | ||
} | ||
} | ||
} | ||
cur | ||
} | ||
|
||
pub struct StateStore { | ||
state: Arc<Mutex<Option<Weak<RwLock<SessionState>>>>>, | ||
} | ||
|
||
impl StateStore { | ||
pub fn new() -> Self { | ||
Self { | ||
state: Arc::new(Mutex::new(None)), | ||
} | ||
} | ||
|
||
pub fn with_state(&self, state: Weak<RwLock<SessionState>>) { | ||
let mut lock = self.state.lock(); | ||
*lock = Some(state); | ||
} | ||
|
||
pub fn get_state(&self) -> Weak<RwLock<SessionState>> { | ||
self.state.lock().clone().unwrap() | ||
} | ||
} | ||
|
||
impl Default for StateStore { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,7 @@ use object_store::ObjectStore; | |
use parking_lot::RwLock; | ||
use url::Url; | ||
|
||
use crate::catalog::dynamic_file_schema::DynamicFileSchemaProvider; | ||
pub use datafusion_execution::config::SessionConfig; | ||
pub use datafusion_execution::TaskContext; | ||
pub use datafusion_expr::execution_props::ExecutionProps; | ||
|
@@ -305,10 +306,18 @@ impl SessionContext { | |
|
||
/// Creates a new `SessionContext` using the provided [`SessionState`] | ||
pub fn new_with_state(state: SessionState) -> Self { | ||
let state_ref = Arc::new(RwLock::new(state.clone())); | ||
state | ||
.schema_for_ref("datafusion.public.xx") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just a workaround to get |
||
.unwrap() | ||
.as_any() | ||
.downcast_ref::<DynamicFileSchemaProvider>() | ||
.unwrap() | ||
.with_state(Arc::downgrade(&state_ref)); | ||
Self { | ||
session_id: state.session_id().to_string(), | ||
session_id: state_ref.clone().read().session_id().to_string(), | ||
session_start_time: Utc::now(), | ||
state: Arc::new(RwLock::new(state)), | ||
state: state_ref, | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column is
c1
actually. There're some issues about getting CSV header automatically.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I found that this is the default behavior for
ListTable
. The dynamic query in datafusion-cli is the same as this. I think we don't need to change it in this PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior is controlled by the config option in the current session context. We can create the session context like
to enable the header scanning.