From 7e9e3aaa7f9403dbf2be5c41d58641c5c5e7eff4 Mon Sep 17 00:00:00 2001 From: joey Date: Wed, 21 Aug 2024 10:16:32 +0800 Subject: [PATCH] fix pipeline stuck in queue status because etcd server abnormal --- .../pipeline/providers/dbgc/definition_cleanup/provider.go | 2 +- internal/tools/pipeline/providers/leaderworker/heartbeat.go | 5 +++++ .../tools/pipeline/providers/leaderworker/impl_worker.go | 6 ++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/tools/pipeline/providers/dbgc/definition_cleanup/provider.go b/internal/tools/pipeline/providers/dbgc/definition_cleanup/provider.go index 5966178e58d..1e2260b4c5e 100644 --- a/internal/tools/pipeline/providers/dbgc/definition_cleanup/provider.go +++ b/internal/tools/pipeline/providers/dbgc/definition_cleanup/provider.go @@ -69,7 +69,7 @@ func (p *provider) handleLogDir() error { return err } // dir is not exist - return os.Mkdir(p.Cfg.LogDir, 0755) + return os.MkdirAll(p.Cfg.LogDir, 0755) } func (p *provider) Init(ctx servicehub.Context) error { diff --git a/internal/tools/pipeline/providers/leaderworker/heartbeat.go b/internal/tools/pipeline/providers/leaderworker/heartbeat.go index 66f11f0ea76..5318a68a37b 100644 --- a/internal/tools/pipeline/providers/leaderworker/heartbeat.go +++ b/internal/tools/pipeline/providers/leaderworker/heartbeat.go @@ -20,6 +20,8 @@ import ( "strconv" "time" + "github.com/pkg/errors" + clientv3 "go.etcd.io/etcd/client/v3" "github.com/erda-project/erda/internal/tools/pipeline/providers/leaderworker/worker" @@ -53,6 +55,9 @@ func (p *provider) workerOnceReportHeartbeat(ctx context.Context, w worker.Worke // update lastProbeAt nowSec := time.Now().Round(0).Unix() if _, err := p.EtcdClient.Put(hctx, p.makeEtcdWorkerHeartbeatKey(w.GetID()), strutil.String(nowSec)); err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + panic(fmt.Errorf("failed to update last heartbeat time into etcd, workerID: %s, err: %v", w.GetID(), err)) + } return fmt.Errorf("failed to update last heartbeat time into etcd, workerID: %s, err: %v", w.GetID(), err) } p.Log.Debugf("worker heartbeat reported, workerID: %s", w.GetID()) diff --git a/internal/tools/pipeline/providers/leaderworker/impl_worker.go b/internal/tools/pipeline/providers/leaderworker/impl_worker.go index 39471f5ac7d..bfaba278fd2 100644 --- a/internal/tools/pipeline/providers/leaderworker/impl_worker.go +++ b/internal/tools/pipeline/providers/leaderworker/impl_worker.go @@ -16,8 +16,10 @@ package leaderworker import ( "context" + "fmt" "time" + "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" "github.com/erda-project/erda/internal/tools/pipeline/providers/leaderworker/lwctx" @@ -151,6 +153,10 @@ func (p *provider) workerIntervalCleanupOnDelete(ctx context.Context, ev Event) if err == nil { return } + // actively panic exit + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + panic(fmt.Errorf("actively panic exit, maybe the etcd server has been shut down, err: %v", err)) + } p.Log.Errorf("failed to do worker interval cleanup on delete(auto retry), step: delete heartbeat key, workerID: %s, err: %v", ev.WorkerID, err) time.Sleep(p.Cfg.Worker.RetryInterval) }