diff --git a/logstream/logprocessor.go b/logstream/logprocessor.go new file mode 100644 index 000000000..f2819d8c2 --- /dev/null +++ b/logstream/logprocessor.go @@ -0,0 +1,106 @@ +package logstream + +import ( + "encoding/gob" + "fmt" + "io" + "os" + "regexp" + "strings" + "syscall" + + "github.com/pkg/errors" +) + +type LogProcessor[ReturnType any] struct { + logStream *LogStream +} + +// NewLogProcessor creates a new LogProcessor instance +func NewLogProcessor[ReturnType any](logStream *LogStream) *LogProcessor[ReturnType] { + return &LogProcessor[ReturnType]{ + logStream: logStream, + } +} + +type LogProcessorFn[ReturnType any] func(content LogContent, returnValue *ReturnType) error + +// ProcessContainerLogs reads the logs of a container and processes them with the provided function +func (l *LogProcessor[ReturnType]) ProcessContainerLogs(containerName string, processFn func(content LogContent, returnValue *ReturnType) error) (*ReturnType, error) { + containerName = strings.Replace(containerName, "/", "", 1) + var consumer *ContainerLogConsumer + for _, c := range l.logStream.consumers { + if c.name == containerName { + consumer = c + break + } + } + + if consumer == nil { + return new(ReturnType), fmt.Errorf("no consumer found for container %s", containerName) + } + + // Duplicate the file descriptor for independent reading, so we don't mess up writing the file by moving the cursor + fd, err := syscall.Dup(int(consumer.tempFile.Fd())) + if err != nil { + return new(ReturnType), err + } + readFile := os.NewFile(uintptr(fd), "name_doesnt_matter.txt") + _, err = readFile.Seek(0, 0) + if err != nil { + return new(ReturnType), err + } + + decoder := gob.NewDecoder(readFile) + var returnValue ReturnType + + for { + var log LogContent + decodeErr := decoder.Decode(&log) + if decodeErr == nil { + processErr := processFn(log, &returnValue) + if processErr != nil { + l.logStream.log.Error(). + Err(processErr). + Str("Container", consumer.name). + Msg("Failed to process log") + return new(ReturnType), processErr + } + } else if errors.Is(decodeErr, io.EOF) { + l.logStream.log.Debug(). + Str("Container", consumer.name). + Str("Processing result", fmt.Sprint(returnValue)). + Msg("Finished scanning logs") + break + } else { + l.logStream.log.Error(). + Err(decodeErr). + Str("Container", consumer.name). + Msg("Failed to decode log") + return new(ReturnType), decodeErr + } + } + + return &returnValue, nil +} + +// GetRegexMatchingProcessor creates a LogProcessor that counts the number of logs matching a regex pattern. Function returns +// the LogProcessor, the processing function, and an error if the regex pattern is invalid. +func GetRegexMatchingProcessor(logStream *LogStream, pattern string) (*LogProcessor[int], LogProcessorFn[int], error) { + re, err := regexp.Compile(pattern) + if err != nil { + return nil, nil, err + } + + logProcessor := NewLogProcessor[int](logStream) + + processFn := func(logContent LogContent, returnValue *int) error { + if re.MatchString(string(logContent.Content)) { + newVal := *returnValue + 1 + *returnValue = newVal + } + return nil + } + + return logProcessor, processFn, nil +} diff --git a/logstream/logstream.go b/logstream/logstream.go index 72406cdf6..8579a0313 100644 --- a/logstream/logstream.go +++ b/logstream/logstream.go @@ -381,6 +381,17 @@ func (m *LogStream) SaveLogLocationInTestSummary() { }) } +// SaveLogTargetsLocations saves all log targets locations to test summary +func (m *LogStream) GetLogLocation() string { + var logLocation string + m.SaveLogTargetsLocations(func(testName string, name string, location interface{}) error { + logLocation = location.(string) + return nil + }) + + return logLocation +} + // SaveLogTargetsLocations saves all log targets given writer func (m *LogStream) SaveLogTargetsLocations(writer LogWriter) { for _, handler := range m.logTargetHandlers { diff --git a/testreporters/reporter_model.go b/testreporters/reporter_model.go index 419d5f251..2e6347bd2 100644 --- a/testreporters/reporter_model.go +++ b/testreporters/reporter_model.go @@ -52,7 +52,7 @@ func WriteTeardownLogs( log.Warn().Err(err).Msg("Error trying to collect pod logs") return err } - logFiles, err := findAllLogFilesToScan(logsPath) + logFiles, err := FindAllLogFilesToScan(logsPath, "node.log") if err != nil { log.Warn().Err(err).Msg("Error looking for pod logs") return err @@ -61,7 +61,7 @@ func WriteTeardownLogs( for _, f := range logFiles { file := f verifyLogsGroup.Go(func() error { - return verifyLogFile(file, failingLogLevel) + return VerifyLogFile(file, failingLogLevel, 1) }) } assert.NoError(t, verifyLogsGroup.Wait(), "Found a concerning log") @@ -91,8 +91,8 @@ func SendReport(t *testing.T, namespace string, logsPath string, optionalTestRep return nil } -// findAllLogFilesToScan walks through log files pulled from all pods, and gets all chainlink node logs -func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err error) { +// FindAllLogFilesToScan walks through log files pulled from all pods, and gets all chainlink node logs +func FindAllLogFilesToScan(directoryPath string, partialFilename string) (logFilesToScan []*os.File, err error) { logFilePaths := []string{} err = filepath.Walk(directoryPath, func(path string, info fs.FileInfo, err error) error { if err != nil { @@ -105,7 +105,7 @@ func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err }) for _, filePath := range logFilePaths { - if strings.Contains(filePath, "node.log") { + if strings.Contains(filePath, partialFilename) { logFileToScan, err := os.Open(filePath) if err != nil { return nil, err @@ -116,14 +116,33 @@ func findAllLogFilesToScan(directoryPath string) (logFilesToScan []*os.File, err return logFilesToScan, err } -// allowedLogMessage is a log message that might be thrown by a Chainlink node during a test, but is not a concern -type allowedLogMessage struct { - message string - reason string - level zapcore.Level +type LogAllowed = bool + +const ( + LogAllowed_Yes LogAllowed = true + LogAllowed_No LogAllowed = false +) + +// AllowedLogMessage is a log message that might be thrown by a Chainlink node during a test, but is not a concern +type AllowedLogMessage struct { + message string + reason string + level zapcore.Level + logWhenFound LogAllowed } -var allowedLogMessages = []allowedLogMessage{ +// NewAllowedLogMessage creates a new AllowedLogMessage. If logWhenFound is true, the log message will be printed to the +// console when found in the log file with Warn level (this can get noisy). +func NewAllowedLogMessage(message string, reason string, level zapcore.Level, logWhenFound LogAllowed) AllowedLogMessage { + return AllowedLogMessage{ + message: message, + reason: reason, + level: level, + logWhenFound: logWhenFound, + } +} + +var defaultAllowedLogMessages = []AllowedLogMessage{ { message: "No EVM primary nodes available: 0/1 nodes are alive", reason: "Sometimes geth gets unlucky in the start up process and the Chainlink node starts before geth is ready", @@ -131,8 +150,11 @@ var allowedLogMessages = []allowedLogMessage{ }, } -// verifyLogFile verifies that a log file -func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error { +// VerifyLogFile verifies that a log file does not contain any logs at a level higher than the failingLogLevel. If it does, +// it will return an error. It also allows for a list of AllowedLogMessages to be passed in, which will be ignored if found +// in the log file. The failureThreshold is the number of logs at the failingLogLevel or higher that can be found before +// the function returns an error. +func VerifyLogFile(file *os.File, failingLogLevel zapcore.Level, failureThreshold uint, allowedMessages ...AllowedLogMessage) error { // nolint defer file.Close() @@ -142,6 +164,11 @@ func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error { ) scanner := bufio.NewScanner(file) scanner.Split(bufio.ScanLines) + + allAllowedMessages := append(defaultAllowedLogMessages, allowedMessages...) + + var logsFound uint + for scanner.Scan() { jsonLogLine := scanner.Text() if !strings.HasPrefix(jsonLogLine, "{") { // don't bother with non-json lines @@ -170,21 +197,36 @@ func verifyLogFile(file *os.File, failingLogLevel zapcore.Level) error { } } - if zapLevel > failingLogLevel { + if zapLevel >= failingLogLevel { logErr := fmt.Errorf("found log at level '%s', failing any log level higher than %s: %s", logLevel, zapLevel.String(), jsonLogLine) + if failureThreshold > 1 { + logErr = fmt.Errorf("found too many logs at level '%s' or above; failure threshold of %d reached; last error found: %s", logLevel, failureThreshold, jsonLogLine) + } logMessage, hasMessage := jsonMapping["msg"] if !hasMessage { - return logErr + logsFound++ + if logsFound >= failureThreshold { + return logErr + } + continue } - for _, allowedLog := range allowedLogMessages { - if strings.Contains(logMessage.(string), allowedLog.message) { + + for _, allowedLog := range allAllowedMessages { + if strings.Contains(logMessage.(string), allowedLog.message) && allowedLog.logWhenFound { log.Warn(). Str("Reason", allowedLog.reason). Str("Level", allowedLog.level.CapitalString()). Str("Msg", logMessage.(string)). Msg("Found allowed log message, ignoring") + + continue } } + + logsFound++ + if logsFound >= failureThreshold { + return logErr + } } } return nil