From 85527b81bb9c190bf51b1e4b4f35f9e07b39320f Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 10:01:50 -0600 Subject: [PATCH 01/10] Add ability to read or extract payloads from state Added two flags to execution state extraction program: --extract-payloads-by-address produces a file of payloads for specified accounts or for all accounts instead of checkpoint files --use-payload-as-input uses payload file as input instead of checkpoint files The two new flags don't affect migration and other existing functionaly of state extraction program. These two options only affect input and output of state extraction program. In other words, this can be used to extract migrated payloads or extract as-is payloads for specified accounts. --- cmd/util/cmd/execution-state-extract/cmd.go | 116 ++++++- .../execution_state_extract.go | 199 +++++++++-- .../execution_state_extract_test.go | 327 +++++++++++++++++- .../export_payloads.go | 205 +++++++++++ 4 files changed, 797 insertions(+), 50 deletions(-) create mode 100644 cmd/util/cmd/execution-state-extract/export_payloads.go diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index 55728b428a8..55e3432ba9c 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -3,10 +3,13 @@ package extract import ( "encoding/hex" "path" + "strings" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + runtimeCommon "github.com/onflow/cadence/runtime/common" + "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/model/bootstrap" "github.com/onflow/flow-go/model/flow" @@ -26,6 +29,8 @@ var ( flagNoReport bool flagValidateMigration bool flagLogVerboseValidationError bool + flagInputPayload bool + flagOutputPayloadByAddresses string ) var Cmd = &cobra.Command{ @@ -68,6 +73,19 @@ func init() { Cmd.Flags().BoolVar(&flagLogVerboseValidationError, "log-verbose-validation-error", false, "log entire Cadence values on validation error (atree migration)") + Cmd.Flags().StringVar( + &flagOutputPayloadByAddresses, + "extract-payloads-by-address", + "", + "extract payloads of specified addresses (comma separated list of hex-encoded addresses or \"all\"", // empty string ignores this flag + ) + + Cmd.Flags().BoolVar( + &flagInputPayload, + "use-payload-as-input", + false, + "use payload file instead of checkpoint file as input", + ) } func run(*cobra.Command, []string) { @@ -112,20 +130,65 @@ func run(*cobra.Command, []string) { log.Info().Msgf("extracting state by state commitment: %x", stateCommitment) } - if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 { - log.Fatal().Msg("no --block-hash or --state-commitment was specified") + if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && !flagInputPayload { + log.Fatal().Msg("no --block-hash or --state-commitment or --use-payload-as-input was specified") } - log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v", - flagExecutionStateDir, - path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint), - 6, - ) + exportPayloads := len(flagOutputPayloadByAddresses) > 0 + + var exportedAddresses []runtimeCommon.Address + + if exportPayloads { + + addresses := strings.Split(flagOutputPayloadByAddresses, ",") + + if len(addresses) == 1 && strings.TrimSpace(addresses[0]) == "all" { + // Extract payloads of the entire state. + log.Info().Msgf("Extracting state from %s, exporting all payloads to %s", + flagExecutionStateDir, + path.Join(flagOutputDir, FilenamePayloads), + ) + } else { + // Extract payloads of specified accounts + for _, hexAddr := range addresses { + b, err := hex.DecodeString(strings.TrimSpace(hexAddr)) + if err != nil { + log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr)) + } + + addr, err := runtimeCommon.BytesToAddress(b) + if err != nil { + log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b) + } + + exportedAddresses = append(exportedAddresses, addr) + } + + log.Info().Msgf("Extracting state from %s, exporting payloads by addresses %v to %s", + flagExecutionStateDir, + flagOutputPayloadByAddresses, + path.Join(flagOutputDir, FilenamePayloads), + ) + } - log.Info().Msgf("Block state commitment: %s from %v, output dir: %s", - hex.EncodeToString(stateCommitment[:]), - flagExecutionStateDir, - flagOutputDir) + } else { + log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v", + flagExecutionStateDir, + path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint), + 6, + ) + } + + if flagInputPayload { + log.Info().Msgf("Payload input from %v, output dir: %s", + flagExecutionStateDir, + flagOutputDir) + } else { + log.Info().Msgf("Block state commitment: %s from %v, output dir: %s", + hex.EncodeToString(stateCommitment[:]), + flagExecutionStateDir, + flagOutputDir) + } // err := ensureCheckpointFileExist(flagExecutionStateDir) // if err != nil { @@ -148,14 +211,29 @@ func run(*cobra.Command, []string) { log.Warn().Msgf("atree migration has verbose validation error logging enabled which may increase size of log") } - err := extractExecutionState( - log.Logger, - flagExecutionStateDir, - stateCommitment, - flagOutputDir, - flagNWorker, - !flagNoMigration, - ) + var err error + if flagInputPayload { + err = extractExecutionStateFromPayloads( + log.Logger, + flagExecutionStateDir, + flagOutputDir, + flagNWorker, + !flagNoMigration, + exportPayloads, + exportedAddresses, + ) + } else { + err = extractExecutionState( + log.Logger, + flagExecutionStateDir, + stateCommitment, + flagOutputDir, + flagNWorker, + !flagNoMigration, + exportPayloads, + exportedAddresses, + ) + } if err != nil { log.Fatal().Err(err).Msgf("error extracting the execution state: %s", err.Error()) diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract.go b/cmd/util/cmd/execution-state-extract/execution_state_extract.go index 90bcd70533d..dc552682886 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract.go @@ -5,7 +5,9 @@ import ( "fmt" "math" "os" + "time" + "github.com/onflow/cadence/runtime/common" "github.com/rs/zerolog" "go.uber.org/atomic" @@ -34,6 +36,8 @@ func extractExecutionState( outputDir string, nWorker int, // number of concurrent worker to migation payloads runMigrations bool, + exportPayloads bool, + exportPayloadsByAddresses []common.Address, ) error { log.Info().Msg("init WAL") @@ -84,30 +88,7 @@ func extractExecutionState( <-compactor.Done() }() - var migrations []ledger.Migration - - if runMigrations { - rwf := reporters.NewReportFileWriterFactory(dir, log) - - migrations = []ledger.Migration{ - migrators.CreateAccountBasedMigration( - log, - nWorker, - []migrators.AccountBasedMigration{ - migrators.NewAtreeRegisterMigrator( - rwf, - flagValidateMigration, - flagLogVerboseValidationError, - ), - - &migrators.DeduplicateContractNamesMigration{}, - - // This will fix storage used discrepancies caused by the - // DeduplicateContractNamesMigration. - &migrators.AccountUsageMigrator{}, - }), - } - } + migrations := newMigrations(log, dir, nWorker, runMigrations) newState := ledger.State(targetHash) @@ -134,6 +115,19 @@ func extractExecutionState( log.Error().Err(err).Msgf("can not generate report for migrated state: %v", newMigratedState) } + if exportPayloads { + payloads := newTrie.AllPayloads() + + exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses) + if err != nil { + return fmt.Errorf("cannot generate payloads file: %w", err) + } + + log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads)) + + return nil + } + migratedState, err := createCheckpoint( newTrie, log, @@ -191,3 +185,160 @@ func writeStatusFile(fileName string, e error) error { err := os.WriteFile(fileName, checkpointStatusJson, 0644) return err } + +func extractExecutionStateFromPayloads( + log zerolog.Logger, + dir string, + outputDir string, + nWorker int, // number of concurrent worker to migation payloads + runMigrations bool, + exportPayloads bool, + exportPayloadsByAddresses []common.Address, +) error { + + payloads, err := readPayloadFile(log, dir) + if err != nil { + return err + } + + log.Info().Msgf("read %d payloads\n", len(payloads)) + + migrations := newMigrations(log, dir, nWorker, runMigrations) + + payloads, err = migratePayloads(log, payloads, migrations) + if err != nil { + return err + } + + if exportPayloads { + exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses) + if err != nil { + return fmt.Errorf("cannot generate payloads file: %w", err) + } + + log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads)) + + return nil + } + + newTrie, err := createTrieFromPayloads(log, payloads) + if err != nil { + return err + } + + migratedState, err := createCheckpoint( + newTrie, + log, + outputDir, + bootstrap.FilenameWALRootCheckpoint, + ) + if err != nil { + return fmt.Errorf("cannot generate the output checkpoint: %w", err) + } + + log.Info().Msgf( + "New state commitment for the exported state is: %s (base64: %s)", + migratedState.String(), + migratedState.Base64(), + ) + + return nil +} + +func migratePayloads(logger zerolog.Logger, payloads []*ledger.Payload, migrations []ledger.Migration) ([]*ledger.Payload, error) { + + if len(migrations) == 0 { + return payloads, nil + } + + var err error + payloadCount := len(payloads) + + // migrate payloads + for i, migrate := range migrations { + logger.Info().Msgf("migration %d/%d is underway", i, len(migrations)) + + start := time.Now() + payloads, err = migrate(payloads) + elapsed := time.Since(start) + + if err != nil { + return nil, fmt.Errorf("error applying migration (%d): %w", i, err) + } + + newPayloadCount := len(payloads) + + if payloadCount != newPayloadCount { + logger.Warn(). + Int("migration_step", i). + Int("expected_size", payloadCount). + Int("outcome_size", newPayloadCount). + Msg("payload counts has changed during migration, make sure this is expected.") + } + logger.Info().Str("timeTaken", elapsed.String()).Msgf("migration %d is done", i) + + payloadCount = newPayloadCount + } + + return payloads, nil +} + +func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (*trie.MTrie, error) { + // get paths + paths, err := pathfinder.PathsFromPayloads(payloads, complete.DefaultPathFinderVersion) + if err != nil { + return nil, fmt.Errorf("cannot export checkpoint, can't construct paths: %w", err) + } + + logger.Info().Msgf("constructing a new trie with migrated payloads (count: %d)...", len(payloads)) + + emptyTrie := trie.NewEmptyMTrie() + + derefPayloads := make([]ledger.Payload, len(payloads)) + for i, p := range payloads { + derefPayloads[i] = *p + } + + // no need to prune the data since it has already been prunned through migrations + applyPruning := false + newTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, derefPayloads, applyPruning) + if err != nil { + return nil, fmt.Errorf("constructing updated trie failed: %w", err) + } + + return newTrie, nil +} + +func newMigrations( + log zerolog.Logger, + dir string, + nWorker int, // number of concurrent worker to migation payloads + runMigrations bool, +) []ledger.Migration { + if runMigrations { + rwf := reporters.NewReportFileWriterFactory(dir, log) + + migrations := []ledger.Migration{ + migrators.CreateAccountBasedMigration( + log, + nWorker, + []migrators.AccountBasedMigration{ + migrators.NewAtreeRegisterMigrator( + rwf, + flagValidateMigration, + flagLogVerboseValidationError, + ), + + &migrators.DeduplicateContractNamesMigration{}, + + // This will fix storage used discrepancies caused by the + // DeduplicateContractNamesMigration. + &migrators.AccountUsageMigrator{}, + }), + } + + return migrations + } + + return nil +} diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go index 2f91ea7d603..39b05ad557f 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go @@ -2,13 +2,17 @@ package extract import ( "crypto/rand" + "encoding/hex" "math" + "strings" "testing" "github.com/rs/zerolog" "github.com/stretchr/testify/require" "go.uber.org/atomic" + runtimeCommon "github.com/onflow/cadence/runtime/common" + "github.com/onflow/flow-go/cmd/util/cmd/common" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/pathfinder" @@ -66,6 +70,8 @@ func TestExtractExecutionState(t *testing.T) { outdir, 10, false, + false, + nil, ) require.Error(t, err) }) @@ -96,7 +102,7 @@ func TestExtractExecutionState(t *testing.T) { var stateCommitment = f.InitialState() - //saved data after updates + // saved data after updates keysValuesByCommit := make(map[string]map[string]keyPair) commitsByBlocks := make(map[flow.Identifier]ledger.State) blocksInOrder := make([]flow.Identifier, size) @@ -108,7 +114,7 @@ func TestExtractExecutionState(t *testing.T) { require.NoError(t, err) stateCommitment, _, err = f.Set(update) - //stateCommitment, err = f.UpdateRegisters(keys, values, stateCommitment) + // stateCommitment, err = f.UpdateRegisters(keys, values, stateCommitment) require.NoError(t, err) // generate random block and map it to state commitment @@ -135,13 +141,13 @@ func TestExtractExecutionState(t *testing.T) { err = db.Close() require.NoError(t, err) - //for blockID, stateCommitment := range commitsByBlocks { + // for blockID, stateCommitment := range commitsByBlocks { for i, blockID := range blocksInOrder { stateCommitment := commitsByBlocks[blockID] - //we need fresh output dir to prevent contamination + // we need fresh output dir to prevent contamination unittest.RunWithTempDir(t, func(outdir string) { Cmd.SetArgs([]string{ @@ -182,7 +188,7 @@ func TestExtractExecutionState(t *testing.T) { require.NoError(t, err) registerValues, err := storage.Get(query) - //registerValues, err := mForest.Read([]byte(stateCommitment), keys) + // registerValues, err := mForest.Read([]byte(stateCommitment), keys) require.NoError(t, err) for i, key := range keys { @@ -190,7 +196,7 @@ func TestExtractExecutionState(t *testing.T) { require.Equal(t, data[key.String()].value, registerValue) } - //make sure blocks after this one are not in checkpoint + // make sure blocks after this one are not in checkpoint // ie - extraction stops after hitting right hash for j := i + 1; j < len(blocksInOrder); j++ { @@ -207,6 +213,312 @@ func TestExtractExecutionState(t *testing.T) { }) } +// TestExtractPayloadsFromExecutionState tests state extraction with checkpoint as input and payload as output. +func TestExtractPayloadsFromExecutionState(t *testing.T) { + + metr := &metrics.NoopCollector{} + + t.Run("all payloads", func(t *testing.T) { + withDirs(t, func(_, execdir, outdir string) { + + const ( + checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation. + checkpointsToKeep = 1 + ) + + size := 10 + + diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), execdir, size, pathfinder.PathByteSize, wal.SegmentSize) + require.NoError(t, err) + f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion) + require.NoError(t, err) + compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false)) + require.NoError(t, err) + <-compactor.Ready() + + var stateCommitment = f.InitialState() + + // Save generated data after updates + keysValues := make(map[string]keyPair) + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + update, err := ledger.NewUpdate(stateCommitment, keys, values) + require.NoError(t, err) + + stateCommitment, _, err = f.Set(update) + require.NoError(t, err) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + } + } + + <-f.Done() + <-compactor.Done() + + tries, err := f.Tries() + require.NoError(t, err) + + err = wal.StoreCheckpointV6SingleThread(tries, execdir, "checkpoint.00000001", zerolog.Nop()) + require.NoError(t, err) + + // Export all payloads + Cmd.SetArgs([]string{ + "--execution-state-dir", execdir, + "--output-dir", outdir, + "--state-commitment", hex.EncodeToString(stateCommitment[:]), + "--no-migration", + "--no-report", + "--extract-payloads-by-address", "all", + "--chain", flow.Emulator.Chain().String()}) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + require.NoError(t, err) + require.Equal(t, len(keysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := keysValues[k.String()] + require.True(t, exist) + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) + + t.Run("some payloads", func(t *testing.T) { + withDirs(t, func(_, execdir, outdir string) { + const ( + checkpointDistance = math.MaxInt // A large number to prevent checkpoint creation. + checkpointsToKeep = 1 + ) + + size := 10 + + diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), execdir, size, pathfinder.PathByteSize, wal.SegmentSize) + require.NoError(t, err) + f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion) + require.NoError(t, err) + compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false)) + require.NoError(t, err) + <-compactor.Ready() + + var stateCommitment = f.InitialState() + + // Save generated data after updates + keysValues := make(map[string]keyPair) + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + update, err := ledger.NewUpdate(stateCommitment, keys, values) + require.NoError(t, err) + + stateCommitment, _, err = f.Set(update) + require.NoError(t, err) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + } + } + + <-f.Done() + <-compactor.Done() + + tries, err := f.Tries() + require.NoError(t, err) + + err = wal.StoreCheckpointV6SingleThread(tries, execdir, "checkpoint.00000001", zerolog.Nop()) + require.NoError(t, err) + + const selectedAddressCount = 10 + selectedAddresses := make(map[string]struct{}) + selectedKeysValues := make(map[string]keyPair) + for k, kv := range keysValues { + owner := kv.key.KeyParts[0].Value + if len(owner) != runtimeCommon.AddressLength { + continue + } + + address, err := runtimeCommon.BytesToAddress(owner) + require.NoError(t, err) + + if len(selectedAddresses) < selectedAddressCount { + selectedAddresses[address.Hex()] = struct{}{} + } + + if _, exist := selectedAddresses[address.Hex()]; exist { + selectedKeysValues[k] = kv + } + } + + addresses := make([]string, 0, len(selectedAddresses)) + for address := range selectedAddresses { + addresses = append(addresses, address) + } + + // Export selected payloads + Cmd.SetArgs([]string{ + "--execution-state-dir", execdir, + "--output-dir", outdir, + "--state-commitment", hex.EncodeToString(stateCommitment[:]), + "--no-migration", + "--no-report", + "--extract-payloads-by-address", strings.Join(addresses, ","), + "--chain", flow.Emulator.Chain().String()}) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + require.NoError(t, err) + require.Equal(t, len(selectedKeysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := selectedKeysValues[k.String()] + require.True(t, exist) + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) +} + +// TestExtractStateFromPayloads tests state extraction with payload as input. +func TestExtractStateFromPayloads(t *testing.T) { + + t.Run("create checkpoint", func(t *testing.T) { + withDirs(t, func(_, execdir, outdir string) { + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := createPayloadFile(zerolog.Nop(), execdir, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + // Export checkpoint file + Cmd.SetArgs([]string{ + "--execution-state-dir", execdir, + "--output-dir", outdir, + "--no-migration", + "--no-report", + "--use-payload-as-input", + "--extract-payloads-by-address", "", + "--chain", flow.Emulator.Chain().String()}) + + err = Cmd.Execute() + require.NoError(t, err) + + tries, err := wal.OpenAndReadCheckpointV6(outdir, "root.checkpoint", zerolog.Nop()) + require.NoError(t, err) + require.Equal(t, 1, len(tries)) + + // Verify exported checkpoint + payloadsFromFile := tries[0].AllPayloads() + require.NoError(t, err) + require.Equal(t, len(keysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := keysValues[k.String()] + require.True(t, exist) + + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + + }) + + t.Run("create payloads", func(t *testing.T) { + withDirs(t, func(_, execdir, outdir string) { + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := createPayloadFile(zerolog.Nop(), execdir, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + // Export all payloads + Cmd.SetArgs([]string{ + "--execution-state-dir", execdir, + "--output-dir", outdir, + "--no-migration", + "--no-report", + "--use-payload-as-input", + "--extract-payloads-by-address", "all", + "--chain", flow.Emulator.Chain().String()}) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + require.NoError(t, err) + require.Equal(t, len(keysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := keysValues[k.String()] + require.True(t, exist) + + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) +} + func getSampleKeyValues(i int) ([]ledger.Key, []ledger.Value) { switch i { case 0: @@ -226,7 +538,8 @@ func getSampleKeyValues(i int) ([]ledger.Key, []ledger.Value) { keys := make([]ledger.Key, 0) values := make([]ledger.Value, 0) for j := 0; j < 10; j++ { - address := make([]byte, 32) + // address := make([]byte, 32) + address := make([]byte, 8) _, err := rand.Read(address) if err != nil { panic(err) diff --git a/cmd/util/cmd/execution-state-extract/export_payloads.go b/cmd/util/cmd/execution-state-extract/export_payloads.go new file mode 100644 index 00000000000..68325dac3de --- /dev/null +++ b/cmd/util/cmd/execution-state-extract/export_payloads.go @@ -0,0 +1,205 @@ +package extract + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/fxamacker/cbor/v2" + "github.com/rs/zerolog" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/ledger" +) + +const ( + FilenamePayloads = "root.payloads" + + defaultBufioWriteSize = 1024 * 32 + defaultBufioReadSize = 1024 * 32 + + payloadEncodingVersion = 1 +) + +func createPayloadFile( + logger zerolog.Logger, + outputDir string, + payloads []*ledger.Payload, + addresses []common.Address, +) (int, error) { + payloadFile := filepath.Join(outputDir, FilenamePayloads) + + f, err := os.Create(payloadFile) + if err != nil { + return 0, fmt.Errorf("can't create %s: %w", payloadFile, err) + } + defer f.Close() + + writer := bufio.NewWriterSize(f, defaultBufioWriteSize) + if err != nil { + return 0, fmt.Errorf("can't create bufio writer for %s: %w", payloadFile, err) + } + defer writer.Flush() + + includeAllPayloads := len(addresses) == 0 + + if includeAllPayloads { + return writeAllPayloads(logger, writer, payloads) + } + + return writeSelectedPayloads(logger, writer, payloads, addresses) +} + +func writeAllPayloads(logger zerolog.Logger, w io.Writer, payloads []*ledger.Payload) (int, error) { + logger.Info().Msgf("writing %d payloads to file", len(payloads)) + + enc := cbor.NewEncoder(w) + + // Encode number of payloads + err := enc.Encode(len(payloads)) + if err != nil { + return 0, fmt.Errorf("failed to encode number of payloads %d in CBOR: %w", len(payloads), err) + } + + var payloadScratchBuffer [1024 * 2]byte + for _, p := range payloads { + + buf := ledger.EncodeAndAppendPayloadWithoutPrefix(payloadScratchBuffer[:0], p, payloadEncodingVersion) + + // Encode payload + err = enc.Encode(buf) + if err != nil { + return 0, err + } + } + + return len(payloads), nil +} + +func writeSelectedPayloads(logger zerolog.Logger, w io.Writer, payloads []*ledger.Payload, addresses []common.Address) (int, error) { + var includedPayloadCount int + + includedFlags := make([]bool, len(payloads)) + for i, p := range payloads { + include, err := includePayloadByAddresses(p, addresses) + if err != nil { + return 0, err + } + + includedFlags[i] = include + + if include { + includedPayloadCount++ + } + } + + logger.Info().Msgf("writing %d payloads to file", includedPayloadCount) + + enc := cbor.NewEncoder(w) + + // Encode number of payloads + err := enc.Encode(includedPayloadCount) + if err != nil { + return 0, fmt.Errorf("failed to encode number of payloads %d in CBOR: %w", includedPayloadCount, err) + } + + var payloadScratchBuffer [1024 * 2]byte + for i, included := range includedFlags { + if !included { + continue + } + + p := payloads[i] + + buf := ledger.EncodeAndAppendPayloadWithoutPrefix(payloadScratchBuffer[:0], p, payloadEncodingVersion) + + // Encode payload + err = enc.Encode(buf) + if err != nil { + return 0, err + } + } + + return includedPayloadCount, nil +} + +func includePayloadByAddresses(payload *ledger.Payload, addresses []common.Address) (bool, error) { + if len(addresses) == 0 { + // Include all payloads + return true, nil + } + + for _, address := range addresses { + k, err := payload.Key() + if err != nil { + return false, fmt.Errorf("failed to get key from payload: %w", err) + } + + owner := k.KeyParts[0].Value + if bytes.Equal(owner, address[:]) { + return true, nil + } + } + + return false, nil +} + +func readPayloadFile(logger zerolog.Logger, inputDir string) ([]*ledger.Payload, error) { + payloadFile := filepath.Join(inputDir, FilenamePayloads) + + if _, err := os.Stat(payloadFile); os.IsNotExist(err) { + return nil, fmt.Errorf("%s doesn't exist", payloadFile) + } + + f, err := os.Open(payloadFile) + if err != nil { + return nil, fmt.Errorf("failed to open %s: %w", payloadFile, err) + } + defer f.Close() + + r := bufio.NewReaderSize(f, defaultBufioReadSize) + if err != nil { + return nil, fmt.Errorf("failed to create bufio reader for %s: %w", payloadFile, err) + } + + dec := cbor.NewDecoder(r) + + // Decode number of payloads + var payloadCount int + err = dec.Decode(&payloadCount) + if err != nil { + return nil, fmt.Errorf("failed to decode number of payload in CBOR: %w", err) + } + + logger.Info().Msgf("reading %d payloads from file", payloadCount) + + payloads := make([]*ledger.Payload, 0, payloadCount) + + for { + var rawPayload []byte + err := dec.Decode(&rawPayload) + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("failed to decode payload in CBOR: %w", err) + } + + payload, err := ledger.DecodePayloadWithoutPrefix(rawPayload, false, payloadEncodingVersion) + if err != nil { + return nil, fmt.Errorf("failed to decode payload 0x%x: %w", rawPayload, err) + } + + payloads = append(payloads, payload) + } + + if payloadCount != len(payloads) { + return nil, fmt.Errorf("failed to decode %s: expect %d payloads, got %d payloads", payloadFile, payloadCount, len(payloads)) + } + + return payloads, nil +} From 9b1fc05aec9b651d99603de1c75409d390a10917 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 11:45:44 -0600 Subject: [PATCH 02/10] Add t.Parallel() to test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Müller --- .../cmd/execution-state-extract/execution_state_extract_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go index 39b05ad557f..2bdd4e6b083 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go @@ -215,7 +215,7 @@ func TestExtractExecutionState(t *testing.T) { // TestExtractPayloadsFromExecutionState tests state extraction with checkpoint as input and payload as output. func TestExtractPayloadsFromExecutionState(t *testing.T) { - + t.Parallel() metr := &metrics.NoopCollector{} t.Run("all payloads", func(t *testing.T) { From 5d5eb09c0f7289647effe987893826bf80138edd Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:04:38 -0600 Subject: [PATCH 03/10] Remove t.Parallel() to clean output dir correctly --- .../cmd/execution-state-extract/execution_state_extract_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go index 2bdd4e6b083..3e2441d853a 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go @@ -215,7 +215,6 @@ func TestExtractExecutionState(t *testing.T) { // TestExtractPayloadsFromExecutionState tests state extraction with checkpoint as input and payload as output. func TestExtractPayloadsFromExecutionState(t *testing.T) { - t.Parallel() metr := &metrics.NoopCollector{} t.Run("all payloads", func(t *testing.T) { From 392b8423e506d2d773fea80a99068ecc8d44c660 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 16:53:10 -0600 Subject: [PATCH 04/10] Refactor & add flags for payloads in state extraction Refactored payload file related functionality to be more reusable. Added flags: --input-payload-filename --output-payload-filename --- cmd/util/cmd/execution-state-extract/cmd.go | 152 +++++++++++------- .../execution_state_extract.go | 26 ++- .../execution_state_extract_test.go | 50 ++++-- .../util/payload_file.go} | 13 +- 4 files changed, 157 insertions(+), 84 deletions(-) rename cmd/util/{cmd/execution-state-extract/export_payloads.go => ledger/util/payload_file.go} (93%) diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index 55e3432ba9c..760c7ff0975 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -2,6 +2,8 @@ package extract import ( "encoding/hex" + "fmt" + "os" "path" "strings" @@ -29,7 +31,8 @@ var ( flagNoReport bool flagValidateMigration bool flagLogVerboseValidationError bool - flagInputPayload bool + flagInputPayloadFileName string + flagOutputPayloadFileName string flagOutputPayloadByAddresses string ) @@ -74,17 +77,27 @@ func init() { "log entire Cadence values on validation error (atree migration)") Cmd.Flags().StringVar( - &flagOutputPayloadByAddresses, - "extract-payloads-by-address", + &flagInputPayloadFileName, + "input-payload-filename", "", - "extract payloads of specified addresses (comma separated list of hex-encoded addresses or \"all\"", // empty string ignores this flag + "input payload file", ) - Cmd.Flags().BoolVar( - &flagInputPayload, - "use-payload-as-input", - false, - "use payload file instead of checkpoint file as input", + Cmd.Flags().StringVar( + &flagOutputPayloadFileName, + "output-payload-filename", + "", + "output payload file", + ) + + Cmd.Flags().StringVar( + // Extract payloads of specified addresses (comma separated list of hex-encoded addresses) + // to file specified by --output-payload-filename. + // If no address is specified (empty string) then this flag is ignored. + &flagOutputPayloadByAddresses, + "extract-payloads-by-address", + "", + "extract payloads of addresses (comma separated hex-encoded addresses) to file specified by output-payload-filename", ) } @@ -96,6 +109,18 @@ func run(*cobra.Command, []string) { return } + if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && len(flagInputPayloadFileName) == 0 { + log.Fatal().Msg("--block-hash or --state-commitment or --input-payload-filename must be specified") + } + + if len(flagInputPayloadFileName) > 0 && (len(flagBlockHash) > 0 || len(flagStateCommitment) > 0) { + log.Fatal().Msg("--input-payload-filename cannot be used with --block-hash or --state-commitment") + } + + if len(flagOutputPayloadFileName) == 0 && len(flagOutputPayloadByAddresses) > 0 { + log.Fatal().Msg("--extract-payloads-by-address requires --output-payload-filename to be specified") + } + if len(flagBlockHash) > 0 { blockID, err := flow.HexStringToIdentifier(flagBlockHash) if err != nil { @@ -130,64 +155,37 @@ func run(*cobra.Command, []string) { log.Info().Msgf("extracting state by state commitment: %x", stateCommitment) } - if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && !flagInputPayload { - log.Fatal().Msg("no --block-hash or --state-commitment or --use-payload-as-input was specified") + if len(flagInputPayloadFileName) > 0 { + if _, err := os.Stat(flagInputPayloadFileName); os.IsNotExist(err) { + log.Fatal().Msgf("payload input file %s doesn't exist", flagInputPayloadFileName) + } } - exportPayloads := len(flagOutputPayloadByAddresses) > 0 + if len(flagOutputPayloadFileName) > 0 { + if _, err := os.Stat(flagOutputPayloadFileName); os.IsExist(err) { + log.Fatal().Msgf("payload output file %s exists", flagOutputPayloadFileName) + } + } var exportedAddresses []runtimeCommon.Address - if exportPayloads { + if len(flagOutputPayloadByAddresses) > 0 { addresses := strings.Split(flagOutputPayloadByAddresses, ",") - if len(addresses) == 1 && strings.TrimSpace(addresses[0]) == "all" { - // Extract payloads of the entire state. - log.Info().Msgf("Extracting state from %s, exporting all payloads to %s", - flagExecutionStateDir, - path.Join(flagOutputDir, FilenamePayloads), - ) - } else { - // Extract payloads of specified accounts - for _, hexAddr := range addresses { - b, err := hex.DecodeString(strings.TrimSpace(hexAddr)) - if err != nil { - log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr)) - } - - addr, err := runtimeCommon.BytesToAddress(b) - if err != nil { - log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b) - } - - exportedAddresses = append(exportedAddresses, addr) + for _, hexAddr := range addresses { + b, err := hex.DecodeString(strings.TrimSpace(hexAddr)) + if err != nil { + log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr)) } - log.Info().Msgf("Extracting state from %s, exporting payloads by addresses %v to %s", - flagExecutionStateDir, - flagOutputPayloadByAddresses, - path.Join(flagOutputDir, FilenamePayloads), - ) - } - - } else { - log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v", - flagExecutionStateDir, - path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint), - 6, - ) - } + addr, err := runtimeCommon.BytesToAddress(b) + if err != nil { + log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b) + } - if flagInputPayload { - log.Info().Msgf("Payload input from %v, output dir: %s", - flagExecutionStateDir, - flagOutputDir) - } else { - log.Info().Msgf("Block state commitment: %s from %v, output dir: %s", - hex.EncodeToString(stateCommitment[:]), - flagExecutionStateDir, - flagOutputDir) + exportedAddresses = append(exportedAddresses, addr) + } } // err := ensureCheckpointFileExist(flagExecutionStateDir) @@ -211,15 +209,51 @@ func run(*cobra.Command, []string) { log.Warn().Msgf("atree migration has verbose validation error logging enabled which may increase size of log") } + var inputMsg string + if len(flagInputPayloadFileName) > 0 { + // Input is payloads + inputMsg = fmt.Sprintf("reading payloads from %s", flagInputPayloadFileName) + } else { + // Input is execution state + inputMsg = fmt.Sprintf("reading block state commitment %s from %s", + hex.EncodeToString(stateCommitment[:]), + flagExecutionStateDir, + ) + } + + var outputMsg string + if len(flagOutputPayloadFileName) > 0 { + // Output is payload file + if len(exportedAddresses) == 0 { + outputMsg = fmt.Sprintf("exporting all payloads to %s", flagOutputPayloadFileName) + } else { + outputMsg = fmt.Sprintf( + "exporting payloads by addresses %v to %s", + flagOutputPayloadByAddresses, + flagOutputPayloadFileName, + ) + } + } else { + // Output is checkpoint files + outputMsg = fmt.Sprintf( + "exporting root checkpoint to %s, version: %d", + path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint), + 6, + ) + } + + log.Info().Msgf("%s, %s", inputMsg, outputMsg) + var err error - if flagInputPayload { + if len(flagInputPayloadFileName) > 0 { err = extractExecutionStateFromPayloads( log.Logger, flagExecutionStateDir, flagOutputDir, flagNWorker, !flagNoMigration, - exportPayloads, + flagInputPayloadFileName, + flagOutputPayloadFileName, exportedAddresses, ) } else { @@ -230,7 +264,7 @@ func run(*cobra.Command, []string) { flagOutputDir, flagNWorker, !flagNoMigration, - exportPayloads, + flagOutputPayloadFileName, exportedAddresses, ) } diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract.go b/cmd/util/cmd/execution-state-extract/execution_state_extract.go index dc552682886..4fa7697f44a 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract.go @@ -13,6 +13,7 @@ import ( migrators "github.com/onflow/flow-go/cmd/util/ledger/migrations" "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/hash" "github.com/onflow/flow-go/ledger/common/pathfinder" @@ -36,7 +37,7 @@ func extractExecutionState( outputDir string, nWorker int, // number of concurrent worker to migation payloads runMigrations bool, - exportPayloads bool, + outputPayloadFile string, exportPayloadsByAddresses []common.Address, ) error { @@ -115,10 +116,16 @@ func extractExecutionState( log.Error().Err(err).Msgf("can not generate report for migrated state: %v", newMigratedState) } + exportPayloads := len(outputPayloadFile) > 0 if exportPayloads { payloads := newTrie.AllPayloads() - exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses) + exportedPayloadCount, err := util.CreatePayloadFile( + log, + outputPayloadFile, + payloads, + exportPayloadsByAddresses, + ) if err != nil { return fmt.Errorf("cannot generate payloads file: %w", err) } @@ -192,16 +199,17 @@ func extractExecutionStateFromPayloads( outputDir string, nWorker int, // number of concurrent worker to migation payloads runMigrations bool, - exportPayloads bool, + inputPayloadFile string, + outputPayloadFile string, exportPayloadsByAddresses []common.Address, ) error { - payloads, err := readPayloadFile(log, dir) + payloads, err := util.ReadPayloadFile(log, inputPayloadFile) if err != nil { return err } - log.Info().Msgf("read %d payloads\n", len(payloads)) + log.Info().Msgf("read %d payloads", len(payloads)) migrations := newMigrations(log, dir, nWorker, runMigrations) @@ -210,8 +218,14 @@ func extractExecutionStateFromPayloads( return err } + exportPayloads := len(outputPayloadFile) > 0 if exportPayloads { - exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses) + exportedPayloadCount, err := util.CreatePayloadFile( + log, + outputPayloadFile, + payloads, + exportPayloadsByAddresses, + ) if err != nil { return fmt.Errorf("cannot generate payloads file: %w", err) } diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go index 3e2441d853a..d193a0169ef 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "encoding/hex" "math" + "path/filepath" "strings" "testing" @@ -14,6 +15,7 @@ import ( runtimeCommon "github.com/onflow/cadence/runtime/common" "github.com/onflow/flow-go/cmd/util/cmd/common" + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/pathfinder" "github.com/onflow/flow-go/ledger/complete" @@ -70,7 +72,7 @@ func TestExtractExecutionState(t *testing.T) { outdir, 10, false, - false, + "", nil, ) require.Error(t, err) @@ -217,6 +219,8 @@ func TestExtractExecutionState(t *testing.T) { func TestExtractPayloadsFromExecutionState(t *testing.T) { metr := &metrics.NoopCollector{} + const payloadFileName = "root.payload" + t.Run("all payloads", func(t *testing.T) { withDirs(t, func(_, execdir, outdir string) { @@ -225,6 +229,8 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { checkpointsToKeep = 1 ) + outputPayloadFileName := filepath.Join(outdir, payloadFileName) + size := 10 diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), execdir, size, pathfinder.PathByteSize, wal.SegmentSize) @@ -273,14 +279,14 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { "--state-commitment", hex.EncodeToString(stateCommitment[:]), "--no-migration", "--no-report", - "--extract-payloads-by-address", "all", + "--output-payload-filename", outputPayloadFileName, "--chain", flow.Emulator.Chain().String()}) err = Cmd.Execute() require.NoError(t, err) // Verify exported payloads. - payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputPayloadFileName) require.NoError(t, err) require.Equal(t, len(keysValues), len(payloadsFromFile)) @@ -302,6 +308,8 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { checkpointsToKeep = 1 ) + outputPayloadFileName := filepath.Join(outdir, payloadFileName) + size := 10 diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), execdir, size, pathfinder.PathByteSize, wal.SegmentSize) @@ -376,6 +384,7 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { "--state-commitment", hex.EncodeToString(stateCommitment[:]), "--no-migration", "--no-report", + "--output-payload-filename", outputPayloadFileName, "--extract-payloads-by-address", strings.Join(addresses, ","), "--chain", flow.Emulator.Chain().String()}) @@ -383,7 +392,7 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { require.NoError(t, err) // Verify exported payloads. - payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputPayloadFileName) require.NoError(t, err) require.Equal(t, len(selectedKeysValues), len(payloadsFromFile)) @@ -402,10 +411,14 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { // TestExtractStateFromPayloads tests state extraction with payload as input. func TestExtractStateFromPayloads(t *testing.T) { + const payloadFileName = "root.payload" + t.Run("create checkpoint", func(t *testing.T) { withDirs(t, func(_, execdir, outdir string) { size := 10 + inputPayloadFileName := filepath.Join(execdir, payloadFileName) + // Generate some data keysValues := make(map[string]keyPair) var payloads []*ledger.Payload @@ -423,7 +436,12 @@ func TestExtractStateFromPayloads(t *testing.T) { } } - numOfPayloadWritten, err := createPayloadFile(zerolog.Nop(), execdir, payloads, nil) + numOfPayloadWritten, err := util.CreatePayloadFile( + zerolog.Nop(), + inputPayloadFileName, + payloads, + nil, + ) require.NoError(t, err) require.Equal(t, len(payloads), numOfPayloadWritten) @@ -433,7 +451,9 @@ func TestExtractStateFromPayloads(t *testing.T) { "--output-dir", outdir, "--no-migration", "--no-report", - "--use-payload-as-input", + "--state-commitment", "", + "--input-payload-filename", inputPayloadFileName, + "--output-payload-filename", "", "--extract-payloads-by-address", "", "--chain", flow.Emulator.Chain().String()}) @@ -464,6 +484,9 @@ func TestExtractStateFromPayloads(t *testing.T) { t.Run("create payloads", func(t *testing.T) { withDirs(t, func(_, execdir, outdir string) { + inputPayloadFileName := filepath.Join(execdir, payloadFileName) + outputPayloadFileName := filepath.Join(outdir, "selected.payload") + size := 10 // Generate some data @@ -483,7 +506,12 @@ func TestExtractStateFromPayloads(t *testing.T) { } } - numOfPayloadWritten, err := createPayloadFile(zerolog.Nop(), execdir, payloads, nil) + numOfPayloadWritten, err := util.CreatePayloadFile( + zerolog.Nop(), + inputPayloadFileName, + payloads, + nil, + ) require.NoError(t, err) require.Equal(t, len(payloads), numOfPayloadWritten) @@ -493,15 +521,17 @@ func TestExtractStateFromPayloads(t *testing.T) { "--output-dir", outdir, "--no-migration", "--no-report", - "--use-payload-as-input", - "--extract-payloads-by-address", "all", + "--state-commitment", "", + "--input-payload-filename", inputPayloadFileName, + "--output-payload-filename", outputPayloadFileName, + "--extract-payloads-by-address", "", "--chain", flow.Emulator.Chain().String()}) err = Cmd.Execute() require.NoError(t, err) // Verify exported payloads. - payloadsFromFile, err := readPayloadFile(zerolog.Nop(), outdir) + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputPayloadFileName) require.NoError(t, err) require.Equal(t, len(keysValues), len(payloadsFromFile)) diff --git a/cmd/util/cmd/execution-state-extract/export_payloads.go b/cmd/util/ledger/util/payload_file.go similarity index 93% rename from cmd/util/cmd/execution-state-extract/export_payloads.go rename to cmd/util/ledger/util/payload_file.go index 68325dac3de..ef226d89f86 100644 --- a/cmd/util/cmd/execution-state-extract/export_payloads.go +++ b/cmd/util/ledger/util/payload_file.go @@ -1,4 +1,4 @@ -package extract +package util import ( "bufio" @@ -6,7 +6,6 @@ import ( "fmt" "io" "os" - "path/filepath" "github.com/fxamacker/cbor/v2" "github.com/rs/zerolog" @@ -17,21 +16,18 @@ import ( ) const ( - FilenamePayloads = "root.payloads" - defaultBufioWriteSize = 1024 * 32 defaultBufioReadSize = 1024 * 32 payloadEncodingVersion = 1 ) -func createPayloadFile( +func CreatePayloadFile( logger zerolog.Logger, - outputDir string, + payloadFile string, payloads []*ledger.Payload, addresses []common.Address, ) (int, error) { - payloadFile := filepath.Join(outputDir, FilenamePayloads) f, err := os.Create(payloadFile) if err != nil { @@ -148,8 +144,7 @@ func includePayloadByAddresses(payload *ledger.Payload, addresses []common.Addre return false, nil } -func readPayloadFile(logger zerolog.Logger, inputDir string) ([]*ledger.Payload, error) { - payloadFile := filepath.Join(inputDir, FilenamePayloads) +func ReadPayloadFile(logger zerolog.Logger, payloadFile string) ([]*ledger.Payload, error) { if _, err := os.Stat(payloadFile); os.IsNotExist(err) { return nil, fmt.Errorf("%s doesn't exist", payloadFile) From 74776662f6cc3413c7dbb284847d7de32f7efb26 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 17:04:07 -0600 Subject: [PATCH 05/10] Optimize payload filtering in state extraction --- cmd/util/ledger/util/payload_file.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cmd/util/ledger/util/payload_file.go b/cmd/util/ledger/util/payload_file.go index ef226d89f86..6524cce8261 100644 --- a/cmd/util/ledger/util/payload_file.go +++ b/cmd/util/ledger/util/payload_file.go @@ -129,13 +129,14 @@ func includePayloadByAddresses(payload *ledger.Payload, addresses []common.Addre return true, nil } - for _, address := range addresses { - k, err := payload.Key() - if err != nil { - return false, fmt.Errorf("failed to get key from payload: %w", err) - } + k, err := payload.Key() + if err != nil { + return false, fmt.Errorf("failed to get key from payload: %w", err) + } + + owner := k.KeyParts[0].Value - owner := k.KeyParts[0].Value + for _, address := range addresses { if bytes.Equal(owner, address[:]) { return true, nil } From 67ec2e2d24900af8fa74d02255f6086081970e85 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 17:29:01 -0600 Subject: [PATCH 06/10] Add tests for payload file related functionality --- cmd/util/ledger/util/payload_file_test.go | 272 ++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 cmd/util/ledger/util/payload_file_test.go diff --git a/cmd/util/ledger/util/payload_file_test.go b/cmd/util/ledger/util/payload_file_test.go new file mode 100644 index 00000000000..d37da30444f --- /dev/null +++ b/cmd/util/ledger/util/payload_file_test.go @@ -0,0 +1,272 @@ +package util_test + +import ( + "bytes" + "crypto/rand" + "path/filepath" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/utils/unittest" +) + +type keyPair struct { + key ledger.Key + value ledger.Value +} + +func TestPayloadFile(t *testing.T) { + + const fileName = "root.payload" + + t.Run("without filter", func(t *testing.T) { + unittest.RunWithTempDir(t, func(datadir string) { + size := 10 + + payloadFileName := filepath.Join(datadir, fileName) + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile( + zerolog.Nop(), + payloadFileName, + payloads, + nil, + ) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), payloadFileName) + require.NoError(t, err) + require.Equal(t, len(payloads), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := keysValues[k.String()] + require.True(t, exist) + + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) + + t.Run("with filter", func(t *testing.T) { + unittest.RunWithTempDir(t, func(datadir string) { + size := 10 + + payloadFileName := filepath.Join(datadir, fileName) + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + const selectedAddressCount = 10 + selectedAddresses := make(map[common.Address]struct{}) + selectedKeysValues := make(map[string]keyPair) + for k, kv := range keysValues { + owner := kv.key.KeyParts[0].Value + if len(owner) != common.AddressLength { + continue + } + + address, err := common.BytesToAddress(owner) + require.NoError(t, err) + + if len(selectedAddresses) < selectedAddressCount { + selectedAddresses[address] = struct{}{} + } + + if _, exist := selectedAddresses[address]; exist { + selectedKeysValues[k] = kv + } + } + + addresses := make([]common.Address, 0, len(selectedAddresses)) + for address := range selectedAddresses { + addresses = append(addresses, address) + } + + numOfPayloadWritten, err := util.CreatePayloadFile( + zerolog.Nop(), + payloadFileName, + payloads, + addresses, + ) + require.NoError(t, err) + require.Equal(t, len(selectedKeysValues), numOfPayloadWritten) + + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), payloadFileName) + require.NoError(t, err) + require.Equal(t, len(selectedKeysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := selectedKeysValues[k.String()] + require.True(t, exist) + + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) + + t.Run("no payloads found with filter", func(t *testing.T) { + emptyAddress := common.Address{} + + unittest.RunWithTempDir(t, func(datadir string) { + size := 10 + + payloadFileName := filepath.Join(datadir, fileName) + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + if bytes.Equal(key.KeyParts[0].Value, emptyAddress[:]) { + continue + } + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile( + zerolog.Nop(), + payloadFileName, + payloads, + []common.Address{emptyAddress}, + ) + require.NoError(t, err) + require.Equal(t, 0, numOfPayloadWritten) + + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), payloadFileName) + require.NoError(t, err) + require.Equal(t, 0, len(payloadsFromFile)) + }) + }) +} + +func getSampleKeyValues(i int) ([]ledger.Key, []ledger.Value) { + switch i { + case 0: + return []ledger.Key{getKey("", "uuid"), getKey("", "account_address_state")}, + []ledger.Value{[]byte{'1'}, []byte{'A'}} + case 1: + return []ledger.Key{getKey("ADDRESS", "public_key_count"), + getKey("ADDRESS", "public_key_0"), + getKey("ADDRESS", "exists"), + getKey("ADDRESS", "storage_used")}, + []ledger.Value{[]byte{1}, []byte("PUBLICKEYXYZ"), []byte{1}, []byte{100}} + case 2: + // TODO change the contract_names to CBOR encoding + return []ledger.Key{getKey("ADDRESS", "contract_names"), getKey("ADDRESS", "code.mycontract")}, + []ledger.Value{[]byte("mycontract"), []byte("CONTRACT Content")} + default: + keys := make([]ledger.Key, 0) + values := make([]ledger.Value, 0) + for j := 0; j < 10; j++ { + // address := make([]byte, 32) + address := make([]byte, 8) + _, err := rand.Read(address) + if err != nil { + panic(err) + } + keys = append(keys, getKey(string(address), "test")) + values = append(values, getRandomCadenceValue()) + } + return keys, values + } +} + +func getKey(owner, key string) ledger.Key { + return ledger.Key{KeyParts: []ledger.KeyPart{ + {Type: uint16(0), Value: []byte(owner)}, + {Type: uint16(2), Value: []byte(key)}, + }, + } +} + +func getRandomCadenceValue() ledger.Value { + + randomPart := make([]byte, 10) + _, err := rand.Read(randomPart) + if err != nil { + panic(err) + } + valueBytes := []byte{ + // magic prefix + 0x0, 0xca, 0xde, 0x0, 0x4, + // tag + 0xd8, 132, + // array, 5 items follow + 0x85, + + // tag + 0xd8, 193, + // UTF-8 string, length 4 + 0x64, + // t, e, s, t + 0x74, 0x65, 0x73, 0x74, + + // nil + 0xf6, + + // positive integer 1 + 0x1, + + // array, 0 items follow + 0x80, + + // UTF-8 string, length 10 + 0x6a, + 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, + } + + valueBytes = append(valueBytes, randomPart...) + return ledger.Value(valueBytes) +} From 48edc602cff221067aad53e0bb7e1f0622c2f9a2 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 17:55:52 -0600 Subject: [PATCH 07/10] Add utility to extract payloads by addresses This utility can be used to create a subset of execution state which can save time during development, testing, and support/troubleshooting. --- .../cmd/extract-payloads-by-address/cmd.go | 262 ++++++++++++++++++ .../extract_payloads_test.go | 241 ++++++++++++++++ 2 files changed, 503 insertions(+) create mode 100644 cmd/util/cmd/extract-payloads-by-address/cmd.go create mode 100644 cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go diff --git a/cmd/util/cmd/extract-payloads-by-address/cmd.go b/cmd/util/cmd/extract-payloads-by-address/cmd.go new file mode 100644 index 00000000000..acf54c07b49 --- /dev/null +++ b/cmd/util/cmd/extract-payloads-by-address/cmd.go @@ -0,0 +1,262 @@ +package extractpayloads + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "os" + "strings" + + "github.com/fxamacker/cbor/v2" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/ledger" +) + +const ( + defaultBufioWriteSize = 1024 * 32 + defaultBufioReadSize = 1024 * 32 + + payloadEncodingVersion = 1 +) + +var ( + flagInputPayloadFileName string + flagOutputPayloadFileName string + flagAddresses string +) + +var Cmd = &cobra.Command{ + Use: "extract-payload-by-address", + Short: "Read payload file and generate payload file containing payloads with specified addresses", + Run: run, +} + +func init() { + Cmd.Flags().StringVar( + &flagInputPayloadFileName, + "input-filename", + "", + "Input payload file name") + _ = Cmd.MarkFlagRequired("input-filename") + + Cmd.Flags().StringVar( + &flagOutputPayloadFileName, + "output-filename", + "", + "Output payload file name") + _ = Cmd.MarkFlagRequired("output-filename") + + Cmd.Flags().StringVar( + &flagAddresses, + "addresses", + "", + "extract payloads of addresses (comma separated hex-encoded addresses) to file specified by output-payload-filename", + ) + _ = Cmd.MarkFlagRequired("addresses") +} + +func run(*cobra.Command, []string) { + + if _, err := os.Stat(flagInputPayloadFileName); os.IsNotExist(err) { + log.Fatal().Msgf("Input file %s doesn't exist", flagInputPayloadFileName) + } + + if _, err := os.Stat(flagOutputPayloadFileName); os.IsExist(err) { + log.Fatal().Msgf("Output file %s exists", flagOutputPayloadFileName) + } + + addresses, err := parseAddresses(strings.Split(flagAddresses, ",")) + if err != nil { + log.Fatal().Err(err) + } + + log.Info().Msgf( + "extracting payloads with address %v from %s to %s", + addresses, + flagInputPayloadFileName, + flagOutputPayloadFileName, + ) + + numOfPayloadWritten, err := extractPayloads(log.Logger, flagInputPayloadFileName, flagOutputPayloadFileName, addresses) + if err != nil { + log.Fatal().Err(err) + } + + err = overwritePayloadCountInFile(flagOutputPayloadFileName, numOfPayloadWritten) + if err != nil { + log.Fatal().Err(err) + } +} + +func overwritePayloadCountInFile(output string, numOfPayloadWritten int) error { + in, err := os.OpenFile(output, os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("failed to open %s to write payload count: %w", output, err) + } + defer in.Close() + + var data [9]byte + data[0] = 0x1b + binary.BigEndian.PutUint64(data[1:], uint64(numOfPayloadWritten)) + + n, err := in.WriteAt(data[:], 0) + if err != nil { + return fmt.Errorf("failed to overwrite number of payloads in %s: %w", output, err) + } + if n != len(data) { + return fmt.Errorf("failed to overwrite number of payloads in %s: wrote %d bytes, expect %d bytes", output, n, len(data)) + } + + return nil +} + +func extractPayloads(log zerolog.Logger, input, output string, addresses []common.Address) (int, error) { + in, err := os.Open(input) + if err != nil { + return 0, fmt.Errorf("failed to open %s: %w", input, err) + } + defer in.Close() + + reader := bufio.NewReaderSize(in, defaultBufioReadSize) + if err != nil { + return 0, fmt.Errorf("failed to create bufio reader for %s: %w", input, err) + } + + out, err := os.Create(output) + if err != nil { + return 0, fmt.Errorf("failed to open %s: %w", output, err) + } + defer out.Close() + + writer := bufio.NewWriterSize(out, defaultBufioWriteSize) + if err != nil { + return 0, fmt.Errorf("failed to create bufio writer for %s: %w", output, err) + } + defer writer.Flush() + + // Preserve 9-bytes header for number of payloads. + var head [9]byte + _, err = writer.Write(head[:]) + if err != nil { + return 0, fmt.Errorf("failed to write header for %s: %w", output, err) + } + + // Need to flush buffer before encoding payloads. + writer.Flush() + + enc := cbor.NewEncoder(writer) + + const logIntervalForPayloads = 1_000_000 + count := 0 + err = readPayloadFile(log, reader, func(rawPayload []byte) error { + + payload, err := ledger.DecodePayloadWithoutPrefix(rawPayload, false, payloadEncodingVersion) + if err != nil { + return fmt.Errorf("failed to decode payload 0x%x: %w", rawPayload, err) + } + + k, err := payload.Key() + if err != nil { + return err + } + + owner := k.KeyParts[0].Value + + include := false + for _, address := range addresses { + if bytes.Equal(owner, address[:]) { + include = true + break + } + } + + if include { + err = enc.Encode(rawPayload) + if err != nil { + return fmt.Errorf("failed to encode payload: %w", err) + } + + count++ + if count%logIntervalForPayloads == 0 { + log.Info().Msgf("wrote %d payloads", count) + } + } + + return nil + }) + if err != nil { + return 0, err + } + + log.Info().Msgf("wrote %d payloads", count) + return count, nil +} + +func parseAddresses(hexAddresses []string) ([]common.Address, error) { + if len(hexAddresses) == 0 { + return nil, fmt.Errorf("at least one address must be provided") + } + + addresses := make([]common.Address, len(hexAddresses)) + for i, hexAddr := range hexAddresses { + b, err := hex.DecodeString(strings.TrimSpace(hexAddr)) + if err != nil { + return nil, fmt.Errorf("address is not hex encoded %s: %w", strings.TrimSpace(hexAddr), err) + } + + addr, err := common.BytesToAddress(b) + if err != nil { + return nil, fmt.Errorf("cannot decode address %x", b) + } + + addresses[i] = addr + } + + return addresses, nil +} + +func readPayloadFile(log zerolog.Logger, r io.Reader, processPayload func([]byte) error) error { + dec := cbor.NewDecoder(r) + + var payloadCount int + err := dec.Decode(&payloadCount) + if err != nil { + return err + } + + log.Info().Msgf("Processing input file with %d payloads", payloadCount) + + const logIntervalForPayloads = 1_000_000 + count := 0 + for { + var rawPayload []byte + err = dec.Decode(&rawPayload) + if err == io.EOF { + break + } + if err != nil { + return err + } + + err = processPayload(rawPayload) + if err != nil { + return err + } + + count++ + if count%logIntervalForPayloads == 0 { + log.Info().Msgf("processed %d payloads", count) + } + } + + log.Info().Msgf("processed %d payloads", count) + return nil +} diff --git a/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go b/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go new file mode 100644 index 00000000000..443fed54518 --- /dev/null +++ b/cmd/util/cmd/extract-payloads-by-address/extract_payloads_test.go @@ -0,0 +1,241 @@ +package extractpayloads + +import ( + "bytes" + "crypto/rand" + "encoding/hex" + "path/filepath" + "strings" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/utils/unittest" +) + +type keyPair struct { + key ledger.Key + value ledger.Value +} + +func TestExtractPayloads(t *testing.T) { + + t.Run("some payloads", func(t *testing.T) { + + unittest.RunWithTempDir(t, func(datadir string) { + + inputFile := filepath.Join(datadir, "input.payload") + outputFile := filepath.Join(datadir, "output.payload") + + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile(zerolog.Nop(), inputFile, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + const selectedAddressCount = 10 + selectedAddresses := make(map[string]struct{}) + selectedKeysValues := make(map[string]keyPair) + for k, kv := range keysValues { + owner := kv.key.KeyParts[0].Value + if len(owner) != common.AddressLength { + continue + } + + address, err := common.BytesToAddress(owner) + require.NoError(t, err) + + if len(selectedAddresses) < selectedAddressCount { + selectedAddresses[address.Hex()] = struct{}{} + } + + if _, exist := selectedAddresses[address.Hex()]; exist { + selectedKeysValues[k] = kv + } + } + + addresses := make([]string, 0, len(selectedAddresses)) + for address := range selectedAddresses { + addresses = append(addresses, address) + } + + // Export selected payloads + Cmd.SetArgs([]string{ + "--input-filename", inputFile, + "--output-filename", outputFile, + "--addresses", strings.Join(addresses, ","), + }) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputFile) + require.NoError(t, err) + require.Equal(t, len(selectedKeysValues), len(payloadsFromFile)) + + for _, payloadFromFile := range payloadsFromFile { + k, err := payloadFromFile.Key() + require.NoError(t, err) + + kv, exist := selectedKeysValues[k.String()] + require.True(t, exist) + require.Equal(t, kv.value, payloadFromFile.Value()) + } + }) + }) + + t.Run("no payloads", func(t *testing.T) { + + emptyAddress := common.Address{} + + unittest.RunWithTempDir(t, func(datadir string) { + + inputFile := filepath.Join(datadir, "input.payload") + outputFile := filepath.Join(datadir, "output.payload") + + size := 10 + + // Generate some data + keysValues := make(map[string]keyPair) + var payloads []*ledger.Payload + + for i := 0; i < size; i++ { + keys, values := getSampleKeyValues(i) + + for j, key := range keys { + if bytes.Equal(key.KeyParts[0].Value, emptyAddress[:]) { + continue + } + keysValues[key.String()] = keyPair{ + key: key, + value: values[j], + } + + payloads = append(payloads, ledger.NewPayload(key, values[j])) + } + } + + numOfPayloadWritten, err := util.CreatePayloadFile(zerolog.Nop(), inputFile, payloads, nil) + require.NoError(t, err) + require.Equal(t, len(payloads), numOfPayloadWritten) + + // Export selected payloads + Cmd.SetArgs([]string{ + "--input-filename", inputFile, + "--output-filename", outputFile, + "--addresses", hex.EncodeToString(emptyAddress[:]), + }) + + err = Cmd.Execute() + require.NoError(t, err) + + // Verify exported payloads. + payloadsFromFile, err := util.ReadPayloadFile(zerolog.Nop(), outputFile) + require.NoError(t, err) + require.Equal(t, 0, len(payloadsFromFile)) + }) + }) +} + +func getSampleKeyValues(i int) ([]ledger.Key, []ledger.Value) { + switch i { + case 0: + return []ledger.Key{getKey("", "uuid"), getKey("", "account_address_state")}, + []ledger.Value{[]byte{'1'}, []byte{'A'}} + case 1: + return []ledger.Key{getKey("ADDRESS", "public_key_count"), + getKey("ADDRESS", "public_key_0"), + getKey("ADDRESS", "exists"), + getKey("ADDRESS", "storage_used")}, + []ledger.Value{[]byte{1}, []byte("PUBLICKEYXYZ"), []byte{1}, []byte{100}} + case 2: + // TODO change the contract_names to CBOR encoding + return []ledger.Key{getKey("ADDRESS", "contract_names"), getKey("ADDRESS", "code.mycontract")}, + []ledger.Value{[]byte("mycontract"), []byte("CONTRACT Content")} + default: + keys := make([]ledger.Key, 0) + values := make([]ledger.Value, 0) + for j := 0; j < 10; j++ { + // address := make([]byte, 32) + address := make([]byte, 8) + _, err := rand.Read(address) + if err != nil { + panic(err) + } + keys = append(keys, getKey(string(address), "test")) + values = append(values, getRandomCadenceValue()) + } + return keys, values + } +} + +func getKey(owner, key string) ledger.Key { + return ledger.Key{KeyParts: []ledger.KeyPart{ + {Type: uint16(0), Value: []byte(owner)}, + {Type: uint16(2), Value: []byte(key)}, + }, + } +} + +func getRandomCadenceValue() ledger.Value { + + randomPart := make([]byte, 10) + _, err := rand.Read(randomPart) + if err != nil { + panic(err) + } + valueBytes := []byte{ + // magic prefix + 0x0, 0xca, 0xde, 0x0, 0x4, + // tag + 0xd8, 132, + // array, 5 items follow + 0x85, + + // tag + 0xd8, 193, + // UTF-8 string, length 4 + 0x64, + // t, e, s, t + 0x74, 0x65, 0x73, 0x74, + + // nil + 0xf6, + + // positive integer 1 + 0x1, + + // array, 0 items follow + 0x80, + + // UTF-8 string, length 10 + 0x6a, + 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, + } + + valueBytes = append(valueBytes, randomPart...) + return ledger.Value(valueBytes) +} From 75e10997e1b8b74d4b6a648412bbc59c6d8a1479 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 13 Feb 2024 18:28:43 -0600 Subject: [PATCH 08/10] Refactor to use named magic number for CBOR data --- cmd/util/cmd/extract-payloads-by-address/cmd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/util/cmd/extract-payloads-by-address/cmd.go b/cmd/util/cmd/extract-payloads-by-address/cmd.go index acf54c07b49..3e384bf5d05 100644 --- a/cmd/util/cmd/extract-payloads-by-address/cmd.go +++ b/cmd/util/cmd/extract-payloads-by-address/cmd.go @@ -103,8 +103,10 @@ func overwritePayloadCountInFile(output string, numOfPayloadWritten int) error { } defer in.Close() + const cbor8BytesPositiveIntegerIndicator = 0x1b + var data [9]byte - data[0] = 0x1b + data[0] = cbor8BytesPositiveIntegerIndicator binary.BigEndian.PutUint64(data[1:], uint64(numOfPayloadWritten)) n, err := in.WriteAt(data[:], 0) From ee2ca4c433719f66ac28abba05ac7facc0ff12ba Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Fri, 16 Feb 2024 08:54:28 -0600 Subject: [PATCH 09/10] Add suggested comments from feedback Co-authored-by: Leo Zhang --- cmd/util/cmd/execution-state-extract/cmd.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/util/cmd/execution-state-extract/cmd.go b/cmd/util/cmd/execution-state-extract/cmd.go index 760c7ff0975..5cfeea0312f 100644 --- a/cmd/util/cmd/execution-state-extract/cmd.go +++ b/cmd/util/cmd/execution-state-extract/cmd.go @@ -76,6 +76,12 @@ func init() { Cmd.Flags().BoolVar(&flagLogVerboseValidationError, "log-verbose-validation-error", false, "log entire Cadence values on validation error (atree migration)") + // If specified, the state will consist of payloads from the given input payload file. + // If not specified, then the state will be extracted from the latest checkpoint file. + // This flag can be used to reduce total duration of migrations when state extraction involves + // multiple migrations because it helps avoid repeatedly reading from checkpoint file to rebuild trie. + // The input payload file must be created by state extraction running with either + // flagOutputPayloadFileName or flagOutputPayloadByAddresses. Cmd.Flags().StringVar( &flagInputPayloadFileName, "input-payload-filename", @@ -117,6 +123,7 @@ func run(*cobra.Command, []string) { log.Fatal().Msg("--input-payload-filename cannot be used with --block-hash or --state-commitment") } + // When flagOutputPayloadByAddresses is specified, flagOutputPayloadFileName is required. if len(flagOutputPayloadFileName) == 0 && len(flagOutputPayloadByAddresses) > 0 { log.Fatal().Msg("--extract-payloads-by-address requires --output-payload-filename to be specified") } @@ -242,7 +249,7 @@ func run(*cobra.Command, []string) { ) } - log.Info().Msgf("%s, %s", inputMsg, outputMsg) + log.Info().Msgf("state extraction plan: %s, %s", inputMsg, outputMsg) var err error if len(flagInputPayloadFileName) > 0 { From 3c2d3134b70411a50b9feb705d105015e2e4d300 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Wed, 21 Feb 2024 17:45:44 -0600 Subject: [PATCH 10/10] Use latest NewCompactor API --- .../execution-state-extract/execution_state_extract_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go index bc52133f1b9..882c88df898 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract_test.go @@ -237,7 +237,7 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { require.NoError(t, err) f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion) require.NoError(t, err) - compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false)) + compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{}) require.NoError(t, err) <-compactor.Ready() @@ -316,7 +316,7 @@ func TestExtractPayloadsFromExecutionState(t *testing.T) { require.NoError(t, err) f, err := complete.NewLedger(diskWal, size*10, metr, zerolog.Nop(), complete.DefaultPathFinderVersion) require.NoError(t, err) - compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false)) + compactor, err := complete.NewCompactor(f, diskWal, zerolog.Nop(), uint(size), checkpointDistance, checkpointsToKeep, atomic.NewBool(false), &metrics.NoopCollector{}) require.NoError(t, err) <-compactor.Ready()