diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 88c3113abc6..557210e58a5 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -33,6 +33,7 @@ import ( var ( maxTransmitQueueSize = 10_000 + maxDeleteQueueSize = 10_000 transmitTimeout = 5 * time.Second ) @@ -60,6 +61,24 @@ var ( }, []string{"feedID"}, ) + transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_delete_error_count", + Help: "Running count of DB errors when trying to delete an item from the queue DB", + }, + []string{"feedID"}, + ) + transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_insert_error_count", + Help: "Running count of DB errors when trying to insert an item into the queue DB", + }, + []string{"feedID"}, + ) + transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mercury_transmit_queue_push_error_count", + Help: "Running count of DB errors when trying to push an item onto the queue", + }, + []string{"feedID"}, + ) transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "mercury_transmit_server_error_count", Help: "Number of errored transmissions that failed due to an error returned by the mercury server", @@ -99,9 +118,14 @@ type mercuryTransmitter struct { queue *TransmitQueue wg sync.WaitGroup - transmitSuccessCount prometheus.Counter - transmitDuplicateCount prometheus.Counter - transmitConnectionErrorCount prometheus.Counter + deleteQueue chan *pb.TransmitRequest + + transmitSuccessCount prometheus.Counter + transmitDuplicateCount prometheus.Counter + transmitConnectionErrorCount prometheus.Counter + transmitQueueDeleteErrorCount prometheus.Counter + transmitQueueInsertErrorCount prometheus.Counter + transmitQueuePushErrorCount prometheus.Counter } var PayloadTypes = getPayloadTypes() @@ -139,9 +163,13 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp make(chan (struct{})), nil, sync.WaitGroup{}, + make(chan *pb.TransmitRequest, maxDeleteQueueSize), transmitSuccessCount.WithLabelValues(feedIDHex), transmitDuplicateCount.WithLabelValues(feedIDHex), transmitConnectionErrorCount.WithLabelValues(feedIDHex), + transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex), + transmitQueueInsertErrorCount.WithLabelValues(feedIDHex), + transmitQueuePushErrorCount.WithLabelValues(feedIDHex), } } @@ -164,6 +192,8 @@ func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) { return err } mt.wg.Add(1) + go mt.runDeleteQueueLoop() + mt.wg.Add(1) go mt.runQueueLoop() return nil }) @@ -192,6 +222,46 @@ func (mt *mercuryTransmitter) HealthReport() map[string]error { return report } +func (mt *mercuryTransmitter) runDeleteQueueLoop() { + defer mt.wg.Done() + runloopCtx, cancel := mt.stopCh.Ctx(context.Background()) + defer cancel() + + // Exponential backoff for very rarely occurring errors (DB disconnect etc) + b := backoff.Backoff{ + Min: 1 * time.Second, + Max: 120 * time.Second, + Factor: 2, + Jitter: true, + } + + for { + select { + case req := <-mt.deleteQueue: + for { + if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil { + mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req) + mt.transmitQueueDeleteErrorCount.Inc() + select { + case <-time.After(b.Duration()): + // Wait a backoff duration before trying to delete again + continue + case <-mt.stopCh: + // abort and return immediately on stop even if items remain in queue + return + } + } + break + } + // success + b.Reset() + case <-mt.stopCh: + // abort and return immediately on stop even if items remain in queue + return + } + } +} + func (mt *mercuryTransmitter) runQueueLoop() { defer mt.wg.Done() // Exponential backoff with very short retry interval (since latency is a priority) @@ -253,9 +323,10 @@ func (mt *mercuryTransmitter) runQueueLoop() { } } - if err := mt.persistenceManager.Delete(runloopCtx, t.Req); err != nil { - mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "reportCtx", t.ReportCtx) - return + select { + case mt.deleteQueue <- t.Req: + default: + mt.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx) } } } @@ -288,9 +359,11 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R mt.lggr.Tracew("Transmit enqueue", "req", req, "report", report, "reportCtx", reportCtx, "signatures", signatures) if err := mt.persistenceManager.Insert(ctx, req, reportCtx); err != nil { + mt.transmitQueueInsertErrorCount.Inc() return err } if ok := mt.queue.Push(req, reportCtx); !ok { + mt.transmitQueuePushErrorCount.Inc() return errors.New("transmit queue is closed") } return nil diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a9f9d080f42..a10f9dd1c6d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`. +- New prom metrics for mercury: + `mercury_transmit_queue_delete_error_count` + `mercury_transmit_queue_insert_error_count` + `mercury_transmit_queue_push_error_count` + Nops should consider alerting on these. ### Changed