From 2e668372ac54e71fd357feba427ffacf0613bda2 Mon Sep 17 00:00:00 2001 From: Chunkai Yang Date: Wed, 8 May 2024 09:47:00 -0400 Subject: [PATCH] CCIP price cache use DB timestamp (#13133) * use statement timestamp * expiration uses db timestamp * changeset --- .changeset/poor-gorillas-give.md | 5 +++++ core/services/ccip/mocks/orm.go | 22 ++++++++++------------ core/services/ccip/orm.go | 26 ++++++++++++-------------- core/services/ccip/orm_test.go | 22 ++++++++++++---------- 4 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 .changeset/poor-gorillas-give.md diff --git a/.changeset/poor-gorillas-give.md b/.changeset/poor-gorillas-give.md new file mode 100644 index 00000000000..67128f654bf --- /dev/null +++ b/.changeset/poor-gorillas-give.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#changed CCIP price cache to use DB timestamp diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index b9afc6c8695..ea6b07dc637 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -8,8 +8,6 @@ import ( ccip "github.com/smartcontractkit/chainlink/v2/core/services/ccip" mock "github.com/stretchr/testify/mock" - - time "time" ) // ORM is an autogenerated mock type for the ORM type @@ -17,17 +15,17 @@ type ORM struct { mock.Mock } -// ClearGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, to -func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error { - ret := _m.Called(ctx, destChainSelector, to) +// ClearGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec +func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { + ret := _m.Called(ctx, destChainSelector, expireSec) if len(ret) == 0 { panic("no return value specified for ClearGasPricesByDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, time.Time) error); ok { - r0 = rf(ctx, destChainSelector, to) + if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { + r0 = rf(ctx, destChainSelector, expireSec) } else { r0 = ret.Error(0) } @@ -35,17 +33,17 @@ func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector return r0 } -// ClearTokenPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, to -func (_m *ORM) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error { - ret := _m.Called(ctx, destChainSelector, to) +// ClearTokenPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec +func (_m *ORM) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { + ret := _m.Called(ctx, destChainSelector, expireSec) if len(ret) == 0 { panic("no return value specified for ClearTokenPricesByDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, time.Time) error); ok { - r0 = rf(ctx, destChainSelector, to) + if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { + r0 = rf(ctx, destChainSelector, expireSec) } else { r0 = ret.Error(0) } diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index 8af7762b18d..6c21520d435 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -40,8 +40,8 @@ type ORM interface { InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error - ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error - ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error + ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error + ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error } type orm struct { @@ -99,7 +99,6 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector return nil } - now := time.Now() insertData := make([]map[string]interface{}, 0, len(gasPrices)) for _, price := range gasPrices { insertData = append(insertData, map[string]interface{}{ @@ -107,12 +106,12 @@ func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector "job_id": jobId, "source_chain_selector": price.SourceChainSelector, "gas_price": price.GasPrice, - "created_at": now, }) } + // using statement_timestamp() to make testing easier stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at) - VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, :created_at);` + VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err) @@ -126,7 +125,6 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect return nil } - now := time.Now() insertData := make([]map[string]interface{}, 0, len(tokenPrices)) for _, price := range tokenPrices { insertData = append(insertData, map[string]interface{}{ @@ -134,12 +132,12 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect "job_id": jobId, "token_addr": price.TokenAddr, "token_price": price.TokenPrice, - "created_at": now, }) } + // using statement_timestamp() to make testing easier stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at) - VALUES (:chain_selector, :job_id, :token_addr, :token_price, :created_at);` + VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err) @@ -148,16 +146,16 @@ func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelect return err } -func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error { - stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < $2` +func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { + stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to) + _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) return err } -func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, to time.Time) error { - stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < $2` +func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { + stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, to) + _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) return err } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index 741cf4b5b38..7b7b8d82710 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -212,7 +212,8 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { assert.NoError(t, err) } - interimTimeStamp := time.Now() + sleepSec := 2 + time.Sleep(time.Duration(sleepSec) * time.Second) // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { @@ -222,13 +223,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - // clear by interimTimeStamp should delete rows inserted before it - err := orm.ClearGasPricesByDestChain(ctx, destSelector, interimTimeStamp) + // clear by sleepSec should delete rows inserted before it + err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec) assert.NoError(t, err) assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - // clear by Now() should delete all rows - err = orm.ClearGasPricesByDestChain(ctx, destSelector, time.Now()) + // clear by 0 expiration seconds should delete all rows + err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0) assert.NoError(t, err) assert.Equal(t, 0, getGasTableRowCount(t, db)) } @@ -324,7 +325,8 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { assert.NoError(t, err) } - interimTimeStamp := time.Now() + sleepSec := 2 + time.Sleep(time.Duration(sleepSec) * time.Second) // insert for the 2nd time after interimTimeStamp for _, updatesPerAddr := range updates { @@ -334,13 +336,13 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - // clear by interimTimeStamp should delete rows inserted before it - err := orm.ClearTokenPricesByDestChain(ctx, destSelector, interimTimeStamp) + // clear by sleepSec should delete rows inserted before it + err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec) assert.NoError(t, err) assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - // clear by Now() should delete all rows - err = orm.ClearTokenPricesByDestChain(ctx, destSelector, time.Now()) + // clear by 0 expiration seconds should delete all rows + err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0) assert.NoError(t, err) assert.Equal(t, 0, getTokenTableRowCount(t, db)) }