Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1716 Adding retention to filters used by LogPoller #530

Merged
merged 10 commits into from
Mar 20, 2024
19 changes: 19 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/ccipdata/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ccipdata

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/core/types"

Expand All @@ -18,6 +19,24 @@ const (
V1_5_0 = "1.5.0-dev"
)

const (
// CommitExecLogsRetention defines the duration for which logs critical for Commit/Exec plugins processing are retained.
// Although Exec relies on permissionlessExecThreshold which is lower than 24hours for picking eligible CommitRoots,
// Commit still can reach to older logs because it filters them by sequence numbers. For instance, in case of RMN curse on chain,
// we might have logs waiting in OnRamp to be committed first. When outage takes days we still would
// be able to bring back processing without replaying any logs from chain. You can read that param as
// "how long CCIP can be down and still be able to process all the messages after getting back to life".
// Breaching this threshold would require replaying chain using LogPoller from the beginning of the outage.
CommitExecLogsRetention = 30 * 24 * time.Hour // 30 days
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
// CacheEvictionLogsRetention defines the duration for which logs used for caching on-chain data are kept.
// Restarting node clears the cache entirely and rebuilds it from scratch by fetching data from chain,
// so we don't need to keep these logs for very long. All events relying on cache.NewLogpollerEventsBased should use this retention.
CacheEvictionLogsRetention = 7 * 24 * time.Hour // 7 days
// PriceUpdatesLogsRetention defines the duration for which logs with price updates are kept.
// These logs are emitted whenever the token price or gas price is updated and Commit scans very small time windows (e.g. 2 hours)
PriceUpdatesLogsRetention = 1 * 24 * time.Hour // 1 day
)

type Event[T any] struct {
Data T
cciptypes.TxMeta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewUSDCReader(lggr logger.Logger, jobID string, transmitter common.Address,
Name: logpoller.FilterName(MESSAGE_SENT_FILTER_NAME, jobID, transmitter.Hex()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{transmitter},
Retention: CommitExecLogsRetention,
},
transmitterAddress: transmitter,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -386,6 +387,7 @@ func NewCommitStore(lggr logger.Logger, addr common.Address, ec client.Client, l
Name: logpoller.FilterName(EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
}
return &CommitStore{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,6 @@ func (o *OffRamp) DecodeExecutionReport(report []byte) (cciptypes.ExecReport, er
return DecodeExecReport(o.ExecutionReportArgs, report)
}

func (o *OffRamp) TokenEvents() []common.Hash {
return offRamp_poolAddedPoolRemovedEvents
}

func (o *OffRamp) RegisterFilters(qopts ...pg.QOpt) error {
return logpollerutil.RegisterLpFilters(o.lp, o.filters, qopts...)
}
Expand All @@ -628,16 +624,19 @@ func NewOffRamp(lggr logger.Logger, addr common.Address, ec client.Client, lp lo
Name: logpoller.FilterName(EXEC_EXECUTION_STATE_CHANGES, addr.String()),
EventSigs: []common.Hash{ExecutionStateChangedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
{
Name: logpoller.FilterName(EXEC_TOKEN_POOL_ADDED, addr.String()),
EventSigs: []common.Hash{PoolAddedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
{
Name: logpoller.FilterName(EXEC_TOKEN_POOL_REMOVED, addr.String()),
EventSigs: []common.Hash{PoolRemovedEvent},
Addresses: []common.Address{addr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
}
return &OnRamp{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ func NewPriceRegistry(lggr logger.Logger, priceRegistryAddr common.Address, lp l
Name: logpoller.FilterName(ccipdata.COMMIT_PRICE_UPDATES, priceRegistryAddr.String()),
EventSigs: []common.Hash{UsdPerUnitGasUpdated, usdPerTokenUpdated},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.PriceUpdatesLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.FEE_TOKEN_ADDED, priceRegistryAddr.String()),
EventSigs: []common.Hash{feeTokenAdded},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.CacheEvictionLogsRetention,
},
{
Name: logpoller.FilterName(ccipdata.FEE_TOKEN_REMOVED, priceRegistryAddr.String()),
EventSigs: []common.Hash{feeTokenRemoved},
Addresses: []common.Address{priceRegistryAddr},
Retention: ccipdata.CacheEvictionLogsRetention,
}}
if registerFilters {
err = logpollerutil.RegisterLpFilters(lp, filters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -412,6 +413,7 @@ func NewCommitStore(lggr logger.Logger, addr common.Address, ec client.Client, l
Name: logpoller.FilterName(v1_0_0.EXEC_REPORT_ACCEPTS, addr.String()),
EventSigs: []common.Hash{eventSig},
Addresses: []common.Address{addr},
Retention: ccipdata.CommitExecLogsRetention,
},
}
lggr.Infow("Initializing CommitStore with estimator", "estimator", estimator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{CCIPSendRequestEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
}
return &OnRamp{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func NewOnRamp(lggr logger.Logger, sourceSelector, destSelector uint64, onRampAd
Name: logpoller.FilterName(ccipdata.COMMIT_CCIP_SENDS, onRampAddress),
EventSigs: []common.Hash{CCIPSendRequestEventSig},
Addresses: []common.Address{onRampAddress},
Retention: ccipdata.CommitExecLogsRetention,
},
}
return &OnRamp{
Expand Down
Loading