Skip to content

Commit

Permalink
support for reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
leehinman committed Oct 9, 2023
1 parent 753d82c commit 4b506ee
Show file tree
Hide file tree
Showing 24 changed files with 151 additions and 122 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ is collected by it.
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36693[36693]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36693[36693]
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}36788[36788]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}36788[36788]

*Auditbeat*

Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/console/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package console

import "github.com/elastic/beats/v7/libbeat/outputs/codec"
import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
)

type Config struct {
Codec codec.Config `config:"codec"`
Expand All @@ -26,6 +29,7 @@ type Config struct {
Pretty bool `config:"pretty"`

BatchSize int
Queue config.Namespace `config:"queue"`
}

var defaultConfig = Config{}
14 changes: 3 additions & 11 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"os"
"runtime"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand All @@ -43,13 +42,6 @@ type console struct {
index string
}

type consoleEvent struct {
Timestamp time.Time `json:"@timestamp" struct:"@timestamp"`

// Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event
Fields interface{} `struct:",inline"`
}

func init() {
outputs.RegisterType("console", makeConsole)
}
Expand Down Expand Up @@ -82,18 +74,18 @@ func makeConsole(
index := beat.Beat
c, err := newConsole(index, observer, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
return outputs.Fail(fmt.Errorf("console output initialization failed with: %w", err))
}

// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = c.out.Stat(); err != nil {
err = fmt.Errorf("console output initialization failed with: %v", err)
err = fmt.Errorf("console output initialization failed with: %w", err)
return outputs.Fail(err)
}
}

return outputs.Success(config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type elasticsearchConfig struct {
AllowOlderVersion bool `config:"allow_older_versions"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
34 changes: 17 additions & 17 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func makeES(
return outputs.Fail(err)
}

config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
esConfig := defaultConfig
if err := cfg.Unpack(&esConfig); err != nil {
return outputs.Fail(err)
}

policy, err := newNonIndexablePolicy(config.NonIndexablePolicy)
policy, err := newNonIndexablePolicy(esConfig.NonIndexablePolicy)
if err != nil {
log.Errorf("error while creating file identifier: %v", err)
return outputs.Fail(err)
Expand All @@ -65,12 +65,12 @@ func makeES(
return outputs.Fail(err)
}

if proxyURL := config.Transport.Proxy.URL; proxyURL != nil && !config.Transport.Proxy.Disable {
if proxyURL := esConfig.Transport.Proxy.URL; proxyURL != nil && !esConfig.Transport.Proxy.Disable {
log.Debugf("breaking down proxy URL. Scheme: '%s', host[:port]: '%s', path: '%s'", proxyURL.Scheme, proxyURL.Host, proxyURL.Path)
log.Infof("Using proxy URL: %s", proxyURL)
}

params := config.Params
params := esConfig.Params
if len(params) == 0 {
params = nil
}
Expand All @@ -84,7 +84,7 @@ func makeES(

clients := make([]outputs.NetworkClient, len(hosts))
for i, host := range hosts {
esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200)
esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200)
if err != nil {
log.Errorf("Invalid host param set: %s, Error: %+v", host, err)
return outputs.Fail(err)
Expand All @@ -95,17 +95,17 @@ func makeES(
ConnectionSettings: eslegclient.ConnectionSettings{
URL: esURL,
Beatname: beat.Beat,
Kerberos: config.Kerberos,
Username: config.Username,
Password: config.Password,
APIKey: config.APIKey,
Kerberos: esConfig.Kerberos,
Username: esConfig.Username,
Password: esConfig.Password,
APIKey: esConfig.APIKey,
Parameters: params,
Headers: config.Headers,
CompressionLevel: config.CompressionLevel,
Headers: esConfig.Headers,
CompressionLevel: esConfig.CompressionLevel,
Observer: observer,
EscapeHTML: config.EscapeHTML,
Transport: config.Transport,
IdleConnTimeout: config.Transport.IdleConnTimeout,
EscapeHTML: esConfig.EscapeHTML,
Transport: esConfig.Transport,
IdleConnTimeout: esConfig.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Expand All @@ -116,11 +116,11 @@ func makeES(
return outputs.Fail(err)
}

client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max)
client = outputs.WithBackoff(client, esConfig.Backoff.Init, esConfig.Backoff.Max)
clients[i] = client
}

return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients)
}

func buildSelectors(
Expand Down
24 changes: 13 additions & 11 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,31 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
)

type config struct {
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
type fileOutConfig struct {
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
Queue config.Namespace `config:"queue"`
}

func defaultConfig() config {
return config{
func defaultConfig() fileOutConfig {
return fileOutConfig{
NumberOfFiles: 7,
RotateEveryKb: 10 * 1024,
Permissions: 0600,
RotateOnStartup: true,
}
}

func (c *config) Validate() error {
func (c *fileOutConfig) Validate() error {
if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit {
return fmt.Errorf("the number_of_files to keep should be between 2 and %v",
file.MaxBackupsLimit)
Expand Down
10 changes: 5 additions & 5 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func makeFileout(
observer outputs.Observer,
cfg *c.C,
) (outputs.Group, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
return outputs.Fail(err)
}

Expand All @@ -64,14 +64,14 @@ func makeFileout(
beat: beat,
observer: observer,
}
if err := fo.init(beat, config); err != nil {
if err := fo.init(beat, foConfig); err != nil {
return outputs.Fail(err)
}

return outputs.Success(-1, 0, fo)
return outputs.Success(foConfig.Queue, -1, 0, fo)
}

func (out *fileOutput) init(beat beat.Info, c config) error {
func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error {
var path string
if c.Filename != "" {
path = filepath.Join(c.Path, c.Filename)
Expand Down
7 changes: 1 addition & 6 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type kafkaConfig struct {
Codec codec.Config `config:"codec"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
Queue config.Namespace `config:"queue"`
}

type metaConfig struct {
Expand All @@ -101,12 +102,6 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down
12 changes: 6 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func makeKafka(
log := logp.NewLogger(logSelector)
log.Debug("initialize kafka output")

config, err := readConfig(cfg)
kConfig, err := readConfig(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -57,7 +57,7 @@ func makeKafka(
return outputs.Fail(err)
}

libCfg, err := newSaramaConfig(log, config)
libCfg, err := newSaramaConfig(log, kConfig)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -67,21 +67,21 @@ func makeKafka(
return outputs.Fail(err)
}

codec, err := codec.CreateEncoder(beat, config.Codec)
codec, err := codec.CreateEncoder(beat, kConfig.Codec)
if err != nil {
return outputs.Fail(err)
}

client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, config.Headers, codec, libCfg)
client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, kConfig.Key, topic, kConfig.Headers, codec, libCfg)
if err != nil {
return outputs.Fail(err)
}

retry := 0
if config.MaxRetries < 0 {
if kConfig.MaxRetries < 0 {
retry = -1
}
return outputs.Success(config.BulkMaxSize, retry, client)
return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *config.C) (outil.Selector, error) {
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
Proxy transport.ProxyConfig `config:",inline"`
Backoff Backoff `config:"backoff"`
EscapeHTML bool `config:"escape_html"`
Queue config.Namespace `config:"queue"`
}

type Backoff struct {
Expand Down
18 changes: 9 additions & 9 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func makeLogstash(
observer outputs.Observer,
cfg *conf.C,
) (outputs.Group, error) {
config, err := readConfig(cfg, beat)
lsConfig, err := readConfig(cfg, beat)
if err != nil {
return outputs.Fail(err)
}
Expand All @@ -51,14 +51,14 @@ func makeLogstash(
return outputs.Fail(err)
}

tls, err := tlscommon.LoadTLSConfig(config.TLS)
tls, err := tlscommon.LoadTLSConfig(lsConfig.TLS)
if err != nil {
return outputs.Fail(err)
}

transp := transport.Config{
Timeout: config.Timeout,
Proxy: &config.Proxy,
Timeout: lsConfig.Timeout,
Proxy: &lsConfig.Proxy,
TLS: tls,
Stats: observer,
}
Expand All @@ -72,18 +72,18 @@ func makeLogstash(
return outputs.Fail(err)
}

if config.Pipelining > 0 {
client, err = newAsyncClient(beat, conn, observer, config)
if lsConfig.Pipelining > 0 {
client, err = newAsyncClient(beat, conn, observer, lsConfig)
} else {
client, err = newSyncClient(beat, conn, observer, config)
client, err = newSyncClient(beat, conn, observer, lsConfig)
}
if err != nil {
return outputs.Fail(err)
}

client = outputs.WithBackoff(client, config.Backoff.Init, config.Backoff.Max)
client = outputs.WithBackoff(client, lsConfig.Backoff.Init, lsConfig.Backoff.Max)
clients[i] = client
}

return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, clients)
return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients)
}
2 changes: 2 additions & 0 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -40,6 +41,7 @@ type redisConfig struct {
Db int `config:"db"`
DataType string `config:"datatype"`
Backoff backoff `config:"backoff"`
Queue config.Namespace `config:"queue"`
}

type backoff struct {
Expand Down
Loading

0 comments on commit 4b506ee

Please sign in to comment.