Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: better requeue mechanism using a separate nonce
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Dec 4, 2023
1 parent d3131ed commit 12cb6d9
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
// After this window is elapsed, the nonce is discarded.
const RequeueWindow = 50

// The queue channel's size
const queueSize = 1000

type Orchestrator struct {
Logger tmlog.Logger // maybe use a more general interface

Expand Down Expand Up @@ -68,8 +71,11 @@ func New(

func (orch Orchestrator) Start(ctx context.Context) {
// contains the nonces that will be signed by the orchestrator.
noncesQueue := make(chan uint64, 100)
noncesQueue := make(chan uint64, queueSize)
defer close(noncesQueue)
// contains the failed nonces to be re-processed.
failedNoncesQueue := make(chan uint64, queueSize)
defer close(failedNoncesQueue)

// used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop.
signalChan := make(chan struct{})
Expand All @@ -92,7 +98,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.ProcessNonces(ctx, noncesQueue, signalChan)
err := orch.ProcessNonces(ctx, noncesQueue, failedNoncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error processing attestations", "err", err)
return
Expand Down Expand Up @@ -245,13 +251,21 @@ func (orch Orchestrator) EnqueueMissingEvents(
func (orch Orchestrator) ProcessNonces(
ctx context.Context,
noncesQueue chan uint64,
requeueQueue chan uint64,
signalChan chan<- struct{},
) error {
ticker := time.NewTicker(time.Hour)
for {
select {
case <-ctx.Done():
close(signalChan)
return ErrSignalChanNotif
case <-ticker.C:
if len(requeueQueue) < 0 && len(noncesQueue) < queueSize {

Check failure on line 264 in orchestrator/orchestrator.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

SA4024: builtin function len does not return negative values (staticcheck)
nonce := <-requeueQueue
noncesQueue <- nonce
orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce)
}
case nonce := <-noncesQueue:
orch.Logger.Info("processing nonce", "nonce", nonce)
if err := orch.Process(ctx, nonce); err != nil {
Expand All @@ -260,23 +274,23 @@ func (orch Orchestrator) ProcessNonces(
return orch.Process(ctx, nonce)
}); err != nil {
orch.Logger.Error("error processing nonce even after retrying", "err", err.Error())
go orch.MaybeRequeue(ctx, noncesQueue, nonce)
go orch.MaybeRequeue(ctx, requeueQueue, nonce)
}
}
}
}
}

// MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent.
func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) {
func (orch Orchestrator) MaybeRequeue(ctx context.Context, requeueQueue chan<- uint64, nonce uint64) {
latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx)
if err != nil {
orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error())
return
}
if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow {
orch.Logger.Debug("requeuing nonce", "nonce", nonce)
noncesQueue <- nonce
orch.Logger.Debug("adding failed nonce to requeue queue", "nonce", nonce)
requeueQueue <- nonce
} else {
orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce)
}
Expand Down

0 comments on commit 12cb6d9

Please sign in to comment.