From 9226b2e76b994cbf65132650d114fb2435718144 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Wed, 6 Dec 2023 14:48:35 +0100 Subject: [PATCH] feat: better requeue mechanism using a separate nonce channel (#643) * feat: better requeue mechanism using a separate nonce * Fix: correct condition * Fix: correct go routine --- orchestrator/orchestrator.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index b3b97f40..7013a904 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -31,6 +31,9 @@ import ( // After this window is elapsed, the nonce is discarded. const RequeueWindow = 50 +// The queue channel's size +const queueSize = 1000 + type Orchestrator struct { Logger tmlog.Logger // maybe use a more general interface @@ -68,8 +71,11 @@ func New( func (orch Orchestrator) Start(ctx context.Context) { // contains the nonces that will be signed by the orchestrator. - noncesQueue := make(chan uint64, 100) + noncesQueue := make(chan uint64, queueSize) defer close(noncesQueue) + // contains the failed nonces to be re-processed. + failedNoncesQueue := make(chan uint64, queueSize) + defer close(failedNoncesQueue) // used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop. signalChan := make(chan struct{}) @@ -92,7 +98,7 @@ func (orch Orchestrator) Start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - err := orch.ProcessNonces(ctx, noncesQueue, signalChan) + err := orch.ProcessNonces(ctx, noncesQueue, failedNoncesQueue, signalChan) if err != nil { orch.Logger.Error("error processing attestations", "err", err) return @@ -245,13 +251,24 @@ func (orch Orchestrator) EnqueueMissingEvents( func (orch Orchestrator) ProcessNonces( ctx context.Context, noncesQueue chan uint64, + requeueQueue chan uint64, signalChan chan<- struct{}, ) error { + ticker := time.NewTicker(time.Hour) for { select { case <-ctx.Done(): close(signalChan) return ErrSignalChanNotif + case <-ticker.C: + if len(requeueQueue) > 0 && len(noncesQueue) < queueSize { + // The use of the go routine is to avoid blocking + go func() { + nonce := <-requeueQueue + noncesQueue <- nonce + orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce) + }() + } case nonce := <-noncesQueue: orch.Logger.Info("processing nonce", "nonce", nonce) if err := orch.Process(ctx, nonce); err != nil { @@ -260,7 +277,7 @@ func (orch Orchestrator) ProcessNonces( return orch.Process(ctx, nonce) }); err != nil { orch.Logger.Error("error processing nonce even after retrying", "err", err.Error()) - go orch.MaybeRequeue(ctx, noncesQueue, nonce) + go orch.MaybeRequeue(ctx, requeueQueue, nonce) } } } @@ -268,15 +285,15 @@ func (orch Orchestrator) ProcessNonces( } // MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent. -func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) { +func (orch Orchestrator) MaybeRequeue(ctx context.Context, requeueQueue chan<- uint64, nonce uint64) { latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx) if err != nil { orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error()) return } if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow { - orch.Logger.Debug("requeuing nonce", "nonce", nonce) - noncesQueue <- nonce + orch.Logger.Debug("adding failed nonce to requeue queue", "nonce", nonce) + requeueQueue <- nonce } else { orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce) }