diff --git a/internal/pkg/crawl/worker.go b/internal/pkg/crawl/worker.go index 832f29dc..722e5f97 100644 --- a/internal/pkg/crawl/worker.go +++ b/internal/pkg/crawl/worker.go @@ -1,9 +1,11 @@ package crawl import ( + "log/slog" "sync" "time" + "github.com/davecgh/go-spew/spew" "github.com/google/uuid" "github.com/internetarchive/Zeno/internal/pkg/frontier" "github.com/internetarchive/Zeno/internal/pkg/log" @@ -158,3 +160,26 @@ func (wp *WorkerPool) NewWorker(crawlParameters *Crawl) *Worker { return worker } + +// WatchHang is a function that checks if a worker is hanging based on the last time it was seen +func (w *Worker) WatchHang() { + w.logger.Info("Starting worker hang watcher") + for { + tryLockCounter := 0 + time.Sleep(5 * time.Second) + for !w.TryLock() { + if tryLockCounter > 10 && w.state.status != completed { + w.logger.Error("Worker is hanging") + spew.Fprint(w.pool.Crawl.Log.Writer(slog.LevelError), w) // This will dump the worker state to the log, this is NOT a good idea in production but it's useful for debugging + return + } + tryLockCounter++ + time.Sleep(1 * time.Second) + } + if w.state.status != idle && time.Since(w.state.lastSeen) > 10*time.Second { + w.logger.Warn("Worker is hanging, stopping it") + w.Stop() + } + w.Unlock() + } +} diff --git a/internal/pkg/crawl/worker_pool.go b/internal/pkg/crawl/worker_pool.go index bd236777..e31e535f 100644 --- a/internal/pkg/crawl/worker_pool.go +++ b/internal/pkg/crawl/worker_pool.go @@ -33,6 +33,7 @@ func (wp *WorkerPool) Start() { worker := wp.NewWorker(wp.Crawl) wp.Crawl.Log.Info("Starting worker", "worker", worker.ID) go worker.Run() + go worker.WatchHang() } go wp.WorkerWatcher() }