Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: orchestrator not failing and requeing failed nonces
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Nov 30, 2023
1 parent 071cba2 commit e9a53f9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/blobstream/orchestrator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func Start() *cobra.Command {

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)
retrier := helpers.NewRetrier(logger, 5, 30*time.Second)

defer func() {
for _, f := range stopFuncs {
Expand Down
6 changes: 4 additions & 2 deletions e2e/scripts/start_orchestrator_after_validator_created.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ then
--grpc.insecure \
--p2p.nickname=key \
--p2p.listen-addr="${P2P_LISTEN}" \
--evm.passphrase=123
--evm.passphrase=123 \
--log.level debug
else
# to give time for the bootstrappers to be up
sleep 5s
Expand All @@ -66,5 +67,6 @@ else
--grpc.insecure \
--p2p.listen-addr="${P2P_LISTEN}" \
--p2p.bootstrappers="${P2P_BOOTSTRAPPERS}" \
--evm.passphrase=123
--evm.passphrase=123 \
--log.level debug
fi
4 changes: 2 additions & 2 deletions helpers/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (r Retrier) Retry(ctx context.Context, retryMethod func() error) error {
case <-ctx.Done():
return ctx.Err()
case <-nextTick.C:
r.logger.Info("retrying", "retry_number", i, "retries_left", r.retriesNumber-i)
r.logger.Debug("retrying", "retry_number", i, "retries_left", r.retriesNumber-i)
err = retryMethod()
if err == nil {
r.logger.Info("succeeded", "retries_number", i)
return nil
}
r.logger.Error("failed attempt", "retry", i, "err", err)
r.logger.Debug("failed attempt", "retry", i, "err", err)
}
}
return err
Expand Down
39 changes: 28 additions & 11 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
coretypes "github.com/tendermint/tendermint/types"
)

// RequeueWindow the number of nonces that we want to re-enqueue if we can't process them even after retry.
// After this window is elapsed, the nonce is discarded.
const RequeueWindow = 50

type Orchestrator struct {
Logger tmlog.Logger // maybe use a more general interface

Expand Down Expand Up @@ -69,18 +73,16 @@ func (orch Orchestrator) Start(ctx context.Context) {
// used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop.
signalChan := make(chan struct{})

withCancel, cancel := context.WithCancel(ctx)

wg := &sync.WaitGroup{}

// go routine to listen for new attestation nonces
wg.Add(1)
go func() {
defer wg.Done()
err := orch.StartNewEventsListener(withCancel, noncesQueue, signalChan)
err := orch.StartNewEventsListener(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error listening to new attestations", "err", err)
cancel()
return
}
orch.Logger.Error("stopping listening to new attestations")
}()
Expand All @@ -89,10 +91,10 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.ProcessNonces(withCancel, noncesQueue, signalChan)
err := orch.ProcessNonces(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error processing attestations", "err", err)
cancel()
return
}
orch.Logger.Error("stopping processing attestations")
}()
Expand All @@ -101,10 +103,10 @@ func (orch Orchestrator) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
err := orch.EnqueueMissingEvents(withCancel, noncesQueue, signalChan)
err := orch.EnqueueMissingEvents(ctx, noncesQueue, signalChan)
if err != nil {
orch.Logger.Error("error enqueuing missing attestations", "err", err)
cancel()
return
}
}()

Expand Down Expand Up @@ -237,7 +239,7 @@ func (orch Orchestrator) EnqueueMissingEvents(

func (orch Orchestrator) ProcessNonces(
ctx context.Context,
noncesQueue <-chan uint64,
noncesQueue chan uint64,
signalChan chan<- struct{},
) error {
for {
Expand All @@ -252,14 +254,29 @@ func (orch Orchestrator) ProcessNonces(
if err := orch.Retrier.Retry(ctx, func() error {
return orch.Process(ctx, nonce)
}); err != nil {
close(signalChan)
return err
orch.Logger.Error("error processing nonce even after retrying", "err", err.Error())
go orch.MaybeRequeue(ctx, noncesQueue, nonce)
}
}
}
}
}

// MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent.
func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) {
latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx)
if err != nil {
orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error())
return
}
if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow {
orch.Logger.Debug("requeuing nonce", "nonce", nonce)
noncesQueue <- nonce
} else {
orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce)
}
}

func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error {
att, err := orch.AppQuerier.QueryAttestationByNonce(ctx, nonce)
if err != nil {
Expand Down

0 comments on commit e9a53f9

Please sign in to comment.