From 2d7b09db959effb2831fb3a55a3fe52d8f71b01f Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 13:11:00 +1000 Subject: [PATCH 1/6] Update workflows to remove old references --- .github/workflows/build-and-test.yml | 15 +-------------- errors.go | 1 + 2 files changed, 2 insertions(+), 14 deletions(-) create mode 100644 errors.go diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 62069e6..efe55e9 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -5,29 +5,16 @@ on: jobs: build: - name: Build and Test ghe-visualizer + name: Build and Test logwhale runs-on: ubuntu-latest - env: - GOPROXY: https://goproxy.githubapp.com/mod,https://proxy.golang.org/,direct - GOPRIVATE: '' - GONOPROXY: '' - GONOSUMDB: github.com/github/* steps: - name: Checkout repository uses: actions/checkout@v4 - # From https://github.com/github/goproxy/blob/main/doc/user.md#set-up: - - name: Configure Go private module access - run: | - echo "machine goproxy.githubapp.com login nobody password ${{ secrets.GOPROXY_TOKEN }}" >> $HOME/.netrc - - uses: actions/setup-go@v4.1.0 with: go-version-file: 'go.mod' -# - name: Build packages -# run: make build - - name: Run tests run: make test \ No newline at end of file diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..63d1b00 --- /dev/null +++ b/errors.go @@ -0,0 +1 @@ +package logwhale From 54b6e4ddc424bcb1403bf7a648264c8c79307867 Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 20:48:40 +1000 Subject: [PATCH 2/6] Added new Error type for interrogation of state --- errors.go | 79 ++++++++++++++++++++++++++++++++++++++++ internal/version/VERSION | 2 +- logfile.go | 24 ++++++------ logmanager.go | 45 +++++++++-------------- logmanager_test.go | 3 +- options.go | 24 ++++++++++++ 6 files changed, 136 insertions(+), 41 deletions(-) create mode 100644 options.go diff --git a/errors.go b/errors.go index 63d1b00..6125c2e 100644 --- a/errors.go +++ b/errors.go @@ -1 +1,80 @@ package logwhale + +import ( + "fmt" + "strings" +) + +// ErrorState defines a set of concrete error states that can be encountered by the LogManager. +type ErrorState int + +// String() returns a string representation of the ErrorState. +func (es ErrorState) String() string { + switch es { + case ErrorStateUnknown: + return "Unknown" + case ErrorStateEndOfStream: + return "End of stream" + case ErrorStateFileRemoved: + return "File removed" + case ErrorStateCancelled: + return "Operation cancelled" + case ErrorStateFSWatcher: + return "FS watch process error" + case ErrorStateFilePath: + return "File path error" + case ErrorStateFileIO: + return "File IO error" + case ErrorStateInternal: + return "Internal exception" + default: + return "Unknown" + } +} + +const ( + ErrorStateUnknown ErrorState = iota + ErrorStateCancelled + ErrorStateFSWatcher + ErrorStateFilePath + ErrorStateFileRemoved + ErrorStateFileIO + ErrorStateEndOfStream + ErrorStateInternal +) + +type LogWhaleError struct { + State ErrorState + Msg string + Cause error +} + +func NewLogWhaleError(state ErrorState, msg string, cause error) *LogWhaleError { + return &LogWhaleError{ + State: state, + Msg: msg, + Cause: cause, + } +} + +// Error satisfies the error interface. +func (e *LogWhaleError) Error() string { + es := strings.Builder{} + es.WriteString(fmt.Sprintf("state: %s", e.State)) + + if len(e.Msg) != 0 { + es.WriteString(fmt.Sprintf(" msg: %s", e.Msg)) + } + + if e.Cause != nil { + es.WriteString(fmt.Sprintf(" cause: %s", e.Cause)) + + } + return es.String() +} + +// Unwrap satisfies the Wrapper interface. It allows the +// LogWhaleError to work with and errors.As. +func (e *LogWhaleError) Unwrap() error { + return e.Cause +} diff --git a/internal/version/VERSION b/internal/version/VERSION index 20bded3..141729a 100644 --- a/internal/version/VERSION +++ b/internal/version/VERSION @@ -1 +1 @@ -v0.2.0-prerelease +v0.3.0-prerelease diff --git a/logfile.go b/logfile.go index 7bd2357..0228bed 100644 --- a/logfile.go +++ b/logfile.go @@ -42,17 +42,17 @@ type logFile struct { } // newLogFile configures the logFile struct -func newLogFile(fp string) (*logFile, error) { +func newLogFile(fp string, bs int) (*logFile, error) { fi, _ := os.Stat(fp) if fi != nil && fi.IsDir() { - return nil, fmt.Errorf("provided filepath (%s) is a directory, must be a file", fp) + return nil, NewLogWhaleError(ErrorStateFilePath, fmt.Sprintf("filepath (%s) is a directory, must be a file", fp), nil) } lf := logFile{ filePath: fp, basepath: path.Dir(fp), created: true, - dataChan: make(chan []byte, logBufferLen), + dataChan: make(chan []byte, bs), errorChan: make(chan error, 1), lastWriteEvent: make(chan time.Time, 1), stateEvents: make(chan stateEventOp), @@ -87,7 +87,7 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, if err != nil { if !os.IsNotExist(err) { stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("unhandlable error encountered with path (%s): %w", lf.filePath, err) + lf.errorChan <- NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unhandlable error encountered with path (%s)", lf.filePath), err) return } lf.created = false @@ -95,7 +95,7 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, if fi != nil && fi.IsDir() { stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("provided filepath (%s) is a directory, must be a file", lf.filePath) + lf.errorChan <- NewLogWhaleError(ErrorStateFilePath, fmt.Sprintf("filepath (%s) is a directory, must be a file", lf.filePath), nil) return } @@ -110,7 +110,7 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, break } stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("unexpected state event: %s", so) + lf.errorChan <- NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unexpected state event while waiting for file creation: %s", so), nil) return } } @@ -119,7 +119,7 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, of, err := os.Open(lf.filePath) if err != nil { stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("unable to open file: %w", err) + lf.errorChan <- NewLogWhaleError(ErrorStateFileIO, fmt.Sprintf("unable to open file: %s", lf.filePath), err) return } defer of.Close() @@ -160,7 +160,7 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, lf.lastRead = time.Now() } else { stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("error reading file: %w", readErr) + lf.errorChan <- NewLogWhaleError(ErrorStateFileIO, fmt.Sprintf("error reading file: %s", lf.filePath), readErr) return } @@ -175,18 +175,18 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, continue case err := <-ewCancelChan: stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("cancellation requested: %w", err) + lf.errorChan <- NewLogWhaleError(ErrorStateCancelled, fmt.Sprintf("operation cancelled"), err) return case so := <-lf.stateEvents: if so == stateEventRemoved { - lf.errorChan <- fmt.Errorf("file removed, waiting for creation") + lf.errorChan <- NewLogWhaleError(ErrorStateFileRemoved, fmt.Sprintf("file removed, waiting for creation: %s", lf.filePath), nil) lf.created = false of.Close() continue creationLoop } stopChan <- lf.filePath - lf.errorChan <- fmt.Errorf("unexpected state event: %s", so) - return + lf.errorChan <- NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unexpected state event waiting for writing to resume: %s", so), nil) + continue } } } diff --git a/logmanager.go b/logmanager.go index 3517694..a346f60 100644 --- a/logmanager.go +++ b/logmanager.go @@ -10,9 +10,7 @@ import ( "github.com/fsnotify/fsnotify" ) -var ( - logBufferLen = 1024 // default buffer length for the log reader -) +var () // Option is a function that can be passed to NewLogManager to configure it. type Option func(*LogManager) error @@ -25,6 +23,8 @@ type logWatcher struct { // LogManager is used to watch one of more log files for changes and consume the data line by line to be passed // as a byte slice to a consumer channel. type LogManager struct { + bufferSize int // default buffer length for the log reader + closed bool // closed is a flag that indicates if the LogManager has been closed ctx context.Context ctxCancel context.CancelFunc @@ -45,16 +45,18 @@ type LogManager struct { func NewLogManager(ctx context.Context, options ...Option) (*LogManager, error) { watcher, err := fsnotify.NewWatcher() if err != nil { - return nil, fmt.Errorf("unable to create file watcher: %w", err) + return nil, NewLogWhaleError(ErrorStateInternal, fmt.Sprint("unable to create file watcher"), err) } // Context for LogManager cancellation if ctx == nil { - return nil, fmt.Errorf("LogManager requires a valid context") + return nil, NewLogWhaleError(ErrorStateInternal, fmt.Sprint("LogManager requires a valid context"), nil) } ctx, cancel := context.WithCancel(ctx) lm := &LogManager{ + bufferSize: 1024, + ctx: ctx, ctxCancel: cancel, fileWatcher: watcher, @@ -68,7 +70,7 @@ func NewLogManager(ctx context.Context, options ...Option) (*LogManager, error) } if err := lm.withOptions(options...); err != nil { - return nil, fmt.Errorf("unable to process options: %w", err) + return nil, NewLogWhaleError(ErrorStateInternal, fmt.Sprint("unable to process options"), err) } // Start Processing Events @@ -83,20 +85,20 @@ func NewLogManager(ctx context.Context, options ...Option) (*LogManager, error) // AddLogFile adds a log file to the LogManager and starts its data processor. func (lm *LogManager) AddLogFile(lp string) (<-chan []byte, <-chan error, error) { if lm.closed { - return nil, nil, fmt.Errorf("log manager closed") + return nil, nil, NewLogWhaleError(ErrorStateInternal, "log manager closed", nil) } // Clean up the file path lfp := path.Clean(lp) - lf, err := newLogFile(lfp) + lf, err := newLogFile(lfp, lm.bufferSize) if err != nil { - return nil, nil, fmt.Errorf("unable to configure log file: %w", err) + return nil, nil, NewLogWhaleError(ErrorStateInternal, "unable to configure log file", err) } lm.lwMutex.Lock() defer lm.lwMutex.Unlock() if _, exists := lm.logsWatched[lfp]; exists { - return nil, nil, fmt.Errorf("log file already being watched: %s", lfp) + return nil, nil, NewLogWhaleError(ErrorStateFSWatcher, fmt.Sprintf("filepath watch already exists: %s", lfp), nil) } // Create a context for the log file @@ -111,7 +113,7 @@ func (lm *LogManager) AddLogFile(lp string) (<-chan []byte, <-chan error, error) err := lm.fileWatcher.Add(lf.basepath) if err != nil { delete(lm.logsWatched, lfp) // Remove the log file from the logsWatched map - return nil, nil, fmt.Errorf("unable to watch base path: %w", err) + return nil, nil, NewLogWhaleError(ErrorStateFilePath, fmt.Sprintf("unable to watch base path: %s", lf.basepath), err) } lm.pathsWatched[lf.basepath] = 1 } @@ -125,7 +127,7 @@ func (lm *LogManager) AddLogFile(lp string) (<-chan []byte, <-chan error, error) // RemoveLogFile removes a log file from the LogManager and stops its data processor. func (lm *LogManager) RemoveLogFile(lp string) error { if lm.closed { - return fmt.Errorf("log manager closed") + return NewLogWhaleError(ErrorStateInternal, "log manager closed", nil) } // Clean up the file path @@ -136,7 +138,7 @@ func (lm *LogManager) RemoveLogFile(lp string) error { defer lm.lwMutex.Unlock() lw, exists := lm.logsWatched[lfp] if !exists { - return fmt.Errorf("log file (%s) not watched", lfp) + return NewLogWhaleError(ErrorStateFSWatcher, fmt.Sprintf("file path watch does not exist: %s", lfp), nil) } delete(lm.logsWatched, lfp) lw.cancel() @@ -144,14 +146,14 @@ func (lm *LogManager) RemoveLogFile(lp string) error { lm.pwMutex.Lock() defer lm.pwMutex.Unlock() if _, exists := lm.pathsWatched[bp]; !exists { - return fmt.Errorf("base path (%s) not watched", bp) + return NewLogWhaleError(ErrorStateFSWatcher, fmt.Sprintf("base path not watched: %s", bp), nil) } if lm.pathsWatched[bp] == 1 { delete(lm.pathsWatched, bp) err := lm.fileWatcher.Remove(bp) if err != nil { - return fmt.Errorf("unable to unwatch base path (%s): %w", bp, err) + return NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unable to unwatch base path: %s", bp), err) } } else { lm.pathsWatched[bp]-- @@ -227,7 +229,7 @@ func (lm *LogManager) eventWatcher() { continue } case err := <-lm.fileWatcher.Errors: - lm.evwCancelChan <- fmt.Errorf("file watcher error: %w", err) + lm.evwCancelChan <- err return } } @@ -253,14 +255,3 @@ func (lm *LogManager) Close() error { return nil } - -// WithOptions applies the given options to the LogManager -func (lm *LogManager) withOptions(opts ...Option) error { - for _, opt := range opts { - err := opt(lm) - if err != nil { - return fmt.Errorf("cannot apply Option: %w", err) - } - } - return nil -} diff --git a/logmanager_test.go b/logmanager_test.go index 868263d..a330545 100644 --- a/logmanager_test.go +++ b/logmanager_test.go @@ -2,6 +2,7 @@ package logwhale import ( "context" + "fmt" "os" "testing" "time" @@ -121,7 +122,7 @@ func (s *LogManagerTestSuite) TestRemoveLogFile() { time.Sleep(1 * time.Second) select { case err := <-errorChan: - s.Equal("file removed, waiting for creation", err.Error()) + s.Equal(fmt.Sprintf("state: File removed msg: file removed, waiting for creation: %s", of.Name()), err.Error()) } } diff --git a/options.go b/options.go new file mode 100644 index 0000000..2dfe8b3 --- /dev/null +++ b/options.go @@ -0,0 +1,24 @@ +package logwhale + +import "fmt" + +// WithOptions applies the given options to the LogManager +func (lm *LogManager) withOptions(opts ...Option) error { + for _, opt := range opts { + err := opt(lm) + if err != nil { + return fmt.Errorf("cannot apply Option: %w", err) + } + } + return nil +} + +func WithBufferSize(bs int) Option { + return func(lm *LogManager) error { + if bs <= 0 { + return fmt.Errorf("buffer size must be greater than 0") + } + lm.bufferSize = bs + return nil + } +} From ceb9bf61c7814234ed7e83be4798884742544c00 Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 20:52:53 +1000 Subject: [PATCH 3/6] Add detail comments to AddLogFile --- logmanager.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/logmanager.go b/logmanager.go index a346f60..92e2996 100644 --- a/logmanager.go +++ b/logmanager.go @@ -83,6 +83,14 @@ func NewLogManager(ctx context.Context, options ...Option) (*LogManager, error) } // AddLogFile adds a log file to the LogManager and starts its data processor. +// Two channels are returned, for data and errors respectively and an error if the Add operation fails completely. +// +// The data channel will return a byte slice for each delimited line of the log file. The default delimeter is a newline. +// +// The error channel will return any errors encountered while processing the log file which can be both critical and non-critical. +// When a critical error is encountered, the data processor will stop and the log file will be removed from the LogManager. +// Non-critical errors will be logged and the data processor will continue. +// Both channels will be closed when all operations have completed or critical errors have been encountered. func (lm *LogManager) AddLogFile(lp string) (<-chan []byte, <-chan error, error) { if lm.closed { return nil, nil, NewLogWhaleError(ErrorStateInternal, "log manager closed", nil) From 00e96cf2c51630d5505aa1ca5f17dd835812ed07 Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 20:58:06 +1000 Subject: [PATCH 4/6] Add GetLogFile convenience functions --- logmanager.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/logmanager.go b/logmanager.go index 92e2996..1bf13c0 100644 --- a/logmanager.go +++ b/logmanager.go @@ -170,6 +170,38 @@ func (lm *LogManager) RemoveLogFile(lp string) error { return nil } +// GetLogFile returns the data and error channels for a log file being watched by the LogManager. +func (lm *LogManager) GetLogFile(fp string) (<-chan []byte, <-chan error, error) { + if lm.closed { + return nil, nil, NewLogWhaleError(ErrorStateInternal, "log manager closed", nil) + } + + // Clean up the file path + lfp := path.Clean(fp) + + lm.lwMutex.Lock() + defer lm.lwMutex.Unlock() + lw, exists := lm.logsWatched[lfp] + if !exists { + return nil, nil, NewLogWhaleError(ErrorStateFSWatcher, fmt.Sprintf("file path watch does not exist: %s", lfp), nil) + } + + return lw.logfile.dataChan, lw.logfile.errorChan, nil +} + +// GetLogFiles returns a slice of log file paths being watched by the LogManager. +func (lm *LogManager) GetLogFiles() []string { + lm.lwMutex.RLock() + defer lm.lwMutex.RUnlock() + + logFiles := make([]string, 0, len(lm.logsWatched)) + for k := range lm.logsWatched { + logFiles = append(logFiles, k) + } + + return logFiles +} + func (lm *LogManager) logfileRemover() { go func() { for { From 056de6fd591b720a834c040bf4fbb69c6c92d891 Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 21:11:54 +1000 Subject: [PATCH 5/6] Adjust tests --- logmanager_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/logmanager_test.go b/logmanager_test.go index a330545..1f24bd4 100644 --- a/logmanager_test.go +++ b/logmanager_test.go @@ -108,18 +108,28 @@ func (s *LogManagerTestSuite) TestRemoveLogFile() { defer of.Close() s.NoError(err) - _, errorChan, err := s.lm.AddLogFile(of.Name()) + dataChan, errorChan, err := s.lm.AddLogFile(of.Name()) s.NoError(err) _, err = of.WriteString("test line\n") s.NoError(err) + time.Sleep(100 * time.Millisecond) + select { + case data := <-dataChan: + s.Equal("test line", string(data)) + case err := <-errorChan: + s.Fail("unexpected error: %v", err) + default: + s.Fail("expected data, got none") + } + err = of.Close() s.NoError(err) err = os.Remove(of.Name()) s.NoError(err) - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) select { case err := <-errorChan: s.Equal(fmt.Sprintf("state: File removed msg: file removed, waiting for creation: %s", of.Name()), err.Error()) From d385cd16b02e3bdc9617d34ece9da1781f0a19c7 Mon Sep 17 00:00:00 2001 From: Tom McPhail Date: Mon, 9 Oct 2023 21:22:27 +1000 Subject: [PATCH 6/6] Additional file creation loop logic --- logfile.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/logfile.go b/logfile.go index 0228bed..3dd48f2 100644 --- a/logfile.go +++ b/logfile.go @@ -105,13 +105,18 @@ func (lf *logFile) dataProcessor(ctx context.Context, ewCancelChan <-chan error, case <-ctx.Done(): return case so := <-lf.stateEvents: - if so == stateEventCreated { + switch so { + case stateEventCreated: lf.created = true break + case stateEventRemoved: + lf.created = false + continue creationLoop + default: + stopChan <- lf.filePath + lf.errorChan <- NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unexpected state event while waiting for file creation: %s", so), nil) + return } - stopChan <- lf.filePath - lf.errorChan <- NewLogWhaleError(ErrorStateInternal, fmt.Sprintf("unexpected state event while waiting for file creation: %s", so), nil) - return } }