Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ObjectStore methods that take Session data #7135

Closed
waynr opened this issue Feb 14, 2025 · 11 comments
Closed

Introduce ObjectStore methods that take Session data #7135

waynr opened this issue Feb 14, 2025 · 11 comments
Labels
question Further information is requested

Comments

@waynr
Copy link

waynr commented Feb 14, 2025

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

In influxdata/influxdb#25911, we are discussing ways to record trace spans that distinguish between two key code paths in an implementation of ObjectStore that either retrieves objects from an object store or from a local in-memory cache. The problem we have right now is that our ObjectStore implementation has no way to receive span info passed in from the calling context.

Describe the solution you'd like

This issue is proposing the following:

  • Add a new Session trait to the object_store crate similar to what exists in the datafusion::catalog
    • This would have a single method that returns a session config similar to datafusion::prelude::SessionConfig
      • This session config would have similar set_extension, with_extension, and get_extension methods that allow storing and retrieving Arc<dyn Any + Send + Sync + 'static> instances by type ID
  • Add a new set of methods to the ObjectStore trait take &dyn Session as a parameter
    • eg fn get_with_session(&self, session: &dyn Session, location: &Path)
    • these methods could have default impls that delegate to the corresponding existing method (eg get_with_session would just ignore the session parameter and call get by default) to avoid forcing existing implementers to change

This would allow me to do some refactoring in https://github.com/influxdata/influxdb/ and https://github.com/influxdata/influxdb3_core/ that would result in passing a &dyn Session with a properly-parented child span and a custom *_with_session defined in impl ObjectStore for MemCachedObjectStore -- resulting in trace spans properly contextualized in the hierarchy of a given query which also differentiate between calls that result in cached vs object store parquet file retrieval.

Describe alternatives you've considered

I've considered:

  • Using simple metrics to capture the high-level (ie not contextualized within a trace span hierarchy) difference between object store and cached parquet file retrievals.
    • Doesn't help us identify when poor query performance is caused by cache misses
  • Setting up MemcachedObjectStore with its own root hierarchy of spans
    • Also doesn't help us identify when poor query performance is caused by cache misses

Additional context

@waynr waynr added the enhancement Any new improvement worthy of a entry in the changelog label Feb 14, 2025
@tustvold
Copy link
Contributor

tustvold commented Feb 14, 2025

I wonder if instead of altering the ObjectStore trait, you could instead construct a "fresh" MemCachedObjectStore for each "session", with this ObjectStore impl containing both the shared and session local states internally.

This would not only avoid changes to object_store, breaking or otherwise, but also allow maximum flexibility on what state comprises a session, and avoid needing to propagate this session state through every ObjectStore interaction.

@waynr
Copy link
Author

waynr commented Feb 15, 2025

@tustvold I considered something like that, but the problem is that we pass the MemCachedObjectStore as Arc<dyn ObjectStore> to a iox_query::exec::Executor as well as registering it as an iox object store with the datafusion runtime.

While it's not totally clear to me if the parquet file accesses that we care are about are happening in those core crates or the datafusion runtime using this Arc<dyn ObjectStore>, it does seem likely given the limited number of ways that it's used in the InfluxDB3 server setup.

It's also not clear to me whether we could get a session-scoped span threaded in to an objectstore through the datafusion engine. But I have observed something similar happening for the TableProvider.scan method in a private git repo. So anywhere in datafusion where a method has access to a SessionConfig and makes calls to a Arc<dyn ObjectStore>.*_with_session we could convert that to a session config recognized by the ObjectStore trait.

@tustvold
Copy link
Contributor

tustvold commented Feb 15, 2025

but the problem is that we pass the MemCachedObjectStore as Arc to a iox_query::exec::Executor as well as registering it as an iox object store with the datafusion runtime.

I'm not familiar with how the OSS InfluxDb handles this, but the closed source Influx at least used to construct a SessionContext for each query. This would be an ideal opportunity to construct the instrumented MemCachedObjectStore, and registering it as Arc<dyn ObjectStore> in the relevant places.

I believe this should just be a relatively straightforward plumbing exercise.

TBC designing and then stabilising a session abstraction for object_store is a non-trivial piece of work, and I'd be very wary about it, it isn't immediately obvious to me that a generic abstraction is even possible

@waynr
Copy link
Author

waynr commented Feb 15, 2025

I'm not familiar with how the OSS InfluxDb handles this, but the closed source Influx at least used to construct a SessionContext for each query. This would be an ideal opportunity to construct the instrumented MemCachedObjectStore

While I'm still learning my way around this code, this suggestion is somewhat at odds with my understanding of how spans ideally work. You're saying that at the bottom level of a query call stack we should create a SessionContext (or rather, a SpanContext since that's what we're interested in reporting spans to a trace collector like Jaeger) and pass that in to an ObjectStore impl then initialize the datafusion and query execution runtimes per-query (whereas these things are currently initialized once at program startup).

The problem with this is that the span for any of the ObjectStore methods called using this would exist in own separate tree of spans rather than being the child of the context which makes the method calls. I'm also doubtful that it would be possible or efficient to refactor Influxdb3 to do all this object store and datafusion runtime initialization on a per-query basis.

To be clear, when I'm talking about spans this is what I mean: https://www.jaegertracing.io/docs/2.3/terminology/#span

TBC designing and then stabilising a session abstraction for object_store is a non-trivial piece of work

Yeah, I realize it's a big ask. I definitely appreciate your time discussing this!

it isn't immediately obvious to me that a generic abstraction is even possible

I was trying to illustrate how it might be possible by using the existing datafusion::execution::SessionConfig custom configuration as an example. Spans are being passed to arbitrary implementations of the TableProvider trait using this approach in the closed source Influx.

@tustvold
Copy link
Contributor

tustvold commented Feb 15, 2025

Influxdb3 to do all this object store and datafusion runtime initialization on a per-query basis

I believe it already does this, the SessionContext is constructed per query.

own separate tree of spans rather than being the child of the context which makes the method calls

I suspect this is inevitable unless tracing were integrated into DataFusion as a first-class concept. Currently the tracing for DataFusion is constructed based on the timing metrics collected during execution. This is itself an approximation, e.g. it aggregates across partitions, to keep trace size manageable.

I suspect if you wanted to parent the ObjectStore interactions with the corresponding DF spans, you would need to somehow integrate the ObjectStore interactions with this same metrics system...

Ultimately having ObjectStore spans as siblings of the TableProvider spans, maybe not ideal, but would be better than nothing

@waynr
Copy link
Author

waynr commented Feb 18, 2025

Currently the tracing for DataFusion is constructed based on the timing metrics collected during execution. This is itself an approximation, e.g. it aggregates across partitions, to keep trace size manageable.

I'm not entirely sure what you mean here, do you have a code example I could look at to get a better understanding? I hadn't seen any explicit tracing/span reporting anywhere in datafusion (but I also haven't spent much time digging around the code yet).

I suspect this is inevitable unless tracing were integrated into DataFusion as a first-class concept.

I think we can avoid tracing as a first-class concept in Datafusion and get properly-parented spans. One way to do that is what I've proposed in this issue, but I understand it's a big ask with some maybe unpalatable complexity.

Another way that should work in the contexts I care about that I was just chatting about with @crepererum this morning would be to update GetOptions to accept custom metadata so that we can pass parent span IDs in as strings.

The only thing I'm not certain about with that approach is whether the parquet file access that we care about in influxdb3 are actually using get_range or get_ranges, which don't take GetOptions structs. Regardless, @tustvold would you like me to file a separate issue to propose that?

For what it's worth I am also looking into trying out a per-query wrapper around the MemCachedObjectStore that I've mentioned previously such that we can at least get query-level trace spans but I'm not confident that it's possible without losing some of the benefits of the current process-global Arc<dyn ObjectStore> that we use to initialize various datafusion and iox components.

@tustvold
Copy link
Contributor

tustvold commented Feb 18, 2025

I think we can avoid tracing as a first-class concept in Datafusion and get properly-parented spans. One way to do that is what I've proposed in this issue, but I understand it's a big ask with some maybe unpalatable complexity

I'm not sure what you mean by this, the DF spans are created after the fact based on its metrics, they don't exist at the time the calls are made to ObjectStore. I therefore don't see how you would correctly parent the ObjectStore spans into the DF tree, when the corresponding spans don't exist until the query has finished executing.

See here

@waynr
Copy link
Author

waynr commented Feb 19, 2025

I'm not sure what you mean by this, the DF spans are created after the fact
based on its metrics, they don't exist at the time the calls are made to
ObjectStore.

I think we're talking about the same spans, it's just not obvious that the span you're talking are child spans of the query spans that I care about so I'll try to show in a step-by-step fashion how they're related:

https://github.com/influxdata/influxdb/blob/8daccb7ee8f82ffae99b25236af9645fd60e448b/influxdb3_server/src/http.rs#L563

That root span gets passed to the Arc<dyn QueryExecutor>.query_sql method:

https://github.com/influxdata/influxdb/blob/8daccb7ee8f82ffae99b25236af9645fd60e448b/influxdb3_server/src/http.rs#L563

Then passed to the query_database_sql method:

https://github.com/influxdata/influxdb/blob/8daccb7ee8f82ffae99b25236af9645fd60e448b/influxdb3_server/src/query_executor/mod.rs#L145

query_database_sql builds a new IOxSessionContext:

https://github.com/influxdata/influxdb/blob/8daccb7ee8f82ffae99b25236af9645fd60e448b/influxdb3_server/src/query_executor/mod.rs#L286

Which attaches a Span:

Next it uses the IOxSessionContext to create a physical plan:

.... and so on. I don't think it's important that you look at all those links, I just wanted to illustrate that at any point in that hierarchy the span should already be threaded through in an opaque way -- without DF being made explicitly aware that there is a span being passed through. This is possible because of the SessionConfig.extensions (with type HashMap<TypeId, Arc<dyn Any + Send + Sync + 'static>, BuildHasherDefault<IdHasher>> aka AnyMap) which ends up with an instance of Span added.

It's this span that gets carried all the way through the DF engine call stack that I would like to be able to pass to object store method calls such that MemCachedObjectStore can also attach spans to that span hierarchy.

In fact, I see how the TracedStream you linked relates to this call stack. Starting shortly after the first in my previous list of links, IOxSessionContext.execute_stream is called:

Which calls IOxSessionContext.execute_stream_partitioned:

Which creates a TracedStream:

That TracedStream inherits from the hierarchy of spans that I've been talking about:

Sorry if that's explaining a bunch of stuff you already know, I'm just trying to be as clear as possible about my reasoning.

I therefore don't see how you would correctly parent the ObjectStore spans
into the DF tree, when the corresponding spans don't exist until the query
has finished executing.

The part that's hard for me to understand being so new to this code is, where do the object store calls actually happen? That's something I haven't seen but I think someone mentioned this morning during a team sync that I should look into types that wrap the object store for the sake of reading parquet files. That could be this ParquetFileReader type:

That appears to be used in the execute method of the impl ExecutionPlan for ParquetExec:

The TaskContext passed to the ExecutionPlan.execute method contains a SessionConfig:

If we were to go with the ObjectStore.get_ctx approach I've described in this issue then we might be able to clone the SessionConfig.extensions (described above) and use it to design a hypothetical new ObjectStore-specific context type with the same AnyMap type when constructing the ParquetOpener -- at that point we should be sufficiently close to the call site of ObjectStore methods specific to the currently-executing query:

We would also have to change ObjectStore method calls to their respective _ctx variants introduced by this proposal.

If we go with the other approach, of using GetOptions then it's not clear to me how we would get a parent span ID close to the point of ObjectStore usage without making datafusion aware of that happening.

@tustvold
Copy link
Contributor

tustvold commented Feb 19, 2025

The part that's hard for me to understand being so new to this code is, where do the object store calls actually happen?

They happen as part of physical execution, the spans for which are created as I described above from the metrics set. The spans you are linking to above are from logical and physical planning, not execution.

Perhaps @crepererum can weigh in here, as this conversation is somewhat going in circles.

TBC adding some sort of dyn Any context type to *Options is probably fine and a useful extension point, doubling the API surface with a load of _ctx calls less so.

@crepererum
Copy link
Contributor

crepererum commented Feb 19, 2025

I had a chat w/ @tustvold and I agree w/ him that adding a new method doesn't sound good. Adding an extension point to GetOptions & Co (potentially PutOptions as well) sounds better. We should document that the builtin stores will basically ignore them. Also note that Any might not be the best type to keep (see #7152 (comment)), but maybe something like this:

struct Extensions {
    inner: HashMap<TypeId, Box<dyn Extension>>,
}

impl Extensions {
    pub fn get::<T>(&self) -> Option<&T> where T: Extension {
        self.inner.get(TypeId::of::<T>()).map(|ext| {
            ext.as_any().downcast_ref().expect("correct type IDs are enforced by the compiler")
        })
    }

    pub fn set::<T>(&self, ext: T) -> Option<T> where T: Extension {
        self.inner.insert(TypeId::of::<T>(), Box::new(ext)).map(|ext| {
            ext.as_any().downcast_ref().expect("correct type IDs are enforced by the compiler")
        })
    }
}

impl PartialEq for Extensions {
    // ...
}

trait Extensions: PartialEq<Self> + std::fmt::Debug {
    fn as_any(&self) -> &dyn Any;
}

// other module
pub struct GetOptions {
   // current stuff
   // ...

   extensions: Extensions,
}

This is roughly modeled after datafusion::common::config::Extensions.

@waynr
Copy link
Author

waynr commented Feb 19, 2025

Adding an extension point to GetOptions & Co (potentially PutOptions as well) sounds better. We should document that the builtin stores will basically ignore them.

Okay, sounds good to me. I'll close this in favor of apache/arrow-rs-object-store#17

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants