Skip to content

Commit

Permalink
Refactor ES Factory To Only Hold One Type of Config
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Nov 3, 2024
1 parent 3856d60 commit 63d9d38
Showing 1 changed file with 31 additions and 101 deletions.
132 changes: 31 additions & 101 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,36 +41,34 @@ const (
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ plugin.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
)

// Factory implements storage.Factory for Elasticsearch backend.
type Factory struct {
Options *Options
// Options *Options

metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
archiveConfig *config.Configuration
config *config.Configuration

primaryClient atomic.Pointer[es.Client]
archiveClient atomic.Pointer[es.Client]
client atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
dc := config.DefaultConfig()
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
config: &dc,
newClientFn: config.NewClient,
tracer: otel.GetTracerProvider(),
}
Expand All @@ -85,23 +83,11 @@ func NewFactoryWithConfig(
return nil, err
}

defaultConfig := DefaultConfig()
defaultConfig := config.DefaultConfig()
cfg.ApplyDefaults(&defaultConfig)

archive := make(map[string]*namespaceConfig)
archive[archiveNamespace] = &namespaceConfig{
Configuration: cfg,
namespace: archiveNamespace,
}

f := NewFactory()
f.configureFromOptions(&Options{
Primary: namespaceConfig{
Configuration: cfg,
namespace: primaryNamespace,
},
others: archive,
})
f.config = &cfg
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
Expand All @@ -111,108 +97,60 @@ func NewFactoryWithConfig(

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
f.config.AddFlags(flagSet)
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) {
f.Options.InitFromViper(v)
f.configureFromOptions(f.Options)
}

// configureFromOptions configures factory from Options struct.
func (f *Factory) configureFromOptions(o *Options) {
f.Options = o
f.primaryConfig = f.Options.GetPrimary()
f.archiveConfig = f.Options.Get(archiveNamespace)
f.config.InitFromViper(v)
}

// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
primaryClient, err := f.newClientFn(f.config, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient.Store(&primaryClient)
f.client.Store(&primaryClient)

if f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)
}

if f.archiveConfig.Enabled {
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

if f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath != "" {
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Authentication.BasicAuthentication.PasswordFilePath}, f.onArchivePasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
f.watchers = append(f.watchers, archiveWatcher)
}
}

return nil
}

func (f *Factory) getPrimaryClient() es.Client {
if c := f.primaryClient.Load(); c != nil {
return *c
}
return nil
}

func (f *Factory) getArchiveClient() es.Client {
if c := f.archiveClient.Load(); c != nil {
if c := f.client.Load(); c != nil {
return *c
}
return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.config, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createDependencyReader(f.getPrimaryClient, f.config, f.logger)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
Expand All @@ -229,7 +167,7 @@ func createSpanReader(
ServiceIndex: cfg.Indices.Services,
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
Archive: cfg.IsArchive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Expand All @@ -240,7 +178,6 @@ func createSpanReader(
func createSpanWriter(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
) (spanstore.Writer, error) {
Expand All @@ -262,7 +199,7 @@ func createSpanWriter(
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
Archive: cfg.IsArchive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Logger: logger,
MetricsFactory: mFactory,
Expand All @@ -287,16 +224,16 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Logger: f.logger,
IndexPrefix: f.primaryConfig.Indices.IndexPrefix,
IndexDateLayout: f.primaryConfig.Indices.Sampling.DateLayout,
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.primaryConfig.Indices.Sampling.RolloverFrequency),
Lookback: f.primaryConfig.AdaptiveSamplingLookback,
MaxDocCount: f.primaryConfig.MaxDocCount,
IndexPrefix: f.config.Indices.IndexPrefix,
IndexDateLayout: f.config.Indices.Sampling.DateLayout,
IndexRolloverFrequency: config.RolloverFrequencyAsNegativeDuration(f.config.Indices.Sampling.RolloverFrequency),
Lookback: f.config.AdaptiveSamplingLookback,
MaxDocCount: f.config.MaxDocCount,
}
store := esSampleStore.NewSamplingStore(params)

if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.primaryConfig)
if f.config.CreateIndexTemplates && !f.config.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.config)
samplingMapping, err := mappingBuilder.GetSamplingMappings()
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,19 +281,12 @@ func (f *Factory) Close() error {
errs = append(errs, w.Close())
}
errs = append(errs, f.getPrimaryClient().Close())
if client := f.getArchiveClient(); client != nil {
errs = append(errs, client.Close())
}

return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
f.onClientPasswordChange(f.config, &f.client)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
Expand Down

0 comments on commit 63d9d38

Please sign in to comment.