Skip to content

Commit

Permalink
Return all logs even if seen before (#10473)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor authored Sep 4, 2023
1 parent 200970a commit 2afd9c2
Showing 1 changed file with 43 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,52 +170,54 @@ func (c *TransmitEventProvider) processLogs(latestBlock int64, logs ...logpoller
// ensure we don't have duplicates
continue
}
if _, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k); ok {
// ensure we return only unseen logs
continue
}
l, err := c.parseLog(c.registry, log)
if err != nil {
c.logger.Debugw("failed to parse log", "err", err)
continue
}
id := l.Id()
upkeepId := &ocr2keepers.UpkeepIdentifier{}
ok := upkeepId.FromBigInt(id)

transmitEvent, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k)
if !ok {
return nil, core.ErrInvalidUpkeepID
l, err := c.parseLog(c.registry, log)
if err != nil {
c.logger.Debugw("failed to parse log", "err", err)
continue
}
id := l.Id()
upkeepId := &ocr2keepers.UpkeepIdentifier{}
ok := upkeepId.FromBigInt(id)
if !ok {
return nil, core.ErrInvalidUpkeepID
}
triggerW, err := core.UnpackTrigger(id, l.Trigger())
if err != nil {
return nil, fmt.Errorf("%w: failed to unpack trigger", err)
}
trigger := ocr2keepers.NewTrigger(
ocr2keepers.BlockNumber(triggerW.BlockNum),
triggerW.BlockHash,
)
switch core.GetUpkeepType(*upkeepId) {
case ocr2keepers.LogTrigger:
trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{}
trigger.LogTriggerExtension.TxHash = triggerW.TxHash
trigger.LogTriggerExtension.Index = triggerW.LogIndex
default:
}
workID := core.UpkeepWorkID(*upkeepId, trigger)
transmitEvent = ocr2keepers.TransmitEvent{
Type: l.TransmitEventType(),
TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber),
TransactionHash: l.TxHash,
WorkID: workID,
UpkeepID: *upkeepId,
CheckBlock: trigger.BlockNumber,
}
}
triggerW, err := core.UnpackTrigger(id, l.Trigger())
if err != nil {
return nil, fmt.Errorf("%w: failed to unpack trigger", err)
}
trigger := ocr2keepers.NewTrigger(
ocr2keepers.BlockNumber(triggerW.BlockNum),
triggerW.BlockHash,
)
switch core.GetUpkeepType(*upkeepId) {
case ocr2keepers.LogTrigger:
trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{}
trigger.LogTriggerExtension.TxHash = triggerW.TxHash
trigger.LogTriggerExtension.Index = triggerW.LogIndex
default:
}
workID := core.UpkeepWorkID(*upkeepId, trigger)
e := ocr2keepers.TransmitEvent{
Type: l.TransmitEventType(),
TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber),
Confirmations: latestBlock - l.BlockNumber,
TransactionHash: l.TxHash,
WorkID: workID,
UpkeepID: *upkeepId,
CheckBlock: trigger.BlockNumber,
}
vals = append(vals, e)
visited[k] = e

transmitEvent.Confirmations = latestBlock - int64(transmitEvent.TransmitBlock)

vals = append(vals, transmitEvent)
visited[k] = transmitEvent
}

// adding to the cache only after we've processed all the logs
// the next time we call processLogs we don't want to return these logs
// the next time we call processLogs we don't want to process these logs
for k, e := range visited {
c.cache.add(k, e)
}
Expand Down

0 comments on commit 2afd9c2

Please sign in to comment.