Skip to content

Commit

Permalink
corrected pr comments + enhanced code and logic
Browse files Browse the repository at this point in the history
  • Loading branch information
equals215 committed Jul 1, 2024
1 parent f1de101 commit c2f492a
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 123 deletions.
11 changes: 9 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ var GlobalFlags = []cli.Flag{
Usage: "If turned on, the crawler will send back URLs that hit a rate limit to crawl HQ.",
Destination: &config.App.Flags.HQRateLimitingSendBack,
},
// Logging flags
&cli.StringFlag{
Name: "log-file-output-dir",
Usage: "Directory to write log files to.",
Value: "jobs",
Destination: &config.App.Flags.LogFileOutputDir,
},
&cli.StringFlag{
Name: "es-url",
Usage: "comma-separated ElasticSearch URL to use for indexing crawl logs.",
Expand All @@ -315,8 +322,8 @@ var GlobalFlags = []cli.Flag{
},
&cli.StringFlag{
Name: "es-index-prefix",
Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno-`",
Value: "zeno-",
Usage: "ElasticSearch index prefix to use for indexing crawl logs. Default is : `zeno`, without `-`",
Value: "zeno",
Destination: &config.App.Flags.ElasticSearchIndexPrefix,
},
&cli.StringSliceFlag{
Expand Down
17 changes: 6 additions & 11 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {
}
}

logFileOutput := &log.Logfile{
Dir: strings.TrimRight(flags.LogFileOutputDir, "/"),
Prefix: "zeno",
}
customLogger, err := log.New(log.Config{
FileOutput: "zeno.log",
FileOutput: logFileOutput,
FileLevel: slog.LevelDebug,
StdoutLevel: slog.LevelInfo,
RotateLogFile: true,
Expand All @@ -51,16 +55,6 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {
}
c.Log = customLogger

go func() {
errChan := c.Log.Errors()
for {
select {
case err := <-errChan:
fmt.Fprintf(os.Stderr, "Logging error: %v\n", err)
}
}
}()

// Statistics counters
c.CrawledSeeds = new(ratecounter.Counter)
c.CrawledAssets = new(ratecounter.Counter)
Expand All @@ -71,6 +65,7 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {

// Frontier
c.Frontier = new(frontier.Frontier)
c.Frontier.Log = c.Log

// If the job name isn't specified, we generate a random name
if flags.Job == "" {
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ type Flags struct {
DisableAssetsCapture bool
CertValidation bool

CloudflareStream bool
ElasticSearchURLs string
ElasticSearchUsername string
ElasticSearchPassword string
ElasticSearchIndexPrefix string
ExcludedStrings cli.StringSlice
LogFileOutputDir string
}

type Application struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *Crawl) Start() (err error) {

c.Client, err = warc.NewWARCWritingHTTPClient(HTTPClientSettings)
if err != nil {
logrus.Fatalf("Unable to init WARC writing HTTP client: %s", err)
c.Log.Fatal("Unable to init WARC writing HTTP client", "error", err)
}

go func() {
Expand All @@ -211,7 +211,7 @@ func (c *Crawl) Start() (err error) {
}()

c.Client.Timeout = time.Duration(c.HTTPTimeout) * time.Second
logrus.Infof("HTTP client timeout set to %d seconds", c.HTTPTimeout)
c.Log.Info("HTTP client timeout set", "timeout", c.HTTPTimeout)

if c.Proxy != "" {
proxyHTTPClientSettings := HTTPClientSettings
Expand All @@ -229,7 +229,7 @@ func (c *Crawl) Start() (err error) {
}()
}

logrus.Info("WARC writer initialized")
c.Log.Info("WARC writer initialized")

// Process responsible for slowing or pausing the crawl
// when the WARC writing queue gets too big
Expand Down Expand Up @@ -280,13 +280,13 @@ func (c *Crawl) Start() (err error) {
go c.HQWebsocket()
} else {
// Push the seed list to the queue
logrus.Info("Pushing seeds in the local queue..")
c.Log.Info("Pushing seeds in the local queue..")
for _, item := range c.SeedList {
item := item
c.Frontier.PushChan <- &item
}
c.SeedList = nil
logrus.Info("All seeds are now in queue, crawling will start")
c.Log.Info("All seeds are now in queue, crawling will start")
}

// Start the background process that will catch when there
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/crawl/finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (crawl *Crawl) finish() {
crawl.Log.Warn("Finished!")

crawl.Log.Warn("Shutting down the logger, bai bai")
crawl.Log.Stop()
crawl.Log.StopRotation()
crawl.Log.StopErrorLog()

os.Exit(0)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/beeker1121/goque"
"github.com/internetarchive/Zeno/internal/pkg/log"
"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/paulbellamy/ratecounter"
"github.com/philippgille/gokv/leveldb"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Frontier struct {
UseSeencheck bool
Seencheck *Seencheck
LoggingChan chan *FrontierLogMessage
Log *log.Logger
}

type FrontierLogMessage struct {
Expand Down Expand Up @@ -70,7 +72,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo
f.QueueCount = new(ratecounter.Counter)
f.QueueCount.Incr(int64(f.Queue.Length()))

logrus.Info("persistent queue initialized")
f.Log.Info("persistent queue initialized")

// Initialize the seencheck
f.UseSeencheck = useSeencheck
Expand All @@ -82,7 +84,7 @@ func (f *Frontier) Init(jobPath string, loggingChan chan *FrontierLogMessage, wo
return err
}

logrus.Info("seencheck initialized")
f.Log.Info("seencheck initialized")
}

f.FinishingQueueReader = new(utils.TAtomBool)
Expand Down
29 changes: 29 additions & 0 deletions internal/pkg/log/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ElasticsearchHandler struct {
level slog.Level
attrs []slog.Attr
groups []string
config *ElasticsearchConfig
}

// Handle is responsible for passing the log record to all underlying handlers.
Expand Down Expand Up @@ -146,3 +147,31 @@ func (h *ElasticsearchHandler) createIndex() error {

return nil
}

// Rotate implements the rotation for the Elasticsearch handler.
// It updates the index name to use the current date and creates the new index if it doesn't exist.
func (h *ElasticsearchHandler) Rotate() error {
newIndex := fmt.Sprintf("%s-%s", h.config.IndexPrefix, time.Now().Format("2006.01.02"))

// If the index name hasn't changed, no need to rotate
if newIndex == h.index {
return nil
}

// Update the index name
h.index = newIndex

// Create the new index
err := h.createIndex()
if err != nil {
return fmt.Errorf("failed to create new Elasticsearch index during rotation: %w", err)
}

return nil
}

// NextRotation calculates the next rotation time, which is the start of the next day
func (h *ElasticsearchHandler) NextRotation() time.Time {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Add(24 * time.Hour)
}
52 changes: 52 additions & 0 deletions internal/pkg/log/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package log

import (
"fmt"
"log/slog"
"os"
"time"
)

type fileHandler struct {
slog.Handler
filename string
file *os.File
rotationInterval time.Duration
lastRotation time.Time
level slog.Level
logfile *Logfile
}

type Logfile struct {
Dir string
Prefix string
}

func (h *fileHandler) Rotate() error {
if h.file != nil {
h.file.Close()
}

h.filename = h.logfile.Filename()

file, err := os.OpenFile(h.filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("failed to open new log file: %w", err)
}

h.file = file
h.Handler = slog.NewJSONHandler(file, &slog.HandlerOptions{
Level: h.level,
})

h.lastRotation = time.Now()
return nil
}

func (h *fileHandler) NextRotation() time.Time {
return h.lastRotation.Add(h.rotationInterval)
}

func (s *Logfile) Filename() string {
return fmt.Sprintf("%s/%s.%s.log", s.Dir, s.Prefix, time.Now().Format("2006.01.02-15h"))
}
Loading

0 comments on commit c2f492a

Please sign in to comment.