From 2afd9c259658f2a4d857493a92059bfd16608603 Mon Sep 17 00:00:00 2001 From: ferglor <19188060+ferglor@users.noreply.github.com> Date: Mon, 4 Sep 2023 19:16:03 +0100 Subject: [PATCH] Return all logs even if seen before (#10473) --- .../evm21/transmit/event_provider.go | 84 ++++++++++--------- 1 file changed, 43 insertions(+), 41 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go index eccc9304d99..a21cdd25eb0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go @@ -170,52 +170,54 @@ func (c *TransmitEventProvider) processLogs(latestBlock int64, logs ...logpoller // ensure we don't have duplicates continue } - if _, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k); ok { - // ensure we return only unseen logs - continue - } - l, err := c.parseLog(c.registry, log) - if err != nil { - c.logger.Debugw("failed to parse log", "err", err) - continue - } - id := l.Id() - upkeepId := &ocr2keepers.UpkeepIdentifier{} - ok := upkeepId.FromBigInt(id) + + transmitEvent, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k) if !ok { - return nil, core.ErrInvalidUpkeepID + l, err := c.parseLog(c.registry, log) + if err != nil { + c.logger.Debugw("failed to parse log", "err", err) + continue + } + id := l.Id() + upkeepId := &ocr2keepers.UpkeepIdentifier{} + ok := upkeepId.FromBigInt(id) + if !ok { + return nil, core.ErrInvalidUpkeepID + } + triggerW, err := core.UnpackTrigger(id, l.Trigger()) + if err != nil { + return nil, fmt.Errorf("%w: failed to unpack trigger", err) + } + trigger := ocr2keepers.NewTrigger( + ocr2keepers.BlockNumber(triggerW.BlockNum), + triggerW.BlockHash, + ) + switch core.GetUpkeepType(*upkeepId) { + case ocr2keepers.LogTrigger: + trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{} + trigger.LogTriggerExtension.TxHash = triggerW.TxHash + trigger.LogTriggerExtension.Index = triggerW.LogIndex + default: + } + workID := core.UpkeepWorkID(*upkeepId, trigger) + transmitEvent = ocr2keepers.TransmitEvent{ + Type: l.TransmitEventType(), + TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber), + TransactionHash: l.TxHash, + WorkID: workID, + UpkeepID: *upkeepId, + CheckBlock: trigger.BlockNumber, + } } - triggerW, err := core.UnpackTrigger(id, l.Trigger()) - if err != nil { - return nil, fmt.Errorf("%w: failed to unpack trigger", err) - } - trigger := ocr2keepers.NewTrigger( - ocr2keepers.BlockNumber(triggerW.BlockNum), - triggerW.BlockHash, - ) - switch core.GetUpkeepType(*upkeepId) { - case ocr2keepers.LogTrigger: - trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{} - trigger.LogTriggerExtension.TxHash = triggerW.TxHash - trigger.LogTriggerExtension.Index = triggerW.LogIndex - default: - } - workID := core.UpkeepWorkID(*upkeepId, trigger) - e := ocr2keepers.TransmitEvent{ - Type: l.TransmitEventType(), - TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber), - Confirmations: latestBlock - l.BlockNumber, - TransactionHash: l.TxHash, - WorkID: workID, - UpkeepID: *upkeepId, - CheckBlock: trigger.BlockNumber, - } - vals = append(vals, e) - visited[k] = e + + transmitEvent.Confirmations = latestBlock - int64(transmitEvent.TransmitBlock) + + vals = append(vals, transmitEvent) + visited[k] = transmitEvent } // adding to the cache only after we've processed all the logs - // the next time we call processLogs we don't want to return these logs + // the next time we call processLogs we don't want to process these logs for k, e := range visited { c.cache.add(k, e) }