diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 2cf2d8e0..e1db92d8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,31 +7,44 @@ jobs: golangci: name: Linting runs-on: ubuntu-20.04 + container: 'quay.io/plmr/sg-core-ci' steps: - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 + with: + go-version: '1.16' + - name: tidy + run: go mod tidy - name: golangci-lint uses: golangci/golangci-lint-action@v2 with: + # Caching conflicts happen in GHA, so just disable for now + skip-pkg-cache: true + skip-build-cache: true # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. version: v1.33 - test-framework: - name: Base testing + name: Unit tests runs-on: ubuntu-20.04 - + container: 'quay.io/plmr/sg-core-ci' steps: - - name: Set up Go - uses: actions/setup-go@v1 + - uses: actions/checkout@v2 + - uses: actions/setup-go@v2 with: - go-version: '1.14' - - name: Checkout code - uses: actions/checkout@v2 + go-version: '1.16' + - name: tidy + run: go mod tidy - name: Run unit tests and code coverage run: go test -v -coverprofile=profile.cov ./... - name: Send coverage uses: shogo82148/actions-goveralls@v1 with: path-to-profile: profile.cov + image-build: + name: Image build + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 - name: Verify image builds run: | docker build --tag infrawatch/sg-core:latest --file build/Dockerfile . diff --git a/.github/workflows/updates.yml b/.github/workflows/updates.yml index 13d6cca6..3a42e21e 100644 --- a/.github/workflows/updates.yml +++ b/.github/workflows/updates.yml @@ -1,5 +1,8 @@ name: Coveralls Badge -on: pull_request +on: + pull_request: + types: + - opened jobs: coveralls_badge: @@ -26,4 +29,4 @@ jobs: body: `[![Coverage Status](https://coveralls.io/repos/github/${context.repo.owner}/${context.repo.repo}/badge.svg?branch=${BRANCH_NAME})](https://coveralls.io/github/${context.repo.owner}/${context.repo.repo}?branch=${BRANCH_NAME})` }) env: - BRANCH_NAME: ${{ github.event.pull_request.head.ref }} \ No newline at end of file + BRANCH_NAME: ${{ github.event.pull_request.head.ref }} diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 00000000..b56b6d55 --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,58 @@ +run: + skip-dirs: + - plugins/transport/dummy-alertmanager + - plugins/transport/dummy-events + - plugins/transport/dummy-metrics + - plugins/transport/dummy-logs + - plugins/application/print + - devenv +issues: + exclude-rules: + - linters: + - errcheck + text: "[a-zA-Z]+.[a-zA-Z]+.(Error|Info|Debug|Warn)" # from logger + - text: "[A-Z]+" #omit enums + linters: + - deadcode + - text: New + linters: + - deadcode + +linters: + disable-all: true + enable: + - bodyclose + - deadcode + - depguard + - dogsled + - dupl + - errcheck + # - exhaustive + # - gochecknoinits + - goconst + - gocritic + - gocyclo + - gofmt + - goimports + - golint + - goprintffuncname + - gosec + - gosimple + - govet + - ineffassign + - misspell + - nakedret + - noctx + - nolintlint + - rowserrcheck + - scopelint + - staticcheck + - structcheck + - stylecheck + - typecheck + # - unused + - unconvert + - unparam + - varcheck + # - whitespace + diff --git a/build/Dockerfile b/build/Dockerfile index 338486fc..f581dd33 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -10,7 +10,7 @@ RUN dnf install golang git -y --setopt=tsflags=nodocs RUN CONTAINER_BUILD=true ./build.sh # --- end build, create smart gateway layer --- -FROM registry.access.redhat.com/ubi8 +FROM registry.access.redhat.com/ubi8-minimal LABEL io.k8s.display-name="Smart Gateway" \ io.k8s.description="A component of the Service Telemetry Framework on the server side that ingests data from AMQP 1.x and provides a metrics scrape endpoint for Prometheus, and forwards events to ElasticSearch" \ diff --git a/cmd/main.go b/cmd/main.go index ffe2cc76..199b9441 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,7 +18,7 @@ import ( func main() { configPath := flag.String("config", "/etc/sg-core.conf.yaml", "configuration file path") cpuprofile := flag.String("cpuprofile", "", "write cpu profile to file") - //memprofile := flag.String("memprofile", "", "write cpu profile to file") + // memprofile := flag.String("memprofile", "", "write cpu profile to file") flag.Usage = func() { fmt.Printf("Usage: %s [OPTIONS]\n\nAvailable options:\n", os.Args[0]) flag.PrintDefaults() @@ -40,7 +40,11 @@ func main() { logger.Metadata(logging.Metadata{"error": err}) logger.Error("failed to start cpu profile") } - pprof.StartCPUProfile(f) + err = pprof.StartCPUProfile(f) + if err != nil { + logger.Metadata(logging.Metadata{"error": err}) + logger.Error("failed to start cpu profile") + } defer pprof.StopCPUProfile() } @@ -107,9 +111,9 @@ func main() { ctx, cancelCtx := context.WithCancel(context.Background()) wg := new(sync.WaitGroup) - //run main processes + // run main processes - pluginDone := make(chan bool) //notified if a plugin stops execution before main or interrupt Received + pluginDone := make(chan bool) // notified if a plugin stops execution before main or interrupt Received interrupt := make(chan bool) manager.RunTransports(ctx, wg, pluginDone, configuration.HandlerErrors) manager.RunApplications(ctx, wg, pluginDone) diff --git a/cmd/manager/manager.go b/cmd/manager/manager.go index 076a1432..08edb602 100644 --- a/cmd/manager/manager.go +++ b/cmd/manager/manager.go @@ -17,9 +17,9 @@ import ( "gopkg.in/yaml.v2" ) -//errors +// errors var ( - //ErrAppNotReceiver return if application plugin does not implement any receiver. In this case, it will receive no messages from the internal buses + // ErrAppNotReceiver return if application plugin does not implement any receiver. In this case, it will receive no messages from the internal buses ErrAppNotReceiver = errors.New("application plugin does not implement either application.MetricReceiver or application.EventReceiver") ) var ( @@ -39,17 +39,17 @@ func init() { pluginPath = "/usr/lib64/sg-core" } -//SetPluginDir set directory path containing plugin binaries +// SetPluginDir set directory path containing plugin binaries func SetPluginDir(path string) { pluginPath = path } -//SetLogger set logger +// SetLogger set logger func SetLogger(l *logging.Logger) { logger = l } -//InitTransport load tranpsort binary and initialize with config +// InitTransport load tranpsort binary and initialize with config func InitTransport(name string, config interface{}) error { n, err := initPlugin(name) if err != nil { @@ -75,7 +75,7 @@ func InitTransport(name string, config interface{}) error { return nil } -//InitApplication initialize application plugin with configuration +// InitApplication initialize application plugin with configuration func InitApplication(name string, config interface{}) error { n, err := initPlugin(name) if err != nil { @@ -121,7 +121,7 @@ func InitApplication(name string, config interface{}) error { return nil } -//SetTransportHandlers load handlers binaries for transport +// SetTransportHandlers load handlers binaries for transport func SetTransportHandlers(name string, handlerBlocks []struct { Name string `validate:"required"` Config interface{} @@ -156,7 +156,7 @@ func SetTransportHandlers(name string, handlerBlocks []struct { return nil } -//RunTransports spins off tranpsort + handler processes +// RunTransports spins off tranpsort + handler processes func RunTransports(ctx context.Context, wg *sync.WaitGroup, done chan bool, report bool) { for name, t := range transports { for _, h := range handlers[name] { @@ -183,7 +183,7 @@ func RunTransports(ctx context.Context, wg *sync.WaitGroup, done chan bool, repo } } -//RunApplications spins off application processes +// RunApplications spins off application processes func RunApplications(ctx context.Context, wg *sync.WaitGroup, done chan bool) { for _, a := range applications { wg.Add(1) diff --git a/generator/amqp_snd_th.c b/generator/amqp_snd_th.c index 9850e612..0be52c17 100644 --- a/generator/amqp_snd_th.c +++ b/generator/amqp_snd_th.c @@ -156,7 +156,7 @@ static void send_message(app_data_t *app, pn_link_t *sender, pn_rwbytes_t *data) } static bool send_burst(app_data_t *app, pn_event_t *event) { -// pn_link_t *sender = pn_event_link(event); +// pn_link_t *sender = pn_event_link(event); pn_link_t *sender = app->sender; int credits = pn_link_credit(sender); @@ -210,7 +210,7 @@ static bool handle(app_data_t *app, pn_event_t *event) { if (app->verbose > 1) { printf("PN_LINK_FLOW %d\n", pn_link_credit(sender)); } - //printf("link_credits: %d, sent: %ld\n",pn_link_credit(sender), app->amqp_sent ); + // printf("link_credits: %d, sent: %ld\n",pn_link_credit(sender), app->amqp_sent ); exit_code = send_burst(app, event); break; } @@ -265,7 +265,7 @@ static bool handle(app_data_t *app, pn_event_t *event) { } pn_connection_t *c = pn_event_connection(event); pn_connection_set_container(c, app->container_id); - //pn_connection_open(c); + // pn_connection_open(c); pn_session_t *s = pn_session(c); pn_session_open(s); diff --git a/go.mod b/go.mod index b493b868..fa5ddd6e 100644 --- a/go.mod +++ b/go.mod @@ -4,20 +4,21 @@ go 1.14 require ( collectd.org v0.5.0 - github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20201228183019-1cbb255902f5 - github.com/go-ini/ini v1.62.0 // indirect - github.com/go-openapi/errors v0.19.9 + github.com/elastic/go-elasticsearch/v7 v7.11.0 + github.com/go-openapi/errors v0.20.0 github.com/go-playground/universal-translator v0.17.0 // indirect - github.com/google/uuid v1.0.0 - github.com/infrawatch/apputils v0.0.0-20210218211331-9f6eb5097d89 + github.com/google/uuid v1.2.0 + github.com/infrawatch/apputils v0.0.0-20210308125437-598b321f4335 github.com/json-iterator/go v1.1.10 - github.com/leodido/go-urn v1.2.0 // indirect + github.com/leodido/go-urn v1.2.1 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.9.0 - github.com/stretchr/testify v1.6.1 - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + github.com/stretchr/testify v1.7.0 + golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 gopkg.in/go-playground/validator.v9 v9.31.0 + gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/yaml.v2 v2.4.0 - gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) diff --git a/pkg/application/application.go b/pkg/application/application.go index b2d21a39..6e657400 100644 --- a/pkg/application/application.go +++ b/pkg/application/application.go @@ -7,7 +7,7 @@ import ( "github.com/infrawatch/sg-core/pkg/data" ) -//package application defines the interfaces for interacting with application plugins +// package application defines the interfaces for interacting with application plugins // Application describes application plugin interfaces. // Configuration bytes are passed into the Config() function as a sequence of bytes in yaml format. It is recommended to use the config.ParseConfig() method to parse the input. This is a convenience method that uses the validations library to validate the input and provide specific feedback. @@ -17,25 +17,25 @@ type Application interface { Run(context.Context, chan bool) } -//MetricReceiver Receives metrics from the internal metrics bus +// MetricReceiver Receives metrics from the internal metrics bus type MetricReceiver interface { Application - // The ReceiveMetric function will be called every time a Metric is Received on the internal metrics bus. Each part of the metric is passed in as an argument to the function in the following order: name, epoch time, metric type, interval, value, label keys, label values. - //The last two arguments are gauranteed to be the same size and map index to index. Implementors of this function should run as quickly as possible as metrics can be very high volume. It is recommended to cache metrics in a data.Metric{} object to be utilized by the application plugin later. + // The ReceiveMetric function will be called every time a Metric is Received on the internal metrics bus. Each part of the metric is passed in as an argument to the function in the following order: name, epoch time, metric type, interval, value, label keys, label values. + // The last two arguments are guaranteed to be the same size and map index to index. Implementors of this function should run as quickly as possible as metrics can be very high volume. It is recommended to cache metrics in a data.Metric{} object to be utilized by the application plugin later. ReceiveMetric( - string, //name - float64, //epoch time - data.MetricType, //type - time.Duration, //interval - float64, //value - []string, //labelKeys - []string, //labelValues + string, // name + float64, // epoch time + data.MetricType, // type + time.Duration, // interval + float64, // value + []string, // labelKeys + []string, // labelValues ) } -//EventReceiver Receive events from the internal event bus +// EventReceiver Receive events from the internal event bus type EventReceiver interface { Application - //ReceiveEvent is called whenever an event is broadcast on the event bus. + // ReceiveEvent is called whenever an event is broadcast on the event bus. ReceiveEvent(data.Event) } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 90fa8674..1775c59d 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -7,26 +7,26 @@ import ( "github.com/infrawatch/sg-core/pkg/data" ) -//EventReceiveFunc callback type for receiving events from the event bus +// EventReceiveFunc callback type for receiving events from the event bus type EventReceiveFunc func(data.Event) -//EventPublishFunc function to for publishing to the event bus +// EventPublishFunc function to for publishing to the event bus type EventPublishFunc func(data.Event) -//EventBus bus for data.Event type +// EventBus bus for data.Event type type EventBus struct { subscribers []EventReceiveFunc rw sync.RWMutex } -//Subscribe subscribe to bus +// Subscribe subscribe to bus func (eb *EventBus) Subscribe(rf EventReceiveFunc) { eb.rw.Lock() defer eb.rw.Unlock() eb.subscribers = append(eb.subscribers, rf) } -//Publish publish to bus +// Publish publish to bus func (eb *EventBus) Publish(e data.Event) { eb.rw.RLock() @@ -42,23 +42,23 @@ func (eb *EventBus) Publish(e data.Event) { // arguments are name, timestamp, metric type, interval, value, labels type MetricReceiveFunc func(string, float64, data.MetricType, time.Duration, float64, []string, []string) -//MetricPublishFunc function type for publishing to the metric bus +// MetricPublishFunc function type for publishing to the metric bus type MetricPublishFunc func(string, float64, data.MetricType, time.Duration, float64, []string, []string) -//MetricBus bus for data.Metric type +// MetricBus bus for data.Metric type type MetricBus struct { sync.RWMutex subscribers []MetricReceiveFunc } -//Subscribe subscribe to bus +// Subscribe subscribe to bus func (mb *MetricBus) Subscribe(rf MetricReceiveFunc) { mb.Lock() defer mb.Unlock() mb.subscribers = append(mb.subscribers, rf) } -//Publish publish to bus +// Publish publish to bus func (mb *MetricBus) Publish(name string, time float64, mType data.MetricType, interval time.Duration, value float64, labelKeys []string, labelVals []string) { mb.RLock() for _, rf := range mb.subscribers { diff --git a/pkg/concurrent/concurrent.go b/pkg/concurrent/concurrent.go index 2760287d..bc751138 100644 --- a/pkg/concurrent/concurrent.go +++ b/pkg/concurrent/concurrent.go @@ -2,13 +2,13 @@ package concurrent import "sync" -//Map thread safe map type +// Map thread safe map type type Map struct { sync.RWMutex - Items map[string]interface{} //Optimize with unsafe types? + Items map[string]interface{} // Optimize with unsafe types? } -//NewMap map constructor +// NewMap map constructor func NewMap() *Map { return &Map{ RWMutex: sync.RWMutex{}, @@ -16,14 +16,14 @@ func NewMap() *Map { } } -//Set set index in map +// Set set index in map func (m *Map) Set(key string, value interface{}) { m.Lock() m.Items[key] = value - m.Unlock() //do not use defer() as it is too slow + m.Unlock() // do not use defer() as it is too slow } -//Contains return true if key exists +// Contains return true if key exists func (m *Map) Contains(key string) bool { m.RLock() _, ok := m.Items[key] @@ -31,7 +31,7 @@ func (m *Map) Contains(key string) bool { return ok } -//Len return number of map indexes +// Len return number of map indexes func (m *Map) Len() int { m.RLock() l := len(m.Items) @@ -39,7 +39,7 @@ func (m *Map) Len() int { return l } -//Get get item with key. Returns nil if does not exist +// Get get item with key. Returns nil if does not exist func (m *Map) Get(key string) interface{} { m.RLock() val := m.Items[key] @@ -47,20 +47,20 @@ func (m *Map) Get(key string) interface{} { return val } -//Delete delete index in map +// Delete delete index in map func (m *Map) Delete(key string) { m.Lock() delete(m.Items, key) m.Unlock() } -//MapItem key value pair for use in iteration +// MapItem key value pair for use in iteration type MapItem struct { Key string Value interface{} } -//Iter iterate with range keyword +// Iter iterate with range keyword func (m *Map) Iter() <-chan MapItem { c := make(chan MapItem) diff --git a/pkg/config/config.go b/pkg/config/config.go index 77aec14c..06abe263 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -12,7 +12,7 @@ import ( "gopkg.in/yaml.v2" ) -//ParseConfig parses and validates input into config object +// ParseConfig parses and validates input into config object func ParseConfig(r io.Reader, config interface{}) error { validate := validator.New() configBytes, err := ioutil.ReadAll(r) diff --git a/pkg/data/data.go b/pkg/data/data.go index 183b4afe..6f75eb59 100644 --- a/pkg/data/data.go +++ b/pkg/data/data.go @@ -6,7 +6,7 @@ import ( // package data defines the data descriptions for objects used in the internal buses -//----------------------------------- events ---------------------------------- +// ----------------------------------- events ---------------------------------- func (mt MetricType) String() string { return []string{"untyped", "counter", "gauge"}[mt] @@ -16,16 +16,16 @@ func (mt MetricType) String() string { type EventType int const ( - // ERROR event contains handler failure data and should be handled on application level + // ERROR event contains handler failure data and should be handled on application level ERROR EventType = iota - // EVENT contains regular event data + // EVENT contains regular event data EVENT - // LOG event contains log record + // LOG event contains log record LOG - // RESULT event contains data about result of check execution - // perfomed by any supported client side agent (collectd-sensubility, sg-agent) + // RESULT event contains data about result of check execution + // perfomed by any supported client side agent (collectd-sensubility, sg-agent) RESULT - // TASK contains request of performing some task, for example scheduler app asking transport to send message + // TASK contains request of performing some task, for example scheduler app asking transport to send message TASK ) @@ -37,13 +37,13 @@ func (et EventType) String() string { type EventSeverity int const ( - //UNKNOWN ... default + // UNKNOWN ... default UNKNOWN EventSeverity = iota - //INFO ... + // INFO ... INFO - //WARNING ... + // WARNING ... WARNING - //CRITICAL ... + // CRITICAL ... CRITICAL ) @@ -63,17 +63,17 @@ type Event struct { Message string } -//---------------------------------- metrics ---------------------------------- +// ---------------------------------- metrics ---------------------------------- // MetricType follows standard metric conventions from prometheus type MetricType int const ( - //UNTYPED ... + // UNTYPED ... UNTYPED MetricType = iota - //COUNTER only increases in value + // COUNTER only increases in value COUNTER - //GAUGE can increase or decrease in value + // GAUGE can increase or decrease in value GAUGE ) diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index fe3b0095..08a1ae6e 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -8,17 +8,17 @@ import ( // package handler contains the interface description for handler plugins -//Handler mangle messages to place on metric bus +// Handler mangle messages to place on metric bus type Handler interface { - //Run should only be used to send metrics or events apart from those being parsed from the transport. For example, this process could send metrics tracking the number of arrived messages and send them to the bus on a time delayed interval + // Run should only be used to send metrics or events apart from those being parsed from the transport. For example, this process could send metrics tracking the number of arrived messages and send them to the bus on a time delayed interval Run(context.Context, bus.MetricPublishFunc, bus.EventPublishFunc) - //Returns identification string for a handler + // Returns identification string for a handler Identify() string - //Handle parse incoming messages from the transport and write resulting metrics or events to the corresponding bus. Handlers MUST ensure that labelValues and labelKeys for metrics are always submitted int the same order + // Handle parse incoming messages from the transport and write resulting metrics or events to the corresponding bus. Handlers MUST ensure that labelValues and labelKeys for metrics are always submitted int the same order Handle([]byte, bool, bus.MetricPublishFunc, bus.EventPublishFunc) error - //Config a yaml object from the config file associated with this plugin is passed into this function. The plugin is responsible for handling this data + // Config a yaml object from the config file associated with this plugin is passed into this function. The plugin is responsible for handling this data Config([]byte) error } diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index cbdff8ac..520f1f28 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -8,13 +8,13 @@ import ( // package transport defines the interfaces for interacting with transport // plugins -//Mode indicates if transport is setup to receive or write +// Mode indicates if transport is setup to receive or write type Mode int const ( - //WRITE ... + // WRITE ... WRITE = iota - //READ ... + // READ ... READ ) @@ -25,21 +25,21 @@ var ( } ) -//String get string representation of mode +// String get string representation of mode func (m *Mode) String() string { return [...]string{"WRITE", "READ"}[*m] } -//FromString get mode from string +// FromString get mode from string func (m *Mode) FromString(s string) { *m = modStr[strings.ToLower(s)] } -//WriteFn func type for writing from transport to handlers +// WriteFn func type for writing from transport to handlers type WriteFn func([]byte) -//Transport type listens on one interface and delivers data to core -//TODO: listen for events internally +// Transport type listens on one interface and delivers data to core +// TODO: listen for events internally type Transport interface { Config([]byte) error Run(context.Context, WriteFn, chan bool) diff --git a/plugins/application/alertmanager/main.go b/plugins/application/alertmanager/main.go index 3f0d1135..09224d15 100644 --- a/plugins/application/alertmanager/main.go +++ b/plugins/application/alertmanager/main.go @@ -20,14 +20,14 @@ const ( appname = "alertmanager" ) -//AlertManager plugin suites for reporting alerts for Prometheus' alert manager +// AlertManager plugin suites for reporting alerts for Prometheus' alert manager type AlertManager struct { configuration lib.AppConfig logger *logging.Logger dump chan lib.PrometheusAlert } -//New constructor +// New constructor func New(logger *logging.Logger) application.Application { return &AlertManager{ configuration: lib.AppConfig{ @@ -39,23 +39,24 @@ func New(logger *logging.Logger) application.Application { } } -//ReceiveEvent is called whenever an event is broadcast on the event bus. The order of arguments +// ReceiveEvent is called whenever an event is broadcast on the event bus. The order of arguments func (am *AlertManager) ReceiveEvent(event data.Event) { switch event.Type { case data.ERROR: - //TODO: error handling + // TODO: error handling case data.EVENT: // generate alert am.dump <- lib.GenerateAlert(am.configuration.GeneratorURL, event) case data.RESULT: - //TODO: result type handling + // TODO: result type handling case data.LOG: - //TODO: log handling + // TODO: log handling + case data.TASK: } } -//Run implements main process of the application +// Run implements main process of the application func (am *AlertManager) Run(ctx context.Context, done chan bool) { wg := sync.WaitGroup{} @@ -65,7 +66,7 @@ func (am *AlertManager) Run(ctx context.Context, done chan bool) { goto done case dumped := <-am.dump: wg.Add(1) - go func(url string, dumped lib.PrometheusAlert, logger *logging.Logger, wg *sync.WaitGroup) { + go func(dumped lib.PrometheusAlert, wg *sync.WaitGroup) { defer wg.Done() alert, err := json.Marshal(dumped) if err != nil { @@ -81,6 +82,7 @@ func (am *AlertManager) Run(ctx context.Context, done chan bool) { am.logger.Metadata(logging.Metadata{"plugin": appname, "error": err}) am.logger.Error("failed to create http request") } + req = req.WithContext(ctx) req.Header.Set("X-Custom-Header", "smartgateway") req.Header.Set("Content-Type", "application/json") @@ -89,21 +91,19 @@ func (am *AlertManager) Run(ctx context.Context, done chan bool) { if err != nil { am.logger.Metadata(logging.Metadata{"plugin": appname, "error": err, "alert": buff.String()}) am.logger.Error("failed to report alert to AlertManager") - } else { + } else if resp.StatusCode != http.StatusOK { // https://github.com/prometheus/alertmanager/blob/master/api/v2/openapi.yaml#L170 - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - resp.Body.Close() - am.logger.Metadata(logging.Metadata{ - "plugin": appname, - "status": resp.Status, - "header": resp.Header, - "body": string(body)}) - am.logger.Error("failed to report alert to AlertManager") - } + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + am.logger.Metadata(logging.Metadata{ + "plugin": appname, + "status": resp.Status, + "header": resp.Header, + "body": string(body)}) + am.logger.Error("failed to report alert to AlertManager") } } - }(am.configuration.AlertManagerURL, dumped, am.logger, &wg) + }(dumped, &wg) } } @@ -113,7 +113,7 @@ done: am.logger.Info("exited") } -//Config implements application.Application +// Config implements application.Application func (am *AlertManager) Config(c []byte) error { am.configuration = lib.AppConfig{ AlertManagerURL: "http://localhost", diff --git a/plugins/application/alertmanager/pkg/lib/alert.go b/plugins/application/alertmanager/pkg/lib/alert.go index 24803185..0d0c1362 100644 --- a/plugins/application/alertmanager/pkg/lib/alert.go +++ b/plugins/application/alertmanager/pkg/lib/alert.go @@ -5,7 +5,7 @@ import ( "strings" ) -//PrometheusAlert represents data structure used for sending alerts to Prometheus Alert Manager +// PrometheusAlert represents data structure used for sending alerts to Prometheus Alert Manager type PrometheusAlert struct { Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` @@ -14,7 +14,7 @@ type PrometheusAlert struct { GeneratorURL string `json:"generatorURL"` } -//SetName generates unique name and description for the alert and creates new key/value pair for it in Labels +// SetName generates unique name and description for the alert and creates new key/value pair for it in Labels func (alert *PrometheusAlert) SetName() { if _, ok := alert.Labels["name"]; !ok { keys := make([]string, 0, len(alert.Labels)) @@ -36,7 +36,7 @@ func (alert *PrometheusAlert) SetName() { } } -//SetSummary generates summary annotation in case it is empty +// SetSummary generates summary annotation in case it is empty func (alert *PrometheusAlert) SetSummary() { generate := false if _, ok := alert.Annotations["summary"]; ok { diff --git a/plugins/application/alertmanager/pkg/lib/config.go b/plugins/application/alertmanager/pkg/lib/config.go index 8b67683b..59a9e047 100644 --- a/plugins/application/alertmanager/pkg/lib/config.go +++ b/plugins/application/alertmanager/pkg/lib/config.go @@ -1,6 +1,6 @@ package lib -//AppConfig ... +// AppConfig ... type AppConfig struct { AlertManagerURL string `yaml:"alertManagerUrl"` GeneratorURL string `yaml:"generatorUrl"` diff --git a/plugins/application/alertmanager/pkg/lib/generators.go b/plugins/application/alertmanager/pkg/lib/generators.go index 204752fd..a4b1d1d8 100644 --- a/plugins/application/alertmanager/pkg/lib/generators.go +++ b/plugins/application/alertmanager/pkg/lib/generators.go @@ -7,20 +7,10 @@ import ( ) const ( - alertSource = "SmartGateway" - isoTimeLayout = "2006-01-02 15:04:05.000000" - unknownSeverity = "unknown" + alertSource = "SmartGateway" ) -var ( - collectdAlertSeverity = map[string]string{ - "OKAY": "info", - "WARNING": "warning", - "FAILURE": "critical", - } -) - -//GenerateAlert generate prometheus alert from event +// GenerateAlert generate prometheus alert from event func GenerateAlert(generatorURL string, event data.Event) PrometheusAlert { alert := PrometheusAlert{ @@ -44,9 +34,3 @@ func GenerateAlert(generatorURL string, event data.Event) PrometheusAlert { alert.SetSummary() return alert } - -func timeFromEpoch(epoch float64) string { - whole := int64(epoch) - t := time.Unix(whole, int64((float64(whole)-epoch)*1000000000)) - return t.Format(time.RFC3339) -} diff --git a/plugins/application/alertmanager/pkg/lib/utils.go b/plugins/application/alertmanager/pkg/lib/utils.go index 9cf81623..b268363c 100644 --- a/plugins/application/alertmanager/pkg/lib/utils.go +++ b/plugins/application/alertmanager/pkg/lib/utils.go @@ -6,9 +6,9 @@ import ( "strings" ) -//assimilateMap recursively saves content of the given map to destination map of strings +// assimilateMap recursively saves content of the given map to destination map of strings func assimilateMap(theMap map[string]interface{}, destination *map[string]string) { - defer func() { //recover from any panic + defer func() { // recover from any panic if r := recover(); r != nil { log.Printf("Panic:recovered in assimilateMap %v\n", r) } diff --git a/plugins/application/elasticsearch/main.go b/plugins/application/elasticsearch/main.go index 7fcd7916..9c7addae 100644 --- a/plugins/application/elasticsearch/main.go +++ b/plugins/application/elasticsearch/main.go @@ -16,22 +16,20 @@ import ( ) const ( - appname = "elasticsearch" - genericSuffix = "_generic" - eventRecordFormat = `{"event_type":"%s","generated":"%s","severity":"%s","labels":%s,"annotations":%s}` + appname = "elasticsearch" ) var ( json = jsoniter.ConfigCompatibleWithStandardLibrary ) -//wrapper object for elasitcsearch index +// wrapper object for elasitcsearch index type esIndex struct { index string record []string } -//used to marshal event into es usable json +// used to marshal event into es usable json type record struct { EventType string `json:"event_type"` Generated string `json:"generated"` @@ -40,7 +38,7 @@ type record struct { Annotations map[string]interface{} `json:"annotations"` } -//Elasticsearch plugin saves events to Elasticsearch database +// Elasticsearch plugin saves events to Elasticsearch database type Elasticsearch struct { configuration *lib.AppConfig logger *logging.Logger @@ -49,7 +47,7 @@ type Elasticsearch struct { dump chan esIndex } -//New constructor +// New constructor func New(logger *logging.Logger) application.Application { return &Elasticsearch{ logger: logger, @@ -58,11 +56,11 @@ func New(logger *logging.Logger) application.Application { } } -//ReceiveEvent receive event from event bus +// ReceiveEvent receive event from event bus func (es *Elasticsearch) ReceiveEvent(event data.Event) { switch event.Type { case data.ERROR: - //TODO: error handling + // TODO: error handling case data.EVENT: // buffer or index record var recordList []string @@ -91,14 +89,15 @@ func (es *Elasticsearch) ReceiveEvent(event data.Event) { } es.dump <- esIndex{index: event.Index, record: recordList} case data.RESULT: - //TODO: result + // TODO: result case data.LOG: - //TODO: log + // TODO: log + case data.TASK: } } -//Run plugin process +// Run plugin process func (es *Elasticsearch) Run(ctx context.Context, done chan bool) { es.logger.Metadata(logging.Metadata{"plugin": appname, "url": es.configuration.HostURL}) es.logger.Info("storing events to Elasticsearch.") @@ -134,7 +133,7 @@ done: es.logger.Info("exited") } -//Config implements application.Application +// Config implements application.Application func (es *Elasticsearch) Config(c []byte) error { es.configuration = &lib.AppConfig{ HostURL: "", diff --git a/plugins/application/elasticsearch/pkg/lib/client.go b/plugins/application/elasticsearch/pkg/lib/client.go index a6351111..1ef7be83 100644 --- a/plugins/application/elasticsearch/pkg/lib/client.go +++ b/plugins/application/elasticsearch/pkg/lib/client.go @@ -15,18 +15,18 @@ import ( // ElasticSearch client implementation using official library from ElasticClient -//Client holds cluster connection configuration +// Client holds cluster connection configuration type Client struct { conn *esv7.Client } -//NewElasticClient constructor +// NewElasticClient constructor func NewElasticClient(cfg *AppConfig) (*Client, error) { client := &Client{} return client, client.Connect(cfg) } -//createTLSConfig creates appropriate TLS configuration with enabled cert-based authentication +// createTLSConfig creates appropriate TLS configuration with enabled cert-based authentication func createTLSConfig(serverName string, certFile string, keyFile string, caFile string) (*tls.Config, error) { cert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { @@ -41,6 +41,7 @@ func createTLSConfig(serverName string, certFile string, keyFile string, caFile certPool.AppendCertsFromPEM(ca) tlsConfig := &tls.Config{ + MinVersion: tls.VersionTLS13, Certificates: []tls.Certificate{cert}, RootCAs: certPool, } @@ -53,7 +54,7 @@ func createTLSConfig(serverName string, certFile string, keyFile string, caFile return tlsConfig, nil } -//Connect initiates connection with ES host and tests the connection +// Connect initiates connection with ES host and tests the connection func (esc *Client) Connect(cfg *AppConfig) error { var err error @@ -84,7 +85,7 @@ func (esc *Client) Connect(cfg *AppConfig) error { return err } -//IndicesExists returns true if given indices exists, otherwise return false +// IndicesExists returns true if given indices exists, otherwise return false func (esc *Client) IndicesExists(indices []string) (bool, error) { res, err := esc.conn.Indices.Exists(indices) if err != nil { @@ -96,7 +97,7 @@ func (esc *Client) IndicesExists(indices []string) (bool, error) { return false, nil } -//IndicesDelete ... +// IndicesDelete ... func (esc *Client) IndicesDelete(indices []string) error { res, err := esc.conn.Indices.Delete(indices) if err != nil { @@ -109,7 +110,7 @@ func (esc *Client) IndicesDelete(indices []string) error { return nil } -//IndicesCreate ... +// IndicesCreate ... func (esc *Client) IndicesCreate(indices []string) error { for _, index := range indices { res, err := esc.conn.Indices.Create(index) @@ -124,7 +125,7 @@ func (esc *Client) IndicesCreate(indices []string) error { return nil } -//Index saves given documents under given index +// Index saves given documents under given index func (esc *Client) Index(index string, documents []string, bulk bool) error { if !bulk { for _, doc := range documents { diff --git a/plugins/application/elasticsearch/pkg/lib/config.go b/plugins/application/elasticsearch/pkg/lib/config.go index 1d309382..1c68e0d5 100644 --- a/plugins/application/elasticsearch/pkg/lib/config.go +++ b/plugins/application/elasticsearch/pkg/lib/config.go @@ -1,6 +1,6 @@ package lib -//AppConfig holds configuration for Elasticsearch client +// AppConfig holds configuration for Elasticsearch client type AppConfig struct { HostURL string `yaml:"hostURL"` UseTLS bool `yaml:"useTLS"` diff --git a/plugins/application/loki/main.go b/plugins/application/loki/main.go index bc924798..2ea830c6 100644 --- a/plugins/application/loki/main.go +++ b/plugins/application/loki/main.go @@ -21,17 +21,17 @@ type LokiConfig struct { MaxWaitTime time.Duration } -//Loki plugin for forwarding logs to loki +// Loki plugin for forwarding logs to loki type Loki struct { - config *LokiConfig - client *connector.LokiConnector - logger *logging.Logger + config *LokiConfig + client *connector.LokiConnector + logger *logging.Logger logChannel chan interface{} } -//New constructor +// New constructor func New(logger *logging.Logger) application.Application { - return &Loki { + return &Loki{ logger: logger, logChannel: make(chan interface{}, 100), } @@ -54,7 +54,7 @@ func (l *Loki) ReceiveEvent(log data.Event) { } } -//Run run loki application plugin +// Run run loki application plugin func (l *Loki) Run(ctx context.Context, done chan bool) { l.logger.Metadata(logging.Metadata{"plugin": "loki", "url": l.config.Connection}) l.logger.Debug("storing logs to loki.") @@ -67,9 +67,9 @@ func (l *Loki) Run(ctx context.Context, done chan bool) { l.logger.Info("exited") } -//Config implements application.Application +// Config implements application.Application func (l *Loki) Config(c []byte) error { - l.config = &LokiConfig { + l.config = &LokiConfig{ Connection: "", BatchSize: 20, MaxWaitTime: 100, @@ -80,9 +80,9 @@ func (l *Loki) Config(c []byte) error { } l.client, err = connector.CreateLokiConnector(l.logger, - l.config.Connection, - l.config.MaxWaitTime, - l.config.BatchSize) + l.config.Connection, + l.config.MaxWaitTime, + l.config.BatchSize) if err != nil { return errors.Wrap(err, "failed to connect to Loki host") } diff --git a/plugins/application/loki/pkg/lib/loki.go b/plugins/application/loki/pkg/lib/loki.go index 1bc926bf..e3ded70e 100644 --- a/plugins/application/loki/pkg/lib/loki.go +++ b/plugins/application/loki/pkg/lib/loki.go @@ -1,8 +1,8 @@ package lib import ( - "time" "fmt" + "time" "github.com/infrawatch/apputils/connector" "github.com/infrawatch/sg-core/pkg/data" @@ -30,12 +30,10 @@ func CreateLokiLog(log data.Event) (connector.LokiLog, error) { return connector.LokiLog{}, err } - output := connector.LokiLog { + output := connector.LokiLog{ LogMessage: log.Message, - Timestamp: time.Duration(time.Duration(log.Time) * time.Second), - Labels: labels, + Timestamp: time.Duration(log.Time) * time.Second, + Labels: labels, } return output, nil } - - diff --git a/plugins/application/loki/pkg/lib/utils.go b/plugins/application/loki/pkg/lib/utils.go index 9cf81623..b268363c 100644 --- a/plugins/application/loki/pkg/lib/utils.go +++ b/plugins/application/loki/pkg/lib/utils.go @@ -6,9 +6,9 @@ import ( "strings" ) -//assimilateMap recursively saves content of the given map to destination map of strings +// assimilateMap recursively saves content of the given map to destination map of strings func assimilateMap(theMap map[string]interface{}, destination *map[string]string) { - defer func() { //recover from any panic + defer func() { // recover from any panic if r := recover(); r != nil { log.Printf("Panic:recovered in assimilateMap %v\n", r) } diff --git a/plugins/application/print/main.go b/plugins/application/print/main.go index 28ee72e2..20215687 100644 --- a/plugins/application/print/main.go +++ b/plugins/application/print/main.go @@ -28,7 +28,7 @@ type eventOutput struct { Annotations map[string]interface{} } -//Print plugin suites for logging both internal buses to a file. +// Print plugin suites for logging both internal buses to a file. type Print struct { configuration configT logger *logging.Logger @@ -36,7 +36,7 @@ type Print struct { mChan chan data.Metric } -//New constructor +// New constructor func New(logger *logging.Logger) application.Application { return &Print{ configuration: configT{ @@ -68,7 +68,7 @@ func (p *Print) ReceiveMetric(name string, t float64, mType data.MetricType, int p.mChan <- metric } -//Run run scrape endpoint +// Run run scrape endpoint func (p *Print) Run(ctx context.Context, done chan bool) { metrF, err := os.OpenFile(p.configuration.MetricOutput, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) @@ -125,7 +125,7 @@ done: p.logger.Info("exited") } -//Config implements application.Application +// Config implements application.Application func (p *Print) Config(c []byte) error { err := config.ParseConfig(bytes.NewReader(c), &p.configuration) if err != nil { diff --git a/plugins/application/prometheus/main.go b/plugins/application/prometheus/main.go index 488c3b6a..2a6c8753 100644 --- a/plugins/application/prometheus/main.go +++ b/plugins/application/prometheus/main.go @@ -98,7 +98,7 @@ type metricProcess struct { scrapped bool } -//PromCollector implements prometheus.Collector for incoming metrics. Metrics +// PromCollector implements prometheus.Collector for incoming metrics. Metrics // with differing label dimensions must create separate PromCollectors. type PromCollector struct { logger *logWrapper @@ -108,15 +108,16 @@ type PromCollector struct { cacheindexbuilder strings.Builder } -//NewPromCollector PromCollector constructor +// NewPromCollector PromCollector constructor func NewPromCollector(l *logWrapper, dimensions int, withtimestamp bool) *PromCollector { return &PromCollector{ - logger: l, - dimensions: dimensions, + logger: l, + dimensions: dimensions, + withtimestamp: withtimestamp, } } -//Describe implements prometheus.Collector +// Describe implements prometheus.Collector func (pc *PromCollector) Describe(ch chan<- *prometheus.Desc) { pc.mProc.Range(func(mName interface{}, itf interface{}) bool { ch <- itf.(*metricProcess).description @@ -124,11 +125,11 @@ func (pc *PromCollector) Describe(ch chan<- *prometheus.Desc) { }) } -//Collect implements prometheus.Collector +// Collect implements prometheus.Collector func (pc *PromCollector) Collect(ch chan<- prometheus.Metric) { - //fmt.Printf("\nScrapping collector of size %d with %d metrics:\n", pc.dimensions, syncMapLen(&pc.mProc)) + // fmt.Printf("\nScrapping collector of size %d with %d metrics:\n", pc.dimensions, syncMapLen(&pc.mProc)) pc.mProc.Range(func(mName interface{}, itf interface{}) bool { - //fmt.Println(mName) + // fmt.Println(mName) mProc := itf.(*metricProcess) mProc.scrapped = true pMetric, err := prometheus.NewConstMetric(mProc.description, typeToPromType[mProc.metric.Type], mProc.metric.Value, mProc.metric.LabelVals...) @@ -149,12 +150,12 @@ func (pc *PromCollector) Collect(ch chan<- prometheus.Metric) { }) } -//Dimensions return dimension size of labels in collector +// Dimensions return dimension size of labels in collector func (pc *PromCollector) Dimensions() int { return pc.dimensions } -//UpdateMetrics update metrics in collector +// UpdateMetrics update metrics in collector func (pc *PromCollector) UpdateMetrics(name string, time float64, typ data.MetricType, interval time.Duration, value float64, labelKeys []string, labelVals []string, ep *expiryProc) { var mProc *metricProcess pc.cacheindexbuilder.Grow(len(name)) @@ -206,20 +207,20 @@ func (pc *PromCollector) UpdateMetrics(name string, time float64, typ data.Metri pc.cacheindexbuilder.Reset() } -//Prometheus plugin for interfacing with Prometheus. Metrics with the same dimensions +// Prometheus plugin for interfacing with Prometheus. Metrics with the same dimensions // are included in the same collectors even if the labels are different type Prometheus struct { configuration configT logger *logWrapper - collectors sync.Map //collectors mapped according to label dimensions - metricExpiryProcs sync.Map //stores expiry processes based for each metric interval + collectors sync.Map // collectors mapped according to label dimensions + metricExpiryProcs sync.Map // stores expiry processes based for each metric interval collectorExpiryProc *expiryProc registry *prometheus.Registry ctx context.Context sync.RWMutex } -//New constructor +// New constructor func New(l *logging.Logger) application.Application { return &Prometheus{ configuration: configT{ @@ -236,7 +237,7 @@ func New(l *logging.Logger) application.Application { } } -//ReceiveMetric callback function for recieving metric from the bus +// ReceiveMetric callback function for receiving metric from the bus func (p *Prometheus) ReceiveMetric(name string, t float64, typ data.MetricType, interval time.Duration, value float64, labelKeys []string, labelVals []string) { p.Lock() labelLen := len(labelKeys) @@ -281,12 +282,12 @@ func (p *Prometheus) ReceiveMetric(name string, t float64, typ data.MetricType, p.Unlock() } -//Run run scrape endpoint +// Run run scrape endpoint func (p *Prometheus) Run(ctx context.Context, done chan bool) { p.ctx = ctx p.registry = prometheus.NewRegistry() - //Set up Metric Exporter + // Set up Metric Exporter handler := http.NewServeMux() handler.Handle("/metrics", promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{})) handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -302,7 +303,7 @@ func (p *Prometheus) Run(ctx context.Context, done chan bool) { } }) - //run exporter for prometheus to scrape + // run exporter for prometheus to scrape metricsURL := fmt.Sprintf("%s:%d", p.configuration.Host, p.configuration.Port) srv := &http.Server{Addr: metricsURL} @@ -320,7 +321,7 @@ func (p *Prometheus) Run(ctx context.Context, done chan bool) { p.logger.Infof("metric server at : %s", metricsURL) - //run collector expiry process + // run collector expiry process go p.collectorExpiryProc.run(ctx) <-ctx.Done() @@ -336,7 +337,7 @@ func (p *Prometheus) Run(ctx context.Context, done chan bool) { p.logger.Infof("exited") } -//Config implements application.Application +// Config implements application.Application func (p *Prometheus) Config(c []byte) error { p.configuration = configT{} err := config.ParseConfig(bytes.NewReader(c), &p.configuration) diff --git a/plugins/handler/ceilometer-metrics/main.go b/plugins/handler/ceilometer-metrics/main.go index 4732cd24..b8f2c385 100644 --- a/plugins/handler/ceilometer-metrics/main.go +++ b/plugins/handler/ceilometer-metrics/main.go @@ -13,7 +13,7 @@ import ( ) const ( - metricTimeout = 100 //TODO - further research on best interval to use here + metricTimeout = 100 // TODO - further research on best interval to use here ) var ( @@ -80,7 +80,7 @@ func (c *ceilometerMetricHandler) Handle(blob []byte, reportErrs bool, mpf bus.M if err != nil { c.totalDecodeErrors++ if reportErrs { - epf(data.Event{ //THIS IS EXTREMELY SLOW + epf(data.Event{ // THIS IS EXTREMELY SLOW Index: c.Identify(), Type: data.ERROR, Severity: data.CRITICAL, @@ -106,11 +106,11 @@ func (c *ceilometerMetricHandler) Handle(blob []byte, reportErrs bool, mpf bus.M t = 0.0 } - mType := ceilTypeToMetricType[m.CounterType] //zero value is UNTYPED + mType := ceilTypeToMetricType[m.CounterType] // zero value is UNTYPED cNameShards := strings.Split(m.CounterName, ".") - labelKeys, labelVals := genLabels(&m, msg.Publisher, cNameShards) - err = validateMetric(&m, cNameShards) + labelKeys, labelVals := genLabels(m, msg.Publisher, cNameShards) + err = validateMetric(m, cNameShards) if err != nil { c.totalDecodeErrors++ if reportErrs { @@ -132,7 +132,7 @@ func (c *ceilometerMetricHandler) Handle(blob []byte, reportErrs bool, mpf bus.M } c.totalMetricsDecoded++ mpf( - genName(&m, cNameShards), + genName(cNameShards), t, mType, time.Second*metricTimeout, @@ -156,7 +156,7 @@ func validateMessage(msg *ceilometer.Message) error { return nil } -func validateMetric(m *ceilometer.Metric, cNameShards []string) error { +func validateMetric(m ceilometer.Metric, cNameShards []string) error { if len(cNameShards) < 1 { return errors.New("missing 'counter_name' in metric payload") } @@ -184,14 +184,14 @@ func validateMetric(m *ceilometer.Metric, cNameShards []string) error { return nil } -func genName(m *ceilometer.Metric, cNameShards []string) string { +func genName(cNameShards []string) string { nameParts := []string{"ceilometer"} nameParts = append(nameParts, cNameShards...) return strings.Join(nameParts, "_") } -func genLabels(m *ceilometer.Metric, publisher string, cNameShards []string) ([]string, []string) { - labelKeys := make([]string, 8) //TODO: set to persistant var +func genLabels(m ceilometer.Metric, publisher string, cNameShards []string) ([]string, []string) { + labelKeys := make([]string, 8) // TODO: set to persistent var labelVals := make([]string, 8) plugin := cNameShards[0] pluginVal := m.ResourceID @@ -201,7 +201,7 @@ func genLabels(m *ceilometer.Metric, publisher string, cNameShards []string) ([] labelKeys[0] = plugin labelVals[0] = pluginVal - //TODO: should we instead do plugin: , plugin_id: ? + // TODO: should we instead do plugin: , plugin_id: ? labelKeys[1] = "publisher" labelVals[1] = publisher @@ -241,7 +241,7 @@ func (c *ceilometerMetricHandler) Config(blob []byte) error { return nil } -//New ceilometer metric handler constructor +// New ceilometer metric handler constructor func New() handler.Handler { return &ceilometerMetricHandler{ ceilo: ceilometer.New(), diff --git a/plugins/handler/ceilometer-metrics/main_test.go b/plugins/handler/ceilometer-metrics/main_test.go index 7a89ee88..3eee68d0 100644 --- a/plugins/handler/ceilometer-metrics/main_test.go +++ b/plugins/handler/ceilometer-metrics/main_test.go @@ -16,7 +16,7 @@ var ( metricsUT []data.Metric ) -//CeilometerMetricTemplate holds correct parsings for comparing against parsed results +// CeilometerMetricTemplate holds correct parsings for comparing against parsed results type CeilometerMetricTestTemplate struct { TestInput jsoniter.RawMessage `json:"testInput"` ValidatedResults []data.Metric `json:"validatedResults"` @@ -31,7 +31,9 @@ func ceilometerMetricTestTemplateFromJSON(jsonData jsoniter.RawMessage) (*Ceilom return &testData, nil } -func EventReceive(data.Event) +func EventReceive(data.Event) { + +} func MetricReceive(name string, mTime float64, mType data.MetricType, interval time.Duration, value float64, labelKeys []string, labelVals []string) { metricsUT = append(metricsUT, data.Metric{ @@ -54,7 +56,7 @@ func TestCeilometerIncoming(t *testing.T) { } tests := make(map[string]jsoniter.RawMessage) - err = json.Unmarshal([]byte(testData), &tests) + err = json.Unmarshal(testData, &tests) if err != nil { t.Errorf("failed to unmarshal test data: %s", err.Error()) } diff --git a/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go b/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go index f434dd9f..9d596a3d 100644 --- a/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go +++ b/plugins/handler/ceilometer-metrics/pkg/ceilometer/ceilometer.go @@ -14,12 +14,12 @@ var ( json = jsoniter.ConfigCompatibleWithStandardLibrary ) -//Metedata represents metadataof a metric from ceilometer +// Metedata represents metadataof a metric from ceilometer type metadata struct { Host string } -//Metric represents a single metric from ceilometer for unmarshalling +// Metric represents a single metric from ceilometer for unmarshalling type Metric struct { Source string CounterName string `json:"counter_name"` @@ -33,32 +33,32 @@ type Metric struct { ResourceMetadata metadata `json:"resource_metadata"` } -//Message struct represents an incoming ceilometer metrics message +// Message struct represents an incoming ceilometer metrics message type Message struct { Publisher string `json:"publisher_id"` Payload []Metric `json:"payload"` } -//OsloSchema initial OsloSchema +// OsloSchema initial OsloSchema type OsloSchema struct { Request struct { OsloMessage string `json:"oslo.message"` } } -//Ceilometer instance for parsing and handling ceilometer metric messages +// Ceilometer instance for parsing and handling ceilometer metric messages type Ceilometer struct { schema OsloSchema } -//New Ceilometer constructor +// New Ceilometer constructor func New() *Ceilometer { return &Ceilometer{ schema: OsloSchema{}, } } -//ParseInputJSON parse blob into list of metrics +// ParseInputJSON parse blob into list of metrics func (c *Ceilometer) ParseInputJSON(blob []byte) (*Message, error) { msg := &Message{} err := json.Unmarshal(blob, &c.schema) @@ -73,7 +73,7 @@ func (c *Ceilometer) ParseInputJSON(blob []byte) (*Message, error) { return msg, nil } -//sanitize remove extraneous characters +// sanitize remove extraneous characters func (c *Ceilometer) sanitize() string { sanitized := rexForNestedQuote.ReplaceAllString(c.schema.Request.OsloMessage, `"`) diff --git a/plugins/handler/collectd-metrics/main.go b/plugins/handler/collectd-metrics/main.go index 52c10cc1..7587e18b 100644 --- a/plugins/handler/collectd-metrics/main.go +++ b/plugins/handler/collectd-metrics/main.go @@ -21,7 +21,7 @@ var ( ) type collectdMetricsHandler struct { - totalMetricsDecoded uint64 //total number of collectd metrics decoded from messages + totalMetricsDecoded uint64 // total number of collectd metrics decoded from messages totalMessagesReceived uint64 totalDecodeErrors uint64 } @@ -91,7 +91,7 @@ func (c *collectdMetricsHandler) Handle(blob []byte, reportErrors bool, pf bus.M } for _, cdmetric := range *cdmetrics { - err = c.writeMetrics(&cdmetric, pf) + err = c.writeMetrics(cdmetric, pf) if err != nil { c.totalDecodeErrors++ if reportErrors { @@ -118,8 +118,8 @@ func (c *collectdMetricsHandler) Identify() string { return "collectd-metrics" } -func (c *collectdMetricsHandler) writeMetrics(cdmetric *collectd.Metric, pf bus.MetricPublishFunc) error { - if !validateMetric(cdmetric) { +func (c *collectdMetricsHandler) writeMetrics(cdmetric collectd.Metric, pf bus.MetricPublishFunc) error { + if !validateMetric(&cdmetric) { return errors.New(0, "") } pluginInstance := cdmetric.PluginInstance @@ -137,7 +137,7 @@ func (c *collectdMetricsHandler) writeMetrics(cdmetric *collectd.Metric, pf bus. mType = data.UNTYPED } pf( - genMetricName(cdmetric, index), + genMetricName(&cdmetric, index), cdmetric.Time.Float(), mType, time.Duration(cdmetric.Interval)*time.Second, @@ -189,7 +189,7 @@ func genMetricName(cdmetric *collectd.Metric, index int) (name string) { return } -//New create new collectdMetricsHandler object +// New create new collectdMetricsHandler object func New() handler.Handler { return &collectdMetricsHandler{} } diff --git a/plugins/handler/collectd-metrics/main_test.go b/plugins/handler/collectd-metrics/main_test.go index daf2745e..610a1639 100644 --- a/plugins/handler/collectd-metrics/main_test.go +++ b/plugins/handler/collectd-metrics/main_test.go @@ -34,7 +34,9 @@ var ( metricsUT []data.Metric ) -func EventReceive(data.Event) +func EventReceive(data.Event) { + +} func MetricReceive(name string, mTime float64, mType data.MetricType, interval time.Duration, value float64, labelKeys []string, labelVals []string) { metricsUT = append(metricsUT, data.Metric{ @@ -48,7 +50,7 @@ func MetricReceive(name string, mTime float64, mType data.MetricType, interval t }) } -//Use this to update messages in metric-tests-expected.json if behavior should change +// Use this to update messages in metric-tests-expected.json if behavior should change // func TestPrintMsgs(t *testing.T) { // metricHandler := New().(*collectdMetricsHandler) @@ -77,7 +79,7 @@ func TestMessageParsing(t *testing.T) { t.Run("Invalid Messages", func(t *testing.T) { for _, blob := range testMsgsInvalid { metricHandler.totalDecodeErrors = 0 - metricHandler.Handle([]byte(blob), false, MetricReceive, EventReceive) + _ = metricHandler.Handle([]byte(blob), false, MetricReceive, EventReceive) assert.Equal(t, uint64(1), metricHandler.totalDecodeErrors) } }) diff --git a/plugins/handler/collectd-metrics/pkg/collectd/collectd.go b/plugins/handler/collectd-metrics/pkg/collectd/collectd.go index db6f3919..ba4c985b 100644 --- a/plugins/handler/collectd-metrics/pkg/collectd/collectd.go +++ b/plugins/handler/collectd-metrics/pkg/collectd/collectd.go @@ -24,12 +24,10 @@ type Metric struct { Meta collectdMeta `json:"meta,omitempty"` } -//ParseInputByte ... +// ParseInputByte ... func ParseInputByte(jsonBlob []byte) (*[]Metric, error) { collect := []Metric{} - //var json = jsoniter.ConfigCompatibleWithStandardLibrary.BorrowIterator(jsonBlob) var json = jsoniter.ConfigFastest.BorrowIterator(jsonBlob) - //defer jsoniter.ConfigCompatibleWithStandardLibrary.ReturnIterator(json) json.ReadVal(&collect) // err := json.Unmarshal(jsonBlob, &collect) if json.Error != nil { diff --git a/plugins/handler/events/ceilometer/ceilometer.go b/plugins/handler/events/ceilometer/ceilometer.go index 7576492b..711453ce 100644 --- a/plugins/handler/events/ceilometer/ceilometer.go +++ b/plugins/handler/events/ceilometer/ceilometer.go @@ -14,9 +14,9 @@ import ( var ( // Regular expression for sanitizing received data rexForNestedQuote = regexp.MustCompile(`\\\"`) - //json parser + // json parser json = jsoniter.ConfigCompatibleWithStandardLibrary - //severity converter. A DNE returns data.INFO + // severity converter. A DNE returns data.INFO ceilometerAlertSeverity = map[string]data.EventSeverity{ "audit": data.INFO, "info": data.INFO, @@ -96,16 +96,19 @@ func (om *osloMessage) fromBytes(blob []byte) error { return json.Unmarshal(blob, om) } -//Ceilometer holds parsed ceilometer event data and provides methods for retrieving that data +// Ceilometer holds parsed ceilometer event data and provides methods for retrieving that data // in a standardizes format type Ceilometer struct { osloMessage osloMessage } -//Parse parse ceilometer message data +// Parse parse ceilometer message data func (c *Ceilometer) Parse(blob []byte) error { rm := rawMessage{} - json.Unmarshal(blob, &rm) + err := json.Unmarshal(blob, &rm) + if err != nil { + return err + } rm.sanitizeMessage() c.osloMessage = osloMessage{} @@ -124,7 +127,7 @@ func (c *Ceilometer) name(index int) string { return buildName(fmt.Sprintf("%s%s", source, genericSuffix)) } -//PublishEvents iterate through events in payload calling publish function on each iteration +// PublishEvents iterate through events in payload calling publish function on each iteration func (c *Ceilometer) PublishEvents(epf bus.EventPublishFunc) error { for idx, event := range c.osloMessage.Payload { ts, err := event.traitsFormatted() @@ -149,7 +152,7 @@ func (c *Ceilometer) PublishEvents(epf bus.EventPublishFunc) error { } func (c *Ceilometer) getTimeAsEpoch(payload osloPayload) float64 { - //order of precedence: payload timestamp, message timestamp, zero + // order of precedence: payload timestamp, message timestamp, zero if payload.Generated != "" { return float64(lib.EpochFromFormat(payload.Generated)) @@ -168,7 +171,7 @@ func buildName(eventType string) string { if len(etParts) > 1 { output = strings.Join(etParts[:len(etParts)-1], "_") } - strings.ReplaceAll(output, "-", "_") + output = strings.ReplaceAll(output, "-", "_") // ensure index name is prefixed with source name if !strings.HasPrefix(output, fmt.Sprintf("%s_", source)) { diff --git a/plugins/handler/events/collectd/collectd.go b/plugins/handler/events/collectd/collectd.go index 61932249..0ac10d03 100644 --- a/plugins/handler/events/collectd/collectd.go +++ b/plugins/handler/events/collectd/collectd.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" ) -//collectd contains objects for handling collectd events +// collectd contains objects for handling collectd events var ( // Regular expression for sanitizing received data @@ -32,32 +32,25 @@ var ( const source string = "collectd" -type msgType int - -const ( - collectd msgType = iota - sensubility -) - type eventMessage struct { Labels map[string]interface{} Annotations map[string]interface{} StartsAt string `json:"startsAt"` } -//Collectd type for handling collectd event messages +// Collectd type for handling collectd event messages type Collectd struct { events []data.Event } -//PublishEvents write events to publish func +// PublishEvents write events to publish func func (c *Collectd) PublishEvents(epf bus.EventPublishFunc) { for _, e := range c.events { epf(e) } } -//Parse parse event message +// Parse parse event message func (c *Collectd) Parse(blob []byte) error { message := []eventMessage{} err := json.UnmarshalFromString(sanitize(blob), &message) @@ -70,13 +63,13 @@ func (c *Collectd) Parse(blob []byte) error { var name string name, ok := eMsg.Labels["alertname"].(string) if !ok { - //sensubility + // sensubility v, ok := eMsg.Labels["check"].(string) if ok { name = strings.ReplaceAll(v, "-", "_") } } - //gets rid of last term showing type like "gauge" + // gets rid of last term showing type like "gauge" if index := strings.LastIndex(name, "_"); index > len("collectd_") { name = name[0:index] } @@ -139,7 +132,7 @@ func sanitize(jsondata []byte) string { output = rexForVes.ReplaceAllLiteralString(output, fmt.Sprintf(`"ves":{%s}`, substr)) // messages from collectd-sensubility don't contain array, so add surrounding brackets if rexForArray.FindString(output) == "" { - output = fmt.Sprintf("[%s]", string(output)) + output = fmt.Sprintf("[%s]", output) } } return output diff --git a/plugins/handler/events/handlers/handlers.go b/plugins/handler/events/handlers/handlers.go index 20461473..09b06c6b 100644 --- a/plugins/handler/events/handlers/handlers.go +++ b/plugins/handler/events/handlers/handlers.go @@ -27,7 +27,7 @@ func collectdEventHandler(blob []byte, epf bus.EventPublishFunc) error { return nil } -//EventHandlers handle messages according to the expected data source and write parsed events to the events bus +// EventHandlers handle messages according to the expected data source and write parsed events to the events bus var EventHandlers = map[string]func([]byte, bus.EventPublishFunc) error{ "ceilometer": ceilometerEventHandler, "collectd": collectdEventHandler, diff --git a/plugins/handler/events/main.go b/plugins/handler/events/main.go index 9e3c16b1..a510a106 100644 --- a/plugins/handler/events/main.go +++ b/plugins/handler/events/main.go @@ -14,27 +14,20 @@ import ( "github.com/infrawatch/sg-core/plugins/handler/events/pkg/lib" ) -//EventsHandler is processing event messages +// EventsHandler is processing event messages type EventsHandler struct { eventsReceived map[string]uint64 configuration *lib.HandlerConfig } -//ProcessingError contains processing error data -type ProcessingError struct { - Error string `json:"error"` - Context string `json:"context"` - Message string `json:"message"` -} - -//Handle implements the data.EventsHandler interface +// Handle implements the data.EventsHandler interface func (eh *EventsHandler) Handle(msg []byte, reportErrors bool, sendMetric bus.MetricPublishFunc, sendEvent bus.EventPublishFunc) error { source := lib.DataSource(0) if eh.configuration.StrictSource != "" { source.SetFromString(eh.configuration.StrictSource) } else { - // if strict source is not set then handler is processing channel with multiple data sources - // and has to be recognized from message format + // if strict source is not set then handler is processing channel with multiple data sources + // and has to be recognized from message format source.SetFromMessage(msg) } @@ -65,7 +58,7 @@ func (eh *EventsHandler) Handle(msg []byte, reportErrors bool, sendMetric bus.Me return err } -//Run send internal metrics to bus +// Run send internal metrics to bus func (eh *EventsHandler) Run(ctx context.Context, sendMetric bus.MetricPublishFunc, sendEvent bus.EventPublishFunc) { for { select { @@ -99,18 +92,18 @@ func (eh *EventsHandler) Run(ctx context.Context, sendMetric bus.MetricPublishFu done: } -//Identify returns handler's name +// Identify returns handler's name func (eh *EventsHandler) Identify() string { return "events" } -//Config ... +// Config ... func (eh *EventsHandler) Config(blob []byte) error { eh.configuration = &lib.HandlerConfig{StrictSource: ""} return config.ParseConfig(bytes.NewReader(blob), eh.configuration) } -//New create new eventsHandler object +// New create new eventsHandler object func New() handler.Handler { return &EventsHandler{eventsReceived: make(map[string]uint64)} } diff --git a/plugins/handler/events/pkg/lib/config.go b/plugins/handler/events/pkg/lib/config.go index dd421c6d..9db5313c 100644 --- a/plugins/handler/events/pkg/lib/config.go +++ b/plugins/handler/events/pkg/lib/config.go @@ -1,6 +1,6 @@ package lib -//HandlerConfig contains validateable configuration +// HandlerConfig contains validateable configuration type HandlerConfig struct { StrictSource string `yaml:"strictSource" validate:"oneof=generic collectd ceilometer"` } diff --git a/plugins/handler/events/pkg/lib/source.go b/plugins/handler/events/pkg/lib/source.go index 385b9bfc..5c4451d7 100644 --- a/plugins/handler/events/pkg/lib/source.go +++ b/plugins/handler/events/pkg/lib/source.go @@ -5,11 +5,11 @@ import ( ) var ( - //Ceilometer data parsers + // Ceilometer data parsers rexForOsloMessage = regexp.MustCompile(`\\*"oslo.message\\*"\s*:\s*\\*"({.*})\\*"`) rexForPayload = regexp.MustCompile(`\\+"payload\\+"\s*:\s*\[(.*)\]`) rexForEventType = regexp.MustCompile(`\\+"event_type\\+"\s*:\s*\\*"`) - //collectd data parsers + // collectd data parsers rexForLabelsField = regexp.MustCompile(`\\?"labels\\?"\w?:\w?\{`) rexForAnnotationField = regexp.MustCompile(`\\?"annotations\\?"\w?:\w?\{`) ) @@ -24,10 +24,7 @@ func recognizeCeilometer(jsondata []byte) bool { return false } match = rexForPayload.FindSubmatchIndex(jsondata) - if match == nil { - return false - } - return true + return match != nil } func recognizeCollectd(jsondata []byte) bool { @@ -41,15 +38,15 @@ var recognizers = map[string](func([]byte) bool){ "ceilometer": recognizeCeilometer, } -//DataSource indentifies a format of incoming data in the message bus channel. +// DataSource indentifies a format of incoming data in the message bus channel. type DataSource int -//ListAll returns slice of supported data sources in form of human readable names. +// ListAll returns slice of supported data sources in form of human readable names. func (src DataSource) ListAll() []string { return []string{"ceilometer", "collectd", "generic"} } -//SetFromString resets value according to given human readable identification. Returns false if invalid identification was given. +// SetFromString resets value according to given human readable identification. Returns false if invalid identification was given. func (src *DataSource) SetFromString(name string) bool { for index, value := range src.ListAll() { if name == value { @@ -60,7 +57,7 @@ func (src *DataSource) SetFromString(name string) bool { return false } -//SetFromMessage resets value according to given message data format +// SetFromMessage resets value according to given message data format func (src *DataSource) SetFromMessage(jsondata []byte) { for source, rec := range recognizers { if rec(jsondata) { @@ -73,7 +70,7 @@ func (src *DataSource) SetFromMessage(jsondata []byte) { src.SetFromString("generic") } -//String returns human readable data type identification. +// String returns human readable data type identification. func (src DataSource) String() string { return (src.ListAll())[src] } diff --git a/plugins/handler/events/pkg/lib/time.go b/plugins/handler/events/pkg/lib/time.go index 50d1182f..d18a162a 100644 --- a/plugins/handler/events/pkg/lib/time.go +++ b/plugins/handler/events/pkg/lib/time.go @@ -7,7 +7,7 @@ var ( rFC3339 = "2006-01-02T15:04:05.000000" ) -//EpochFromFormat get epoch time from one of select time string formats +// EpochFromFormat get epoch time from one of select time string formats func EpochFromFormat(ts string) int64 { for _, layout := range []string{rFC3339, time.RFC3339, time.RFC3339Nano, time.ANSIC, isoTimeLayout} { stamp, err := time.Parse(layout, ts) diff --git a/plugins/handler/logs/main.go b/plugins/handler/logs/main.go index 49bc61e1..bad613a4 100644 --- a/plugins/handler/logs/main.go +++ b/plugins/handler/logs/main.go @@ -1,12 +1,12 @@ package main import ( + "bytes" "context" + "encoding/json" "fmt" - "time" - "bytes" "strconv" - "encoding/json" + "time" "github.com/infrawatch/sg-core/pkg/bus" "github.com/infrawatch/sg-core/pkg/config" @@ -31,15 +31,15 @@ const ( func (rs SyslogSeverity) toEventSeverity() data.EventSeverity { return []data.EventSeverity{data.CRITICAL, - data.CRITICAL, - data.CRITICAL, - data.CRITICAL, - data.WARNING, - data.INFO, - data.INFO, - data.INFO, - data.UNKNOWN, - }[rs] + data.CRITICAL, + data.CRITICAL, + data.CRITICAL, + data.WARNING, + data.INFO, + data.INFO, + data.INFO, + data.UNKNOWN, + }[rs] } type logConfig struct { @@ -96,25 +96,24 @@ func (l *logHandler) parse(log []byte) (data.Event, error) { index := fmt.Sprintf("logs-%s-%d-%d-%d", hostname, year, month, day) - // remove message and timestamp from labels (leave the rest) delete(logFields, l.config.MessageField) delete(logFields, l.config.TimestampField) - parsedLog = data.Event { - Index: index, - Time: timestamp, - Type: data.LOG, + parsedLog = data.Event{ + Index: index, + Time: timestamp, + Type: data.LOG, Publisher: hostname, - Severity: eventSeverity, - Labels: logFields, - Message: msg, + Severity: eventSeverity, + Labels: logFields, + Message: msg, } return parsedLog, nil } -//Handle implements the data.EventsHandler interface +// Handle implements the data.EventsHandler interface func (l *logHandler) Handle(msg []byte, reportErrors bool, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) error { var err error l.totalLogsReceived++ @@ -124,29 +123,27 @@ func (l *logHandler) Handle(msg []byte, reportErrors bool, mpf bus.MetricPublish epf( log, ) - } else { - if reportErrors { - epf(data.Event{ - Index: l.Identify(), - Type: data.ERROR, - Severity: data.CRITICAL, - Time: 0.0, - Labels: map[string]interface{}{ - "error": err.Error(), - "context": string(msg), - "message": "failed to parse log - disregarding", - }, - Annotations: map[string]interface{}{ - "description": "internal smartgateway log handler error", - }, - }) - } + } else if reportErrors { + epf(data.Event{ + Index: l.Identify(), + Type: data.ERROR, + Severity: data.CRITICAL, + Time: 0.0, + Labels: map[string]interface{}{ + "error": err.Error(), + "context": string(msg), + "message": "failed to parse log - disregarding", + }, + Annotations: map[string]interface{}{ + "description": "internal smartgateway log handler error", + }, + }) } return err } -//Run send internal metrics to bus +// Run send internal metrics to bus func (l *logHandler) Run(ctx context.Context, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) { for { select { @@ -171,7 +168,7 @@ func (l *logHandler) Identify() string { return "log" } -//New create new logHandler object +// New create new logHandler object func New() handler.Handler { return &logHandler{} } diff --git a/plugins/handler/logs/pkg/lib/time.go b/plugins/handler/logs/pkg/lib/time.go index 5250d1e8..6a9e680d 100644 --- a/plugins/handler/logs/pkg/lib/time.go +++ b/plugins/handler/logs/pkg/lib/time.go @@ -1,8 +1,8 @@ package lib import ( - "time" "fmt" + "time" ) var ( @@ -10,7 +10,7 @@ var ( rFC3339 = "2006-01-02T15:04:05.000000" ) -//TimeFromFormat get time from one of select time string formats +// TimeFromFormat get time from one of select time string formats func TimeFromFormat(ts string) (time.Time, error) { for _, layout := range []string{rFC3339, time.RFC3339, time.RFC3339Nano, time.ANSIC, isoTimeLayout} { stamp, err := time.Parse(layout, ts) @@ -18,5 +18,5 @@ func TimeFromFormat(ts string) (time.Time, error) { return stamp, nil } } - return time.Now(), fmt.Errorf("unable to parse timestamp.") + return time.Now(), fmt.Errorf("unable to parse timestamp") } diff --git a/plugins/transport/dummy-alertmanager/main.go b/plugins/transport/dummy-alertmanager/main.go index 18022ad6..bbaea935 100644 --- a/plugins/transport/dummy-alertmanager/main.go +++ b/plugins/transport/dummy-alertmanager/main.go @@ -23,13 +23,13 @@ type configT struct { Output string } -//DummyAM listens on given port and prints all HTTP requests +// DummyAM listens on given port and prints all HTTP requests type DummyAM struct { conf configT logger *logging.Logger } -//Run implements type Transport +// Run implements type Transport func (dam *DummyAM) Run(ctx context.Context, w transport.WriteFn, done chan bool) { // print all received requests http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { @@ -68,11 +68,11 @@ func (dam *DummyAM) Run(ctx context.Context, w transport.WriteFn, done chan bool dam.logger.Info("exited") } -//Listen ... +// Listen ... func (dam *DummyAM) Listen(e data.Event) { } -//Config load configurations +// Config load configurations func (dam *DummyAM) Config(c []byte) error { err := config.ParseConfig(bytes.NewReader(c), &dam.conf) if err != nil { @@ -81,7 +81,7 @@ func (dam *DummyAM) Config(c []byte) error { return nil } -//New create new socket transport +// New create new socket transport func New(l *logging.Logger) transport.Transport { return &DummyAM{ conf: configT{ diff --git a/plugins/transport/dummy-events/main.go b/plugins/transport/dummy-events/main.go index 2ee9fc59..14642506 100644 --- a/plugins/transport/dummy-events/main.go +++ b/plugins/transport/dummy-events/main.go @@ -66,12 +66,12 @@ type configT struct { Interval int } -//DummyEvents plugin struct +// DummyEvents plugin struct type DummyEvents struct { c configT } -//Run implements type Transport +// Run implements type Transport func (de *DummyEvents) Run(ctx context.Context, wrFn transport.WriteFn, done chan bool) { for { @@ -95,12 +95,12 @@ func (de *DummyEvents) Run(ctx context.Context, wrFn transport.WriteFn, done cha done: } -//Listen ... +// Listen ... func (de *DummyEvents) Listen(e data.Event) { } -//Config load configurations +// Config load configurations func (de *DummyEvents) Config(c []byte) error { de.c = configT{ Ceilometer: true, @@ -114,7 +114,7 @@ func (de *DummyEvents) Config(c []byte) error { return nil } -//New create new socket transport +// New create new socket transport func New(l *logging.Logger) transport.Transport { return &DummyEvents{} } diff --git a/plugins/transport/dummy-logs/main.go b/plugins/transport/dummy-logs/main.go index bc15c07a..94d69282 100644 --- a/plugins/transport/dummy-logs/main.go +++ b/plugins/transport/dummy-logs/main.go @@ -21,12 +21,12 @@ var logMessages = []string{ `", "host":"localhost", "severity":"1", "facility":"authpriv", "tag":"sudo[803493]:", "source":"sudo", "message":" jarda : 1 incorrect password attempt ; TTY=pts\/1 ; PWD=\/home\/jarda\/go\/src\/github.com\/vyzigold\/sg-core\/plugins\/application\/loki ; USER=root ; COMMAND=\/usr\/bin\/ls", "file":"", "cloud": "cloud1", "region": ""}`, } -//DummyLogs plugin struct +// DummyLogs plugin struct type DummyLogs struct { logger *logging.Logger } -//Run implements type Transport +// Run implements type Transport func (dl *DummyLogs) Run(ctx context.Context, wrFn transport.WriteFn, done chan bool) { for { @@ -54,17 +54,17 @@ func (dl *DummyLogs) Run(ctx context.Context, wrFn transport.WriteFn, done chan done: } -//Listen ... +// Listen ... func (dl *DummyLogs) Listen(e data.Event) { } -//Config load configurations +// Config load configurations func (dl *DummyLogs) Config(c []byte) error { return nil } -//New create new socket transport +// New create new socket transport func New(l *logging.Logger) transport.Transport { return &DummyLogs{ logger: l, diff --git a/plugins/transport/dummy-metrics/main.go b/plugins/transport/dummy-metrics/main.go index 9360582a..b4d766b1 100644 --- a/plugins/transport/dummy-metrics/main.go +++ b/plugins/transport/dummy-metrics/main.go @@ -83,12 +83,12 @@ type configT struct { Interval int } -//DummyMetrics basic struct +// DummyMetrics basic struct type DummyMetrics struct { c configT } -//Run implements type Transport +// Run implements type Transport func (dm *DummyMetrics) Run(ctx context.Context, w transport.WriteFn, done chan bool) { for { @@ -109,12 +109,12 @@ func (dm *DummyMetrics) Run(ctx context.Context, w transport.WriteFn, done chan done: } -//Listen ... +// Listen ... func (dm *DummyMetrics) Listen(e data.Event) { } -//Config load configurations +// Config load configurations func (dm *DummyMetrics) Config(c []byte) error { dm.c = configT{ Ceilometer: true, @@ -128,7 +128,7 @@ func (dm *DummyMetrics) Config(c []byte) error { return nil } -//New create new socket transport +// New create new socket transport func New(l *logging.Logger) transport.Transport { return &DummyMetrics{} } diff --git a/plugins/transport/socket/main.go b/plugins/transport/socket/main.go index 9e38f253..c8621549 100644 --- a/plugins/transport/socket/main.go +++ b/plugins/transport/socket/main.go @@ -54,13 +54,13 @@ func (lw *logWrapper) Warnf(format string, a ...interface{}) { lw.l.Warn(fmt.Sprintf(format, a...)) } -//Socket basic struct +// Socket basic struct type Socket struct { conf configT logger *logWrapper } -//Run implements type Transport +// Run implements type Transport func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) { msgBuffer := make([]byte, maxBufferSize) @@ -81,7 +81,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) { go func() { for { n, err := pc.Read(msgBuffer) - //fmt.Printf("received message: %s\n", string(msgBuffer)) + // fmt.Printf("received message: %s\n", string(msgBuffer)) if err != nil || n < 1 { if err != nil { @@ -110,12 +110,12 @@ Done: s.logger.Infof("exited") } -//Listen ... +// Listen ... func (s *Socket) Listen(e data.Event) { fmt.Printf("Received event: %v\n", e) } -//Config load configurations +// Config load configurations func (s *Socket) Config(c []byte) error { s.conf = configT{} err := config.ParseConfig(bytes.NewReader(c), &s.conf) @@ -125,7 +125,7 @@ func (s *Socket) Config(c []byte) error { return nil } -//New create new socket transport +// New create new socket transport func New(l *logging.Logger) transport.Transport { return &Socket{ logger: &logWrapper{