Skip to content

Commit

Permalink
MERC 3653 channel definition cache orm cleanup deletes (#14389)
Browse files Browse the repository at this point in the history
* Cleanup on job delete for LLO

* Fix lint

---------

Co-authored-by: ad0ll <[email protected]>
  • Loading branch information
samsondav and ad0ll authored Sep 16, 2024
1 parent 6814bce commit 4be0665
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 2 deletions.
28 changes: 28 additions & 0 deletions core/services/llo/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package llo

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

func Cleanup(ctx context.Context, lp LogPoller, addr common.Address, donID uint32, ds sqlutil.DataSource, chainSelector uint64) error {
if (addr != common.Address{} && donID > 0) {
if err := lp.UnregisterFilter(ctx, filterName(addr, donID)); err != nil {
return fmt.Errorf("failed to unregister filter: %w", err)
}
orm := NewORM(ds, chainSelector)
if err := orm.CleanupChannelDefinitions(ctx, addr, donID); err != nil {
return fmt.Errorf("failed to cleanup channel definitions: %w", err)
}
}
torm := mercurytransmitter.NewORM(ds, donID)
if err := torm.Cleanup(ctx); err != nil {
return fmt.Errorf("failed to cleanup transmitter: %w", err)
}
return nil
}
102 changes: 102 additions & 0 deletions core/services/llo/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package llo

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/mercurytransmitter"
)

func makeSampleTransmission(seqNr uint64, sURL string) *mercurytransmitter.Transmission {
return &mercurytransmitter.Transmission{
ServerURL: sURL,
ConfigDigest: types.ConfigDigest{0x0, 0x9, 0x57, 0xdd, 0x2f, 0x63, 0x56, 0x69, 0x34, 0xfd, 0xc2, 0xe1, 0xcd, 0xc1, 0xe, 0x3e, 0x25, 0xb9, 0x26, 0x5a, 0x16, 0x23, 0x91, 0xa6, 0x53, 0x16, 0x66, 0x59, 0x51, 0x0, 0x28, 0x7c},
SeqNr: seqNr,
Report: ocr3types.ReportWithInfo[llotypes.ReportInfo]{
Report: ocrtypes.Report{0x0, 0x3, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xde, 0xf5, 0xba, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xde, 0xf5, 0xba, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1e, 0x8e, 0x95, 0xcf, 0xb5, 0xd8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1a, 0xd0, 0x1c, 0x67, 0xa9, 0xcf, 0xb3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xdf, 0x3, 0xca, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x1c, 0x93, 0x6d, 0xa4, 0xf2, 0x17, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x14, 0x8d, 0x9a, 0xc1, 0xd9, 0x6f, 0xc0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x40, 0x5c, 0xcf, 0xa1, 0xbc, 0x63, 0xc0, 0x0},
Info: llotypes.ReportInfo{
LifeCycleStage: llotypes.LifeCycleStage("production"),
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
},
},
Sigs: []types.AttributedOnchainSignature{types.AttributedOnchainSignature{Signature: []uint8{0x9d, 0xab, 0x8f, 0xa7, 0xca, 0x7, 0x62, 0x57, 0xf7, 0x11, 0x2c, 0xb7, 0xf3, 0x49, 0x37, 0x12, 0xbd, 0xe, 0x14, 0x27, 0xfc, 0x32, 0x5c, 0xec, 0xa6, 0xb9, 0x7f, 0xf9, 0xd7, 0x7b, 0xa6, 0x36, 0x30, 0x9d, 0x84, 0x29, 0xbf, 0xd4, 0xeb, 0xc5, 0xc9, 0x29, 0xef, 0xdd, 0xd3, 0x2f, 0xa6, 0x25, 0x63, 0xda, 0xd9, 0x2c, 0xa1, 0x4a, 0xba, 0x75, 0xb2, 0x85, 0x25, 0x8f, 0x2b, 0x84, 0xcd, 0x99, 0x1}, Signer: 0x1}, types.AttributedOnchainSignature{Signature: []uint8{0x9a, 0x47, 0x4a, 0x3, 0x1a, 0x95, 0xcf, 0x46, 0x10, 0xaf, 0xcc, 0x90, 0x49, 0xb2, 0xce, 0xbf, 0x63, 0xaa, 0xc7, 0x25, 0x4d, 0x2a, 0x8, 0x36, 0xda, 0xd5, 0x9f, 0x9d, 0x63, 0x69, 0x22, 0xb3, 0x36, 0xd9, 0x6e, 0xf, 0xae, 0x7b, 0xd1, 0x61, 0x59, 0xf, 0x36, 0x4a, 0x22, 0xec, 0xde, 0x45, 0x32, 0xe0, 0x5b, 0x5c, 0xe3, 0x14, 0x29, 0x4, 0x60, 0x7b, 0xce, 0xa3, 0x89, 0x6b, 0xbb, 0xe0, 0x0}, Signer: 0x3}},
}
}

func Test_Cleanup(t *testing.T) {
ctx := testutils.Context(t)

lp := &mockLogPoller{}
ds := pgtest.NewSqlxDB(t)

addr1 := common.Address{1, 2, 3}
addr2 := common.Address{4, 5, 6}
donID1 := uint32(1)
donID2 := uint32(2)
chainSelector := uint64(3)

// add some channel definitions
cdcorm := NewORM(ds, chainSelector)
{
err := cdcorm.StoreChannelDefinitions(ctx, addr1, donID1, 1, llotypes.ChannelDefinitions{}, 1)
require.NoError(t, err)
err = cdcorm.StoreChannelDefinitions(ctx, addr2, donID2, 1, llotypes.ChannelDefinitions{}, 1)
require.NoError(t, err)
}

// add some transmissions

torm1 := mercurytransmitter.NewORM(ds, donID1)
srvURL1 := "http://example.com/foo"
srvURL2 := "http://example.test/bar"
{
err := torm1.Insert(ctx, []*mercurytransmitter.Transmission{makeSampleTransmission(1, srvURL1), makeSampleTransmission(1, srvURL2)})
require.NoError(t, err)
}

torm2 := mercurytransmitter.NewORM(ds, donID2)
{
err := torm2.Insert(ctx, []*mercurytransmitter.Transmission{makeSampleTransmission(2, srvURL1), makeSampleTransmission(2, srvURL2)})
require.NoError(t, err)
}

err := Cleanup(ctx, lp, addr1, donID1, ds, chainSelector)
require.NoError(t, err)

t.Run("unregisters filter", func(t *testing.T) {
assert.Equal(t, []string{"OCR3 LLO ChannelDefinitionCachePoller - 0x0102030000000000000000000000000000000000:1"}, lp.unregisteredFilterNames)
})
t.Run("removes channel definitions", func(t *testing.T) {
pd, err := cdcorm.LoadChannelDefinitions(ctx, addr1, donID1)
require.NoError(t, err)
assert.Nil(t, pd)
pd, err = cdcorm.LoadChannelDefinitions(ctx, addr2, donID2)
require.NoError(t, err)
assert.NotNil(t, pd)
})
t.Run("removes transmissions", func(t *testing.T) {
trs, err := torm1.Get(ctx, srvURL1)
require.NoError(t, err)
assert.Len(t, trs, 0)
trs, err = torm1.Get(ctx, srvURL2)
require.NoError(t, err)
assert.Len(t, trs, 0)

trs, err = torm2.Get(ctx, srvURL1)
require.NoError(t, err)
assert.Len(t, trs, 1)
trs, err = torm2.Get(ctx, srvURL2)
require.NoError(t, err)
assert.Len(t, trs, 1)
})
}
9 changes: 9 additions & 0 deletions core/services/llo/mercurytransmitter/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ORM interface {
Delete(ctx context.Context, hashes [][32]byte) error
Get(ctx context.Context, serverURL string) ([]*Transmission, error)
Prune(ctx context.Context, serverURL string, maxSize int) error
Cleanup(ctx context.Context) error
}

type orm struct {
Expand Down Expand Up @@ -194,3 +195,11 @@ func (o *orm) Prune(ctx context.Context, serverURL string, maxSize int) error {
}
return nil
}

func (o *orm) Cleanup(ctx context.Context) error {
_, err := o.ds.ExecContext(ctx, `DELETE FROM llo_mercury_transmit_queue WHERE don_id = $1`, o.donID)
if err != nil {
return fmt.Errorf("llo orm: failed to cleanup transmissions: %w", err)
}
return nil
}
8 changes: 6 additions & 2 deletions core/services/llo/onchain_channel_definition_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func init() {
}

type ChannelDefinitionCacheORM interface {
// TODO: What about delete/cleanup?
// https://smartcontract-it.atlassian.net/browse/MERC-3653
LoadChannelDefinitions(ctx context.Context, addr common.Address, donID uint32) (pd *PersistedDefinitions, err error)
StoreChannelDefinitions(ctx context.Context, addr common.Address, donID, version uint32, dfns llotypes.ChannelDefinitions, blockNum int64) (err error)
CleanupChannelDefinitions(ctx context.Context, addr common.Address, donID uint32) error
}

var _ llotypes.ChannelDefinitionCache = &channelDefinitionCache{}

type LogPoller interface {
UnregisterFilter(ctx context.Context, filterName string) error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
LatestBlock(ctx context.Context) (logpoller.LogPollerBlock, error)
LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error)
Expand Down Expand Up @@ -115,6 +115,10 @@ type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}

func filterName(addr common.Address, donID uint32) string {
return logpoller.FilterName("OCR3 LLO ChannelDefinitionCachePoller", addr.String(), fmt.Sprintf("%d", donID))
}

func NewChannelDefinitionCache(lggr logger.Logger, orm ChannelDefinitionCacheORM, client HTTPClient, lp logpoller.LogPoller, addr common.Address, donID uint32, fromBlock int64, options ...Option) llotypes.ChannelDefinitionCache {
filterName := logpoller.FilterName("OCR3 LLO ChannelDefinitionCachePoller", addr.String(), donID)
cdc := &channelDefinitionCache{
Expand Down
15 changes: 15 additions & 0 deletions core/services/llo/onchain_channel_definition_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type mockLogPoller struct {
latestBlockErr error
logsWithSigs []logpoller.Log
logsWithSigsErr error

unregisteredFilterNames []string
}

func (m *mockLogPoller) RegisterFilter(ctx context.Context, filter logpoller.Filter) error {
Expand All @@ -40,6 +42,10 @@ func (m *mockLogPoller) LatestBlock(ctx context.Context) (logpoller.LogPollerBlo
func (m *mockLogPoller) LogsWithSigs(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) {
return m.logsWithSigs, m.logsWithSigsErr
}
func (m *mockLogPoller) UnregisterFilter(ctx context.Context, name string) error {
m.unregisteredFilterNames = append(m.unregisteredFilterNames, name)
return nil
}

var _ HTTPClient = &mockHTTPClient{}

Expand Down Expand Up @@ -76,6 +82,10 @@ func (m *mockORM) StoreChannelDefinitions(ctx context.Context, addr common.Addre
return m.err
}

func (m *mockORM) CleanupChannelDefinitions(ctx context.Context, addr common.Address, donID uint32) (err error) {
panic("not implemented")
}

func makeLog(t *testing.T, donID, version uint32, url string, sha [32]byte) logpoller.Log {
data := makeLogData(t, donID, version, url, sha)
return logpoller.Log{EventSig: topicNewChannelDefinition, Topics: [][]byte{topicNewChannelDefinition[:], makeDonIDTopic(donID)}, Data: data}
Expand Down Expand Up @@ -436,3 +446,8 @@ func Test_ChannelDefinitionCache(t *testing.T) {
})
})
}

func Test_filterName(t *testing.T) {
s := filterName(common.Address{1, 2, 3}, 654)
assert.Equal(t, "OCR3 LLO ChannelDefinitionCachePoller - 0x0102030000000000000000000000000000000000:654", s)
}
8 changes: 8 additions & 0 deletions core/services/llo/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,11 @@ WHERE EXCLUDED.version > channel_definitions.version
}
return nil
}

func (o *orm) CleanupChannelDefinitions(ctx context.Context, addr common.Address, donID uint32) error {
_, err := o.ds.ExecContext(ctx, "DELETE FROM channel_definitions WHERE chain_selector = $1 AND addr = $2 AND don_id = $3", o.chainSelector, addr, donID)
if err != nil {
return fmt.Errorf("failed to CleanupChannelDefinitions; %w", err)
}
return nil
}
12 changes: 12 additions & 0 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,18 @@ func (d *Delegate) cleanupEVM(ctx context.Context, jb job.Job, relayID types.Rel
d.lggr.Errorw("failed to unregister ccip exec plugin filters", "err", err2, "spec", spec)
}
return nil
case types.LLO:
var pluginCfg lloconfig.PluginConfig
err = json.Unmarshal(spec.PluginConfig.Bytes(), &pluginCfg)
if err != nil {
return err
}
var chainSelector uint64
chainSelector, err = chainselectors.SelectorFromChainId(chain.ID().Uint64())
if err != nil {
return err
}
return llo.Cleanup(ctx, lp, pluginCfg.ChannelDefinitionsContractAddress, pluginCfg.DonID, d.ds, chainSelector)
default:
return nil
}
Expand Down

0 comments on commit 4be0665

Please sign in to comment.