diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index 8a987c21602..35ca2353bfc 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -23,102 +23,6 @@ func (_m *ORM) EXPECT() *ORM_Expecter { return &ORM_Expecter{mock: &_m.Mock} } -// ClearGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearGasPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearGasPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearGasPricesByDestChain' -type ORM_ClearGasPricesByDestChain_Call struct { - *mock.Call -} - -// ClearGasPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearGasPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearGasPricesByDestChain_Call { - return &ORM_ClearGasPricesByDestChain_Call{Call: _e.mock.On("ClearGasPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Return(_a0 error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - -// ClearTokenPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearTokenPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearTokenPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearTokenPricesByDestChain' -type ORM_ClearTokenPricesByDestChain_Call struct { - *mock.Call -} - -// ClearTokenPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearTokenPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearTokenPricesByDestChain_Call { - return &ORM_ClearTokenPricesByDestChain_Call{Call: _e.mock.On("ClearTokenPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Return(_a0 error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - // GetGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector func (_m *ORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]ccip.GasPrice, error) { ret := _m.Called(ctx, destChainSelector) @@ -237,17 +141,17 @@ func (_c *ORM_GetTokenPricesByDestChain_Call) RunAndReturn(run func(context.Cont return _c } -// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, gasPrices -func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, gasPrices) +// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices +func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate) error { + ret := _m.Called(ctx, destChainSelector, gasPrices) if len(ret) == 0 { panic("no return value specified for InsertGasPricesForDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, gasPrices) + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPriceUpdate) error); ok { + r0 = rf(ctx, destChainSelector, gasPrices) } else { r0 = ret.Error(0) } @@ -263,15 +167,14 @@ type ORM_InsertGasPricesForDestChain_Call struct { // InsertGasPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 // - gasPrices []ccip.GasPriceUpdate -func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { - return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, gasPrices)} +func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { + return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} } -func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.GasPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPriceUpdate)) }) return _c } @@ -281,22 +184,22 @@ func (_c *ORM_InsertGasPricesForDestChain_Call) Return(_a0 error) *ORM_InsertGas return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { _c.Call.Return(run) return _c } -// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, tokenPrices -func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, tokenPrices) +// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices +func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate) error { + ret := _m.Called(ctx, destChainSelector, tokenPrices) if len(ret) == 0 { panic("no return value specified for InsertTokenPricesForDestChain") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, tokenPrices) + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPriceUpdate) error); ok { + r0 = rf(ctx, destChainSelector, tokenPrices) } else { r0 = ret.Error(0) } @@ -312,15 +215,14 @@ type ORM_InsertTokenPricesForDestChain_Call struct { // InsertTokenPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 // - tokenPrices []ccip.TokenPriceUpdate -func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { - return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, tokenPrices)} +func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { + return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices)} } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.TokenPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPriceUpdate)) }) return _c } @@ -330,7 +232,7 @@ func (_c *ORM_InsertTokenPricesForDestChain_Call) Return(_a0 error) *ORM_InsertT return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { _c.Call.Return(run) return _c } diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index d074ea7473e..b513b13cc50 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -36,11 +36,8 @@ type ORM interface { GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error - - ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error - ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error + InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error + InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error } type orm struct { @@ -62,8 +59,7 @@ func NewORM(ds sqlutil.DataSource) (ORM, error) { func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT DISTINCT ON (source_chain_selector) - source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price, created_at FROM ccip.observed_gas_prices WHERE chain_selector = $1 ORDER BY source_chain_selector, created_at DESC; @@ -79,11 +75,10 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT DISTINCT ON (token_addr) - token_addr, token_price, created_at + SELECT token_addr, token_price, created_at FROM ccip.observed_token_prices WHERE chain_selector = $1 - ORDER BY token_addr, created_at DESC; + ORDER BY token_addr; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { @@ -93,68 +88,64 @@ func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector u return tokenPrices, nil } -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error { +func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPriceUpdate) error { if len(gasPrices) == 0 { return nil } - insertData := make([]map[string]interface{}, 0, len(gasPrices)) - for _, price := range gasPrices { + uniqueGasUpdates := make(map[string]GasPriceUpdate) + for _, gasPrice := range gasPrices { + key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector) + uniqueGasUpdates[key] = gasPrice + } + + insertData := make([]map[string]interface{}, 0, len(uniqueGasUpdates)) + for _, price := range uniqueGasUpdates { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "source_chain_selector": price.SourceChainSelector, "gas_price": price.GasPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at) - VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());` + stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, created_at) + VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp()) + ON CONFLICT (chain_selector, source_chain_selector) + DO UPDATE SET gas_price = EXCLUDED.gas_price, created_at = EXCLUDED.created_at;` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err) + err = fmt.Errorf("error inserting gas prices %w", err) } - return err } -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error { +func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPriceUpdate) error { if len(tokenPrices) == 0 { return nil } - insertData := make([]map[string]interface{}, 0, len(tokenPrices)) - for _, price := range tokenPrices { + uniqueTokenPrices := make(map[string]TokenPriceUpdate) + for _, tokenPrice := range tokenPrices { + key := fmt.Sprintf("%s-%d", tokenPrice.TokenAddr, destChainSelector) + uniqueTokenPrices[key] = tokenPrice + } + + insertData := make([]map[string]interface{}, 0, len(uniqueTokenPrices)) + for _, price := range uniqueTokenPrices { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "token_addr": price.TokenAddr, "token_price": price.TokenPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at) - VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());` + stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, created_at) + VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp()) + ON CONFLICT (chain_selector, token_addr) + DO UPDATE SET token_price = EXCLUDED.token_price, created_at = EXCLUDED.created_at;` _, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err) + err = fmt.Errorf("error inserting token prices %w", err) } - - return err -} - -func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err -} - -func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` - - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) return err } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index 7b7b8d82710..f7b8a6d5a4f 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -145,9 +145,9 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { for selector, updatesPerSelector := range updates { lastIndex := len(updatesPerSelector) - 1 - err := orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[:lastIndex]) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) assert.NoError(t, err) - err = orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[lastIndex:]) + err = orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) assert.NoError(t, err) expectedPrices[selector] = updatesPerSelector[lastIndex] @@ -156,7 +156,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { // verify number of rows inserted numRows := getGasTableRowCount(t, db) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector, numRows) + assert.Equal(t, numSourceChainSelectors, numRows) prices, err := orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -176,9 +176,9 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { expectedPrices[selector] = updatesPerSelector[0] } - err = orm.InsertGasPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + err = orm.InsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector+numSourceChainSelectors, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) prices, err = orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -208,7 +208,7 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { } for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -217,21 +217,11 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + err := orm.InsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } - assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by sleepSec should delete rows inserted before it - err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by 0 expiration seconds should delete all rows - err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) } func TestORM_InsertAndGetTokenPrices(t *testing.T) { @@ -258,9 +248,9 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { for addr, updatesPerAddr := range updates { lastIndex := len(updatesPerAddr) - 1 - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[:lastIndex]) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex]) assert.NoError(t, err) - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[lastIndex:]) + err = orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:]) assert.NoError(t, err) expectedPrices[addr] = updatesPerAddr[lastIndex] @@ -269,7 +259,7 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { // verify number of rows inserted numRows := getTokenTableRowCount(t, db) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress, numRows) + assert.Equal(t, numAddresses, numRows) prices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -289,9 +279,9 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { expectedPrices[addr] = updatesPerAddr[0] } - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + err = orm.InsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress+numAddresses, getTokenTableRowCount(t, db)) + assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) prices, err = orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -321,7 +311,7 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { } for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) assert.NoError(t, err) } @@ -330,19 +320,9 @@ func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) + err := orm.InsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr) assert.NoError(t, err) } - assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - - // clear by sleepSec should delete rows inserted before it - err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) - - // clear by 0 expiration seconds should delete all rows - err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getTokenTableRowCount(t, db)) + assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 7d7d5bda3ad..23ea3dcbd00 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -46,19 +46,14 @@ const ( // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while // surfacing price update outages quickly enough. priceExpireSec = 600 - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`. - priceCleanupInterval = 600 * time.Second // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. priceUpdateInterval = 60 * time.Second ) type priceService struct { - priceExpireSec int - cleanupInterval time.Duration - updateInterval time.Duration + priceExpireSec int + updateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -93,9 +88,8 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireSec: priceExpireSec, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - updateInterval: utils.WithJitter(priceUpdateInterval), + priceExpireSec: priceExpireSec, + updateInterval: utils.WithJitter(priceUpdateInterval), lggr: lggr, orm: orm, @@ -134,7 +128,6 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - cleanupTicker := time.NewTicker(p.cleanupInterval) updateTicker := time.NewTicker(p.updateInterval) go func() { @@ -144,11 +137,6 @@ func (p *priceService) run() { select { case <-p.backgroundCtx.Done(): return - case <-cleanupTicker.C: - err := p.runCleanup(p.backgroundCtx) - if err != nil { - p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) - } case <-updateTicker.C: err := p.runUpdate(p.backgroundCtx) if err != nil { @@ -220,28 +208,6 @@ func (p *priceService) GetGasAndTokenPrices(ctx context.Context, destChainSelect return gasPrices, tokenPrices, nil } -func (p *priceService) runCleanup(ctx context.Context) error { - eg := new(errgroup.Group) - - eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) - if err != nil { - return fmt.Errorf("error clearing gas prices: %w", err) - } - return nil - }) - - eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) - if err != nil { - return fmt.Errorf("error clearing token prices: %w", err) - } - return nil - }) - - return eg.Wait() -} - func (p *priceService) runUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` // Price updates happen infrequently - once every `priceUpdateInterval` seconds. @@ -359,7 +325,7 @@ func (p *priceService) writePricesToDB( if sourceGasPriceUSD != nil { eg.Go(func() error { - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ + return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPriceUpdate{ { SourceChainSelector: p.sourceChainSelector, GasPrice: assets.NewWei(sourceGasPriceUSD), @@ -384,7 +350,7 @@ func (p *priceService) writePricesToDB( }) eg.Go(func() error { - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices) }) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 0bea8af9a19..872d933a944 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -30,81 +30,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" ) -func TestPriceService_priceCleanup(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - - testCases := []struct { - name string - gasPriceError bool - tokenPriceError bool - expectedErr bool - }{ - { - name: "ORM called successfully", - gasPriceError: false, - tokenPriceError: false, - expectedErr: false, - }, - { - name: "gasPrice clear failed", - gasPriceError: true, - tokenPriceError: false, - expectedErr: true, - }, - { - name: "tokenPrice clear failed", - gasPriceError: false, - tokenPriceError: true, - expectedErr: true, - }, - { - name: "both ORM calls failed", - gasPriceError: true, - tokenPriceError: true, - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx := tests.Context(t) - - var gasPricesError error - var tokenPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } - if tc.tokenPriceError { - tokenPricesError = fmt.Errorf("token prices error") - } - - mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(tokenPricesError).Once() - - priceService := NewPriceService( - lggr, - mockOrm, - jobId, - destChainSelector, - sourceChainSelector, - "", - nil, - nil, - ).(*priceService) - err := priceService.runCleanup(ctx) - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestPriceService_priceWrite(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) @@ -712,12 +637,9 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { ).(*priceService) updateInterval := 2000 * time.Millisecond - cleanupInterval := 3000 * time.Millisecond // run write task every 2 second priceService.updateInterval = updateInterval - // run cleanup every 3 seconds - priceService.cleanupInterval = cleanupInterval // expire all prices during every cleanup priceService.priceExpireSec = 0 @@ -745,7 +667,6 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { }, testutils.WaitTimeout(t), testutils.TestInterval) assert.NoError(t, priceService.Close()) - assert.NoError(t, priceService.runCleanup(ctx)) // after stopping PriceService and runCleanup, no more updates are inserted for i := 0; i < 5; i++ { diff --git a/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql new file mode 100644 index 00000000000..ebadf544684 --- /dev/null +++ b/core/store/migrate/migrations/0249_ccip_token_prices_fix.sql @@ -0,0 +1,49 @@ +-- +goose Up + +-- We need to re-create tables from scratch because of the unique constraint on tokens and chains selectors +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT unique_token_chain_selector UNIQUE (token_addr, chain_selector) +); + +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT unique_source_dest_chain UNIQUE (source_chain_selector, chain_selector) +); + +-- +goose Down +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +-- Restore state from migration 0236_ccip_prices_cache.sql +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_ccip_gas_prices_chain_gas_price_timestamp ON ccip.observed_gas_prices (chain_selector, source_chain_selector, created_at DESC); +CREATE INDEX idx_ccip_token_prices_token_price_timestamp ON ccip.observed_token_prices (chain_selector, token_addr, created_at DESC);