-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Refactor storage factories to hold one configuration #6156
base: main
Are you sure you want to change the base?
Changes from 4 commits
92cc3f4
8a548a1
dd04a13
209b834
89c7c70
c6b6274
92e8070
535aa2c
76a3528
50795b0
a4d41b9
11a560d
8c8e8ae
bbcdbd7
6c1fd95
43487cd
c65bd21
7f87200
7d65eb8
c385416
4194aab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -36,16 +36,15 @@ import ( | |||
) | ||||
|
||||
const ( | ||||
primaryNamespace = "es" | ||||
archiveNamespace = "es-archive" | ||||
PrimaryNamespace = "es" | ||||
ArchiveNamespace = "es-archive" | ||||
) | ||||
|
||||
var ( // interface comformance checks | ||||
_ storage.Factory = (*Factory)(nil) | ||||
_ storage.ArchiveFactory = (*Factory)(nil) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to be very careful about removing ArchiveFactory interface, because query service uses it via runtime cast, so unless we have integration tests (many of them disable archive tests as I recall) you can introduce a breaking change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro yep - noted. I was thinking to avoid breaking changes we can remove the ArchiveFactory within this PR and use an archive storage factory wherever needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/jaegertracing/jaeger/blob/main/plugin/storage/integration/elasticsearch_test.go#L165 - ES doesn't skip the archive test |
||||
_ io.Closer = (*Factory)(nil) | ||||
_ plugin.Configurable = (*Factory)(nil) | ||||
_ storage.Purger = (*Factory)(nil) | ||||
_ storage.Factory = (*Factory)(nil) | ||||
_ io.Closer = (*Factory)(nil) | ||||
_ plugin.Configurable = (*Factory)(nil) | ||||
_ storage.Purger = (*Factory)(nil) | ||||
) | ||||
|
||||
// Factory implements storage.Factory for Elasticsearch backend. | ||||
|
@@ -58,26 +57,25 @@ type Factory struct { | |||
|
||||
newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) | ||||
|
||||
primaryConfig *config.Configuration | ||||
archiveConfig *config.Configuration | ||||
config *config.Configuration | ||||
|
||||
primaryClient atomic.Pointer[es.Client] | ||||
archiveClient atomic.Pointer[es.Client] | ||||
client atomic.Pointer[es.Client] | ||||
|
||||
watchers []*fswatcher.FSWatcher | ||||
} | ||||
|
||||
// NewFactory creates a new Factory. | ||||
func NewFactory() *Factory { | ||||
func NewFactory(namespace string) *Factory { | ||||
return &Factory{ | ||||
Options: NewOptions(primaryNamespace, archiveNamespace), | ||||
Options: NewOptions(namespace), | ||||
newClientFn: config.NewClient, | ||||
tracer: otel.GetTracerProvider(), | ||||
} | ||||
} | ||||
|
||||
func NewFactoryWithConfig( | ||||
cfg config.Configuration, | ||||
namespace string, | ||||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
metricsFactory metrics.Factory, | ||||
logger *zap.Logger, | ||||
) (*Factory, error) { | ||||
|
@@ -88,19 +86,12 @@ func NewFactoryWithConfig( | |||
defaultConfig := DefaultConfig() | ||||
cfg.ApplyDefaults(&defaultConfig) | ||||
|
||||
archive := make(map[string]*namespaceConfig) | ||||
archive[archiveNamespace] = &namespaceConfig{ | ||||
Configuration: cfg, | ||||
namespace: archiveNamespace, | ||||
} | ||||
|
||||
f := NewFactory() | ||||
f := NewFactory(namespace) | ||||
f.configureFromOptions(&Options{ | ||||
Primary: namespaceConfig{ | ||||
Configuration: cfg, | ||||
namespace: primaryNamespace, | ||||
namespace: namespace, | ||||
}, | ||||
others: archive, | ||||
}) | ||||
err := f.Initialize(metricsFactory, logger) | ||||
if err != nil { | ||||
|
@@ -123,96 +114,55 @@ func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { | |||
// configureFromOptions configures factory from Options struct. | ||||
func (f *Factory) configureFromOptions(o *Options) { | ||||
f.Options = o | ||||
f.primaryConfig = f.Options.GetPrimary() | ||||
f.archiveConfig = f.Options.Get(archiveNamespace) | ||||
f.config = f.Options.GetPrimary() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||
} | ||||
|
||||
// Initialize implements storage.Factory. | ||||
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { | ||||
f.metricsFactory, f.logger = metricsFactory, logger | ||||
|
||||
primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) | ||||
primaryClient, err := f.newClientFn(f.config, logger, metricsFactory) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) | ||||
} | ||||
f.primaryClient.Store(&primaryClient) | ||||
f.client.Store(&primaryClient) | ||||
|
||||
if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { | ||||
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) | ||||
if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" { | ||||
primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) | ||||
} | ||||
f.watchers = append(f.watchers, primaryWatcher) | ||||
} | ||||
|
||||
if f.archiveConfig.Enabled { | ||||
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) | ||||
} | ||||
f.archiveClient.Store(&archiveClient) | ||||
|
||||
if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" { | ||||
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger) | ||||
if err != nil { | ||||
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) | ||||
} | ||||
f.watchers = append(f.watchers, archiveWatcher) | ||||
} | ||||
} | ||||
|
||||
return nil | ||||
} | ||||
|
||||
func (f *Factory) getPrimaryClient() es.Client { | ||||
if c := f.primaryClient.Load(); c != nil { | ||||
return *c | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
func (f *Factory) getArchiveClient() es.Client { | ||||
if c := f.archiveClient.Load(); c != nil { | ||||
if c := f.client.Load(); c != nil { | ||||
return *c | ||||
} | ||||
return nil | ||||
} | ||||
|
||||
// CreateSpanReader implements storage.Factory | ||||
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { | ||||
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) | ||||
return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer) | ||||
} | ||||
|
||||
// CreateSpanWriter implements storage.Factory | ||||
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { | ||||
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) | ||||
return createSpanWriter(f.getPrimaryClient, f.config, false, f.metricsFactory, f.logger) | ||||
} | ||||
|
||||
// CreateDependencyReader implements storage.Factory | ||||
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { | ||||
return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger) | ||||
} | ||||
|
||||
// CreateArchiveSpanReader implements storage.ArchiveFactory | ||||
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { | ||||
if !f.archiveConfig.Enabled { | ||||
return nil, nil | ||||
} | ||||
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) | ||||
} | ||||
|
||||
// CreateArchiveSpanWriter implements storage.ArchiveFactory | ||||
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { | ||||
if !f.archiveConfig.Enabled { | ||||
return nil, nil | ||||
} | ||||
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) | ||||
return createDependencyReader(f.getPrimaryClient, f.config, f.logger) | ||||
} | ||||
|
||||
func createSpanReader( | ||||
clientFn func() es.Client, | ||||
cfg *config.Configuration, | ||||
archive bool, | ||||
mFactory metrics.Factory, | ||||
logger *zap.Logger, | ||||
tp trace.TracerProvider, | ||||
|
@@ -229,7 +179,7 @@ func createSpanReader( | |||
ServiceIndex: cfg.Indices.Services, | ||||
TagDotReplacement: cfg.Tags.DotReplacement, | ||||
UseReadWriteAliases: cfg.UseReadWriteAliases, | ||||
Archive: archive, | ||||
Archive: cfg.IsArchive, | ||||
RemoteReadClusters: cfg.RemoteReadClusters, | ||||
Logger: logger, | ||||
MetricsFactory: mFactory, | ||||
|
@@ -287,16 +237,16 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store | |||
params := esSampleStore.Params{ | ||||
Client: f.getPrimaryClient, | ||||
Logger: f.logger, | ||||
IndexPrefix: f.primaryConfig.Indices.IndexPrefix, | ||||
IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout, | ||||
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency), | ||||
Lookback: f.primaryConfig.AdaptiveSamplingLookback, | ||||
MaxDocCount: f.primaryConfig.MaxDocCount, | ||||
IndexPrefix: f.config.Indices.IndexPrefix, | ||||
IndexDateLayout: f.config.Indices.Sampling.DateLayout, | ||||
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency), | ||||
Lookback: f.config.AdaptiveSamplingLookback, | ||||
MaxDocCount: f.config.MaxDocCount, | ||||
} | ||||
store := esSampleStore.NewSamplingStore(params) | ||||
|
||||
if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM { | ||||
mappingBuilder := mappingBuilderFromConfig(f.primaryConfig) | ||||
if f.config.CreateIndexTemplates && !f.config.UseILM { | ||||
mappingBuilder := mappingBuilderFromConfig(f.config) | ||||
samplingMapping, err := mappingBuilder.GetSamplingMappings() | ||||
if err != nil { | ||||
return nil, err | ||||
|
@@ -344,19 +294,12 @@ func (f *Factory) Close() error { | |||
errs = append(errs, w.Close()) | ||||
} | ||||
errs = append(errs, f.getPrimaryClient().Close()) | ||||
if client := f.getArchiveClient(); client != nil { | ||||
errs = append(errs, client.Close()) | ||||
} | ||||
|
||||
return errors.Join(errs...) | ||||
} | ||||
|
||||
func (f *Factory) onPrimaryPasswordChange() { | ||||
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient) | ||||
} | ||||
|
||||
func (f *Factory) onArchivePasswordChange() { | ||||
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient) | ||||
f.onClientPasswordChange(f.config, &f.client) | ||||
} | ||||
|
||||
func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) { | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,7 +118,7 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { | |
case cassandraStorageType: | ||
return cassandra.NewFactory(), nil | ||
case elasticsearchStorageType, opensearchStorageType: | ||
return es.NewFactory(), nil | ||
return es.NewFactory(es.PrimaryNamespace), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro can you take another look at the changes? I minimized the code movements to simply remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see #6156 (comment) The query service instantiates just a single factory and casts it to ArchiveFactory. With your changes (which are in the right direction, but of insufficient scope) it never gets a chance to create archive CLI flags, because that has to happen via different factories. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro is the archive factory only needed for the query service? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
case memoryStorageType: | ||
return memory.NewFactory(), nil | ||
case kafkaStorageType: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still used? Let's add some comments, because this is not the first time I have this question, and the flag doesn't really make sense to me.