Skip to content

Commit

Permalink
Add logs handler and Loki application plugins (#28)
Browse files Browse the repository at this point in the history
Changelog:

* Adds dummy-logs transport plugin.
* Adds a handler for logs.
   - This new handler is able to handle all kinds of logs
     depending on the content of the configuration.
* Adds loki application
* Adds Message field to Event bus
  • Loading branch information
vyzigold authored Mar 8, 2021
1 parent aca5970 commit 897df52
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 1 deletion.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ go 1.14
require (
collectd.org v0.5.0
github.com/elastic/go-elasticsearch/v7 v7.5.1-0.20201228183019-1cbb255902f5
github.com/go-ini/ini v1.62.0 // indirect
github.com/go-openapi/errors v0.19.9
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/google/uuid v1.0.0
github.com/infrawatch/apputils v0.0.0-20201208221556-d59b03ddde31
github.com/infrawatch/apputils v0.0.0-20210218211331-9f6eb5097d89
github.com/json-iterator/go v1.1.10
github.com/leodido/go-urn v1.2.0 // indirect
github.com/pkg/errors v0.9.1
Expand Down
1 change: 1 addition & 0 deletions pkg/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Event struct {
Severity EventSeverity
Labels map[string]interface{}
Annotations map[string]interface{}
Message string
}

//---------------------------------- metrics ----------------------------------
Expand Down
90 changes: 90 additions & 0 deletions plugins/application/loki/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"bytes"
"context"
"time"

"github.com/infrawatch/apputils/connector"
"github.com/infrawatch/apputils/logging"
"github.com/infrawatch/sg-core/pkg/application"
"github.com/infrawatch/sg-core/pkg/config"
"github.com/infrawatch/sg-core/pkg/data"
"github.com/pkg/errors"

"github.com/infrawatch/sg-core/plugins/application/loki/pkg/lib"
)

type LokiConfig struct {
Connection string `validate:"required"`
BatchSize int64
MaxWaitTime time.Duration
}

//Loki plugin for forwarding logs to loki
type Loki struct {
config *LokiConfig
client *connector.LokiConnector
logger *logging.Logger
logChannel chan interface{}
}

//New constructor
func New(logger *logging.Logger) application.Application {
return &Loki {
logger: logger,
logChannel: make(chan interface{}, 100),
}
}

// ReceiveEvent ...
func (l *Loki) ReceiveEvent(log data.Event) {
switch log.Type {
case data.LOG:
lokiLog, err := lib.CreateLokiLog(log)
if err != nil {
l.logger.Metadata(logging.Metadata{"plugin": "loki", "log": log, "error": err})
l.logger.Error("failed to parse the data in event bus - disregarding")
return
}
l.logChannel <- lokiLog
default:
l.logger.Metadata(logging.Metadata{"plugin": "loki", "event": log})
l.logger.Error("received event data (instead of log data) in event bus - disregarding")
}
}

//Run run loki application plugin
func (l *Loki) Run(ctx context.Context, done chan bool) {
l.logger.Metadata(logging.Metadata{"plugin": "loki", "url": l.config.Connection})
l.logger.Debug("storing logs to loki.")
l.client.Start(nil, l.logChannel)

<-ctx.Done()
l.client.Disconnect()

l.logger.Metadata(logging.Metadata{"plugin": "loki"})
l.logger.Info("exited")
}

//Config implements application.Application
func (l *Loki) Config(c []byte) error {
l.config = &LokiConfig {
Connection: "",
BatchSize: 20,
MaxWaitTime: 100,
}
err := config.ParseConfig(bytes.NewReader(c), l.config)
if err != nil {
return err
}

l.client, err = connector.CreateLokiConnector(l.logger,
l.config.Connection,
l.config.MaxWaitTime,
l.config.BatchSize)
if err != nil {
return errors.Wrap(err, "failed to connect to Loki host")
}
return nil
}
41 changes: 41 additions & 0 deletions plugins/application/loki/pkg/lib/loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package lib

import (
"time"
"fmt"

"github.com/infrawatch/apputils/connector"
"github.com/infrawatch/sg-core/pkg/data"
)

type LokiConfig struct {
Connection string `validate:"required"`
BatchSize int
MaxWaitTime int
}

// Creates labels used by Loki.
func createLabels(rawLabels map[string]interface{}) (map[string]string, error) {
result := make(map[string]string)
assimilateMap(rawLabels, &result)
if len(result) == 0 {
return nil, fmt.Errorf("unable to create log labels")
}
return result, nil
}

func CreateLokiLog(log data.Event) (connector.LokiLog, error) {
labels, err := createLabels(log.Labels)
if err != nil {
return connector.LokiLog{}, err
}

output := connector.LokiLog {
LogMessage: log.Message,
Timestamp: time.Duration(time.Duration(log.Time) * time.Second),
Labels: labels,
}
return output, nil
}


39 changes: 39 additions & 0 deletions plugins/application/loki/pkg/lib/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package lib

import (
"fmt"
"log"
"strings"
)

//assimilateMap recursively saves content of the given map to destination map of strings
func assimilateMap(theMap map[string]interface{}, destination *map[string]string) {
defer func() { //recover from any panic
if r := recover(); r != nil {
log.Printf("Panic:recovered in assimilateMap %v\n", r)
}
}()
for key, val := range theMap {
switch value := val.(type) {
case map[string]interface{}:
// go one level deeper in the map
assimilateMap(value, destination)
case []interface{}:
// transform slice value to comma separated list and assimilate it
aList := make([]string, 0, len(value))
for _, item := range value {
if itm, ok := item.(string); ok {
aList = append(aList, itm)
}
}
(*destination)[key] = strings.Join(aList, ",")
case float64, float32:
(*destination)[key] = fmt.Sprintf("%f", value)
case int:
(*destination)[key] = fmt.Sprintf("%d", value)
default:
// assimilate KV pair
(*destination)[key] = value.(string)
}
}
}
2 changes: 2 additions & 0 deletions plugins/application/print/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type eventOutput struct {
Index string
Type string
Publisher string
Severity data.EventSeverity
Labels map[string]interface{}
Annotations map[string]interface{}
}
Expand Down Expand Up @@ -99,6 +100,7 @@ func (p *Print) Run(ctx context.Context, done chan bool) {
Index: event.Index,
Type: event.Type.String(),
Publisher: event.Publisher,
Severity: event.Severity,
Labels: event.Labels,
Annotations: event.Annotations,
}
Expand Down
182 changes: 182 additions & 0 deletions plugins/handler/logs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"context"
"fmt"
"time"
"bytes"
"strconv"
"encoding/json"

"github.com/infrawatch/sg-core/pkg/bus"
"github.com/infrawatch/sg-core/pkg/config"
"github.com/infrawatch/sg-core/pkg/data"
"github.com/infrawatch/sg-core/pkg/handler"
"github.com/infrawatch/sg-core/plugins/handler/logs/pkg/lib"
)

type SyslogSeverity int

const (
EMERGENCY SyslogSeverity = iota
ALERT
CRITICAL
ERROR
WARNING
NOTICE
INFORMATIONAL
DEBUG
UNKNOWN
)

func (rs SyslogSeverity) toEventSeverity() data.EventSeverity {
return []data.EventSeverity{data.CRITICAL,
data.CRITICAL,
data.CRITICAL,
data.CRITICAL,
data.WARNING,
data.INFO,
data.INFO,
data.INFO,
data.UNKNOWN,
}[rs]
}

type logConfig struct {
MessageField string `validate:"required"`
TimestampField string `validate:"required"`
HostnameField string `validate:"required"`
SeverityField string
}

type logHandler struct {
totalLogsReceived uint64
config logConfig
}

func (l *logHandler) parse(log []byte) (data.Event, error) {
parsedLog := data.Event{}
logFields := make(map[string]interface{})
err := json.Unmarshal(log, &logFields)
if err != nil {
return parsedLog, err
}

msg, ok := logFields[l.config.MessageField].(string)
if !ok {
return parsedLog, fmt.Errorf("unable to find a log message under field called: %s", l.config.MessageField)
}

severity := UNKNOWN
severitystring, ok := logFields[l.config.SeverityField].(string)
if ok {
s, err := strconv.Atoi(severitystring)
if err == nil {
severity = SyslogSeverity(s)
}
}
eventSeverity := severity.toEventSeverity()

hostname, ok := logFields[l.config.HostnameField].(string)
if !ok {
return parsedLog, fmt.Errorf("unable to find the hostname under field called: %s", l.config.HostnameField)
}

timestring, ok := logFields[l.config.TimestampField].(string)
if !ok {
return parsedLog, fmt.Errorf("unable to find the timestamp under field called: %s", l.config.TimestampField)
}
t, err := lib.TimeFromFormat(timestring)
if err != nil {
return parsedLog, err
}

timestamp := float64(t.Unix())
year, month, day := t.Date()

index := fmt.Sprintf("logs-%s-%d-%d-%d", hostname, year, month, day)


// remove message and timestamp from labels (leave the rest)
delete(logFields, l.config.MessageField)
delete(logFields, l.config.TimestampField)

parsedLog = data.Event {
Index: index,
Time: timestamp,
Type: data.LOG,
Publisher: hostname,
Severity: eventSeverity,
Labels: logFields,
Message: msg,
}

return parsedLog, nil
}

//Handle implements the data.EventsHandler interface
func (l *logHandler) Handle(msg []byte, reportErrors bool, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) error {
var err error
l.totalLogsReceived++

log, err := l.parse(msg)
if err == nil {
epf(
log,
)
} else {
if reportErrors {
epf(data.Event{
Index: l.Identify(),
Type: data.ERROR,
Severity: data.CRITICAL,
Time: 0.0,
Labels: map[string]interface{}{
"error": err.Error(),
"context": string(msg),
"message": "failed to parse log - disregarding",
},
Annotations: map[string]interface{}{
"description": "internal smartgateway log handler error",
},
})
}
}

return err
}

//Run send internal metrics to bus
func (l *logHandler) Run(ctx context.Context, mpf bus.MetricPublishFunc, epf bus.EventPublishFunc) {
for {
select {
case <-ctx.Done():
goto done
case <-time.After(time.Second):
mpf(
"sg_total_logs_received",
0,
data.COUNTER,
0,
float64(l.totalLogsReceived),
[]string{"source"},
[]string{"SG"},
)
}
}
done:
}

func (l *logHandler) Identify() string {
return "log"
}

//New create new logHandler object
func New() handler.Handler {
return &logHandler{}
}

func (l *logHandler) Config(c []byte) error {
l.config = logConfig{}
return config.ParseConfig(bytes.NewReader(c), &l.config)
}
Loading

0 comments on commit 897df52

Please sign in to comment.