Skip to content

Commit

Permalink
initial functional pass
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry committed Sep 20, 2023
1 parent bbb8072 commit 53b0948
Show file tree
Hide file tree
Showing 24 changed files with 966 additions and 469 deletions.
9 changes: 6 additions & 3 deletions libbeat/cmd/export/ilm_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
)

// GenGetILMPolicyCmd is the command used to export the ilm policy.
Expand All @@ -35,14 +35,17 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command {
dir, _ := cmd.Flags().GetString("dir")

if settings.ILM == nil {
settings.ILM = ilm.StdSupport
settings.ILM = lifecycle.StdSupport
}
b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalfInitCmd(err)
}

clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version))
clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig)
if err != nil {
fatalf("error creating file handler: %s", err)
}
idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields))
if err := idxManager.Setup(idxmgmt.LoadModeDisabled, idxmgmt.LoadModeForce); err != nil {
fatalf("Error exporting ilm-policy: %+v.", err)
Expand Down
11 changes: 7 additions & 4 deletions libbeat/cmd/export/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
)

// GenTemplateConfigCmd is the command used to export the elasticsearch template.
Expand All @@ -36,18 +36,21 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command {
noILM, _ := cmd.Flags().GetBool("noilm")

if noILM {
settings.ILM = ilm.NoopSupport
settings.ILM = lifecycle.NoopSupport
}
if settings.ILM == nil {
settings.ILM = ilm.StdSupport
settings.ILM = lifecycle.StdSupport
}

b, err := instance.NewInitializedBeat(settings)
if err != nil {
fatalfInitCmd(err)
}

clientHandler := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version))
clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig)
if err != nil {
fatalf("error creating file handler: %s", err)
}
idxManager := b.IdxSupporter.Manager(clientHandler, idxmgmt.BeatsAssets(b.Fields))
if err := idxManager.Setup(idxmgmt.LoadModeForce, idxmgmt.LoadModeDisabled); err != nil {
fatalf("Error exporting template: %+v.", err)
Expand Down
18 changes: 16 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/kibana"
"github.com/elastic/beats/v7/libbeat/management"
Expand Down Expand Up @@ -133,6 +134,9 @@ type beatConfig struct {
// monitoring settings
MonitoringBeatConfig monitoring.BeatConfig `config:",inline"`

// ILM settings
LifecycleConfig lifecycle.LifecycleConfig `config:",inline"`

// central management settings
Management *config.C `config:"management"`

Expand Down Expand Up @@ -671,7 +675,13 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if setup.IndexManagement || setup.ILMPolicy {
loadILM = idxmgmt.LoadModeEnabled
}
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))

mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig)
if err != nil {
return fmt.Errorf("error creating index management handler: %w", err)
}

m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields))
if ok, warn := m.VerifySetup(loadTemplate, loadILM); !ok {
fmt.Println(warn)
}
Expand Down Expand Up @@ -1065,7 +1075,11 @@ func (b *Beat) registerESIndexManagement() error {

func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *eslegclient.Connection) error {
m := b.IdxSupporter.Manager(idxmgmt.NewESClientHandler(esClient), idxmgmt.BeatsAssets(b.Fields))
mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig)
if err != nil {
return fmt.Errorf("error creating index management handler: %w", err)
}
m := b.IdxSupporter.Manager(mgmtHandler, idxmgmt.BeatsAssets(b.Fields))
return m.Setup(idxmgmt.LoadModeEnabled, idxmgmt.LoadModeEnabled)
}
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
)
Expand All @@ -42,7 +42,7 @@ type Settings struct {

// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory
ILM lifecycle.SupportFactory

Processing processing.SupportFactory

Expand Down
8 changes: 8 additions & 0 deletions libbeat/common/fmtstr/formatevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (fs *EventFormatString) Unpack(v interface{}) error {
return nil
}

// IsInitialized returns true if the underlying event formatted is prepared to format an event
func (fs *EventFormatString) IsInitialized() bool {
return fs.formatter != nil
}

// NumFields returns number of unique event fields used by the format string.
func (fs *EventFormatString) NumFields() int {
return len(fs.fields)
Expand All @@ -190,6 +195,9 @@ func (fs *EventFormatString) Fields() []string {
// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *EventFormatString) Run(event *beat.Event) (string, error) {
if !fs.IsInitialized() {
return "", fmt.Errorf("event formatter is nil")
}
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand Down
21 changes: 16 additions & 5 deletions libbeat/idxmgmt/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package idxmgmt

import (
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
ilm "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/template"
"github.com/elastic/elastic-agent-libs/version"
)
Expand Down Expand Up @@ -56,12 +59,20 @@ func NewClientHandler(ilm ilm.ClientHandler, template template.Loader) ClientHan

// NewESClientHandler returns a new ESLoader instance,
// initialized with an ilm and template client handler based on the passed in client.
func NewESClientHandler(c ESClient) ClientHandler {
return NewClientHandler(ilm.NewESClientHandler(c), template.NewESLoader(c))
func NewESClientHandler(c ESClient, info beat.Info, cfg ilm.LifecycleConfig) (ClientHandler, error) {
esHandler, err := ilm.NewESClientHandler(c, info, cfg)
if err != nil {
return nil, fmt.Errorf("error creating ES handler: %w", err)
}
return NewClientHandler(esHandler, template.NewESLoader(c)), nil
}

// NewFileClientHandler returns a new ESLoader instance,
// initialized with an ilm and template client handler based on the passed in client.
func NewFileClientHandler(c FileClient) ClientHandler {
return NewClientHandler(ilm.NewFileClientHandler(c), template.NewFileLoader(c))
func NewFileClientHandler(c FileClient, info beat.Info, cfg ilm.LifecycleConfig) (ClientHandler, error) {
mgmt, err := ilm.NewFileClientHandler(c, info, cfg)
if err != nil {
return nil, fmt.Errorf("error creating client handler: %w", err)
}
return NewClientHandler(mgmt, template.NewFileLoader(c)), nil
}
41 changes: 33 additions & 8 deletions libbeat/idxmgmt/idxmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/template"
"github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -99,20 +100,44 @@ func DefaultSupport(log *logp.Logger, info beat.Info, configRoot *config.C) (Sup

// MakeDefaultSupport creates some default index management support, with a
// custom ILM support implementation.
func MakeDefaultSupport(ilmSupport ilm.SupportFactory) SupportFactory {
func MakeDefaultSupport(ilmSupport lifecycle.SupportFactory) SupportFactory {
if ilmSupport == nil {
ilmSupport = ilm.DefaultSupport
ilmSupport = lifecycle.DefaultSupport
}

return func(log *logp.Logger, info beat.Info, configRoot *config.C) (Supporter, error) {
const logName = "index-management"

cfg := struct {
ILM *config.C `config:"setup.ilm"`
Template *config.C `config:"setup.template"`
Output config.Namespace `config:"output"`
Migration *config.C `config:"migration.6_to_7"`
// first fetch the ES output, check if we're running against serverless, use that to set a default config
outCfg := struct {
Output config.Namespace `config:"output"`
}{}
if configRoot != nil {
err := configRoot.Unpack(&outCfg)
if err != nil {
return nil, fmt.Errorf("error unpacking output config while making index support: %w", err)
}
}

defaultLifecycle := lifecycle.DefaultILMConfig(info)
if outCfg.Output.IsSet() && outCfg.Output.Name() == "elasticsearch" {
esClient, err := eslegclient.NewConnectedClient(outCfg.Output.Config(), info.Beat)
if err != nil {
return nil, fmt.Errorf("error creating ES client while setting up index support: %w", err)
}
if esClient.IsServerless() {
defaultLifecycle = lifecycle.DefaultDSLConfig(info)
}
}

cfg := struct {
ILM lifecycle.LifecycleConfig `config:",inline"`
Template *config.C `config:"setup.template"`
Output config.Namespace `config:"output"`
Migration *config.C `config:"migration.6_to_7"`
}{
ILM: defaultLifecycle,
}
if configRoot != nil {
if err := configRoot.Unpack(&cfg); err != nil {
return nil, err
Expand Down
146 changes: 0 additions & 146 deletions libbeat/idxmgmt/ilm/client_handler.go

This file was deleted.

Loading

0 comments on commit 53b0948

Please sign in to comment.