From 12cb6d94d255814bc41027d69808f49c1225c9a9 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Dec 2023 00:18:25 +0100 Subject: [PATCH 1/3] feat: better requeue mechanism using a separate nonce --- orchestrator/orchestrator.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index b3b97f40..a5264432 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -31,6 +31,9 @@ import ( // After this window is elapsed, the nonce is discarded. const RequeueWindow = 50 +// The queue channel's size +const queueSize = 1000 + type Orchestrator struct { Logger tmlog.Logger // maybe use a more general interface @@ -68,8 +71,11 @@ func New( func (orch Orchestrator) Start(ctx context.Context) { // contains the nonces that will be signed by the orchestrator. - noncesQueue := make(chan uint64, 100) + noncesQueue := make(chan uint64, queueSize) defer close(noncesQueue) + // contains the failed nonces to be re-processed. + failedNoncesQueue := make(chan uint64, queueSize) + defer close(failedNoncesQueue) // used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop. signalChan := make(chan struct{}) @@ -92,7 +98,7 @@ func (orch Orchestrator) Start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - err := orch.ProcessNonces(ctx, noncesQueue, signalChan) + err := orch.ProcessNonces(ctx, noncesQueue, failedNoncesQueue, signalChan) if err != nil { orch.Logger.Error("error processing attestations", "err", err) return @@ -245,13 +251,21 @@ func (orch Orchestrator) EnqueueMissingEvents( func (orch Orchestrator) ProcessNonces( ctx context.Context, noncesQueue chan uint64, + requeueQueue chan uint64, signalChan chan<- struct{}, ) error { + ticker := time.NewTicker(time.Hour) for { select { case <-ctx.Done(): close(signalChan) return ErrSignalChanNotif + case <-ticker.C: + if len(requeueQueue) < 0 && len(noncesQueue) < queueSize { + nonce := <-requeueQueue + noncesQueue <- nonce + orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) + } case nonce := <-noncesQueue: orch.Logger.Info("processing nonce", "nonce", nonce) if err := orch.Process(ctx, nonce); err != nil { @@ -260,7 +274,7 @@ func (orch Orchestrator) ProcessNonces( return orch.Process(ctx, nonce) }); err != nil { orch.Logger.Error("error processing nonce even after retrying", "err", err.Error()) - go orch.MaybeRequeue(ctx, noncesQueue, nonce) + go orch.MaybeRequeue(ctx, requeueQueue, nonce) } } } @@ -268,15 +282,15 @@ func (orch Orchestrator) ProcessNonces( } // MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent. -func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) { +func (orch Orchestrator) MaybeRequeue(ctx context.Context, requeueQueue chan<- uint64, nonce uint64) { latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx) if err != nil { orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error()) return } if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow { - orch.Logger.Debug("requeuing nonce", "nonce", nonce) - noncesQueue <- nonce + orch.Logger.Debug("adding failed nonce to requeue queue", "nonce", nonce) + requeueQueue <- nonce } else { orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce) } From 0aaea3930ed8d0674ab601aef15cb525669384f1 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Dec 2023 00:28:26 +0100 Subject: [PATCH 2/3] Fix: correct condition --- orchestrator/orchestrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index a5264432..bf8865f1 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -261,7 +261,7 @@ func (orch Orchestrator) ProcessNonces( close(signalChan) return ErrSignalChanNotif case <-ticker.C: - if len(requeueQueue) < 0 && len(noncesQueue) < queueSize { + if len(requeueQueue) > 0 && len(noncesQueue) < queueSize { nonce := <-requeueQueue noncesQueue <- nonce orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) From 1860596f9fef21bc5ff502961b70b50158abc5a5 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Dec 2023 00:34:08 +0100 Subject: [PATCH 3/3] Fix: correct go routine --- orchestrator/orchestrator.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index bf8865f1..7013a904 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -262,9 +262,12 @@ func (orch Orchestrator) ProcessNonces( return ErrSignalChanNotif case <-ticker.C: if len(requeueQueue) > 0 && len(noncesQueue) < queueSize { - nonce := <-requeueQueue - noncesQueue <- nonce - orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) + // The use of the go routine is to avoid blocking + go func() { + nonce := <-requeueQueue + noncesQueue <- nonce + orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) + }() } case nonce := <-noncesQueue: orch.Logger.Info("processing nonce", "nonce", nonce)