Skip to content
This repository was archived by the owner on Apr 15, 2024. It is now read-only.

feat: better requeue mechanism using a separate nonce channel #643

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
// After this window is elapsed, the nonce is discarded.
const RequeueWindow = 50

// The queue channel's size
const queueSize = 1000

type Orchestrator struct {
Logger tmlog.Logger // maybe use a more general interface

Expand Down Expand Up @@ -68,8 +71,11 @@ func New(

func (orch Orchestrator) Start(ctx context.Context) {
// contains the nonces that will be signed by the orchestrator.
noncesQueue := make(chan uint64, 100)
noncesQueue := make(chan uint64, queueSize)
defer close(noncesQueue)
// contains the failed nonces to be re-processed.
failedNoncesQueue := make(chan uint64, queueSize)
defer close(failedNoncesQueue)

// used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop.
signalChan := make(chan struct{})
Expand All @@ -92,7 +98,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.ProcessNonces(ctx, noncesQueue, signalChan)
err := orch.ProcessNonces(ctx, noncesQueue, failedNoncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error processing attestations", "err", err)
return
Expand Down Expand Up @@ -245,13 +251,24 @@ func (orch Orchestrator) EnqueueMissingEvents(
func (orch Orchestrator) ProcessNonces(
ctx context.Context,
noncesQueue chan uint64,
requeueQueue chan uint64,
signalChan chan<- struct{},
) error {
ticker := time.NewTicker(time.Hour)
for {
select {
case <-ctx.Done():
close(signalChan)
return ErrSignalChanNotif
case <-ticker.C:
if len(requeueQueue) > 0 && len(noncesQueue) < queueSize {
// The use of the go routine is to avoid blocking
go func() {
nonce := <-requeueQueue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just add this as a different case in the select/for loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want this to be blocking, that's why I added the condition. Also, we don't want to requeue every time so that we don't keep retry indefinitely the failing nonce, but instead do it once in an hour

Copy link
Member

@evan-forbes evan-forbes Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind explaining how will it block? won't it only get triggered is something is pushed through the channel to the select statement? If it will block if noncesQueue is full, then imo we should either increase that buffer or empty the channel faster. I could be missing something ofc too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps the emptying and filling of the channel should not be in the same select statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nonces queue is full and we run:

noncesQueue <- nonce

it will block, and at the same time, it won't be able to process nonces because it will be stuck here, hence the use of the go routine.

Also, we're using the ticker so that the nonsense don't get processed every time they fail directly so that we give them an hour of a delay before they get re-processed.

perhaps the emptying and filling of the channel should not be in the same select statement?

we could and it would be cleaner, but didn't want to add that extra complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just put the nonce in the queue when we attempt to retry instead of putting it in a channel, which eventually gets added to the nonce channel

or if we need the extra protection for some reason we can have a single goroutine that is connecting the two channels.

this could just be me being too nit picky, but spinning up a single goroutine to connect two channels on a timer feels like there's room for improvement

noncesQueue <- nonce
orch.Logger.Debug("failed nonce added to the nonces queue to be processed", "nonce", nonce)
}()
}
case nonce := <-noncesQueue:
orch.Logger.Info("processing nonce", "nonce", nonce)
if err := orch.Process(ctx, nonce); err != nil {
Expand All @@ -260,23 +277,23 @@ func (orch Orchestrator) ProcessNonces(
return orch.Process(ctx, nonce)
}); err != nil {
orch.Logger.Error("error processing nonce even after retrying", "err", err.Error())
go orch.MaybeRequeue(ctx, noncesQueue, nonce)
go orch.MaybeRequeue(ctx, requeueQueue, nonce)
}
}
}
}
}

// MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent.
func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) {
func (orch Orchestrator) MaybeRequeue(ctx context.Context, requeueQueue chan<- uint64, nonce uint64) {
latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx)
if err != nil {
orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error())
return
}
if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow {
orch.Logger.Debug("requeuing nonce", "nonce", nonce)
noncesQueue <- nonce
orch.Logger.Debug("adding failed nonce to requeue queue", "nonce", nonce)
requeueQueue <- nonce
} else {
orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce)
}
Expand Down