-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: change catalog provider and schema provider methods to be asynchronous #13582
base: main
Are you sure you want to change the base?
Conversation
datafusion/catalog/src/catalog.rs
Outdated
pub trait CatalogProvider: Debug + Sync + Send { | ||
/// Returns the catalog provider as [`Any`] | ||
/// so that it can be downcast to a specific implementation. | ||
fn as_any(&self) -> &dyn Any; | ||
|
||
/// Retrieves the list of available schema names in this catalog. | ||
fn schema_names(&self) -> Vec<String>; | ||
async fn schema_names(&self) -> Vec<String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should/could return a stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoxStream<String>
? Or BoxStream<Result<String>>
. In retrospect it seems that errors are likely to occur if we're using a real remote catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the latter, not sure about whether it should be '_
or 'static
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try and get away with '_
. Otherwise the memory provider will need to clone its contents.
If callers need static they can Arc the provider, clone it, and wrap with the call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to perhaps prefer 'static
is python compatibility - apache/arrow-rs#6587
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. How about we retreat to:
async fn schema_names(&self) -> Result<Box<dyn Iterator<Item = &str>>>;
with the rationale that real world implementations should be caching results anyways. Note there is once again an implicit '_
bound on the result but since we're returning a future and not a stream it's easily removed if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a case where there are so many tables / schemas / catalogs that caching the full list is too expensive then the planner is probably going to have other problems anyways. I'm guessing the planner is iterating through this list and constructing a hash map it uses to resolve lookups and so the thing is going to get cached one way or another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like the worst of both worlds 😅
What is the issue with a static stream, its fairly extensively used in DF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue with a static stream, its fairly extensively used in DF?
Just me being foolish. Having now gone full circle on the iterator approach I'm ready to do static stream again 😮💨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I have switched to BoxStream<'static, Result<String>>
datafusion/catalog/src/catalog.rs
Outdated
&self, | ||
name: String, | ||
catalog: Arc<dyn CatalogProvider>, | ||
) -> Option<Arc<dyn CatalogProvider>>; | ||
|
||
/// Retrieves the list of available catalog names | ||
fn catalog_names(&self) -> Vec<String>; | ||
async fn catalog_names(&self) -> Vec<String>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be a stream
…ng>>. Add Result to a few APIs
I plan to review this PR soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @westonpace -- this is an epic PR.
I personally think this API is a reasonable change , and I think @scsmithr noted that GlareDB forked the DataFusion planner and made it async
for the same reason in his CMU talk (see #13525)
I basically have 2 concerns
- Potential planning performance impact (I am running the sql planner benchmarks to see if it has a noticable impact on planning performance).
- Potential major downstream API impact (aka the viral effects of
async
may cause downstream issues as well, but I a not sure)
Quoting from #13525:
Another approach that is taken by the SessionContext::sql Is:
- Does an initial pass through the parse tree to find all references (non async)
- Then fetch all references (can be async)
- Then does the planning (non async) with all the relevant references
Would you be wiling to test / review an example of implementing a remote / async catalog with the existing APIs if I wrote it up?
Sure. I think the approach you've outlined is probably fine. If there is a method that can (synchronously or asynchronously) give me the referenced tables from an arbitrary query string then I'm all set. If you have an example of that (I'm just not aware of the best way to approach step 1) then I'd be happy to try it out. |
In fact, if you can give some guidance on step 1 then I can commit to upstreaming a generic solution for the problem (if there's interest). I.e.:
|
|
Here is an example of how it is possible to run queries against a remote catalog using the exsiting CatalogProvider APIs: |
@alamb so I think this PR might still useful. 90% of the change here is not coming from the planning path but rather from the registration path. Your caching approach and PR works well for the planning path but not the registration path. This is not a problem for me personally. We (Lance) have our own API for registration tasks and don't use DF for this. Are there SQL-like paths for registration (I thought CREATE TABLE had a different trait for some reason)? Do you see any reason users might want to do catalog registration via DF instead of directly with the catalog? If so, maybe we move forward with this PR. On the other hand, maybe the DF "register xyz" methods are just utility methods for the case where a user has not catalog and they are using DF as a temporary (since SessionContext isn't persisted) in-memory cache and so there will never be a reason to add async. |
Which issue does this PR close?
Closes #10339
Rationale for this change
Many catalogs are remote (and/or disk based) and offer only asynchronous APIs. For example, Polaris, Unity, and Hive. Integrating with this catalogs is impossible since something like
ctx.sql("SELECT * FROM db.schm.tbl")
first enters an async context (sql
) then a synchronous context (calling the catalog provider to resolvedb
) and then we need to go into an asynchronous context to interact with the catalog and this async -> sync -> async path is generally forbidden.What changes are included in this PR?
The heart of the change is making all non-trivial methods async in
CatalogProvider
,SchemaProvider
, andCatalogProviderList
.These changes had rather far-reaching ramifications discussed more below.
Are these changes tested?
Yes, in the sense that these traits were all tested previously. I did not add any new testing.
Are there any user-facing changes?
Yes, there are significant user-facing breaking changes beyond the obvious change that these public traits are now async.
Notable but expected
The following methods are now async and were previously sync
Perhaps surprising
The
SessionStateBuilder::build
method andSessionStateBuilder::new_from_existing
methods are now async and theFrom
impls to go betweenSessionState
andSessionStateBuilder
were removed.The
new_from_existing
change is because the method does a (now async) lookup into the existing catalog list (which may be remote) to determine if a default catalog exists so that it knows if it needs to create a new one or not.The
build
change is because, if no default catalog exists, then a default catalog is created and registered with the (potentially remote) catalog list.The
SessionContext::register_batch
andSessionContext::register_table
methods are used frequently and it may make sense to think of them as synchronous since in-memory tables and batches are not typically thought of as something a "catalog" provides. It would be possible forSessionContext
to have both "a catalog" (which is async) and "a collection of in-memory session-specific tables" (which is sync) and thus keep these methods synchronous. However, that felt like more complexity than justified by this change.Caveats
In most cases I simply propagated the async. In some benchmarks I am using
now_or_never().expect()
to avoid the need to create a tokio runtime.In a few of the
SessionContext
factory functions, where a custom catalog cannot possibly be provided (e.g.SessionContext::default
) I usenow_or_never
since it is safe to assume the default in-memory catalog is synchronous.Details for review
The only non-trivial implementation changes were in
SessionState
. There is aRwLock
there and it required some fiddling (and a few Arc clones of the catalog list) to avoid holding the lock across an await boundary. This could be potentially simplified by changing theRwLock
to an async lock but I'm not sure that's justified yet.