Skip to content

Commit

Permalink
Merge remote-tracking branch 'chainlink/develop' into main-repo-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Sep 29, 2023
2 parents 49016fb + 5233de3 commit e8bed46
Show file tree
Hide file tree
Showing 36 changed files with 1,878 additions and 314 deletions.
1 change: 1 addition & 0 deletions contracts/scripts/native_solc_compile_all_shared
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ compileContract () {

compileContract shared/token/ERC677/BurnMintERC677.sol
compileContract shared/token/ERC677/LinkToken.sol
compileContract shared/mocks/WERC20Mock.sol
compileContract vendor/openzeppelin-solidity/v4.8.0/contracts/token/ERC20/ERC20.sol
2 changes: 1 addition & 1 deletion core/chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod
if opts.GenLogPoller != nil {
logPoller = opts.GenLogPoller(chainID)
} else {
logPoller = logpoller.NewObservedLogPoller(logpoller.NewORM(chainID, db, l, cfg.Database()), client, l, cfg.EVM().LogPollInterval(), int64(cfg.EVM().FinalityDepth()), int64(cfg.EVM().LogBackfillBatchSize()), int64(cfg.EVM().RPCDefaultBatchSize()), int64(cfg.EVM().LogKeepBlocksDepth()))
logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, cfg.EVM().LogPollInterval(), int64(cfg.EVM().FinalityDepth()), int64(cfg.EVM().LogBackfillBatchSize()), int64(cfg.EVM().RPCDefaultBatchSize()), int64(cfg.EVM().LogKeepBlocksDepth()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type TestHarness struct {
Lggr logger.Logger
// Chain2/ORM2 is just a dummy second chain, doesn't have a client.
ChainID, ChainID2 *big.Int
ORM, ORM2 *logpoller.ORM
ORM, ORM2 *logpoller.DbORM
LogPoller logpoller.LogPollerTest
Client *backends.SimulatedBackend
Owner *bind.TransactOpts
Expand Down
30 changes: 15 additions & 15 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var (
type logPoller struct {
utils.StartStopOnce
ec Client
orm *ORM
orm ORM
lggr logger.Logger
pollPeriod time.Duration // poll period set by block production rate
finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized
Expand Down Expand Up @@ -119,7 +119,7 @@ type logPoller struct {
//
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) *logPoller {

return &logPoller{
Expand Down Expand Up @@ -676,7 +676,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx))
})
if err != nil {
Expand Down Expand Up @@ -747,7 +747,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
Expand Down Expand Up @@ -844,7 +844,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
return
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix())
err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, pg.WithQueryer(tx)); err2 != nil {
return err2
}
Expand Down Expand Up @@ -937,11 +937,11 @@ func (lp *logPoller) pruneOldBlocks(ctx context.Context) error {
// Logs returns logs matching topics and address (exactly) in the given block range,
// which are canonical at time of query.
func (lp *logPoller) Logs(start, end int64, eventSig common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsByBlockRangeFilter(start, end, address, eventSig, qopts...)
return lp.orm.SelectLogs(start, end, address, eventSig, qopts...)
}

func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsWithSigsByBlockRangeFilter(start, end, address, eventSigs, qopts...)
return lp.orm.SelectLogsWithSigs(start, end, address, eventSigs, qopts...)
}

func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
Expand All @@ -955,7 +955,7 @@ func (lp *logPoller) IndexedLogs(eventSig common.Hash, address common.Address, t

// IndexedLogsByBlockRange finds all the logs that have a topic value in topicValues at index topicIndex within the block range
func (lp *logPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexedLogsByBlockRangeFilter(start, end, address, eventSig, topicIndex, topicValues, qopts...)
return lp.orm.SelectIndexedLogsByBlockRange(start, end, address, eventSig, topicIndex, topicValues, qopts...)
}

func (lp *logPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
Expand All @@ -968,28 +968,28 @@ func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Has

// LogsDataWordGreaterThan note index is 0 based.
func (lp *logPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, confs, qopts...)
return lp.orm.SelectLogsDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, confs, qopts...)
}

// LogsDataWordRange note index is 0 based.
func (lp *logPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectDataWordRange(address, eventSig, wordIndex, wordValueMin, wordValueMax, confs, qopts...)
return lp.orm.SelectLogsDataWordRange(address, eventSig, wordIndex, wordValueMin, wordValueMax, confs, qopts...)
}

// IndexedLogsTopicGreaterThan finds all the logs that have a topic value greater than topicValueMin at index topicIndex.
// Only works for integer topics.
func (lp *logPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
return lp.orm.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
}

// LogsUntilBlockHashDataWordGreaterThan note index is 0 based.
// If the blockhash is not found (i.e. a stale fork) it will error.
func (lp *logPoller) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...)
return lp.orm.SelectLogsUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...)
}

func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...)
return lp.orm.SelectIndexedLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...)
}

// LatestBlock returns the latest block the log poller is on. It tracks blocks to be able
Expand All @@ -1009,15 +1009,15 @@ func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock,

// LatestLogByEventSigWithConfs finds the latest log that has confs number of blocks on top of the log.
func (lp *logPoller) LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) {
return lp.orm.SelectLatestLogEventSigWithConfs(eventSig, address, confs, qopts...)
return lp.orm.SelectLatestLogByEventSigWithConfs(eventSig, address, confs, qopts...)
}

func (lp *logPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLatestLogEventSigsAddrsWithConfs(fromBlock, addresses, eventSigs, confs, qopts...)
}

func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return lp.orm.SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
}

// GetBlocksRange tries to get the specified block numbers from the log pollers
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
)

// Validate that filters stored in log_filters_table match the filters stored in memory
func validateFiltersTable(t *testing.T, lp *logPoller, orm *ORM) {
func validateFiltersTable(t *testing.T, lp *logPoller, orm *DbORM) {
filters, err := orm.LoadFilters()
require.NoError(t, err)
require.Equal(t, len(filters), len(lp.filters))
Expand Down
8 changes: 4 additions & 4 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func logRuntime(t testing.TB, start time.Time) {
t.Log("runtime", time.Since(start))
}

func populateDatabase(t testing.TB, o *logpoller.ORM, chainID *big.Int) (common.Hash, common.Address, common.Address) {
func populateDatabase(t testing.TB, o *logpoller.DbORM, chainID *big.Int) (common.Hash, common.Address, common.Address) {
event1 := EmitterABI.Events["Log1"].ID
address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")
address2 := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E")
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestPopulateLoadedDB(t *testing.T) {

func() {
defer logRuntime(t, time.Now())
_, err1 := o.SelectLogsByBlockRangeFilter(750000, 800000, address1, event1)
_, err1 := o.SelectLogs(750000, 800000, address1, event1)
require.NoError(t, err1)
}()
func() {
Expand All @@ -123,7 +123,7 @@ func TestPopulateLoadedDB(t *testing.T) {
require.NoError(t, o.InsertBlock(common.HexToHash("0x10"), 1000000, time.Now()))
func() {
defer logRuntime(t, time.Now())
lgs, err1 := o.SelectDataWordRange(address1, event1, 0, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0)
lgs, err1 := o.SelectLogsDataWordRange(address1, event1, 0, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0)
require.NoError(t, err1)
// 10 since every other log is for address1
assert.Equal(t, 10, len(lgs))
Expand All @@ -138,7 +138,7 @@ func TestPopulateLoadedDB(t *testing.T) {

func() {
defer logRuntime(t, time.Now())
lgs, err1 := o.SelectIndexLogsTopicRange(address1, event1, 1, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0)
lgs, err1 := o.SelectIndexedLogsTopicRange(address1, event1, 1, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0)
require.NoError(t, err1)
assert.Equal(t, 10, len(lgs))
}()
Expand Down
Loading

0 comments on commit e8bed46

Please sign in to comment.