From 12cb6d94d255814bc41027d69808f49c1225c9a9 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Dec 2023 00:18:25 +0100 Subject: [PATCH] feat: better requeue mechanism using a separate nonce --- orchestrator/orchestrator.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index b3b97f40..a5264432 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -31,6 +31,9 @@ import ( // After this window is elapsed, the nonce is discarded. const RequeueWindow = 50 +// The queue channel's size +const queueSize = 1000 + type Orchestrator struct { Logger tmlog.Logger // maybe use a more general interface @@ -68,8 +71,11 @@ func New( func (orch Orchestrator) Start(ctx context.Context) { // contains the nonces that will be signed by the orchestrator. - noncesQueue := make(chan uint64, 100) + noncesQueue := make(chan uint64, queueSize) defer close(noncesQueue) + // contains the failed nonces to be re-processed. + failedNoncesQueue := make(chan uint64, queueSize) + defer close(failedNoncesQueue) // used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop. signalChan := make(chan struct{}) @@ -92,7 +98,7 @@ func (orch Orchestrator) Start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - err := orch.ProcessNonces(ctx, noncesQueue, signalChan) + err := orch.ProcessNonces(ctx, noncesQueue, failedNoncesQueue, signalChan) if err != nil { orch.Logger.Error("error processing attestations", "err", err) return @@ -245,13 +251,21 @@ func (orch Orchestrator) EnqueueMissingEvents( func (orch Orchestrator) ProcessNonces( ctx context.Context, noncesQueue chan uint64, + requeueQueue chan uint64, signalChan chan<- struct{}, ) error { + ticker := time.NewTicker(time.Hour) for { select { case <-ctx.Done(): close(signalChan) return ErrSignalChanNotif + case <-ticker.C: + if len(requeueQueue) < 0 && len(noncesQueue) < queueSize { + nonce := <-requeueQueue + noncesQueue <- nonce + orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) + } case nonce := <-noncesQueue: orch.Logger.Info("processing nonce", "nonce", nonce) if err := orch.Process(ctx, nonce); err != nil { @@ -260,7 +274,7 @@ func (orch Orchestrator) ProcessNonces( return orch.Process(ctx, nonce) }); err != nil { orch.Logger.Error("error processing nonce even after retrying", "err", err.Error()) - go orch.MaybeRequeue(ctx, noncesQueue, nonce) + go orch.MaybeRequeue(ctx, requeueQueue, nonce) } } } @@ -268,15 +282,15 @@ func (orch Orchestrator) ProcessNonces( } // MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent. -func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) { +func (orch Orchestrator) MaybeRequeue(ctx context.Context, requeueQueue chan<- uint64, nonce uint64) { latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx) if err != nil { orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error()) return } if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow { - orch.Logger.Debug("requeuing nonce", "nonce", nonce) - noncesQueue <- nonce + orch.Logger.Debug("adding failed nonce to requeue queue", "nonce", nonce) + requeueQueue <- nonce } else { orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce) }