Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize migration by adding ability to read or extract payloads from state #5386

Merged
merged 14 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 97 additions & 19 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package extract
import (
"encoding/hex"
"path"
"strings"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

runtimeCommon "github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -26,6 +29,8 @@ var (
flagNoReport bool
flagValidateMigration bool
flagLogVerboseValidationError bool
flagInputPayload bool
flagOutputPayloadByAddresses string
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -68,6 +73,19 @@ func init() {
Cmd.Flags().BoolVar(&flagLogVerboseValidationError, "log-verbose-validation-error", false,
"log entire Cadence values on validation error (atree migration)")

Cmd.Flags().StringVar(
&flagOutputPayloadByAddresses,
"extract-payloads-by-address",
"",
"extract payloads of specified addresses (comma separated list of hex-encoded addresses or \"all\"", // empty string ignores this flag
)

Cmd.Flags().BoolVar(
&flagInputPayload,
"use-payload-as-input",
false,
"use payload file instead of checkpoint file as input",
)
}

func run(*cobra.Command, []string) {
Expand Down Expand Up @@ -112,20 +130,65 @@ func run(*cobra.Command, []string) {
log.Info().Msgf("extracting state by state commitment: %x", stateCommitment)
}

if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 {
log.Fatal().Msg("no --block-hash or --state-commitment was specified")
if len(flagBlockHash) == 0 && len(flagStateCommitment) == 0 && !flagInputPayload {
log.Fatal().Msg("no --block-hash or --state-commitment or --use-payload-as-input was specified")
}

log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
exportPayloads := len(flagOutputPayloadByAddresses) > 0

var exportedAddresses []runtimeCommon.Address

if exportPayloads {

addresses := strings.Split(flagOutputPayloadByAddresses, ",")

if len(addresses) == 1 && strings.TrimSpace(addresses[0]) == "all" {
// Extract payloads of the entire state.
log.Info().Msgf("Extracting state from %s, exporting all payloads to %s",
flagExecutionStateDir,
path.Join(flagOutputDir, FilenamePayloads),
)
} else {
// Extract payloads of specified accounts
for _, hexAddr := range addresses {
b, err := hex.DecodeString(strings.TrimSpace(hexAddr))
if err != nil {
log.Fatal().Err(err).Msgf("cannot hex decode address %s for payload export", strings.TrimSpace(hexAddr))
}

addr, err := runtimeCommon.BytesToAddress(b)
if err != nil {
log.Fatal().Err(err).Msgf("cannot decode address %x for payload export", b)
}

exportedAddresses = append(exportedAddresses, addr)
}

log.Info().Msgf("Extracting state from %s, exporting payloads by addresses %v to %s",
flagExecutionStateDir,
flagOutputPayloadByAddresses,
path.Join(flagOutputDir, FilenamePayloads),
)
}

log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)
} else {
log.Info().Msgf("Extracting state from %s, exporting root checkpoint to %s, version: %v",
flagExecutionStateDir,
path.Join(flagOutputDir, bootstrap.FilenameWALRootCheckpoint),
6,
)
}

if flagInputPayload {
log.Info().Msgf("Payload input from %v, output dir: %s",
flagExecutionStateDir,
flagOutputDir)
} else {
log.Info().Msgf("Block state commitment: %s from %v, output dir: %s",
hex.EncodeToString(stateCommitment[:]),
flagExecutionStateDir,
flagOutputDir)
}

// err := ensureCheckpointFileExist(flagExecutionStateDir)
// if err != nil {
Expand All @@ -148,14 +211,29 @@ func run(*cobra.Command, []string) {
log.Warn().Msgf("atree migration has verbose validation error logging enabled which may increase size of log")
}

err := extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
)
var err error
if flagInputPayload {
err = extractExecutionStateFromPayloads(
log.Logger,
flagExecutionStateDir,
flagOutputDir,
flagNWorker,
!flagNoMigration,
exportPayloads,
exportedAddresses,
)
} else {
err = extractExecutionState(
log.Logger,
flagExecutionStateDir,
stateCommitment,
flagOutputDir,
flagNWorker,
!flagNoMigration,
exportPayloads,
exportedAddresses,
)
}

if err != nil {
log.Fatal().Err(err).Msgf("error extracting the execution state: %s", err.Error())
Expand Down
199 changes: 175 additions & 24 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"math"
"os"
"time"

"github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog"
"go.uber.org/atomic"

Expand Down Expand Up @@ -34,6 +36,8 @@ func extractExecutionState(
outputDir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
exportPayloads bool,
exportPayloadsByAddresses []common.Address,
) error {

log.Info().Msg("init WAL")
Expand Down Expand Up @@ -84,30 +88,7 @@ func extractExecutionState(
<-compactor.Done()
}()

var migrations []ledger.Migration

if runMigrations {
rwf := reporters.NewReportFileWriterFactory(dir, log)

migrations = []ledger.Migration{
migrators.CreateAccountBasedMigration(
log,
nWorker,
[]migrators.AccountBasedMigration{
migrators.NewAtreeRegisterMigrator(
rwf,
flagValidateMigration,
flagLogVerboseValidationError,
),

&migrators.DeduplicateContractNamesMigration{},

// This will fix storage used discrepancies caused by the
// DeduplicateContractNamesMigration.
&migrators.AccountUsageMigrator{},
}),
}
}
migrations := newMigrations(log, dir, nWorker, runMigrations)

newState := ledger.State(targetHash)

Expand All @@ -134,6 +115,19 @@ func extractExecutionState(
log.Error().Err(err).Msgf("can not generate report for migrated state: %v", newMigratedState)
}

if exportPayloads {
payloads := newTrie.AllPayloads()

exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses)
if err != nil {
return fmt.Errorf("cannot generate payloads file: %w", err)
}

log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads))

return nil
}

migratedState, err := createCheckpoint(
newTrie,
log,
Expand Down Expand Up @@ -191,3 +185,160 @@ func writeStatusFile(fileName string, e error) error {
err := os.WriteFile(fileName, checkpointStatusJson, 0644)
return err
}

func extractExecutionStateFromPayloads(
log zerolog.Logger,
dir string,
outputDir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
exportPayloads bool,
exportPayloadsByAddresses []common.Address,
) error {

payloads, err := readPayloadFile(log, dir)
if err != nil {
return err
}

log.Info().Msgf("read %d payloads\n", len(payloads))

migrations := newMigrations(log, dir, nWorker, runMigrations)

payloads, err = migratePayloads(log, payloads, migrations)
if err != nil {
return err
}

if exportPayloads {
exportedPayloadCount, err := createPayloadFile(log, outputDir, payloads, exportPayloadsByAddresses)
if err != nil {
return fmt.Errorf("cannot generate payloads file: %w", err)
}

log.Info().Msgf("Exported %d payloads out of %d payloads", exportedPayloadCount, len(payloads))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We break the state extraction into 2 steps, the first step to output all payloads into a file after the first migration, and the second step to read all the payloads from the output file and continue running the remaining migrations and extract state.

I wonder how can we make the process verifiable in case someone is trying to reproduce the same process, and try to validate each intermediate steps? Basically we need a way to guarantee and prove that the payloads did not change when reading it back and continue with the remaining migrations and state extraction.

One idea I had is to compute some hash (or checksum) of all the payloads before writing them to disk, and validate it after reading and decode the payloads.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how can we make the process verifiable in case someone is trying to reproduce the same process, and try to validate each intermediate steps? Basically we need a way to guarantee and prove that the payloads did not change when reading it back and continue with the remaining migrations and state extraction.

Making this verifiable "in case someone is trying to reproduce the same process" is not the primary use case now (this PR is to help speed up development/testing) but it makes sense to add a hash for other use cases. 👍

One idea I had is to compute some hash (or checksum) of all the payloads before writing them to disk, and validate it after reading and decode the payloads.

Thoughts?

Due to parallelism, the payloads within the output file may not be in the same sequence each time this program is used. In practice payload file would still produce the same checkpoint when used as input (as long as sequence of payloads inside is the only thing that changed and the contents of all payloads are unchanged).

I'll merge this PR and open a separate PR to address the "reproduce the same process" use case by:

  • making the payloads have deterministic sequence in the output payload file
  • computing and adding a hash to the payload file representing all the payloads


return nil
}

newTrie, err := createTrieFromPayloads(log, payloads)
if err != nil {
return err
}

migratedState, err := createCheckpoint(
newTrie,
log,
outputDir,
bootstrap.FilenameWALRootCheckpoint,
)
if err != nil {
return fmt.Errorf("cannot generate the output checkpoint: %w", err)
}

log.Info().Msgf(
"New state commitment for the exported state is: %s (base64: %s)",
migratedState.String(),
migratedState.Base64(),
)

return nil
}

func migratePayloads(logger zerolog.Logger, payloads []*ledger.Payload, migrations []ledger.Migration) ([]*ledger.Payload, error) {

if len(migrations) == 0 {
return payloads, nil
}

var err error
payloadCount := len(payloads)

// migrate payloads
for i, migrate := range migrations {
logger.Info().Msgf("migration %d/%d is underway", i, len(migrations))

start := time.Now()
payloads, err = migrate(payloads)
elapsed := time.Since(start)

if err != nil {
return nil, fmt.Errorf("error applying migration (%d): %w", i, err)
}

newPayloadCount := len(payloads)

if payloadCount != newPayloadCount {
logger.Warn().
Int("migration_step", i).
Int("expected_size", payloadCount).
Int("outcome_size", newPayloadCount).
Msg("payload counts has changed during migration, make sure this is expected.")
}
logger.Info().Str("timeTaken", elapsed.String()).Msgf("migration %d is done", i)

payloadCount = newPayloadCount
}

return payloads, nil
}

func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (*trie.MTrie, error) {
// get paths
paths, err := pathfinder.PathsFromPayloads(payloads, complete.DefaultPathFinderVersion)
if err != nil {
return nil, fmt.Errorf("cannot export checkpoint, can't construct paths: %w", err)
}

logger.Info().Msgf("constructing a new trie with migrated payloads (count: %d)...", len(payloads))

emptyTrie := trie.NewEmptyMTrie()

derefPayloads := make([]ledger.Payload, len(payloads))
for i, p := range payloads {
derefPayloads[i] = *p
}

// no need to prune the data since it has already been prunned through migrations
applyPruning := false
newTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, derefPayloads, applyPruning)
if err != nil {
return nil, fmt.Errorf("constructing updated trie failed: %w", err)
}

return newTrie, nil
}

func newMigrations(
log zerolog.Logger,
dir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
) []ledger.Migration {
if runMigrations {
rwf := reporters.NewReportFileWriterFactory(dir, log)

migrations := []ledger.Migration{
migrators.CreateAccountBasedMigration(
log,
nWorker,
[]migrators.AccountBasedMigration{
migrators.NewAtreeRegisterMigrator(
rwf,
flagValidateMigration,
flagLogVerboseValidationError,
),

&migrators.DeduplicateContractNamesMigration{},

// This will fix storage used discrepancies caused by the
// DeduplicateContractNamesMigration.
&migrators.AccountUsageMigrator{},
}),
}

return migrations
}

return nil
}
Loading
Loading