Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: change catalog provider and schema provider methods to be asynchronous #13582

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Nov 27, 2024

Which issue does this PR close?

Closes #10339

Rationale for this change

Many catalogs are remote (and/or disk based) and offer only asynchronous APIs. For example, Polaris, Unity, and Hive. Integrating with this catalogs is impossible since something like ctx.sql("SELECT * FROM db.schm.tbl") first enters an async context (sql) then a synchronous context (calling the catalog provider to resolve db) and then we need to go into an asynchronous context to interact with the catalog and this async -> sync -> async path is generally forbidden.

What changes are included in this PR?

The heart of the change is making all non-trivial methods async in CatalogProvider, SchemaProvider, and CatalogProviderList.

These changes had rather far-reaching ramifications discussed more below.

Are these changes tested?

Yes, in the sense that these traits were all tested previously. I did not add any new testing.

Are there any user-facing changes?

Yes, there are significant user-facing breaking changes beyond the obvious change that these public traits are now async.

Notable but expected

The following methods are now async and were previously sync

SessionContext::register_catalog
SessionContext::catalog_names
SessionContext::catalog
SessionContext::register_table
SessionContext::deregister_table
SessionContext::table_exist

Perhaps surprising

The SessionStateBuilder::build method and SessionStateBuilder::new_from_existing methods are now async and the From impls to go between SessionState and SessionStateBuilder were removed.

The new_from_existing change is because the method does a (now async) lookup into the existing catalog list (which may be remote) to determine if a default catalog exists so that it knows if it needs to create a new one or not.

The build change is because, if no default catalog exists, then a default catalog is created and registered with the (potentially remote) catalog list.

The SessionContext::register_batch and SessionContext::register_table methods are used frequently and it may make sense to think of them as synchronous since in-memory tables and batches are not typically thought of as something a "catalog" provides. It would be possible for SessionContext to have both "a catalog" (which is async) and "a collection of in-memory session-specific tables" (which is sync) and thus keep these methods synchronous. However, that felt like more complexity than justified by this change.

Caveats

In most cases I simply propagated the async. In some benchmarks I am using now_or_never().expect() to avoid the need to create a tokio runtime.

In a few of the SessionContext factory functions, where a custom catalog cannot possibly be provided (e.g. SessionContext::default) I use now_or_never since it is safe to assume the default in-memory catalog is synchronous.

Details for review

The only non-trivial implementation changes were in SessionState. There is a RwLock there and it required some fiddling (and a few Arc clones of the catalog list) to avoid holding the lock across an await boundary. This could be potentially simplified by changing the RwLock to an async lock but I'm not sure that's justified yet.

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait catalog Related to the catalog crate proto Related to proto crate labels Nov 27, 2024
@westonpace westonpace changed the title feat: change catalog provider and schema provider methods to be asynchronous feat!: change catalog provider and schema provider methods to be asynchronous Nov 27, 2024
pub trait CatalogProvider: Debug + Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
async fn schema_names(&self) -> Vec<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should/could return a stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BoxStream<String>? Or BoxStream<Result<String>>. In retrospect it seems that errors are likely to occur if we're using a real remote catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the latter, not sure about whether it should be '_ or 'static

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try and get away with '_. Otherwise the memory provider will need to clone its contents.

If callers need static they can Arc the provider, clone it, and wrap with the call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to perhaps prefer 'static is python compatibility - apache/arrow-rs#6587

Copy link
Member Author

@westonpace westonpace Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. How about we retreat to:

async fn schema_names(&self) -> Result<Box<dyn Iterator<Item = &str>>>;

with the rationale that real world implementations should be caching results anyways. Note there is once again an implicit '_ bound on the result but since we're returning a future and not a stream it's easily removed if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a case where there are so many tables / schemas / catalogs that caching the full list is too expensive then the planner is probably going to have other problems anyways. I'm guessing the planner is iterating through this list and constructing a hash map it uses to resolve lookups and so the thing is going to get cached one way or another.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like the worst of both worlds 😅

What is the issue with a static stream, its fairly extensively used in DF?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the issue with a static stream, its fairly extensively used in DF?

Just me being foolish. Having now gone full circle on the iterator approach I'm ready to do static stream again 😮‍💨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I have switched to BoxStream<'static, Result<String>>

&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;

/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;
async fn catalog_names(&self) -> Vec<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be a stream

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 27, 2024
@alamb
Copy link
Contributor

alamb commented Dec 6, 2024

I plan to review this PR soon

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @westonpace -- this is an epic PR.

I personally think this API is a reasonable change , and I think @scsmithr noted that GlareDB forked the DataFusion planner and made it async for the same reason in his CMU talk (see #13525)

I basically have 2 concerns

  1. Potential planning performance impact (I am running the sql planner benchmarks to see if it has a noticable impact on planning performance).
  2. Potential major downstream API impact (aka the viral effects of async may cause downstream issues as well, but I a not sure)

Quoting from #13525:

Another approach that is taken by the SessionContext::sql Is:

  1. Does an initial pass through the parse tree to find all references (non async)
  2. Then fetch all references (can be async)
  3. Then does the planning (non async) with all the relevant references

Would you be wiling to test / review an example of implementing a remote / async catalog with the existing APIs if I wrote it up?

@westonpace
Copy link
Member Author

westonpace commented Dec 9, 2024

Would you be wiling to test / review an example of implementing a remote / async catalog with the existing APIs if I wrote it up?

Sure. I think the approach you've outlined is probably fine. If there is a method that can (synchronously or asynchronously) give me the referenced tables from an arbitrary query string then I'm all set.

If you have an example of that (I'm just not aware of the best way to approach step 1) then I'd be happy to try it out.

@westonpace
Copy link
Member Author

westonpace commented Dec 9, 2024

In fact, if you can give some guidance on step 1 then I can commit to upstreaming a generic solution for the problem (if there's interest). I.e.:

pub trait AsyncSchemaProvider {...}

pub fn resolve(schema_provider: Arc<dyn AsyncSchemaProvider>, query: &str) -> Arc<dyn SchemaProvider> {
  // Do step 1 from above
  // Use `schema_provider` to load the schemas / tables
  // Create a "cached schema provider" from the loaded schemas / tables
}

@alamb
Copy link
Contributor

alamb commented Dec 10, 2024

In fact, if you can give some guidance on step 1 then I can commit to upstreaming a generic solution for the problem (if there's interest). I.e.:

@alamb
Copy link
Contributor

alamb commented Dec 10, 2024

Here is an example of how it is possible to run queries against a remote catalog using the exsiting CatalogProvider APIs:

@westonpace
Copy link
Member Author

@alamb so I think this PR might still useful. 90% of the change here is not coming from the planning path but rather from the registration path.

Your caching approach and PR works well for the planning path but not the registration path.

This is not a problem for me personally. We (Lance) have our own API for registration tasks and don't use DF for this. Are there SQL-like paths for registration (I thought CREATE TABLE had a different trait for some reason)? Do you see any reason users might want to do catalog registration via DF instead of directly with the catalog? If so, maybe we move forward with this PR.

On the other hand, maybe the DF "register xyz" methods are just utility methods for the case where a user has not catalog and they are using DF as a temporary (since SessionContext isn't persisted) in-memory cache and so there will never be a reason to add async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate core Core DataFusion crate documentation Improvements or additions to documentation proto Related to proto crate sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make all SchemaProvider trait APIs async
3 participants