Skip to content

Commit

Permalink
Merge pull request #5533 from onflow/bastian/improve-migrations-3
Browse files Browse the repository at this point in the history
[Cadence 1.0] Improve migrations
  • Loading branch information
turbolent authored Mar 12, 2024
2 parents 8aebe84 + cf34d51 commit a0b44e9
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 126 deletions.
6 changes: 6 additions & 0 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
flagInputPayloadFileName string
flagOutputPayloadFileName string
flagOutputPayloadByAddresses string
flagMaxAccountSize uint64
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -132,6 +133,9 @@ func init() {
"",
"extract payloads of addresses (comma separated hex-encoded addresses) to file specified by output-payload-filename",
)

Cmd.Flags().Uint64Var(&flagMaxAccountSize, "max-account-size", 0,
"max account size")
}

func run(*cobra.Command, []string) {
Expand Down Expand Up @@ -346,6 +350,7 @@ func run(*cobra.Command, []string) {
exportedAddresses,
flagSortPayloads,
flagPrune,
flagMaxAccountSize,
)
} else {
err = extractExecutionState(
Expand All @@ -365,6 +370,7 @@ func run(*cobra.Command, []string) {
exportedAddresses,
flagSortPayloads,
flagPrune,
flagMaxAccountSize,
)
}

Expand Down
53 changes: 7 additions & 46 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/common/hash"
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
Expand Down Expand Up @@ -48,6 +47,7 @@ func extractExecutionState(
exportPayloadsByAddresses []common.Address,
sortPayloads bool,
prune bool,
maxAccountSize uint64,
) error {

log.Info().Msg("init WAL")
Expand Down Expand Up @@ -119,6 +119,7 @@ func extractExecutionState(
burnerContractChange,
stagedContracts,
prune,
maxAccountSize,
)

newState := ledger.State(targetHash)
Expand Down Expand Up @@ -220,20 +221,6 @@ func writeStatusFile(fileName string, e error) error {
return err
}

func ByteCountIEC(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB",
float64(b)/float64(div), "KMGTPE"[exp])
}

func extractExecutionStateFromPayloads(
log zerolog.Logger,
dir string,
Expand All @@ -251,6 +238,7 @@ func extractExecutionStateFromPayloads(
exportPayloadsByAddresses []common.Address,
sortPayloads bool,
prune bool,
maxAccountSize uint64,
) error {

inputPayloadsFromPartialState, payloads, err := util.ReadPayloadFile(log, inputPayloadFile)
Expand All @@ -260,36 +248,6 @@ func extractExecutionStateFromPayloads(

log.Info().Msgf("read %d payloads", len(payloads))

if log.Debug().Enabled() {

type accountInfo struct {
count int
size uint64
}
payloadCountByAddress := make(map[string]accountInfo)

for _, payload := range payloads {
registerID, payloadValue, err := convert.PayloadToRegister(payload)
if err != nil {
return fmt.Errorf("cannot convert payload to register: %w", err)
}
owner := registerID.Owner
accountInfo := payloadCountByAddress[owner]
accountInfo.count++
accountInfo.size += uint64(len(payloadValue))
payloadCountByAddress[owner] = accountInfo
}

for address, info := range payloadCountByAddress {
log.Debug().Msgf(
"address %x has %d payloads and a total size of %s",
address,
info.count,
ByteCountIEC(int64(info.size)),
)
}
}

migrations := newMigrations(
log,
dir,
Expand All @@ -302,6 +260,7 @@ func extractExecutionStateFromPayloads(
burnerContractChange,
stagedContracts,
prune,
maxAccountSize,
)

payloads, err = migratePayloads(log, payloads, migrations)
Expand Down Expand Up @@ -393,7 +352,7 @@ func migratePayloads(logger zerolog.Logger, payloads []*ledger.Payload, migratio

// migrate payloads
for i, migrate := range migrations {
logger.Info().Msgf("migration %d/%d is underway", i, len(migrations))
logger.Info().Msgf("migration %d/%d is underway", i+1, len(migrations))

start := time.Now()
payloads, err = migrate(payloads)
Expand Down Expand Up @@ -458,6 +417,7 @@ func newMigrations(
burnerContractChange migrators.BurnerContractChange,
stagedContracts []migrators.StagedContract,
prune bool,
maxAccountSize uint64,
) []ledger.Migration {
if !runMigrations {
return nil
Expand All @@ -478,6 +438,7 @@ func newMigrations(
burnerContractChange,
stagedContracts,
prune,
maxAccountSize,
)

migrations := make([]ledger.Migration, 0, len(namedMigrations))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestExtractExecutionState(t *testing.T) {
nil,
false,
false,
0,
)
require.Error(t, err)
})
Expand Down
24 changes: 2 additions & 22 deletions cmd/util/ledger/migrations/account_based_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func withMigrations(
Type("migration", migrator).
Msg("closing migration")
if cerr := migrator.Close(); cerr != nil {
log.Error().Err(cerr).Msg("error closing migration")
log.Err(cerr).Msg("error closing migration")
if err == nil {
// only set the error if it's not already set
// so that we don't overwrite the original error
Expand Down Expand Up @@ -315,27 +315,7 @@ func MigrateGroupConcurrently(
return migrated, nil
}

var knownProblematicAccounts = map[common.Address]string{
// Testnet accounts with broken contracts
mustHexToAddress("434a1f199a7ae3ba"): "Broken contract FanTopPermission",
mustHexToAddress("454c9991c2b8d947"): "Broken contract Test",
mustHexToAddress("48602d8056ff9d93"): "Broken contract FanTopPermission",
mustHexToAddress("5d63c34d7f05e5a4"): "Broken contract FanTopPermission",
mustHexToAddress("5e3448b3cffb97f2"): "Broken contract FanTopPermission",
mustHexToAddress("7d8c7e050c694eaa"): "Broken contract Test",
mustHexToAddress("ba53f16ede01972d"): "Broken contract FanTopPermission",
mustHexToAddress("c843c1f5a4805c3a"): "Broken contract FanTopPermission",
mustHexToAddress("48d3be92e6e4a973"): "Broken contract FanTopPermission",
// Mainnet account
}

func mustHexToAddress(hex string) common.Address {
address, err := common.HexToAddress(hex)
if err != nil {
panic(err)
}
return address
}
var knownProblematicAccounts = map[common.Address]string{}

type jobMigrateAccountGroup struct {
Address common.Address
Expand Down
99 changes: 99 additions & 0 deletions cmd/util/ledger/migrations/account_size_filter_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package migrations

import (
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
)

func NewAccountSizeFilterMigration(
maxAccountSize uint64,
exceptions map[string]struct{},
log zerolog.Logger,
) ledger.Migration {

if maxAccountSize == 0 {
return nil
}

return func(payloads []*ledger.Payload) ([]*ledger.Payload, error) {

type accountInfo struct {
count int
size uint64
}
payloadCountByAddress := make(map[string]accountInfo)

for _, payload := range payloads {
registerID, payloadValue, err := convert.PayloadToRegister(payload)
if err != nil {
return nil, fmt.Errorf("cannot convert payload to register: %w", err)
}

owner := registerID.Owner
accountInfo := payloadCountByAddress[owner]
accountInfo.count++
accountInfo.size += uint64(len(payloadValue))
payloadCountByAddress[owner] = accountInfo
}

if log.Debug().Enabled() {
for address, info := range payloadCountByAddress {
log.Debug().Msgf(
"address %x has %d payloads and a total size of %s",
address,
info.count,
ByteCountIEC(int64(info.size)),
)

if _, ok := exceptions[address]; !ok && info.size > maxAccountSize {
log.Warn().Msgf(
"dropping payloads of account %x. size of payloads %s exceeds max size %s",
address,
ByteCountIEC(int64(info.size)),
ByteCountIEC(int64(maxAccountSize)),
)
}
}
}

filteredPayloads := make([]*ledger.Payload, 0, int(0.8*float32(len(payloads))))

for _, payload := range payloads {
registerID, _, err := convert.PayloadToRegister(payload)
if err != nil {
return nil, fmt.Errorf("cannot convert payload to register: %w", err)
}

owner := registerID.Owner

if _, ok := exceptions[owner]; !ok {
info := payloadCountByAddress[owner]
if info.size > maxAccountSize {
continue
}
}

filteredPayloads = append(filteredPayloads, payload)
}

return filteredPayloads, nil
}
}

func ByteCountIEC(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB",
float64(b)/float64(div), "KMGTPE"[exp])
}
Loading

0 comments on commit a0b44e9

Please sign in to comment.