diff --git a/cmd/blobstream/orchestrator/cmd.go b/cmd/blobstream/orchestrator/cmd.go index bf4ea780..4d549f8a 100644 --- a/cmd/blobstream/orchestrator/cmd.go +++ b/cmd/blobstream/orchestrator/cmd.go @@ -122,7 +122,7 @@ func Start() *cobra.Command { // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) - retrier := helpers.NewRetrier(logger, 6, time.Minute) + retrier := helpers.NewRetrier(logger, 5, 30*time.Second) // creating the broadcaster broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT) diff --git a/e2e/scripts/start_orchestrator_after_validator_created.sh b/e2e/scripts/start_orchestrator_after_validator_created.sh index 8f8fe0bb..5188bdd3 100644 --- a/e2e/scripts/start_orchestrator_after_validator_created.sh +++ b/e2e/scripts/start_orchestrator_after_validator_created.sh @@ -54,7 +54,8 @@ then --grpc.insecure \ --p2p.nickname=key \ --p2p.listen-addr="${P2P_LISTEN}" \ - --evm.passphrase=123 + --evm.passphrase=123 \ + --log.level debug else # to give time for the bootstrappers to be up sleep 5s @@ -66,5 +67,6 @@ else --grpc.insecure \ --p2p.listen-addr="${P2P_LISTEN}" \ --p2p.bootstrappers="${P2P_BOOTSTRAPPERS}" \ - --evm.passphrase=123 + --evm.passphrase=123 \ + --log.level debug fi diff --git a/helpers/retrier.go b/helpers/retrier.go index 3819fa44..9a05e82a 100644 --- a/helpers/retrier.go +++ b/helpers/retrier.go @@ -37,13 +37,13 @@ func (r Retrier) Retry(ctx context.Context, retryMethod func() error) error { case <-ctx.Done(): return ctx.Err() case <-nextTick.C: - r.logger.Info("retrying", "retry_number", i, "retries_left", r.retriesNumber-i) + r.logger.Debug("retrying", "retry_number", i, "retries_left", r.retriesNumber-i) err = retryMethod() if err == nil { r.logger.Info("succeeded", "retries_number", i) return nil } - r.logger.Error("failed attempt", "retry", i, "err", err) + r.logger.Debug("failed attempt", "retry", i, "err", err) } } return err diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 51966b72..6da2989a 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -27,6 +27,10 @@ import ( coretypes "github.com/tendermint/tendermint/types" ) +// RequeueWindow the number of nonces that we want to re-enqueue if we can't process them even after retry. +// After this window is elapsed, the nonce is discarded. +const RequeueWindow = 50 + type Orchestrator struct { Logger tmlog.Logger // maybe use a more general interface @@ -70,18 +74,16 @@ func (orch Orchestrator) Start(ctx context.Context) { // used to send a signal when the nonces processor wants to notify the nonces enqueuing services to stop. signalChan := make(chan struct{}) - withCancel, cancel := context.WithCancel(ctx) - wg := &sync.WaitGroup{} // go routine to listen for new attestation nonces wg.Add(1) go func() { defer wg.Done() - err := orch.StartNewEventsListener(withCancel, noncesQueue, signalChan) + err := orch.StartNewEventsListener(ctx, noncesQueue, signalChan) if err != nil { orch.Logger.Error("error listening to new attestations", "err", err) - cancel() + return } orch.Logger.Info("stopping listening to new attestations") }() @@ -90,10 +92,10 @@ func (orch Orchestrator) Start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - err := orch.ProcessNonces(withCancel, noncesQueue, signalChan) + err := orch.ProcessNonces(ctx, noncesQueue, signalChan) if err != nil { orch.Logger.Error("error processing attestations", "err", err) - cancel() + return } orch.Logger.Info("stopping processing attestations") }() @@ -102,10 +104,10 @@ func (orch Orchestrator) Start(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - err := orch.EnqueueMissingEvents(withCancel, noncesQueue, signalChan) + err := orch.EnqueueMissingEvents(ctx, noncesQueue, signalChan) if err != nil { orch.Logger.Error("error enqueuing missing attestations", "err", err) - cancel() + return } }() @@ -241,7 +243,7 @@ func (orch Orchestrator) EnqueueMissingEvents( func (orch Orchestrator) ProcessNonces( ctx context.Context, - noncesQueue <-chan uint64, + noncesQueue chan uint64, signalChan chan<- struct{}, ) error { for { @@ -256,14 +258,29 @@ func (orch Orchestrator) ProcessNonces( if err := orch.Retrier.Retry(ctx, func() error { return orch.Process(ctx, nonce) }); err != nil { - close(signalChan) - return err + orch.Logger.Error("error processing nonce even after retrying", "err", err.Error()) + go orch.MaybeRequeue(ctx, noncesQueue, nonce) } } } } } +// MaybeRequeue requeue the nonce to be re-processed subsequently if it's recent. +func (orch Orchestrator) MaybeRequeue(ctx context.Context, noncesQueue chan<- uint64, nonce uint64) { + latestNonce, err := orch.AppQuerier.QueryLatestAttestationNonce(ctx) + if err != nil { + orch.Logger.Debug("error requeuing nonce", "nonce", nonce, "err", err.Error()) + return + } + if latestNonce <= RequeueWindow || nonce >= latestNonce-RequeueWindow { + orch.Logger.Debug("requeuing nonce", "nonce", nonce) + noncesQueue <- nonce + } else { + orch.Logger.Debug("nonce is too old, will not retry it in the future", "nonce", nonce) + } +} + func (orch Orchestrator) Process(ctx context.Context, nonce uint64) error { att, err := orch.AppQuerier.QueryAttestationByNonce(ctx, nonce) if err != nil {