From 41086399a7bf389128236db753ab51fe05651f45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Tue, 9 Apr 2024 12:11:04 -0700 Subject: [PATCH 01/12] add a migration which detects and filters out unreferenced slabs --- .../filter_unreferenced_slabs_migration.go | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go new file mode 100644 index 00000000000..9f657aae542 --- /dev/null +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -0,0 +1,158 @@ +package migrations + +import ( + "context" + "errors" + "fmt" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { + storageID := atree.StorageID{ + Address: atree.Address([]byte(registerID.Owner)), + } + copy(storageID.Index[:], registerID.Key[1:]) + return storageID +} + +// TODO: use version from atree register inlining (master) +func CheckAccountStorageHealth( + address common.Address, + payloads []*ledger.Payload, + storage *runtime.Storage, +) error { + // Retrieve all slabs before migration. + for _, payload := range payloads { + registerID, _, err := convert.PayloadToRegister(payload) + if err != nil { + return fmt.Errorf("failed to convert payload to register: %w", err) + } + + if !registerID.IsSlabIndex() { + continue + } + + storageID := StorageIDFromRegisterID(registerID) + + // Retrieve the slab. + _, _, err = storage.Retrieve(storageID) + if err != nil { + return fmt.Errorf("failed to retrieve slab %s: %w", storageID, err) + } + } + + // Load storage map. + for _, domain := range domains { + _ = storage.GetStorageMap(address, domain, false) + } + + return storage.CheckHealth() +} + +type FilterUnreferencedSlabsMigration struct { + log zerolog.Logger +} + +var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} + +func (m *FilterUnreferencedSlabsMigration) InitMigration( + log zerolog.Logger, + _ []*ledger.Payload, + _ int, +) error { + m.log = log. + With(). + Str("migration", "filter-unreferenced-slabs"). + Logger() + + return nil +} + +func (m *FilterUnreferencedSlabsMigration) MigrateAccount( + _ context.Context, + address common.Address, + oldPayloads []*ledger.Payload, +) ([]*ledger.Payload, error) { + + checkPayloadsOwnership(oldPayloads, address, m.log) + + migrationRuntime, err := NewMigratorRuntime( + address, + oldPayloads, + util.RuntimeInterfaceConfig{}, + ) + if err != nil { + return nil, fmt.Errorf("failed to create migrator runtime: %w", err) + } + + storage := migrationRuntime.Storage + + newPayloads := oldPayloads + + err = CheckAccountStorageHealth(address, oldPayloads, storage) + if err != nil { + + // The storage health check failed. + // This can happen if there are unreferenced root slabs. + // In this case, we filter out the unreferenced root slabs from the payloads. + + var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError + if !errors.As(err, &unreferencedRootSlabsErr) { + return nil, fmt.Errorf("storage health check failed: %w", err) + } + + m.log.Warn(). + Err(err). + Str("account", address.Hex()). + Msg("filtering unreferenced root slabs") + + // Create a set of unreferenced root slabs. + + unreferencedRootSlabIDs := map[atree.StorageID]struct{}{} + for _, storageID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { + unreferencedRootSlabIDs[storageID] = struct{}{} + } + + // Filter out unreferenced root slabs. + + newCount := len(oldPayloads) - len(unreferencedRootSlabIDs) + newPayloads = make([]*ledger.Payload, 0, newCount) + + filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedRootSlabIDs)) + + for _, payload := range oldPayloads { + registerID, _, err := convert.PayloadToRegister(payload) + if err != nil { + return nil, fmt.Errorf("failed to convert payload to register: %w", err) + } + + // Filter unreferenced slabs. + if registerID.IsSlabIndex() { + storageID := StorageIDFromRegisterID(registerID) + if _, ok := unreferencedRootSlabIDs[storageID]; ok { + filteredPayloads = append(filteredPayloads, payload) + continue + } + } + + newPayloads = append(newPayloads, payload) + } + + // TODO: write filtered payloads to a separate file + } + + return newPayloads, nil +} + +func (m *FilterUnreferencedSlabsMigration) Close() error { + return nil +} From 2c4786917d9a61233414afd92d74e46593351484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Tue, 9 Apr 2024 16:06:22 -0700 Subject: [PATCH 02/12] add test for unreferenced slabs filter migration --- .../filter_unreferenced_slabs_migration.go | 15 +- ...ilter_unreferenced_slabs_migration_test.go | 130 ++++++++++++++++++ cmd/util/ledger/util/util.go | 69 ++++++++-- 3 files changed, 197 insertions(+), 17 deletions(-) create mode 100644 cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 9f657aae542..813fdad81e1 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -10,7 +10,7 @@ import ( "github.com/onflow/cadence/runtime/common" "github.com/rs/zerolog" - "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/cmd/util/ledger/util/snapshot" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" @@ -51,7 +51,7 @@ func CheckAccountStorageHealth( } // Load storage map. - for _, domain := range domains { + for _, domain := range allStorageMapDomains { _ = storage.GetStorageMap(address, domain, false) } @@ -59,7 +59,8 @@ func CheckAccountStorageHealth( } type FilterUnreferencedSlabsMigration struct { - log zerolog.Logger + log zerolog.Logger + chainID flow.ChainID } var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} @@ -83,12 +84,12 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( oldPayloads []*ledger.Payload, ) ([]*ledger.Payload, error) { - checkPayloadsOwnership(oldPayloads, address, m.log) - migrationRuntime, err := NewMigratorRuntime( - address, oldPayloads, - util.RuntimeInterfaceConfig{}, + m.chainID, + MigratorRuntimeConfig{}, + // TODO: + snapshot.SmallChangeSetSnapshot, ) if err != nil { return nil, fmt.Errorf("failed to create migrator runtime: %w", err) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go new file mode 100644 index 00000000000..5e60e70eda1 --- /dev/null +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -0,0 +1,130 @@ +package migrations + +import ( + "encoding/binary" + "testing" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/model/flow" +) + +func TestFilterUnreferencedSlabs(t *testing.T) { + t.Parallel() + + // Arrange + + const chainID = flow.Emulator + chain := chainID.Chain() + + testFlowAddress, err := chain.AddressAtIndex(1_000_000) + require.NoError(t, err) + + testAddress := common.Address(testFlowAddress) + + payloads := map[flow.RegisterID]*ledger.Payload{} + + payloadsLedger := util.NewPayloadsLedger(payloads) + + storageIndices := map[string]uint64{} + payloadsLedger.AllocateStorageIndexFunc = func(owner []byte) (atree.StorageIndex, error) { + var index atree.StorageIndex + + storageIndices[string(owner)]++ + + binary.BigEndian.PutUint64( + index[:], + storageIndices[string(owner)], + ) + + return index, nil + } + + storage := runtime.NewStorage(payloadsLedger, nil) + + // {Int: Int} + dictionaryStaticType := interpreter.NewDictionaryStaticType( + nil, + interpreter.PrimitiveStaticTypeInt, + interpreter.PrimitiveStaticTypeInt, + ) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + require.NoError(t, err) + + dict1 := interpreter.NewDictionaryValueWithAddress( + inter, + interpreter.EmptyLocationRange, + dictionaryStaticType, + testAddress, + ) + + // Storage another dictionary in the account. + // It is not referenced through a storage map though. + + interpreter.NewDictionaryValueWithAddress( + inter, + interpreter.EmptyLocationRange, + dictionaryStaticType, + testAddress, + ) + + storageMap := storage.GetStorageMap( + testAddress, + common.PathDomainStorage.Identifier(), + true, + ) + + // Only insert first dictionary. + // Second dictionary is unreferenced. + + storageMap.SetValue( + inter, + interpreter.StringStorageMapKey("test"), + dict1, + ) + + err = storage.Commit(inter, false) + require.NoError(t, err) + + oldPayloads := make([]*ledger.Payload, 0, len(payloads)) + + for _, payload := range payloadsLedger.Payloads { + oldPayloads = append(oldPayloads, payload) + } + + const totalSlabCount = 4 + + require.Len(t, oldPayloads, totalSlabCount) + + // Act + + migration := &FilterUnreferencedSlabsMigration{ + chainID: chainID, + } + + log := zerolog.New(zerolog.NewTestWriter(t)) + + err = migration.InitMigration(log, nil, 0) + require.NoError(t, err) + + newPayloads, err := migration.MigrateAccount(nil, testAddress, oldPayloads) + require.NoError(t, err) + + // Assert + + require.Len(t, newPayloads, totalSlabCount-1) +} diff --git a/cmd/util/ledger/util/util.go b/cmd/util/ledger/util/util.go index 4b4cb67926e..5cb8feeeec5 100644 --- a/cmd/util/ledger/util/util.go +++ b/cmd/util/ledger/util/util.go @@ -18,6 +18,10 @@ import ( "github.com/onflow/flow-go/model/flow" ) +func newRegisterID(owner []byte, key []byte) flow.RegisterID { + return flow.NewRegisterID(flow.BytesToAddress(owner), string(key)) +} + type AccountsAtreeLedger struct { Accounts environment.Accounts } @@ -29,10 +33,9 @@ func NewAccountsAtreeLedger(accounts environment.Accounts) *AccountsAtreeLedger var _ atree.Ledger = &AccountsAtreeLedger{} func (a *AccountsAtreeLedger) GetValue(owner, key []byte) ([]byte, error) { + registerID := newRegisterID(owner, key) v, err := a.Accounts.GetValue( - flow.NewRegisterID( - flow.BytesToAddress(owner), - string(key))) + registerID) if err != nil { return nil, fmt.Errorf("getting value failed: %w", err) } @@ -40,11 +43,8 @@ func (a *AccountsAtreeLedger) GetValue(owner, key []byte) ([]byte, error) { } func (a *AccountsAtreeLedger) SetValue(owner, key, value []byte) error { - err := a.Accounts.SetValue( - flow.NewRegisterID( - flow.BytesToAddress(owner), - string(key)), - value) + registerID := newRegisterID(owner, key) + err := a.Accounts.SetValue(registerID, value) if err != nil { return fmt.Errorf("setting value failed: %w", err) } @@ -76,8 +76,11 @@ type PayloadsReadonlyLedger struct { SetValueFunc func(owner, key, value []byte) (err error) } +var _ atree.Ledger = &PayloadsReadonlyLedger{} + func (p *PayloadsReadonlyLedger) GetValue(owner, key []byte) (value []byte, err error) { - v, err := p.Snapshot.Get(flow.NewRegisterID(flow.BytesToAddress(owner), string(key))) + registerID := newRegisterID(owner, key) + v, err := p.Snapshot.Get(registerID) if err != nil { return nil, fmt.Errorf("getting value failed: %w", err) } @@ -93,7 +96,8 @@ func (p *PayloadsReadonlyLedger) SetValue(owner, key, value []byte) (err error) } func (p *PayloadsReadonlyLedger) ValueExists(owner, key []byte) (bool, error) { - exists := p.Snapshot.Exists(flow.NewRegisterID(flow.BytesToAddress(owner), string(key))) + registerID := newRegisterID(owner, key) + exists := p.Snapshot.Exists(registerID) return exists, nil } @@ -238,3 +242,48 @@ func registerIDKeyFromString(s string) (flow.RegisterID, common.Address) { type PayloadMetaInfo struct { Height, Version uint64 } + +// PayloadsLedger is a simple read/write in-memory atree.Ledger implementation +type PayloadsLedger struct { + Payloads map[flow.RegisterID]*ledger.Payload + + AllocateStorageIndexFunc func(owner []byte) (atree.StorageIndex, error) +} + +var _ atree.Ledger = &PayloadsLedger{} + +func NewPayloadsLedger(payloads map[flow.RegisterID]*ledger.Payload) *PayloadsLedger { + return &PayloadsLedger{ + Payloads: payloads, + } +} + +func (p *PayloadsLedger) GetValue(owner, key []byte) (value []byte, err error) { + registerID := newRegisterID(owner, key) + v, ok := p.Payloads[registerID] + if !ok { + return nil, nil + } + return v.Value(), nil +} + +func (p *PayloadsLedger) SetValue(owner, key, value []byte) (err error) { + registerID := newRegisterID(owner, key) + ledgerKey := convert.RegisterIDToLedgerKey(registerID) + p.Payloads[registerID] = ledger.NewPayload(ledgerKey, value) + return nil +} + +func (p *PayloadsLedger) ValueExists(owner, key []byte) (exists bool, err error) { + registerID := newRegisterID(owner, key) + _, ok := p.Payloads[registerID] + return ok, nil +} + +func (p *PayloadsLedger) AllocateStorageIndex(owner []byte) (atree.StorageIndex, error) { + if p.AllocateStorageIndexFunc != nil { + return p.AllocateStorageIndexFunc(owner) + } + + panic("AllocateStorageIndex not expected to be called") +} From b3ed71dac625db7be8a67bddb92e6278670af6ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 10 Apr 2024 11:16:11 -0700 Subject: [PATCH 03/12] use existing account storage health check function --- .../filter_unreferenced_slabs_migration.go | 36 +------------------ 1 file changed, 1 insertion(+), 35 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 813fdad81e1..fae67ddea63 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -24,40 +24,6 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { return storageID } -// TODO: use version from atree register inlining (master) -func CheckAccountStorageHealth( - address common.Address, - payloads []*ledger.Payload, - storage *runtime.Storage, -) error { - // Retrieve all slabs before migration. - for _, payload := range payloads { - registerID, _, err := convert.PayloadToRegister(payload) - if err != nil { - return fmt.Errorf("failed to convert payload to register: %w", err) - } - - if !registerID.IsSlabIndex() { - continue - } - - storageID := StorageIDFromRegisterID(registerID) - - // Retrieve the slab. - _, _, err = storage.Retrieve(storageID) - if err != nil { - return fmt.Errorf("failed to retrieve slab %s: %w", storageID, err) - } - } - - // Load storage map. - for _, domain := range allStorageMapDomains { - _ = storage.GetStorageMap(address, domain, false) - } - - return storage.CheckHealth() -} - type FilterUnreferencedSlabsMigration struct { log zerolog.Logger chainID flow.ChainID @@ -99,7 +65,7 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( newPayloads := oldPayloads - err = CheckAccountStorageHealth(address, oldPayloads, storage) + err = checkStorageHealth(address, storage, oldPayloads) if err != nil { // The storage health check failed. From cd62a34af9265da5a094036489ded720d6076a98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 10 Apr 2024 12:01:58 -0700 Subject: [PATCH 04/12] improve slab index prefix --- model/flow/ledger.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/model/flow/ledger.go b/model/flow/ledger.go index 61c521bbef4..81efb66a559 100644 --- a/model/flow/ledger.go +++ b/model/flow/ledger.go @@ -141,6 +141,8 @@ func (id RegisterID) IsInternalState() bool { id.Key == AccountStatusKey } +const SlabIndexPrefix = '$' + // IsSlabIndex returns true if the key is a slab index for an account's ordered fields // map. // @@ -148,15 +150,15 @@ func (id RegisterID) IsInternalState() bool { // only to cadence. Cadence encodes this map into bytes and split the bytes // into slab chunks before storing the slabs into the ledger. func (id RegisterID) IsSlabIndex() bool { - return len(id.Key) == 9 && id.Key[0] == '$' + return len(id.Key) == 9 && id.Key[0] == SlabIndexPrefix } // String returns formatted string representation of the RegisterID. func (id RegisterID) String() string { formattedKey := "" if id.IsSlabIndex() { - i := uint64(binary.BigEndian.Uint64([]byte(id.Key[1:]))) - formattedKey = fmt.Sprintf("$%d", i) + i := binary.BigEndian.Uint64([]byte(id.Key[1:])) + formattedKey = fmt.Sprintf("%c%d", SlabIndexPrefix, i) } else { formattedKey = fmt.Sprintf("#%x", []byte(id.Key)) } From 07ac19b064f490bead00d52285965d38ac26244b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Wed, 10 Apr 2024 12:02:18 -0700 Subject: [PATCH 05/12] write filtered slabs to report --- .../filter_unreferenced_slabs_migration.go | 29 +++++++++++++- ...ilter_unreferenced_slabs_migration_test.go | 40 +++++++++++++++++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index fae67ddea63..763f7747931 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/cadence/runtime/common" "github.com/rs/zerolog" + "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/cmd/util/ledger/util/snapshot" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" @@ -27,10 +28,23 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { type FilterUnreferencedSlabsMigration struct { log zerolog.Logger chainID flow.ChainID + rw reporters.ReportWriter } var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} +const filterUnreferencedSlabsName = "filter-unreferenced-slabs" + +func NewFilterUnreferencedSlabsMigration( + chainID flow.ChainID, + rwf reporters.ReportWriterFactory, +) *FilterUnreferencedSlabsMigration { + return &FilterUnreferencedSlabsMigration{ + chainID: chainID, + rw: rwf.ReportWriter(filterUnreferencedSlabsName), + } +} + func (m *FilterUnreferencedSlabsMigration) InitMigration( log zerolog.Logger, _ []*ledger.Payload, @@ -38,7 +52,7 @@ func (m *FilterUnreferencedSlabsMigration) InitMigration( ) error { m.log = log. With(). - Str("migration", "filter-unreferenced-slabs"). + Str("migration", filterUnreferencedSlabsName). Logger() return nil @@ -114,12 +128,23 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( newPayloads = append(newPayloads, payload) } - // TODO: write filtered payloads to a separate file + m.rw.Write(unreferencedSlabs{ + Account: address, + Payloads: filteredPayloads, + }) } return newPayloads, nil } func (m *FilterUnreferencedSlabsMigration) Close() error { + // close the report writer so it flushes to file + m.rw.Close() + return nil } + +type unreferencedSlabs struct { + Account common.Address `json:"account"` + Payloads []*ledger.Payload `json:"payloads"` +} diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index 5e60e70eda1..166d4cadd45 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -9,10 +9,12 @@ import ( "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" ) @@ -112,9 +114,8 @@ func TestFilterUnreferencedSlabs(t *testing.T) { // Act - migration := &FilterUnreferencedSlabsMigration{ - chainID: chainID, - } + rwf := &testReportWriterFactory{} + migration := NewFilterUnreferencedSlabsMigration(chainID, rwf) log := zerolog.New(zerolog.NewTestWriter(t)) @@ -126,5 +127,36 @@ func TestFilterUnreferencedSlabs(t *testing.T) { // Assert - require.Len(t, newPayloads, totalSlabCount-1) + assert.Len(t, newPayloads, totalSlabCount-1) + + writer := rwf.reportWriters[filterUnreferencedSlabsName] + + expectedFilteredPayloads := make([]*ledger.Payload, 0, 1) + + expectedAddress := string(testAddress[:]) + expectedKey := string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 2}) + + for _, payload := range oldPayloads { + registerID, _, err := convert.PayloadToRegister(payload) + require.NoError(t, err) + + if registerID.Owner != expectedAddress || + registerID.Key != expectedKey { + + continue + } + + expectedFilteredPayloads = append(expectedFilteredPayloads, payload) + break + } + + assert.Equal(t, + []any{ + unreferencedSlabs{ + Account: testAddress, + Payloads: expectedFilteredPayloads, + }, + }, + writer.entries, + ) } From 081ee6c2146b4796ce2087b82c137f6eee774f3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Fri, 12 Apr 2024 15:17:34 -0700 Subject: [PATCH 06/12] avoid nesting --- .../filter_unreferenced_slabs_migration.go | 103 +++++++++--------- ...ilter_unreferenced_slabs_migration_test.go | 2 +- 2 files changed, 51 insertions(+), 54 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 763f7747931..2cdec81144e 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -11,7 +11,6 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/cmd/util/ledger/reporters" - "github.com/onflow/flow-go/cmd/util/ledger/util/snapshot" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" @@ -27,7 +26,6 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { type FilterUnreferencedSlabsMigration struct { log zerolog.Logger - chainID flow.ChainID rw reporters.ReportWriter } @@ -36,11 +34,9 @@ var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} const filterUnreferencedSlabsName = "filter-unreferenced-slabs" func NewFilterUnreferencedSlabsMigration( - chainID flow.ChainID, rwf reporters.ReportWriterFactory, ) *FilterUnreferencedSlabsMigration { return &FilterUnreferencedSlabsMigration{ - chainID: chainID, rw: rwf.ReportWriter(filterUnreferencedSlabsName), } } @@ -62,78 +58,79 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( _ context.Context, address common.Address, oldPayloads []*ledger.Payload, -) ([]*ledger.Payload, error) { - - migrationRuntime, err := NewMigratorRuntime( - oldPayloads, - m.chainID, - MigratorRuntimeConfig{}, - // TODO: - snapshot.SmallChangeSetSnapshot, - ) +) ( + newPayloads []*ledger.Payload, + err error, +) { + migrationRuntime, err := NewAtreeRegisterMigratorRuntime(address, oldPayloads) if err != nil { return nil, fmt.Errorf("failed to create migrator runtime: %w", err) } storage := migrationRuntime.Storage - newPayloads := oldPayloads + newPayloads = oldPayloads err = checkStorageHealth(address, storage, oldPayloads) - if err != nil { + if err == nil { + return + } - // The storage health check failed. - // This can happen if there are unreferenced root slabs. - // In this case, we filter out the unreferenced root slabs from the payloads. + // The storage health check failed. + // This can happen if there are unreferenced root slabs. + // In this case, we filter out the unreferenced root slabs from the payloads. - var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError - if !errors.As(err, &unreferencedRootSlabsErr) { - return nil, fmt.Errorf("storage health check failed: %w", err) - } + var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError + if !errors.As(err, &unreferencedRootSlabsErr) { + return nil, fmt.Errorf("storage health check failed: %w", err) + } - m.log.Warn(). - Err(err). - Str("account", address.Hex()). - Msg("filtering unreferenced root slabs") + m.log.Warn(). + Err(err). + Str("account", address.Hex()). + Msg("filtering unreferenced root slabs") - // Create a set of unreferenced root slabs. + // Create a set of unreferenced root slabs. - unreferencedRootSlabIDs := map[atree.StorageID]struct{}{} - for _, storageID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { - unreferencedRootSlabIDs[storageID] = struct{}{} - } + unreferencedRootSlabIDs := map[atree.StorageID]struct{}{} + for _, storageID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { + unreferencedRootSlabIDs[storageID] = struct{}{} + } - // Filter out unreferenced root slabs. + // Filter out unreferenced root slabs. - newCount := len(oldPayloads) - len(unreferencedRootSlabIDs) - newPayloads = make([]*ledger.Payload, 0, newCount) + newCount := len(oldPayloads) - len(unreferencedRootSlabIDs) + newPayloads = make([]*ledger.Payload, 0, newCount) - filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedRootSlabIDs)) + filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedRootSlabIDs)) - for _, payload := range oldPayloads { - registerID, _, err := convert.PayloadToRegister(payload) - if err != nil { - return nil, fmt.Errorf("failed to convert payload to register: %w", err) - } + for _, payload := range oldPayloads { + registerID, _, err := convert.PayloadToRegister(payload) + if err != nil { + return nil, fmt.Errorf("failed to convert payload to register: %w", err) + } - // Filter unreferenced slabs. - if registerID.IsSlabIndex() { - storageID := StorageIDFromRegisterID(registerID) - if _, ok := unreferencedRootSlabIDs[storageID]; ok { - filteredPayloads = append(filteredPayloads, payload) - continue - } + // Filter unreferenced slabs. + if registerID.IsSlabIndex() { + storageID := StorageIDFromRegisterID(registerID) + if _, ok := unreferencedRootSlabIDs[storageID]; ok { + filteredPayloads = append(filteredPayloads, payload) + continue } - - newPayloads = append(newPayloads, payload) } - m.rw.Write(unreferencedSlabs{ - Account: address, - Payloads: filteredPayloads, - }) + newPayloads = append(newPayloads, payload) } + m.rw.Write(unreferencedSlabs{ + Account: address, + Payloads: filteredPayloads, + }) + + // Do NOT report the health check error here. + // The health check error is only reported if it is not due to unreferenced slabs. + // If it is due to unreferenced slabs, we filter them out and continue. + return newPayloads, nil } diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index 166d4cadd45..f80b2fff1c5 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -115,7 +115,7 @@ func TestFilterUnreferencedSlabs(t *testing.T) { // Act rwf := &testReportWriterFactory{} - migration := NewFilterUnreferencedSlabsMigration(chainID, rwf) + migration := NewFilterUnreferencedSlabsMigration(rwf) log := zerolog.New(zerolog.NewTestWriter(t)) From b80a5dac5992316b322238625db18efec384cf88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Fri, 12 Apr 2024 15:37:55 -0700 Subject: [PATCH 07/12] don't pass nil for context --- .../migrations/filter_unreferenced_slabs_migration_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index f80b2fff1c5..0d681676e78 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -1,6 +1,7 @@ package migrations import ( + "context" "encoding/binary" "testing" @@ -122,7 +123,9 @@ func TestFilterUnreferencedSlabs(t *testing.T) { err = migration.InitMigration(log, nil, 0) require.NoError(t, err) - newPayloads, err := migration.MigrateAccount(nil, testAddress, oldPayloads) + ctx := context.Background() + + newPayloads, err := migration.MigrateAccount(ctx, testAddress, oldPayloads) require.NoError(t, err) // Assert From c2e87fd0285eec48a696f9a90273bf81a5866b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Fri, 19 Apr 2024 18:10:59 -0700 Subject: [PATCH 08/12] filter out all child slabs of unreferenced root slabs --- .../filter_unreferenced_slabs_migration.go | 37 ++++++++----- ...ilter_unreferenced_slabs_migration_test.go | 52 ++++++++++++++----- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 2cdec81144e..124c85292b7 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -25,8 +25,8 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { } type FilterUnreferencedSlabsMigration struct { - log zerolog.Logger - rw reporters.ReportWriter + log zerolog.Logger + rw reporters.ReportWriter } var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} @@ -37,7 +37,7 @@ func NewFilterUnreferencedSlabsMigration( rwf reporters.ReportWriterFactory, ) *FilterUnreferencedSlabsMigration { return &FilterUnreferencedSlabsMigration{ - rw: rwf.ReportWriter(filterUnreferencedSlabsName), + rw: rwf.ReportWriter(filterUnreferencedSlabsName), } } @@ -78,7 +78,7 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( // The storage health check failed. // This can happen if there are unreferenced root slabs. - // In this case, we filter out the unreferenced root slabs from the payloads. + // In this case, we filter out the unreferenced root slabs and all slabs they reference from the payloads. var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError if !errors.As(err, &unreferencedRootSlabsErr) { @@ -90,19 +90,32 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( Str("account", address.Hex()). Msg("filtering unreferenced root slabs") - // Create a set of unreferenced root slabs. + // Create a set of unreferenced slabs: root slabs, and all slabs they reference. - unreferencedRootSlabIDs := map[atree.StorageID]struct{}{} - for _, storageID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { - unreferencedRootSlabIDs[storageID] = struct{}{} + unreferencedSlabIDs := map[atree.StorageID]struct{}{} + for _, rootSlabID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { + unreferencedSlabIDs[rootSlabID] = struct{}{} + + childReferences, _, err := storage.GetAllChildReferences(rootSlabID) + if err != nil { + return nil, fmt.Errorf( + "failed to get all child references for root slab %s: %w", + rootSlabID, + err, + ) + } + + for _, childSlabID := range childReferences { + unreferencedSlabIDs[childSlabID] = struct{}{} + } } - // Filter out unreferenced root slabs. + // Filter out unreferenced slabs. - newCount := len(oldPayloads) - len(unreferencedRootSlabIDs) + newCount := len(oldPayloads) - len(unreferencedSlabIDs) newPayloads = make([]*ledger.Payload, 0, newCount) - filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedRootSlabIDs)) + filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedSlabIDs)) for _, payload := range oldPayloads { registerID, _, err := convert.PayloadToRegister(payload) @@ -113,7 +126,7 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( // Filter unreferenced slabs. if registerID.IsSlabIndex() { storageID := StorageIDFromRegisterID(registerID) - if _, ok := unreferencedRootSlabIDs[storageID]; ok { + if _, ok := unreferencedSlabIDs[storageID]; ok { filteredPayloads = append(filteredPayloads, payload) continue } diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index 0d681676e78..b6a56ebfbd1 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -53,7 +53,7 @@ func TestFilterUnreferencedSlabs(t *testing.T) { storage := runtime.NewStorage(payloadsLedger, nil) // {Int: Int} - dictionaryStaticType := interpreter.NewDictionaryStaticType( + dict1StaticType := interpreter.NewDictionaryStaticType( nil, interpreter.PrimitiveStaticTypeInt, interpreter.PrimitiveStaticTypeInt, @@ -71,20 +71,40 @@ func TestFilterUnreferencedSlabs(t *testing.T) { dict1 := interpreter.NewDictionaryValueWithAddress( inter, interpreter.EmptyLocationRange, - dictionaryStaticType, + dict1StaticType, testAddress, ) - // Storage another dictionary in the account. + // Storage another dictionary, with a nested array, in the account. // It is not referenced through a storage map though. - interpreter.NewDictionaryValueWithAddress( + arrayStaticType := interpreter.NewVariableSizedStaticType(nil, interpreter.PrimitiveStaticTypeInt) + + dict2StaticType := interpreter.NewDictionaryStaticType( + nil, + interpreter.PrimitiveStaticTypeInt, + arrayStaticType, + ) + + dict2 := interpreter.NewDictionaryValueWithAddress( inter, interpreter.EmptyLocationRange, - dictionaryStaticType, + dict2StaticType, testAddress, ) + dict2.InsertWithoutTransfer( + inter, interpreter.EmptyLocationRange, + interpreter.NewUnmeteredIntValueFromInt64(2), + interpreter.NewArrayValue( + inter, + interpreter.EmptyLocationRange, + arrayStaticType, + testAddress, + interpreter.NewUnmeteredIntValueFromInt64(3), + ), + ) + storageMap := storage.GetStorageMap( testAddress, common.PathDomainStorage.Identifier(), @@ -109,7 +129,7 @@ func TestFilterUnreferencedSlabs(t *testing.T) { oldPayloads = append(oldPayloads, payload) } - const totalSlabCount = 4 + const totalSlabCount = 5 require.Len(t, oldPayloads, totalSlabCount) @@ -130,27 +150,31 @@ func TestFilterUnreferencedSlabs(t *testing.T) { // Assert - assert.Len(t, newPayloads, totalSlabCount-1) - writer := rwf.reportWriters[filterUnreferencedSlabsName] - expectedFilteredPayloads := make([]*ledger.Payload, 0, 1) - expectedAddress := string(testAddress[:]) - expectedKey := string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 2}) + expectedKeys := map[string]struct{}{ + string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 2}): {}, + string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 3}): {}, + } + + assert.Len(t, newPayloads, totalSlabCount-len(expectedKeys)) + + expectedFilteredPayloads := make([]*ledger.Payload, 0, len(expectedKeys)) for _, payload := range oldPayloads { registerID, _, err := convert.PayloadToRegister(payload) require.NoError(t, err) - if registerID.Owner != expectedAddress || - registerID.Key != expectedKey { + if registerID.Owner != expectedAddress { + continue + } + if _, ok := expectedKeys[registerID.Key]; !ok { continue } expectedFilteredPayloads = append(expectedFilteredPayloads, payload) - break } assert.Equal(t, From 652410e652ba743e58eb2496ed1f6261ec7bce0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Mon, 22 Apr 2024 09:56:26 -0700 Subject: [PATCH 09/12] add migration which filters unreferenced slabs to pipeline --- cmd/util/cmd/execution-state-extract/cmd.go | 5 +++++ cmd/util/ledger/migrations/cadence.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index 6db992b6337..1b19896e224 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -46,6 +46,7 @@ var ( flagOutputPayloadFileName string flagOutputPayloadByAddresses string flagMaxAccountSize uint64 + flagFilterUnreferencedSlabs bool ) var Cmd = &cobra.Command{ @@ -151,6 +152,9 @@ func init() { Cmd.Flags().Uint64Var(&flagMaxAccountSize, "max-account-size", 0, "max account size") + + Cmd.Flags().BoolVar(&flagFilterUnreferencedSlabs, "filter-unreferenced-slabs", false, + "filter unreferenced slabs") } func run(*cobra.Command, []string) { @@ -371,6 +375,7 @@ func run(*cobra.Command, []string) { Prune: flagPrune, MaxAccountSize: flagMaxAccountSize, VerboseErrorOutput: flagVerboseErrorOutput, + FilterUnreferencedSlabs: flagFilterUnreferencedSlabs, } if len(flagInputPayloadFileName) > 0 { diff --git a/cmd/util/ledger/migrations/cadence.go b/cmd/util/ledger/migrations/cadence.go index ab56c65578e..2b441953a63 100644 --- a/cmd/util/ledger/migrations/cadence.go +++ b/cmd/util/ledger/migrations/cadence.go @@ -384,6 +384,7 @@ type Options struct { StagedContracts []StagedContract Prune bool MaxAccountSize uint64 + FilterUnreferencedSlabs bool } func NewCadence1Migrations( @@ -412,6 +413,19 @@ func NewCadence1Migrations( ) } + if opts.FilterUnreferencedSlabs { + migrations = append(migrations, NamedMigration{ + Name: "filter-unreferenced-slabs-migration", + Migrate: NewAccountBasedMigration( + log, + opts.NWorker, + []AccountBasedMigration{ + NewFilterUnreferencedSlabsMigration(rwf), + }, + ), + }) + } + if opts.Prune { migration := NewCadence1PruneMigration(opts.ChainID, log) if migration != nil { From 7e3d5981ab24cf31d1a367840fa36c42d30be037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Tue, 23 Apr 2024 10:19:32 -0700 Subject: [PATCH 10/12] write filtered payloads to payloads file, only report filtering events/filtered accounts --- .../execution_state_extract.go | 5 +- cmd/util/ledger/migrations/cadence.go | 3 +- .../cadence_values_migration_test.go | 4 + .../filter_unreferenced_slabs_migration.go | 76 +++++++++++++++++-- ...ilter_unreferenced_slabs_migration_test.go | 22 +++++- .../staged_contracts_migration_test.go | 1 + 6 files changed, 98 insertions(+), 13 deletions(-) diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract.go b/cmd/util/cmd/execution-state-extract/execution_state_extract.go index dfcacc0689c..bf1f6d131dd 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract.go @@ -372,7 +372,7 @@ func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) ( func newMigrations( log zerolog.Logger, - dir string, + outputDir string, runMigrations bool, opts migrators.Options, ) []ledger.Migration { @@ -382,10 +382,11 @@ func newMigrations( log.Info().Msgf("initializing migrations") - rwf := reporters.NewReportFileWriterFactory(dir, log) + rwf := reporters.NewReportFileWriterFactory(outputDir, log) namedMigrations := migrators.NewCadence1Migrations( log, + outputDir, rwf, opts, ) diff --git a/cmd/util/ledger/migrations/cadence.go b/cmd/util/ledger/migrations/cadence.go index 2b441953a63..ce425c5b7d7 100644 --- a/cmd/util/ledger/migrations/cadence.go +++ b/cmd/util/ledger/migrations/cadence.go @@ -389,6 +389,7 @@ type Options struct { func NewCadence1Migrations( log zerolog.Logger, + outputDir string, rwf reporters.ReportWriterFactory, opts Options, ) []NamedMigration { @@ -420,7 +421,7 @@ func NewCadence1Migrations( log, opts.NWorker, []AccountBasedMigration{ - NewFilterUnreferencedSlabsMigration(rwf), + NewFilterUnreferencedSlabsMigration(outputDir, rwf), }, ), }) diff --git a/cmd/util/ledger/migrations/cadence_values_migration_test.go b/cmd/util/ledger/migrations/cadence_values_migration_test.go index 99e741605ce..80a2c1637a7 100644 --- a/cmd/util/ledger/migrations/cadence_values_migration_test.go +++ b/cmd/util/ledger/migrations/cadence_values_migration_test.go @@ -79,6 +79,7 @@ func TestCadenceValuesMigration(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -698,6 +699,7 @@ func TestBootstrappedStateMigration(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -823,6 +825,7 @@ func TestProgramParsingError(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -948,6 +951,7 @@ func TestCoreContractUsage(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 124c85292b7..35bc9e45c04 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "path" + "sync" + "time" "github.com/onflow/atree" "github.com/onflow/cadence/runtime" @@ -11,6 +14,7 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/convert" "github.com/onflow/flow-go/model/flow" @@ -25,8 +29,13 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { } type FilterUnreferencedSlabsMigration struct { - log zerolog.Logger - rw reporters.ReportWriter + log zerolog.Logger + rw reporters.ReportWriter + outputDir string + mutex sync.Mutex + filteredAccounts []common.Address + filteredPayloads []*ledger.Payload + payloadsFile string } var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} @@ -34,10 +43,12 @@ var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} const filterUnreferencedSlabsName = "filter-unreferenced-slabs" func NewFilterUnreferencedSlabsMigration( + outputDir string, rwf reporters.ReportWriterFactory, ) *FilterUnreferencedSlabsMigration { return &FilterUnreferencedSlabsMigration{ - rw: rwf.ReportWriter(filterUnreferencedSlabsName), + outputDir: outputDir, + rw: rwf.ReportWriter(filterUnreferencedSlabsName), } } @@ -136,10 +147,15 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( } m.rw.Write(unreferencedSlabs{ - Account: address, - Payloads: filteredPayloads, + Account: address, + PayloadCount: len(filteredPayloads), }) + m.mergeFilteredPayloads( + address, + filteredPayloads, + ) + // Do NOT report the health check error here. // The health check error is only reported if it is not due to unreferenced slabs. // If it is due to unreferenced slabs, we filter them out and continue. @@ -147,14 +163,60 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( return newPayloads, nil } +func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads( + address common.Address, + payloads []*ledger.Payload, +) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.filteredAccounts = append(m.filteredAccounts, address) + m.filteredPayloads = append(m.filteredPayloads, payloads...) +} + func (m *FilterUnreferencedSlabsMigration) Close() error { // close the report writer so it flushes to file m.rw.Close() + err := m.writeFilteredPayloads() + if err != nil { + return fmt.Errorf("failed to write filtered payloads to file: %w", err) + } + + return nil +} + +func (m *FilterUnreferencedSlabsMigration) writeFilteredPayloads() error { + + m.payloadsFile = path.Join( + m.outputDir, + fmt.Sprintf("filtered_%d.payloads", int32(time.Now().Unix())), + ) + + writtenPayloadCount, err := util.CreatePayloadFile( + m.log, + m.payloadsFile, + m.filteredPayloads, + m.filteredAccounts, + true, + ) + + if err != nil { + return fmt.Errorf("failed to write all filtered payloads to file: %w", err) + } + + if writtenPayloadCount != len(m.filteredPayloads) { + return fmt.Errorf( + "failed to write all filtered payloads to file: expected %d, got %d", + len(m.filteredPayloads), + writtenPayloadCount, + ) + } + return nil } type unreferencedSlabs struct { - Account common.Address `json:"account"` - Payloads []*ledger.Payload `json:"payloads"` + Account common.Address `json:"account"` + PayloadCount int `json:"payload_count"` } diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index b6a56ebfbd1..d0282374826 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -136,7 +136,7 @@ func TestFilterUnreferencedSlabs(t *testing.T) { // Act rwf := &testReportWriterFactory{} - migration := NewFilterUnreferencedSlabsMigration(rwf) + migration := NewFilterUnreferencedSlabsMigration(t.TempDir(), rwf) log := zerolog.New(zerolog.NewTestWriter(t)) @@ -148,6 +148,9 @@ func TestFilterUnreferencedSlabs(t *testing.T) { newPayloads, err := migration.MigrateAccount(ctx, testAddress, oldPayloads) require.NoError(t, err) + err = migration.Close() + require.NoError(t, err) + // Assert writer := rwf.reportWriters[filterUnreferencedSlabsName] @@ -180,10 +183,23 @@ func TestFilterUnreferencedSlabs(t *testing.T) { assert.Equal(t, []any{ unreferencedSlabs{ - Account: testAddress, - Payloads: expectedFilteredPayloads, + Account: testAddress, + PayloadCount: len(expectedFilteredPayloads), }, }, writer.entries, ) + assert.Equal(t, + expectedFilteredPayloads, + migration.filteredPayloads, + ) + assert.Equal(t, + []common.Address{testAddress}, + migration.filteredAccounts, + ) + + readIsPartial, readFilteredPayloads, err := util.ReadPayloadFile(log, migration.payloadsFile) + require.NoError(t, err) + assert.True(t, readIsPartial) + assert.Equal(t, expectedFilteredPayloads, readFilteredPayloads) } diff --git a/cmd/util/ledger/migrations/staged_contracts_migration_test.go b/cmd/util/ledger/migrations/staged_contracts_migration_test.go index 4467cf3e117..5d20d7bb17d 100644 --- a/cmd/util/ledger/migrations/staged_contracts_migration_test.go +++ b/cmd/util/ledger/migrations/staged_contracts_migration_test.go @@ -1797,6 +1797,7 @@ func TestConcurrentContractUpdate(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, From f5a46ee35b4fb609642dc83bb071e7ffbc36578b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Tue, 23 Apr 2024 11:09:48 -0700 Subject: [PATCH 11/12] remove unnecessary filered accounts field --- .../filter_unreferenced_slabs_migration.go | 14 +++----------- .../filter_unreferenced_slabs_migration_test.go | 4 ---- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index 35bc9e45c04..ddf37a37153 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -33,7 +33,6 @@ type FilterUnreferencedSlabsMigration struct { rw reporters.ReportWriter outputDir string mutex sync.Mutex - filteredAccounts []common.Address filteredPayloads []*ledger.Payload payloadsFile string } @@ -151,10 +150,7 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( PayloadCount: len(filteredPayloads), }) - m.mergeFilteredPayloads( - address, - filteredPayloads, - ) + m.mergeFilteredPayloads(filteredPayloads) // Do NOT report the health check error here. // The health check error is only reported if it is not due to unreferenced slabs. @@ -163,14 +159,10 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount( return newPayloads, nil } -func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads( - address common.Address, - payloads []*ledger.Payload, -) { +func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads(payloads []*ledger.Payload) { m.mutex.Lock() defer m.mutex.Unlock() - m.filteredAccounts = append(m.filteredAccounts, address) m.filteredPayloads = append(m.filteredPayloads, payloads...) } @@ -197,7 +189,7 @@ func (m *FilterUnreferencedSlabsMigration) writeFilteredPayloads() error { m.log, m.payloadsFile, m.filteredPayloads, - m.filteredAccounts, + nil, true, ) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go index d0282374826..2d7c76629a5 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -193,10 +193,6 @@ func TestFilterUnreferencedSlabs(t *testing.T) { expectedFilteredPayloads, migration.filteredPayloads, ) - assert.Equal(t, - []common.Address{testAddress}, - migration.filteredAccounts, - ) readIsPartial, readFilteredPayloads, err := util.ReadPayloadFile(log, migration.payloadsFile) require.NoError(t, err) From 52175eab577066169d9f24560b22a5a907701869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20M=C3=BCller?= Date: Tue, 23 Apr 2024 11:10:06 -0700 Subject: [PATCH 12/12] pre-allocate filtered payloads --- .../ledger/migrations/filter_unreferenced_slabs_migration.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go index ddf37a37153..e411d313fb3 100644 --- a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -46,8 +46,9 @@ func NewFilterUnreferencedSlabsMigration( rwf reporters.ReportWriterFactory, ) *FilterUnreferencedSlabsMigration { return &FilterUnreferencedSlabsMigration{ - outputDir: outputDir, - rw: rwf.ReportWriter(filterUnreferencedSlabsName), + outputDir: outputDir, + rw: rwf.ReportWriter(filterUnreferencedSlabsName), + filteredPayloads: make([]*ledger.Payload, 0, 50_000), } }