Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor storage factories to hold one configuration #6156

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

mahadzaryab1
Copy link
Collaborator

Which problem is this PR solving?

Description of the changes

How was this change tested?

Checklist

pkg/es/config/flags.go Outdated Show resolved Hide resolved
pkg/es/config/flags.go Outdated Show resolved Hide resolved
pkg/es/config/flags.go Outdated Show resolved Hide resolved
@@ -118,7 +118,7 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) {
case cassandraStorageType:
return cassandra.NewFactory(), nil
case elasticsearchStorageType, opensearchStorageType:
return es.NewFactory(), nil
return es.NewFactory(es.PrimaryNamespace), nil
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro can you take another look at the changes? I minimized the code movements to simply remove the others field from Options. The problem now is that this isn't initializing the CLI flags for es-archive. Any thoughts on how to get around this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #6156 (comment)

The query service instantiates just a single factory and casts it to ArchiveFactory. With your changes (which are in the right direction, but of insufficient scope) it never gets a chance to create archive CLI flags, because that has to happen via different factories.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro is the archive factory only needed for the query service?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
Copy link
Member

@yurishkuro yurishkuro Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to be very careful about removing ArchiveFactory interface, because query service uses it via runtime cast, so unless we have integration tests (many of them disable archive tests as I recall) you can introduce a breaking change

archiveFactory, ok := storageFactory.(storage.ArchiveFactory)

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Nov 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro yep - noted. I was thinking to avoid breaking changes we can remove the ArchiveFactory within this PR and use an archive storage factory wherever needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahadzaryab1
Copy link
Collaborator Author

mahadzaryab1 commented Nov 3, 2024

@yurishkuro we've got 3 callsites for InitArchiveStorage that currently does the runtime cast. I had a couple of questions on how to move forward here and wanted to get your thoughts.

  1. https://github.com/jaegertracing/jaeger/blob/main/cmd/jaeger/internal/extension/jaegerquery/server.go#L137-L139. The previous check to get the traces archive factory should be sufficient. If it exists, we can set the archive span reader and archive span writer in query options. Does that make sense?
  2. func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
    . This is called by cmd/query and cmd/all-in-one. Should we instantiate a new storage factory here that's the archive storage factory and pass that down to the function linked.
  3. https://github.com/jaegertracing/jaeger/blob/main/cmd/remote-storage/app/server.go#L81. This is used in the remote storage in cmd/remote-storage. Should we do the same thing here as suggested in 2?

@yurishkuro
Copy link
Member

I don't think you need to change (1), but we need to change InitArchiveStorage() not to cast but to use the storage directly

(2) yes

(3) I think you don't need to change it because if the caller needs an archive storage it should instantiate a different remote storage. As I understand it there's not a dedicated gRPC API for archive storage, which could go away the same way as ArchiveFactory.

@mahadzaryab1
Copy link
Collaborator Author

@yurishkuro Got it. For (2) - this is how the primary storage factory is initialized. How would we go about initializing the archive storage factory here to expose the CLI flags for storages that have archive flags (cassandra/es)

@mahadzaryab1
Copy link
Collaborator Author

@yurishkuro For v1, what do you think of passing the isArchive flag into https://github.com/jaegertracing/jaeger/blob/main/plugin/storage/factory.go#L116. This way, we can create a new archive storage using NewFactory, which we can pass down to es.NewFactory() and any other storage configs that need it.

Comment on lines +81 to +82
// TODO: what should we do here?
// _ = qOpts.InitArchiveStorage(f, logger)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro any thoughts on how we should handle this case here? previously, this wouldn't initialize the archive storage if the factory didn't implement the ArchiveStorage interface but now it always will.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be different from all other v1 binaries. If I run

SPAN_STORAGE_TYPE=cassandra go run ./cmd/remote-storage help

I see that it supports the same --cassandra.* and --cassandra-archive.* flags, so we have enough information to instantiate two separate factories, just like v1 all-in-one/query would have to do.

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Mahad Zaryab <[email protected]>
Copy link

codecov bot commented Nov 4, 2024

Codecov Report

Attention: Patch coverage is 98.75000% with 1 line in your changes missing coverage. Please review.

Project coverage is 96.44%. Comparing base (0a24f6d) to head (4194aab).
Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
plugin/storage/es/factory.go 97.56% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #6156      +/-   ##
==========================================
- Coverage   96.47%   96.44%   -0.04%     
==========================================
  Files         354      354              
  Lines       20126    19997     -129     
==========================================
- Hits        19417    19286     -131     
- Misses        524      526       +2     
  Partials      185      185              
Flag Coverage Δ
badger_v1 8.42% <0.00%> (+0.10%) ⬆️
badger_v2 1.70% <0.00%> (+0.02%) ⬆️
cassandra-4.x-v1 14.15% <35.00%> (-0.24%) ⬇️
cassandra-4.x-v2 1.64% <0.00%> (+0.02%) ⬆️
cassandra-5.x-v1 14.15% <35.00%> (-0.24%) ⬇️
cassandra-5.x-v2 1.64% <0.00%> (+0.02%) ⬆️
elasticsearch-6.x-v1 18.18% <38.75%> (-0.43%) ⬇️
elasticsearch-7.x-v1 18.26% <38.75%> (-0.43%) ⬇️
elasticsearch-8.x-v1 18.43% <38.75%> (-0.42%) ⬇️
elasticsearch-8.x-v2 1.69% <0.00%> (+0.01%) ⬆️
grpc_v1 ?
grpc_v2 6.91% <3.75%> (-0.09%) ⬇️
kafka-v1 8.99% <0.00%> (+0.11%) ⬆️
kafka-v2 1.70% <0.00%> (+0.02%) ⬆️
memory_v2 1.70% <0.00%> (+0.02%) ⬆️
opensearch-1.x-v1 18.30% <38.75%> (-0.44%) ⬇️
opensearch-2.x-v1 18.31% <38.75%> (-0.43%) ⬇️
opensearch-2.x-v2 1.69% <0.00%> (+0.01%) ⬆️
tailsampling-processor 0.47% <0.00%> (+<0.01%) ⬆️
unittests 95.29% <83.75%> (-0.10%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mahadzaryab1
Copy link
Collaborator Author

@yurishkuro For ES Archive, it looks like the archive flag is used in two places:

For Cassandra, its a bit more straightforward:

Do you have any thoughts on how we should proceed? Do we still want to expose an isArchive or setAsArchive flag?

@yurishkuro
Copy link
Member

The metrics namespacing for Cassandra can be easily done elsewhere, does not need to be based on isArchive. It's only needed there now because primary/archive distinction is made internally in the factory.

For ES:

  • "To choose the suffix for the index name" - not needed since the user can do that themselves. One significant change in v2 is that we cannot provide different defaults in the config for primary/archive, and having the same index prefix by mistake will be bad for the user. Maybe we can introduce additional validation for configs of the same type and catch that as a configuration error.
  • "to add sorting and a search after clause if we're not querying the archive index" - I don't understand the purpose of that difference. Any ideas? Would it hurt if the logic for archive was the same as for primary?
  • There is a 3rd, most important usage of isArchive - in the GetIndicesFn. That's the one where I wonder if we could replace isArchive with a different logic based on the lookback parameter.

@mahadzaryab1
Copy link
Collaborator Author

The metrics namespacing for Cassandra can be easily done elsewhere, does not need to be based on isArchive. It's only needed there now because primary/archive distinction is made internally in the factory.

With the setup of v2, how would we make that distinction?

For ES:

  • "To choose the suffix for the index name" - not needed since the user can do that themselves. One significant change in v2 is that we cannot provide different defaults in the config for primary/archive, and having the same index prefix by mistake will be bad for the user. Maybe we can introduce additional validation for configs of the same type and catch that as a configuration error.

@yurishkuro Okay I see. But if the configurations are being held in different factories - how would we perform validation there?

  • "to add sorting and a search after clause if we're not querying the archive index" - I don't understand the purpose of that difference. Any ideas? Would it hurt if the logic for archive was the same as for primary?

I was thinking the same as well. I'm guessing its an optimization to avoid sorting the archive storage which would be larger than the a non-archive storage. Here is the documentation for Search After. Would we have a performance degradation here if we enabled this for archive as well?

  • There is a 3rd, most important usage of isArchive - in the GetIndicesFn. That's the one where I wonder if we could replace isArchive with a different logic based on the lookback parameter.

Ah yes. I was only looking at the reader. Are you referring to getSpanAndServiceIndexFn? This looks to once again be creating a different suffix based on whether the storage is archive or not (https://github.com/jaegertracing/jaeger/blob/main/plugin/storage/es/spanstore/writer.go#L104-L111). Can we not use the same approach here as the first point for the ES reader?

@yurishkuro
Copy link
Member

yurishkuro commented Nov 8, 2024

With the setup of v2, how would we make that distinction?

the storage extension manages factories and knows storage names, it can use those names to bound MetricsFactory to have a specific label.

Okay I see. But if the configurations are being held in different factories - how would we perform validation there?

No, configuration are passed to factories, but held in a single place in storage extension, which can invoke additional validation here:

for storageName, cfg := range s.config.Backends {

Would we have a performance degradation here if we enabled this for archive as well?

but the thing is, we never search for traces in archive storage, we only retrieve trace by ID, so sorting in this case would only apply to the spans within a trace - yes, could have overhead for very large trace, but still small.

@yurishkuro
Copy link
Member

Are you referring to getSpanAndServiceIndexFn? This looks to once again be creating a different suffix

not just suffix, when it's primary storage with manually rotated indices they also have the date pattern in the name, but archive index never has that (because it doesn't grow large). One compromise we could do is recommend that users don't use archive storage with manually rotated indices, only with ILM. Unless there's another way that I am not seeing.

Btw, reader also has similar branching in index naming logic:

return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {

@mahadzaryab1
Copy link
Collaborator Author

With the setup of v2, how would we make that distinction?

the storage extension manages factories and knows storage names, it can use those names to bound MetricsFactory to have a specific label.

So this is where the Cassandra storage is initialized in the extension. Are you suggesting we can pass the label into the constructor here? If so, how would we make the distinction based on just the name?

Okay I see. But if the configurations are being held in different factories - how would we perform validation there?

No, configuration are passed to factories, but held in a single place in storage extension, which can invoke additional validation here:

Oh okay, I see. So whenever we're processing an ES config, we would go through all the other ones that exist and make sure that the index prefixes are not the same?

for storageName, cfg := range s.config.Backends {

Would we have a performance degradation here if we enabled this for archive as well?

but the thing is, we never search for traces in archive storage, we only retrieve trace by ID, so sorting in this case would only apply to the spans within a trace - yes, could have overhead for very large trace, but still small.

Sounds good. I can remove the indirection here then.

@mahadzaryab1
Copy link
Collaborator Author

Are you referring to getSpanAndServiceIndexFn? This looks to once again be creating a different suffix

not just suffix, when it's primary storage with manually rotated indices they also have the date pattern in the name, but archive index never has that (because it doesn't grow large). One compromise we could do is recommend that users don't use archive storage with manually rotated indices, only with ILM. Unless there's another way that I am not seeing.

Btw, reader also has similar branching in index naming logic:

return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {

How would making that recommendation simplify the archive branching for us?

@mahadzaryab1
Copy link
Collaborator Author

@yurishkuro Regarding getSourceFn, the es archive integration tests seem to fail when the SearchAfter clause is added as it is unable to find the trace.

Signed-off-by: Mahad Zaryab <[email protected]>
@@ -136,6 +136,8 @@ type Configuration struct {
Tags TagsAsFields `mapstructure:"tags_as_fields"`
// Enabled, if set to true, enables the namespace for storage pointed to by this configuration.
Enabled bool `mapstructure:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used? Let's add some comments, because this is not the first time I have this question, and the flag doesn't really make sense to me.

}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
func NewFactory(isArchive bool) *Factory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's not pass a flag but have a 2nd constructor NewArchiveFactory. Because NewFactory(true) has poor readability.

@@ -49,9 +49,10 @@ const (
// (e.g. archive) may be underspecified and infer the rest of its parameters from primary.
type Options struct {
Primary NamespaceConfig `mapstructure:",squash"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Embed?

Suggested change
Primary NamespaceConfig `mapstructure:",squash"`
NamespaceConfig `mapstructure:",squash"`

@@ -123,96 +118,55 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) {
// configureFromOptions configures factory from Options struct.
func (f *Factory) configureFromOptions(o *Options) {
f.Options = o
f.primaryConfig = f.Options.GetPrimary()
f.archiveConfig = f.Options.Get(archiveNamespace)
f.config = f.Options.GetPrimary()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetPrimary is now meaningless, can we do something else?

@yurishkuro
Copy link
Member

Directionally this LGTM. Any blockers?

@mahadzaryab1
Copy link
Collaborator Author

Directionally this LGTM. Any blockers?

@yurishkuro The main blocker is that currently this implementation doesn't initialize the CLI flags for archive storage. We initialize the storage factory in https://github.com/jaegertracing/jaeger/blob/main/cmd/all-in-one/main.go#L58 which in this PR will only initialize the primary storages because the storage factories only hold one configuration. Any thoughts on how we should initialize the archive CLI flags?

@yurishkuro
Copy link
Member

v1 mains need to create two factories, primary and archive, and use both when registering CLI flags. It's worth having a helper function for that in storage/, because you would call this from all-in-one, query, and remote-storage. The helper also may need to be smart about which storages today support archive mode and which do not, and return nil/noop factory for archive for the latter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants