Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-1140] Universal log processor + modified log verifier #947

Merged
merged 3 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions logstream/logprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package logstream

import (
"encoding/gob"
"fmt"
"io"
"os"
"regexp"
"strings"
"syscall"

"github.com/pkg/errors"
)

type LogProcessor[ReturnType any] struct {
logStream *LogStream
}

// NewLogProcessor creates a new LogProcessor instance
func NewLogProcessor[ReturnType any](logStream *LogStream) *LogProcessor[ReturnType] {
return &LogProcessor[ReturnType]{
logStream: logStream,
}
}

type LogProcessorFn[ReturnType any] func(content LogContent, returnValue *ReturnType) error

// ProcessContainerLogs reads the logs of a container and processes them with the provided function
func (l *LogProcessor[ReturnType]) ProcessContainerLogs(containerName string, processFn func(content LogContent, returnValue *ReturnType) error) (*ReturnType, error) {
containerName = strings.Replace(containerName, "/", "", 1)
var consumer *ContainerLogConsumer
for _, c := range l.logStream.consumers {
if c.name == containerName {
consumer = c
break
}
}

if consumer == nil {
return new(ReturnType), fmt.Errorf("no consumer found for container %s", containerName)
}

// Duplicate the file descriptor for independent reading, so we don't mess up writing the file by moving the cursor
fd, err := syscall.Dup(int(consumer.tempFile.Fd()))
if err != nil {
return new(ReturnType), err
}
readFile := os.NewFile(uintptr(fd), "name_doesnt_matter.txt")
_, err = readFile.Seek(0, 0)
if err != nil {
return new(ReturnType), err
}

decoder := gob.NewDecoder(readFile)
var returnValue ReturnType

for {
var log LogContent
decodeErr := decoder.Decode(&log)
if decodeErr == nil {
processErr := processFn(log, &returnValue)
if processErr != nil {
l.logStream.log.Error().
Err(processErr).
Str("Container", consumer.name).
Msg("Failed to process log")
return new(ReturnType), processErr
}
} else if errors.Is(decodeErr, io.EOF) {
l.logStream.log.Debug().
Str("Container", consumer.name).
Str("Processing result", fmt.Sprint(returnValue)).
Msg("Finished scanning logs")
break
} else {
l.logStream.log.Error().
Err(decodeErr).
Str("Container", consumer.name).
Msg("Failed to decode log")
return new(ReturnType), decodeErr
}
}

return &returnValue, nil
}

// GetRegexMatchingProcessor creates a LogProcessor that counts the number of logs matching a regex pattern. Function returns
// the LogProcessor, the processing function, and an error if the regex pattern is invalid.
func GetRegexMatchingProcessor(logStream *LogStream, pattern string) (*LogProcessor[int], LogProcessorFn[int], error) {
re, err := regexp.Compile(pattern)
if err != nil {
return nil, nil, err
}

logProcessor := NewLogProcessor[int](logStream)

processFn := func(logContent LogContent, returnValue *int) error {
if re.MatchString(string(logContent.Content)) {
newVal := *returnValue + 1
*returnValue = newVal
}
return nil
}

return logProcessor, processFn, nil
}
11 changes: 11 additions & 0 deletions logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,17 @@ func (m *LogStream) SaveLogLocationInTestSummary() {
})
}

// SaveLogTargetsLocations saves all log targets locations to test summary
func (m *LogStream) GetLogLocation() string {
var logLocation string
m.SaveLogTargetsLocations(func(testName string, name string, location interface{}) error {
logLocation = location.(string)
return nil
})

return logLocation
}

// SaveLogTargetsLocations saves all log targets given writer
func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) {
for _, handler := range m.logTargetHandlers {
Expand Down
76 changes: 59 additions & 17 deletions testreporters/reporter_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func WriteTeardownLogs(
log.Warn().Err(err).Msg("Error trying to collect pod logs")
return err
}
logFiles, err := findAllLogFilesToScan(logsPath)
logFiles, err := FindAllLogFilesToScan(logsPath, "node.log")
if err != nil {
log.Warn().Err(err).Msg("Error looking for pod logs")
return err
Expand All @@ -61,7 +61,7 @@ func WriteTeardownLogs(
for _, f := range logFiles {
file := f
verifyLogsGroup.Go(func() error {
return verifyLogFile(file, failingLogLevel)
return VerifyLogFile(file, failingLogLevel, 1)
})
}
assert.NoError(t, verifyLogsGroup.Wait(), "Found a concerning log")
Expand Down Expand Up @@ -91,8 +91,8 @@ func SendReport(t *testing.T, namespace string, logsPath string, optionalTestRep
return nil
}

// findAllLogFilesToScan walks through log files pulled from all pods, and gets all chainlink node logs
func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err error) {
// FindAllLogFilesToScan walks through log files pulled from all pods, and gets all chainlink node logs
func FindAllLogFilesToScan(directoryPath string, partialFilename string) (logFilesToScan []*os.File, err error) {
logFilePaths := []string{}
err = filepath.Walk(directoryPath, func(path string, info fs.FileInfo, err error) error {
if err != nil {
Expand All @@ -105,7 +105,7 @@ func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err
})

for _, filePath := range logFilePaths {
if strings.Contains(filePath, "node.log") {
if strings.Contains(filePath, partialFilename) {
logFileToScan, err := os.Open(filePath)
if err != nil {
return nil, err
Expand All @@ -116,23 +116,45 @@ func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err
return logFilesToScan, err
}

// allowedLogMessage is a log message that might be thrown by a Chainlink node during a test, but is not a concern
type allowedLogMessage struct {
message string
reason string
level zapcore.Level
type LogAllowed = bool

const (
LogAllowed_Yes LogAllowed = true
LogAllowed_No LogAllowed = false
)

// AllowedLogMessage is a log message that might be thrown by a Chainlink node during a test, but is not a concern
type AllowedLogMessage struct {
message string
reason string
level zapcore.Level
logWhenFound LogAllowed
}

var allowedLogMessages = []allowedLogMessage{
// NewAllowedLogMessage creates a new AllowedLogMessage. If logWhenFound is true, the log message will be printed to the
// console when found in the log file with Warn level (this can get noisy).
func NewAllowedLogMessage(message string, reason string, level zapcore.Level, logWhenFound LogAllowed) AllowedLogMessage {
return AllowedLogMessage{
message: message,
reason: reason,
level: level,
logWhenFound: logWhenFound,
}
}

var defaultAllowedLogMessages = []AllowedLogMessage{
{
message: "No EVM primary nodes available: 0/1 nodes are alive",
reason: "Sometimes geth gets unlucky in the start up process and the Chainlink node starts before geth is ready",
level: zapcore.DPanicLevel,
},
}

// verifyLogFile verifies that a log file
func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error {
// VerifyLogFile verifies that a log file does not contain any logs at a level higher than the failingLogLevel. If it does,
// it will return an error. It also allows for a list of AllowedLogMessages to be passed in, which will be ignored if found
// in the log file. The failureThreshold is the number of logs at the failingLogLevel or higher that can be found before
// the function returns an error.
func VerifyLogFile(file *os.File, failingLogLevel zapcore.Level, failureThreshold uint, allowedMessages ...AllowedLogMessage) error {
// nolint
defer file.Close()

Expand All @@ -142,6 +164,11 @@ func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error {
)
scanner := bufio.NewScanner(file)
scanner.Split(bufio.ScanLines)

allAllowedMessages := append(defaultAllowedLogMessages, allowedMessages...)

var logsFound uint

for scanner.Scan() {
jsonLogLine := scanner.Text()
if !strings.HasPrefix(jsonLogLine, "{") { // don't bother with non-json lines
Expand Down Expand Up @@ -170,21 +197,36 @@ func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error {
}
}

if zapLevel > failingLogLevel {
if zapLevel >= failingLogLevel {
logErr := fmt.Errorf("found log at level '%s', failing any log level higher than %s: %s", logLevel, zapLevel.String(), jsonLogLine)
if failureThreshold > 1 {
logErr = fmt.Errorf("found too many logs at level '%s' or above; failure threshold of %d reached; last error found: %s", logLevel, failureThreshold, jsonLogLine)
}
logMessage, hasMessage := jsonMapping["msg"]
if !hasMessage {
return logErr
logsFound++
if logsFound >= failureThreshold {
return logErr
}
continue
}
for _, allowedLog := range allowedLogMessages {
if strings.Contains(logMessage.(string), allowedLog.message) {

for _, allowedLog := range allAllowedMessages {
if strings.Contains(logMessage.(string), allowedLog.message) && allowedLog.logWhenFound {
log.Warn().
Str("Reason", allowedLog.reason).
Str("Level", allowedLog.level.CapitalString()).
Str("Msg", logMessage.(string)).
Msg("Found allowed log message, ignoring")

continue
}
}

logsFound++
if logsFound >= failureThreshold {
return logErr
}
}
}
return nil
Expand Down
Loading