diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index 6db992b6337..1b19896e224 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -46,6 +46,7 @@ var ( flagOutputPayloadFileName string flagOutputPayloadByAddresses string flagMaxAccountSize uint64 + flagFilterUnreferencedSlabs bool ) var Cmd = &cobra.Command{ @@ -151,6 +152,9 @@ func init() { Cmd.Flags().Uint64Var(&flagMaxAccountSize, "max-account-size", 0, "max account size") + + Cmd.Flags().BoolVar(&flagFilterUnreferencedSlabs, "filter-unreferenced-slabs", false, + "filter unreferenced slabs") } func run(*cobra.Command, []string) { @@ -371,6 +375,7 @@ func run(*cobra.Command, []string) { Prune: flagPrune, MaxAccountSize: flagMaxAccountSize, VerboseErrorOutput: flagVerboseErrorOutput, + FilterUnreferencedSlabs: flagFilterUnreferencedSlabs, } if len(flagInputPayloadFileName) > 0 { diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract.go b/cmd/util/cmd/execution-state-extract/execution_state_extract.go index dfcacc0689c..bf1f6d131dd 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract.go @@ -372,7 +372,7 @@ func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) ( func newMigrations( log zerolog.Logger, - dir string, + outputDir string, runMigrations bool, opts migrators.Options, ) []ledger.Migration { @@ -382,10 +382,11 @@ func newMigrations( log.Info().Msgf("initializing migrations") - rwf := reporters.NewReportFileWriterFactory(dir, log) + rwf := reporters.NewReportFileWriterFactory(outputDir, log) namedMigrations := migrators.NewCadence1Migrations( log, + outputDir, rwf, opts, ) diff --git a/cmd/util/ledger/migrations/cadence.go b/cmd/util/ledger/migrations/cadence.go index ab56c65578e..ce425c5b7d7 100644 --- a/cmd/util/ledger/migrations/cadence.go +++ b/cmd/util/ledger/migrations/cadence.go @@ -384,10 +384,12 @@ type Options struct { StagedContracts []StagedContract Prune bool MaxAccountSize uint64 + FilterUnreferencedSlabs bool } func NewCadence1Migrations( log zerolog.Logger, + outputDir string, rwf reporters.ReportWriterFactory, opts Options, ) []NamedMigration { @@ -412,6 +414,19 @@ func NewCadence1Migrations( ) } + if opts.FilterUnreferencedSlabs { + migrations = append(migrations, NamedMigration{ + Name: "filter-unreferenced-slabs-migration", + Migrate: NewAccountBasedMigration( + log, + opts.NWorker, + []AccountBasedMigration{ + NewFilterUnreferencedSlabsMigration(outputDir, rwf), + }, + ), + }) + } + if opts.Prune { migration := NewCadence1PruneMigration(opts.ChainID, log) if migration != nil { diff --git a/cmd/util/ledger/migrations/cadence_values_migration_test.go b/cmd/util/ledger/migrations/cadence_values_migration_test.go index 99e741605ce..80a2c1637a7 100644 --- a/cmd/util/ledger/migrations/cadence_values_migration_test.go +++ b/cmd/util/ledger/migrations/cadence_values_migration_test.go @@ -79,6 +79,7 @@ func TestCadenceValuesMigration(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -698,6 +699,7 @@ func TestBootstrappedStateMigration(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -823,6 +825,7 @@ func TestProgramParsingError(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, @@ -948,6 +951,7 @@ func TestCoreContractUsage(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go new file mode 100644 index 00000000000..e411d313fb3 --- /dev/null +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go @@ -0,0 +1,215 @@ +package migrations + +import ( + "context" + "errors" + "fmt" + "path" + "sync" + "time" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID { + storageID := atree.StorageID{ + Address: atree.Address([]byte(registerID.Owner)), + } + copy(storageID.Index[:], registerID.Key[1:]) + return storageID +} + +type FilterUnreferencedSlabsMigration struct { + log zerolog.Logger + rw reporters.ReportWriter + outputDir string + mutex sync.Mutex + filteredPayloads []*ledger.Payload + payloadsFile string +} + +var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{} + +const filterUnreferencedSlabsName = "filter-unreferenced-slabs" + +func NewFilterUnreferencedSlabsMigration( + outputDir string, + rwf reporters.ReportWriterFactory, +) *FilterUnreferencedSlabsMigration { + return &FilterUnreferencedSlabsMigration{ + outputDir: outputDir, + rw: rwf.ReportWriter(filterUnreferencedSlabsName), + filteredPayloads: make([]*ledger.Payload, 0, 50_000), + } +} + +func (m *FilterUnreferencedSlabsMigration) InitMigration( + log zerolog.Logger, + _ []*ledger.Payload, + _ int, +) error { + m.log = log. + With(). + Str("migration", filterUnreferencedSlabsName). + Logger() + + return nil +} + +func (m *FilterUnreferencedSlabsMigration) MigrateAccount( + _ context.Context, + address common.Address, + oldPayloads []*ledger.Payload, +) ( + newPayloads []*ledger.Payload, + err error, +) { + migrationRuntime, err := NewAtreeRegisterMigratorRuntime(address, oldPayloads) + if err != nil { + return nil, fmt.Errorf("failed to create migrator runtime: %w", err) + } + + storage := migrationRuntime.Storage + + newPayloads = oldPayloads + + err = checkStorageHealth(address, storage, oldPayloads) + if err == nil { + return + } + + // The storage health check failed. + // This can happen if there are unreferenced root slabs. + // In this case, we filter out the unreferenced root slabs and all slabs they reference from the payloads. + + var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError + if !errors.As(err, &unreferencedRootSlabsErr) { + return nil, fmt.Errorf("storage health check failed: %w", err) + } + + m.log.Warn(). + Err(err). + Str("account", address.Hex()). + Msg("filtering unreferenced root slabs") + + // Create a set of unreferenced slabs: root slabs, and all slabs they reference. + + unreferencedSlabIDs := map[atree.StorageID]struct{}{} + for _, rootSlabID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs { + unreferencedSlabIDs[rootSlabID] = struct{}{} + + childReferences, _, err := storage.GetAllChildReferences(rootSlabID) + if err != nil { + return nil, fmt.Errorf( + "failed to get all child references for root slab %s: %w", + rootSlabID, + err, + ) + } + + for _, childSlabID := range childReferences { + unreferencedSlabIDs[childSlabID] = struct{}{} + } + } + + // Filter out unreferenced slabs. + + newCount := len(oldPayloads) - len(unreferencedSlabIDs) + newPayloads = make([]*ledger.Payload, 0, newCount) + + filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedSlabIDs)) + + for _, payload := range oldPayloads { + registerID, _, err := convert.PayloadToRegister(payload) + if err != nil { + return nil, fmt.Errorf("failed to convert payload to register: %w", err) + } + + // Filter unreferenced slabs. + if registerID.IsSlabIndex() { + storageID := StorageIDFromRegisterID(registerID) + if _, ok := unreferencedSlabIDs[storageID]; ok { + filteredPayloads = append(filteredPayloads, payload) + continue + } + } + + newPayloads = append(newPayloads, payload) + } + + m.rw.Write(unreferencedSlabs{ + Account: address, + PayloadCount: len(filteredPayloads), + }) + + m.mergeFilteredPayloads(filteredPayloads) + + // Do NOT report the health check error here. + // The health check error is only reported if it is not due to unreferenced slabs. + // If it is due to unreferenced slabs, we filter them out and continue. + + return newPayloads, nil +} + +func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads(payloads []*ledger.Payload) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.filteredPayloads = append(m.filteredPayloads, payloads...) +} + +func (m *FilterUnreferencedSlabsMigration) Close() error { + // close the report writer so it flushes to file + m.rw.Close() + + err := m.writeFilteredPayloads() + if err != nil { + return fmt.Errorf("failed to write filtered payloads to file: %w", err) + } + + return nil +} + +func (m *FilterUnreferencedSlabsMigration) writeFilteredPayloads() error { + + m.payloadsFile = path.Join( + m.outputDir, + fmt.Sprintf("filtered_%d.payloads", int32(time.Now().Unix())), + ) + + writtenPayloadCount, err := util.CreatePayloadFile( + m.log, + m.payloadsFile, + m.filteredPayloads, + nil, + true, + ) + + if err != nil { + return fmt.Errorf("failed to write all filtered payloads to file: %w", err) + } + + if writtenPayloadCount != len(m.filteredPayloads) { + return fmt.Errorf( + "failed to write all filtered payloads to file: expected %d, got %d", + len(m.filteredPayloads), + writtenPayloadCount, + ) + } + + return nil +} + +type unreferencedSlabs struct { + Account common.Address `json:"account"` + PayloadCount int `json:"payload_count"` +} diff --git a/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go new file mode 100644 index 00000000000..2d7c76629a5 --- /dev/null +++ b/cmd/util/ledger/migrations/filter_unreferenced_slabs_migration_test.go @@ -0,0 +1,201 @@ +package migrations + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/convert" + "github.com/onflow/flow-go/model/flow" +) + +func TestFilterUnreferencedSlabs(t *testing.T) { + t.Parallel() + + // Arrange + + const chainID = flow.Emulator + chain := chainID.Chain() + + testFlowAddress, err := chain.AddressAtIndex(1_000_000) + require.NoError(t, err) + + testAddress := common.Address(testFlowAddress) + + payloads := map[flow.RegisterID]*ledger.Payload{} + + payloadsLedger := util.NewPayloadsLedger(payloads) + + storageIndices := map[string]uint64{} + payloadsLedger.AllocateStorageIndexFunc = func(owner []byte) (atree.StorageIndex, error) { + var index atree.StorageIndex + + storageIndices[string(owner)]++ + + binary.BigEndian.PutUint64( + index[:], + storageIndices[string(owner)], + ) + + return index, nil + } + + storage := runtime.NewStorage(payloadsLedger, nil) + + // {Int: Int} + dict1StaticType := interpreter.NewDictionaryStaticType( + nil, + interpreter.PrimitiveStaticTypeInt, + interpreter.PrimitiveStaticTypeInt, + ) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + require.NoError(t, err) + + dict1 := interpreter.NewDictionaryValueWithAddress( + inter, + interpreter.EmptyLocationRange, + dict1StaticType, + testAddress, + ) + + // Storage another dictionary, with a nested array, in the account. + // It is not referenced through a storage map though. + + arrayStaticType := interpreter.NewVariableSizedStaticType(nil, interpreter.PrimitiveStaticTypeInt) + + dict2StaticType := interpreter.NewDictionaryStaticType( + nil, + interpreter.PrimitiveStaticTypeInt, + arrayStaticType, + ) + + dict2 := interpreter.NewDictionaryValueWithAddress( + inter, + interpreter.EmptyLocationRange, + dict2StaticType, + testAddress, + ) + + dict2.InsertWithoutTransfer( + inter, interpreter.EmptyLocationRange, + interpreter.NewUnmeteredIntValueFromInt64(2), + interpreter.NewArrayValue( + inter, + interpreter.EmptyLocationRange, + arrayStaticType, + testAddress, + interpreter.NewUnmeteredIntValueFromInt64(3), + ), + ) + + storageMap := storage.GetStorageMap( + testAddress, + common.PathDomainStorage.Identifier(), + true, + ) + + // Only insert first dictionary. + // Second dictionary is unreferenced. + + storageMap.SetValue( + inter, + interpreter.StringStorageMapKey("test"), + dict1, + ) + + err = storage.Commit(inter, false) + require.NoError(t, err) + + oldPayloads := make([]*ledger.Payload, 0, len(payloads)) + + for _, payload := range payloadsLedger.Payloads { + oldPayloads = append(oldPayloads, payload) + } + + const totalSlabCount = 5 + + require.Len(t, oldPayloads, totalSlabCount) + + // Act + + rwf := &testReportWriterFactory{} + migration := NewFilterUnreferencedSlabsMigration(t.TempDir(), rwf) + + log := zerolog.New(zerolog.NewTestWriter(t)) + + err = migration.InitMigration(log, nil, 0) + require.NoError(t, err) + + ctx := context.Background() + + newPayloads, err := migration.MigrateAccount(ctx, testAddress, oldPayloads) + require.NoError(t, err) + + err = migration.Close() + require.NoError(t, err) + + // Assert + + writer := rwf.reportWriters[filterUnreferencedSlabsName] + + expectedAddress := string(testAddress[:]) + expectedKeys := map[string]struct{}{ + string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 2}): {}, + string([]byte{flow.SlabIndexPrefix, 0, 0, 0, 0, 0, 0, 0, 3}): {}, + } + + assert.Len(t, newPayloads, totalSlabCount-len(expectedKeys)) + + expectedFilteredPayloads := make([]*ledger.Payload, 0, len(expectedKeys)) + + for _, payload := range oldPayloads { + registerID, _, err := convert.PayloadToRegister(payload) + require.NoError(t, err) + + if registerID.Owner != expectedAddress { + continue + } + + if _, ok := expectedKeys[registerID.Key]; !ok { + continue + } + + expectedFilteredPayloads = append(expectedFilteredPayloads, payload) + } + + assert.Equal(t, + []any{ + unreferencedSlabs{ + Account: testAddress, + PayloadCount: len(expectedFilteredPayloads), + }, + }, + writer.entries, + ) + assert.Equal(t, + expectedFilteredPayloads, + migration.filteredPayloads, + ) + + readIsPartial, readFilteredPayloads, err := util.ReadPayloadFile(log, migration.payloadsFile) + require.NoError(t, err) + assert.True(t, readIsPartial) + assert.Equal(t, expectedFilteredPayloads, readFilteredPayloads) +} diff --git a/cmd/util/ledger/migrations/staged_contracts_migration_test.go b/cmd/util/ledger/migrations/staged_contracts_migration_test.go index 4467cf3e117..5d20d7bb17d 100644 --- a/cmd/util/ledger/migrations/staged_contracts_migration_test.go +++ b/cmd/util/ledger/migrations/staged_contracts_migration_test.go @@ -1797,6 +1797,7 @@ func TestConcurrentContractUpdate(t *testing.T) { migrations := NewCadence1Migrations( logger, + t.TempDir(), rwf, Options{ NWorker: nWorker, diff --git a/cmd/util/ledger/util/util.go b/cmd/util/ledger/util/util.go index 4b4cb67926e..5cb8feeeec5 100644 --- a/cmd/util/ledger/util/util.go +++ b/cmd/util/ledger/util/util.go @@ -18,6 +18,10 @@ import ( "github.com/onflow/flow-go/model/flow" ) +func newRegisterID(owner []byte, key []byte) flow.RegisterID { + return flow.NewRegisterID(flow.BytesToAddress(owner), string(key)) +} + type AccountsAtreeLedger struct { Accounts environment.Accounts } @@ -29,10 +33,9 @@ func NewAccountsAtreeLedger(accounts environment.Accounts) *AccountsAtreeLedger var _ atree.Ledger = &AccountsAtreeLedger{} func (a *AccountsAtreeLedger) GetValue(owner, key []byte) ([]byte, error) { + registerID := newRegisterID(owner, key) v, err := a.Accounts.GetValue( - flow.NewRegisterID( - flow.BytesToAddress(owner), - string(key))) + registerID) if err != nil { return nil, fmt.Errorf("getting value failed: %w", err) } @@ -40,11 +43,8 @@ func (a *AccountsAtreeLedger) GetValue(owner, key []byte) ([]byte, error) { } func (a *AccountsAtreeLedger) SetValue(owner, key, value []byte) error { - err := a.Accounts.SetValue( - flow.NewRegisterID( - flow.BytesToAddress(owner), - string(key)), - value) + registerID := newRegisterID(owner, key) + err := a.Accounts.SetValue(registerID, value) if err != nil { return fmt.Errorf("setting value failed: %w", err) } @@ -76,8 +76,11 @@ type PayloadsReadonlyLedger struct { SetValueFunc func(owner, key, value []byte) (err error) } +var _ atree.Ledger = &PayloadsReadonlyLedger{} + func (p *PayloadsReadonlyLedger) GetValue(owner, key []byte) (value []byte, err error) { - v, err := p.Snapshot.Get(flow.NewRegisterID(flow.BytesToAddress(owner), string(key))) + registerID := newRegisterID(owner, key) + v, err := p.Snapshot.Get(registerID) if err != nil { return nil, fmt.Errorf("getting value failed: %w", err) } @@ -93,7 +96,8 @@ func (p *PayloadsReadonlyLedger) SetValue(owner, key, value []byte) (err error) } func (p *PayloadsReadonlyLedger) ValueExists(owner, key []byte) (bool, error) { - exists := p.Snapshot.Exists(flow.NewRegisterID(flow.BytesToAddress(owner), string(key))) + registerID := newRegisterID(owner, key) + exists := p.Snapshot.Exists(registerID) return exists, nil } @@ -238,3 +242,48 @@ func registerIDKeyFromString(s string) (flow.RegisterID, common.Address) { type PayloadMetaInfo struct { Height, Version uint64 } + +// PayloadsLedger is a simple read/write in-memory atree.Ledger implementation +type PayloadsLedger struct { + Payloads map[flow.RegisterID]*ledger.Payload + + AllocateStorageIndexFunc func(owner []byte) (atree.StorageIndex, error) +} + +var _ atree.Ledger = &PayloadsLedger{} + +func NewPayloadsLedger(payloads map[flow.RegisterID]*ledger.Payload) *PayloadsLedger { + return &PayloadsLedger{ + Payloads: payloads, + } +} + +func (p *PayloadsLedger) GetValue(owner, key []byte) (value []byte, err error) { + registerID := newRegisterID(owner, key) + v, ok := p.Payloads[registerID] + if !ok { + return nil, nil + } + return v.Value(), nil +} + +func (p *PayloadsLedger) SetValue(owner, key, value []byte) (err error) { + registerID := newRegisterID(owner, key) + ledgerKey := convert.RegisterIDToLedgerKey(registerID) + p.Payloads[registerID] = ledger.NewPayload(ledgerKey, value) + return nil +} + +func (p *PayloadsLedger) ValueExists(owner, key []byte) (exists bool, err error) { + registerID := newRegisterID(owner, key) + _, ok := p.Payloads[registerID] + return ok, nil +} + +func (p *PayloadsLedger) AllocateStorageIndex(owner []byte) (atree.StorageIndex, error) { + if p.AllocateStorageIndexFunc != nil { + return p.AllocateStorageIndexFunc(owner) + } + + panic("AllocateStorageIndex not expected to be called") +} diff --git a/model/flow/ledger.go b/model/flow/ledger.go index 61c521bbef4..81efb66a559 100644 --- a/model/flow/ledger.go +++ b/model/flow/ledger.go @@ -141,6 +141,8 @@ func (id RegisterID) IsInternalState() bool { id.Key == AccountStatusKey } +const SlabIndexPrefix = '$' + // IsSlabIndex returns true if the key is a slab index for an account's ordered fields // map. // @@ -148,15 +150,15 @@ func (id RegisterID) IsInternalState() bool { // only to cadence. Cadence encodes this map into bytes and split the bytes // into slab chunks before storing the slabs into the ledger. func (id RegisterID) IsSlabIndex() bool { - return len(id.Key) == 9 && id.Key[0] == '$' + return len(id.Key) == 9 && id.Key[0] == SlabIndexPrefix } // String returns formatted string representation of the RegisterID. func (id RegisterID) String() string { formattedKey := "" if id.IsSlabIndex() { - i := uint64(binary.BigEndian.Uint64([]byte(id.Key[1:]))) - formattedKey = fmt.Sprintf("$%d", i) + i := binary.BigEndian.Uint64([]byte(id.Key[1:])) + formattedKey = fmt.Sprintf("%c%d", SlabIndexPrefix, i) } else { formattedKey = fmt.Sprintf("#%x", []byte(id.Key)) }