From c2f492af2ce6deba2c5ac1a3e049af4360fcd0a9 Mon Sep 17 00:00:00 2001 From: Thomas FOUBERT Date: Sun, 30 Jun 2024 22:20:46 -0400 Subject: [PATCH] corrected pr comments + enhanced code and logic --- cmd/cmd.go | 11 +++- cmd/utils.go | 17 ++---- config/config.go | 2 +- internal/pkg/crawl/crawl.go | 10 ++-- internal/pkg/crawl/finish.go | 3 +- internal/pkg/frontier/frontier.go | 6 +- internal/pkg/log/elasticsearch.go | 29 ++++++++++ internal/pkg/log/file.go | 52 +++++++++++++++++ internal/pkg/log/log.go | 92 ++++++++----------------------- internal/pkg/log/misc.go | 26 +++++++++ internal/pkg/log/multi_handler.go | 58 +++++++++++++++++++ internal/pkg/log/rotate.go | 41 +++----------- 12 files changed, 224 insertions(+), 123 deletions(-) create mode 100644 internal/pkg/log/file.go create mode 100644 internal/pkg/log/misc.go create mode 100644 internal/pkg/log/multi_handler.go diff --git a/cmd/cmd.go b/cmd/cmd.go index f2a39ed3..5ffb177d 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -298,6 +298,13 @@ var GlobalFlags = []cli.Flag{ Usage: "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.", Destination: &config.App.Flags.HQRateLimitingSendBack, }, + // Logging flags + &cli.StringFlag{ + Name: "log-file-output-dir", + Usage: "Directory to write log files to.", + Value: "jobs", + Destination: &config.App.Flags.LogFileOutputDir, + }, &cli.StringFlag{ Name: "es-url", Usage: "comma-separated ElasticSearch URL to use for indexing crawl logs.", @@ -315,8 +322,8 @@ var GlobalFlags = []cli.Flag{ }, &cli.StringFlag{ Name: "es-index-prefix", - Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno-`", - Value: "zeno-", + Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`", + Value: "zeno", Destination: &config.App.Flags.ElasticSearchIndexPrefix, }, &cli.StringSliceFlag{ diff --git a/cmd/utils.go b/cmd/utils.go index f255b1c0..155949b3 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -37,8 +37,12 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { } } + logFileOutput := &log.Logfile{ + Dir: strings.TrimRight(flags.LogFileOutputDir, "/"), + Prefix: "zeno", + } customLogger, err := log.New(log.Config{ - FileOutput: "zeno.log", + FileOutput: logFileOutput, FileLevel: slog.LevelDebug, StdoutLevel: slog.LevelInfo, RotateLogFile: true, @@ -51,16 +55,6 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { } c.Log = customLogger - go func() { - errChan := c.Log.Errors() - for { - select { - case err := <-errChan: - fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) - } - } - }() - // Statistics counters c.CrawledSeeds = new(ratecounter.Counter) c.CrawledAssets = new(ratecounter.Counter) @@ -71,6 +65,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl { // Frontier c.Frontier = new(frontier.Frontier) + c.Frontier.Log = c.Log // If the job name isn't specified, we generate a random name if flags.Job == "" { diff --git a/config/config.go b/config/config.go index f8b696f1..112d0e18 100644 --- a/config/config.go +++ b/config/config.go @@ -62,12 +62,12 @@ type Flags struct { DisableAssetsCapture bool CertValidation bool - CloudflareStream bool ElasticSearchURLs string ElasticSearchUsername string ElasticSearchPassword string ElasticSearchIndexPrefix string ExcludedStrings cli.StringSlice + LogFileOutputDir string } type Application struct { diff --git a/internal/pkg/crawl/crawl.go b/internal/pkg/crawl/crawl.go index 608380cb..802dfb1d 100644 --- a/internal/pkg/crawl/crawl.go +++ b/internal/pkg/crawl/crawl.go @@ -201,7 +201,7 @@ func (c *Crawl) Start() (err error) { c.Client, err = warc.NewWARCWritingHTTPClient(HTTPClientSettings) if err != nil { - logrus.Fatalf("Unable to init WARC writing HTTP client: %s", err) + c.Log.Fatal("Unable to init WARC writing HTTP client", "error", err) } go func() { @@ -211,7 +211,7 @@ func (c *Crawl) Start() (err error) { }() c.Client.Timeout = time.Duration(c.HTTPTimeout) * time.Second - logrus.Infof("HTTP client timeout set to %d seconds", c.HTTPTimeout) + c.Log.Info("HTTP client timeout set", "timeout", c.HTTPTimeout) if c.Proxy != "" { proxyHTTPClientSettings := HTTPClientSettings @@ -229,7 +229,7 @@ func (c *Crawl) Start() (err error) { }() } - logrus.Info("WARC writer initialized") + c.Log.Info("WARC writer initialized") // Process responsible for slowing or pausing the crawl // when the WARC writing queue gets too big @@ -280,13 +280,13 @@ func (c *Crawl) Start() (err error) { go c.HQWebsocket() } else { // Push the seed list to the queue - logrus.Info("Pushing seeds in the local queue..") + c.Log.Info("Pushing seeds in the local queue..") for _, item := range c.SeedList { item := item c.Frontier.PushChan <- &item } c.SeedList = nil - logrus.Info("All seeds are now in queue, crawling will start") + c.Log.Info("All seeds are now in queue, crawling will start") } // Start the background process that will catch when there diff --git a/internal/pkg/crawl/finish.go b/internal/pkg/crawl/finish.go index 0b85fde7..92dc9420 100644 --- a/internal/pkg/crawl/finish.go +++ b/internal/pkg/crawl/finish.go @@ -99,7 +99,8 @@ func (crawl *Crawl) finish() { crawl.Log.Warn("Finished!") crawl.Log.Warn("Shutting down the logger, bai bai") - crawl.Log.Stop() + crawl.Log.StopRotation() + crawl.Log.StopErrorLog() os.Exit(0) } diff --git a/internal/pkg/frontier/frontier.go b/internal/pkg/frontier/frontier.go index a9e73c02..15525188 100644 --- a/internal/pkg/frontier/frontier.go +++ b/internal/pkg/frontier/frontier.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/beeker1121/goque" + "github.com/internetarchive/Zeno/internal/pkg/log" "github.com/internetarchive/Zeno/internal/pkg/utils" "github.com/paulbellamy/ratecounter" "github.com/philippgille/gokv/leveldb" @@ -42,6 +43,7 @@ type Frontier struct { UseSeencheck bool Seencheck *Seencheck LoggingChan chan *FrontierLogMessage + Log *log.Logger } type FrontierLogMessage struct { @@ -70,7 +72,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo f.QueueCount = new(ratecounter.Counter) f.QueueCount.Incr(int64(f.Queue.Length())) - logrus.Info("persistent queue initialized") + f.Log.Info("persistent queue initialized") // Initialize the seencheck f.UseSeencheck = useSeencheck @@ -82,7 +84,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo return err } - logrus.Info("seencheck initialized") + f.Log.Info("seencheck initialized") } f.FinishingQueueReader = new(utils.TAtomBool) diff --git a/internal/pkg/log/elasticsearch.go b/internal/pkg/log/elasticsearch.go index f414a557..3bac1877 100644 --- a/internal/pkg/log/elasticsearch.go +++ b/internal/pkg/log/elasticsearch.go @@ -28,6 +28,7 @@ type ElasticsearchHandler struct { level slog.Level attrs []slog.Attr groups []string + config *ElasticsearchConfig } // Handle is responsible for passing the log record to all underlying handlers. @@ -146,3 +147,31 @@ func (h *ElasticsearchHandler) createIndex() error { return nil } + +// Rotate implements the rotation for the Elasticsearch handler. +// It updates the index name to use the current date and creates the new index if it doesn't exist. +func (h *ElasticsearchHandler) Rotate() error { + newIndex := fmt.Sprintf("%s-%s", h.config.IndexPrefix, time.Now().Format("2006.01.02")) + + // If the index name hasn't changed, no need to rotate + if newIndex == h.index { + return nil + } + + // Update the index name + h.index = newIndex + + // Create the new index + err := h.createIndex() + if err != nil { + return fmt.Errorf("failed to create new Elasticsearch index during rotation: %w", err) + } + + return nil +} + +// NextRotation calculates the next rotation time, which is the start of the next day +func (h *ElasticsearchHandler) NextRotation() time.Time { + now := time.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) +} diff --git a/internal/pkg/log/file.go b/internal/pkg/log/file.go new file mode 100644 index 00000000..fb8527b7 --- /dev/null +++ b/internal/pkg/log/file.go @@ -0,0 +1,52 @@ +package log + +import ( + "fmt" + "log/slog" + "os" + "time" +) + +type fileHandler struct { + slog.Handler + filename string + file *os.File + rotationInterval time.Duration + lastRotation time.Time + level slog.Level + logfile *Logfile +} + +type Logfile struct { + Dir string + Prefix string +} + +func (h *fileHandler) Rotate() error { + if h.file != nil { + h.file.Close() + } + + h.filename = h.logfile.Filename() + + file, err := os.OpenFile(h.filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return fmt.Errorf("failed to open new log file: %w", err) + } + + h.file = file + h.Handler = slog.NewJSONHandler(file, &slog.HandlerOptions{ + Level: h.level, + }) + + h.lastRotation = time.Now() + return nil +} + +func (h *fileHandler) NextRotation() time.Time { + return h.lastRotation.Add(h.rotationInterval) +} + +func (s *Logfile) Filename() string { + return fmt.Sprintf("%s/%s.%s.log", s.Dir, s.Prefix, time.Now().Format("2006.01.02-15h")) +} diff --git a/internal/pkg/log/log.go b/internal/pkg/log/log.go index 0d20f916..bde46a9c 100644 --- a/internal/pkg/log/log.go +++ b/internal/pkg/log/log.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "os" + "path/filepath" "sync" "time" @@ -23,17 +24,13 @@ type Logger struct { slogger *slog.Logger mu sync.Mutex stopRotation chan struct{} + stopErrorLog chan struct{} errorChan chan error } -// multiHandler implements slog.Handler interface for multiple outputs -type multiHandler struct { - handlers []slog.Handler -} - // Config holds the configuration for the logger type Config struct { - FileOutput string + FileOutput *Logfile FileLevel slog.Level StdoutLevel slog.Level RotateLogFile bool @@ -61,17 +58,24 @@ func New(cfg Config) (*Logger, error) { handlers = append(handlers, stdoutHandler) // Create file handler if FileOutput is specified - if cfg.FileOutput != "" { - file, err := os.OpenFile(cfg.FileOutput, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if cfg.FileOutput != nil { + // Create directories if they don't exist + err := os.MkdirAll(filepath.Dir(cfg.FileOutput.Filename()), 0755) + if err != nil { + return nil, err + } + + // Open log file + file, err := os.OpenFile(cfg.FileOutput.Filename(), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err } fileHandler := &fileHandler{ - Handler: slog.NewJSONHandler(file, &slog.HandlerOptions{Level: cfg.FileLevel}), - filename: cfg.FileOutput, - file: file, - interval: 6 * time.Hour, - lastRotation: time.Now(), + Handler: slog.NewJSONHandler(file, &slog.HandlerOptions{Level: cfg.FileLevel}), + filename: cfg.FileOutput.Filename(), + file: file, + rotationInterval: 6 * time.Hour, + lastRotation: time.Now(), } handlers = append(handlers, fileHandler) } @@ -88,10 +92,11 @@ func New(cfg Config) (*Logger, error) { } esHandler := &ElasticsearchHandler{ client: esClient, - index: fmt.Sprintf("zeno-%s", time.Now().Format("2006.01.02")), + index: fmt.Sprintf("%s-%s", cfg.ElasticsearchConfig.IndexPrefix, time.Now().Format("2006.01.02")), level: cfg.ElasticsearchConfig.Level, attrs: []slog.Attr{}, groups: []string{}, + config: cfg.ElasticsearchConfig, } if err := esHandler.createIndex(); err != nil { return nil, fmt.Errorf("failed to create Elasticsearch index: %w", err) @@ -106,9 +111,10 @@ func New(cfg Config) (*Logger, error) { slogger := slog.New(mh) logger := &Logger{ - handler: mh, - slogger: slogger, - errorChan: make(chan error, 10), + handler: mh, + slogger: slogger, + errorChan: make(chan error, 10), + stopErrorLog: make(chan struct{}), } // Start rotation goroutine @@ -127,7 +133,7 @@ func New(cfg Config) (*Logger, error) { func Default() *Logger { once.Do(func() { logger, err := New(Config{ - FileOutput: "zeno.log", + FileOutput: &Logfile{Dir: "jobs", Prefix: "zeno"}, FileLevel: slog.LevelInfo, StdoutLevel: slog.LevelInfo, }) @@ -215,53 +221,3 @@ func (l *Logger) Fatal(msg string, args ...any) { l.log(slog.LevelError, msg, args...) os.Exit(1) } - -//------------------------------------------------------------------------------------- -// Following methods are used to implement the slog.Handler interface for multiHandler -//------------------------------------------------------------------------------------- - -// Enabled checks if any of the underlying handlers are enabled for a given log level. -// It's used internally to determine if a log message should be processed by a given handler -func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { - for _, handler := range h.handlers { - if handler.Enabled(ctx, level) { - return true - } - } - return false -} - -// Handle is responsible for passing the log record to all underlying handlers. -// It's called internally when a log message needs to be written. -func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { - var errs []error - for _, handler := range h.handlers { - if err := handler.Handle(ctx, r); err != nil { - errs = append(errs, fmt.Errorf("handler error: %w", err)) - } - } - if len(errs) > 0 { - return fmt.Errorf("multiple handler errors: %v", errs) - } - return nil -} - -// WithAttrs creates a new handler with additional attributes. -// It's used internally when the logger is asked to include additional context with all subsequent log messages. -func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithAttrs(attrs) - } - return &multiHandler{handlers: handlers} -} - -// WithGroups creates a new handler with a new group added to the attribute grouping hierarchy. -// It's used internally when the logger is asked to group a set of attributes together. -func (h *multiHandler) WithGroup(name string) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithGroup(name) - } - return &multiHandler{handlers: handlers} -} diff --git a/internal/pkg/log/misc.go b/internal/pkg/log/misc.go new file mode 100644 index 00000000..d5541dd3 --- /dev/null +++ b/internal/pkg/log/misc.go @@ -0,0 +1,26 @@ +package log + +import ( + "fmt" + "os" +) + +// WatchErrors watches for errors in the logger and prints them to stderr. +func (l *Logger) WatchErrors() { + go func() { + errChan := l.Errors() + for { + select { + case <-l.stopErrorLog: + return + case err := <-errChan: + fmt.Fprintf(os.Stderr, "Logging error: %v\n", err) + } + } + }() +} + +// StopErrorLog stops the error logger. +func (l *Logger) StopErrorLog() { + close(l.stopErrorLog) +} diff --git a/internal/pkg/log/multi_handler.go b/internal/pkg/log/multi_handler.go new file mode 100644 index 00000000..9e0a9104 --- /dev/null +++ b/internal/pkg/log/multi_handler.go @@ -0,0 +1,58 @@ +package log + +import ( + "context" + "fmt" + "log/slog" +) + +// multiHandler implements slog.Handler interface for multiple outputs +type multiHandler struct { + handlers []slog.Handler +} + +// Enabled checks if any of the underlying handlers are enabled for a given log level. +// It's used internally to determine if a log message should be processed by a given handler +func (h *multiHandler) Enabled(ctx context.Context, level slog.Level) bool { + for _, handler := range h.handlers { + if handler.Enabled(ctx, level) { + return true + } + } + return false +} + +// Handle is responsible for passing the log record to all underlying handlers. +// It's called internally when a log message needs to be written. +func (h *multiHandler) Handle(ctx context.Context, r slog.Record) error { + var errs []error + for _, handler := range h.handlers { + if err := handler.Handle(ctx, r); err != nil { + errs = append(errs, fmt.Errorf("handler error: %w", err)) + } + } + if len(errs) > 0 { + return fmt.Errorf("multiple handler errors: %v", errs) + } + return nil +} + +// WithAttrs creates a new handler with additional attributes. +// It's used internally when the logger is asked to include additional context with all subsequent log messages. +func (h *multiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithAttrs(attrs) + } + return &multiHandler{handlers: handlers} +} + +// WithGroups creates a new handler with a new group added to the attribute grouping hierarchy. +// It's used internally when the logger is asked to group a set of attributes together. +func (h *multiHandler) WithGroup(name string) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithGroup(name) + } + return &multiHandler{handlers: handlers} +} diff --git a/internal/pkg/log/rotate.go b/internal/pkg/log/rotate.go index 079c17e2..f9f3dd7c 100644 --- a/internal/pkg/log/rotate.go +++ b/internal/pkg/log/rotate.go @@ -3,41 +3,15 @@ package log import ( "fmt" "log/slog" - "os" "time" ) -// ... (previous Logger, multiHandler, logEntry, ElasticsearchHandler definitions remain the same) - -type rotateableHandler interface { +type rotatableHandler interface { slog.Handler Rotate() error NextRotation() time.Time } -type fileHandler struct { - slog.Handler - filename string - file *os.File - interval time.Duration - lastRotation time.Time -} - -func (h *fileHandler) Rotate() error { - // ... (previous Rotate implementation remains the same) - h.lastRotation = time.Now() - return nil -} - -func (h *fileHandler) NextRotation() time.Time { - return h.lastRotation.Add(h.interval) -} - -func (h *ElasticsearchHandler) NextRotation() time.Time { - now := time.Now() - return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour) -} - func (l *Logger) startRotation() { l.stopRotation = make(chan struct{}) go func() { @@ -53,13 +27,15 @@ func (l *Logger) startRotation() { }() } +// nextRotation returns the earliest next rotation time +// of all rotatable handlers func (l *Logger) nextRotation() time.Time { l.mu.Lock() defer l.mu.Unlock() var earliest time.Time for _, h := range l.handler.handlers { - if rh, ok := h.(rotateableHandler); ok { + if rh, ok := h.(rotatableHandler); ok { next := rh.NextRotation() if earliest.IsZero() || next.Before(earliest) { earliest = next @@ -69,13 +45,14 @@ func (l *Logger) nextRotation() time.Time { return earliest } +// rotate rotates func (l *Logger) rotate() { l.mu.Lock() defer l.mu.Unlock() now := time.Now() for _, h := range l.handler.handlers { - if rh, ok := h.(rotateableHandler); ok { + if rh, ok := h.(rotatableHandler); ok { if now.After(rh.NextRotation()) || now.Equal(rh.NextRotation()) { if err := rh.Rotate(); err != nil { fmt.Printf("Error rotating handler: %v\n", err) @@ -85,9 +62,7 @@ func (l *Logger) rotate() { } } -// Stop stops the rotation goroutine -func (l *Logger) Stop() { +// StopRotation stops the rotation goroutine +func (l *Logger) StopRotation() { close(l.stopRotation) } - -// ... (rest of the code remains the same)