Skip to content

Commit

Permalink
rewrite worker logic for it to be separated from crawl package soon™
Browse files Browse the repository at this point in the history
  • Loading branch information
equals215 committed Jul 16, 2024
1 parent e49838a commit 6906650
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 169 deletions.
6 changes: 2 additions & 4 deletions cmd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,9 @@ func InitCrawlWithCMD(flags config.Flags) *crawl.Crawl {

c.JobPath = path.Join("jobs", flags.Job)

c.Workers = flags.Workers
c.WorkerPool = make([]*crawl.Worker, 0)
c.WorkerStopTimeout = time.Second * 60 // Placeholder for WorkerStopTimeout
c.Workers = crawl.NewPool(uint(flags.Workers), time.Second*60, c)

c.MaxConcurrentAssets = flags.MaxConcurrentAssets
c.WorkerStopSignal = make(chan bool)

c.Seencheck = flags.Seencheck
c.HTTPTimeout = flags.HTTPTimeout
Expand Down
16 changes: 3 additions & 13 deletions internal/pkg/crawl/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package crawl
import (
"encoding/json"
"os"
"strconv"
"strings"
"time"

Expand All @@ -22,7 +21,7 @@ type APIWorkersState struct {

// APIWorkerState represents the state of an API worker.
type APIWorkerState struct {
WorkerID uint `json:"worker_id"`
WorkerID string `json:"worker_id"`
Status string `json:"status"`
LastError string `json:"last_error"`
LastSeen string `json:"last_seen"`
Expand Down Expand Up @@ -53,22 +52,13 @@ func (crawl *Crawl) startAPI() {
http.HandleFunc("/metrics", setupPrometheus(crawl).ServeHTTP)

http.HandleFunc("/workers", func(w http.ResponseWriter, r *http.Request) {
workersState := crawl.GetWorkerState(-1)
workersState := crawl.Workers.GetWorkerStateFromPool("")
json.NewEncoder(w).Encode(workersState)
})

http.HandleFunc("/worker/", func(w http.ResponseWriter, r *http.Request) {
workerID := strings.TrimPrefix(r.URL.Path, "/worker/")
workerIDInt, err := strconv.Atoi(workerID)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]interface{}{
"error": "Unsupported worker ID",
})
return
}

workersState := crawl.GetWorkerState(workerIDInt)
workersState := crawl.Workers.GetWorkerStateFromPool(workerID)
if workersState == nil {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]interface{}{
Expand Down
132 changes: 7 additions & 125 deletions internal/pkg/crawl/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package crawl

import (
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -41,10 +40,7 @@ type Crawl struct {
Frontier *frontier.Frontier

// Worker pool
WorkerMutex sync.RWMutex
WorkerPool []*Worker
WorkerStopSignal chan bool
WorkerStopTimeout time.Duration
Workers *WorkerPool

// Crawl settings
MaxConcurrentAssets int
Expand All @@ -70,7 +66,6 @@ type Crawl struct {
DomainsCrawl bool
Headless bool
Seencheck bool
Workers int
RandomLocalIP bool
MinSpaceRequired int

Expand Down Expand Up @@ -162,7 +157,7 @@ func (c *Crawl) Start() (err error) {
}
}()

c.Frontier.Init(c.JobPath, frontierLoggingChan, c.Workers, c.Seencheck)
c.Frontier.Init(c.JobPath, frontierLoggingChan, int(c.Workers.Count), c.Seencheck)
c.Frontier.Load()
c.Frontier.Start()

Expand Down Expand Up @@ -249,13 +244,9 @@ func (c *Crawl) Start() (err error) {
c.Client.Jar = cookieJar
}

// Fire up the desired amount of workers
for i := uint(0); i < uint(c.Workers); i++ {
worker := newWorker(c, i)
c.WorkerPool = append(c.WorkerPool, worker)
go worker.Run()
}
go c.WorkerWatcher()
// Start the workers pool by building all the workers and starting them
// Also starts all the background processes that will handle the workers
c.Workers.Start()

// Start the process responsible for printing live stats on the standard output
if c.LiveStats {
Expand All @@ -270,8 +261,8 @@ func (c *Crawl) Start() (err error) {
logrus.Panic(err)
}

c.HQProducerChannel = make(chan *frontier.Item, c.Workers)
c.HQFinishedChannel = make(chan *frontier.Item, c.Workers)
c.HQProducerChannel = make(chan *frontier.Item, c.Workers.Count)
c.HQFinishedChannel = make(chan *frontier.Item, c.Workers.Count)

c.HQChannelsWg.Add(2)
go c.HQConsumer()
Expand Down Expand Up @@ -301,112 +292,3 @@ func (c *Crawl) Start() (err error) {

return
}

// WorkerWatcher is a background process that watches over the workers
// and remove them from the pool when they are done
func (c *Crawl) WorkerWatcher() {
var toEnd = false

for {
select {

// Stop the workers when requested
case <-c.WorkerStopSignal:
for i, worker := range c.WorkerPool {
worker.Stop()
c.Log.Info("Stopping worker", "worker", i)
}
toEnd = true

// Check for finished workers and remove them from the pool
// End the watcher if a stop signal was received beforehand and all workers are completed
default:
c.WorkerMutex.Lock()
for i, worker := range c.WorkerPool {
if worker.state.status == completed {
// Remove the worker from the pool
c.Log.Info("Removing worker from pool", "worker", i)
c.WorkerPool = append(c.WorkerPool[:i], c.WorkerPool[i+1:]...)
c.Log.Info("Worker pool size reduced", "size", len(c.WorkerPool))
} else {
worker.Lock()
worker.id = uint(i)
worker.Unlock()
}
}

if toEnd && len(c.WorkerPool) == 0 {
c.WorkerMutex.Unlock()
c.Log.Info("All workers are completed, crawl/crawl.go:WorkerWatcher() is stopping")
close(c.WorkerStopSignal)
return // All workers are completed
}
c.WorkerMutex.Unlock()
}
}
}

// EnsureWorkersFinished waits for all workers to finish
func (c *Crawl) EnsureWorkersFinished() bool {
var workerPoolLen int
var timer = time.NewTimer(c.WorkerStopTimeout)

for {
c.WorkerMutex.RLock()
workerPoolLen = len(c.WorkerPool)
if workerPoolLen == 0 {
c.WorkerMutex.RUnlock()
return true
}
c.WorkerMutex.RUnlock()
select {
case <-timer.C:
c.Log.Warn(fmt.Sprintf("[WORKERS] Timeout reached. %d workers still running", workerPoolLen))
return false
default:
c.Log.Warn(fmt.Sprintf("[WORKERS] Waiting for %d workers to finish", workerPoolLen))
time.Sleep(time.Second * 5)
}
}
}

// GetWorkerState returns the state of a worker given its index in the worker pool
// if the provided index is -1 then the state of all workers is returned
func (c *Crawl) GetWorkerState(index int) interface{} {
c.WorkerMutex.RLock()
defer c.WorkerMutex.RUnlock()

if index == -1 {
var workersStatus = new(APIWorkersState)
for _, worker := range c.WorkerPool {
workersStatus.Workers = append(workersStatus.Workers, _getWorkerState(worker))
}
return workersStatus
}
if index >= len(c.WorkerPool) {
return nil
}
return _getWorkerState(c.WorkerPool[index])
}

func _getWorkerState(worker *Worker) *APIWorkerState {
lastErr := ""
isLocked := true

if worker.TryLock() {
isLocked = false
worker.Unlock()
}

if worker.state.lastError != nil {
lastErr = worker.state.lastError.Error()
}

return &APIWorkerState{
WorkerID: worker.id,
Status: worker.state.status.String(),
LastSeen: worker.state.lastSeen.Format(time.RFC3339),
LastError: lastErr,
Locked: isLocked,
}
}
4 changes: 2 additions & 2 deletions internal/pkg/crawl/finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (crawl *Crawl) finish() {
close(crawl.Frontier.PullChan)

crawl.Log.Warn("[WORKERS] Waiting for workers to finish")
crawl.WorkerStopSignal <- true
crawl.EnsureWorkersFinished()
crawl.Workers.StopSignal <- true
crawl.Workers.EnsureFinished()
crawl.Log.Warn("[WORKERS] All workers finished")

// When all workers are finished, we can safely close the HQ related channels
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/crawl/hq.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Crawl) HQProducer() {
return
default:
mutex.Lock()
if (len(discoveredArray) >= int(math.Ceil(float64(c.Workers)/2)) || time.Since(HQLastSent) >= time.Second*10) && len(discoveredArray) > 0 {
if (len(discoveredArray) >= int(math.Ceil(float64(c.Workers.Count)/2)) || time.Since(HQLastSent) >= time.Second*10) && len(discoveredArray) > 0 {
for {
_, err := c.HQClient.Discovered(discoveredArray, "seed", false, false)
if err != nil {
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *Crawl) HQConsumer() {
// This is on purpose evaluated every time,
// because the value of workers will maybe change
// during the crawl in the future (to be implemented)
var HQBatchSize = int(math.Ceil(float64(c.Workers) / 2))
var HQBatchSize = int(math.Ceil(float64(c.Workers.Count) / 2))

if c.Finished.Get() {
c.Log.Error("crawl finished, stopping HQ consumer")
Expand All @@ -166,7 +166,7 @@ func (c *Crawl) HQConsumer() {
// If HQContinuousPull is set to true, we will pull URLs from HQ
// continuously, otherwise we will only pull URLs when needed
if !c.HQContinuousPull {
if c.ActiveWorkers.Value() >= int64(c.Workers-(c.Workers/10)) {
if c.ActiveWorkers.Value() >= int64(c.Workers.Count-(c.Workers.Count/10)) {
time.Sleep(time.Millisecond * 100)
continue
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (c *Crawl) HQFinisher() {
locallyCrawledTotal += int(finishedItem.LocallyCrawled)
finishedArray = append(finishedArray, gocrawlhq.URL{ID: finishedItem.ID, Value: utils.URLToString(finishedItem.URL)})

if len(finishedArray) == int(math.Ceil(float64(c.Workers)/2)) {
if len(finishedArray) == int(math.Ceil(float64(c.Workers.Count)/2)) {
for {
_, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/crawl/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Crawl) printLiveStats() {
stats.AddRow("", "")
stats.AddRow(" - Job:", c.Job)
stats.AddRow(" - State:", c.getCrawlState())
stats.AddRow(" - Active workers:", strconv.Itoa(int(c.ActiveWorkers.Value()))+"/"+strconv.Itoa(c.Workers))
stats.AddRow(" - Active workers:", strconv.Itoa(int(c.ActiveWorkers.Value()))+"/"+strconv.Itoa(c.Workers.wpLen()))
stats.AddRow(" - URI/s:", c.URIsPerSecond.Rate())
stats.AddRow(" - Queued:", c.Frontier.QueueCount.Value())
stats.AddRow(" - Crawled total:", crawledSeeds+crawledAssets)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/crawl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func (c *Crawl) crawlSpeedLimiter() {
maxConcurrentAssets := c.MaxConcurrentAssets

for {
if c.Client.WaitGroup.Size() > c.Workers*8 {
if c.Client.WaitGroup.Size() > int(c.Workers.Count)*8 {
c.Paused.Set(true)
c.Frontier.Paused.Set(true)
} else if c.Client.WaitGroup.Size() > c.Workers*4 {
} else if c.Client.WaitGroup.Size() > int(c.Workers.Count)*4 {
c.MaxConcurrentAssets = 1
c.Paused.Set(false)
c.Frontier.Paused.Set(false)
Expand Down
Loading

0 comments on commit 6906650

Please sign in to comment.