diff --git a/cmd/util/cmd/checkpoint-list-tries/cmd.go b/cmd/util/cmd/checkpoint-list-tries/cmd.go index 26e5ca01c8b..830075bc5c8 100644 --- a/cmd/util/cmd/checkpoint-list-tries/cmd.go +++ b/cmd/util/cmd/checkpoint-list-tries/cmd.go @@ -29,7 +29,7 @@ func init() { func run(*cobra.Command, []string) { log.Info().Msgf("loading checkpoint %v", flagCheckpoint) - tries, err := wal.LoadCheckpoint(flagCheckpoint, &log.Logger) + tries, err := wal.LoadCheckpoint(flagCheckpoint, log.Logger) if err != nil { log.Fatal().Err(err).Msg("error while loading checkpoint") } diff --git a/cmd/util/cmd/execution-state-extract/execution_state_extract.go b/cmd/util/cmd/execution-state-extract/execution_state_extract.go index dbcb82d53e5..37f0fee3073 100644 --- a/cmd/util/cmd/execution-state-extract/execution_state_extract.go +++ b/cmd/util/cmd/execution-state-extract/execution_state_extract.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "go.uber.org/atomic" + migrators "github.com/onflow/flow-go/cmd/util/ledger/migrations" "github.com/onflow/flow-go/cmd/util/ledger/reporters" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/hash" @@ -82,10 +83,15 @@ func extractExecutionState( <-compactor.Done() }() - var migrations []ledger.Migration + var migrations = []ledger.Migration{ + migrators.MigrateAtreeRegisters( + log, + reporters.NewReportFileWriterFactory(dir, log), + nWorker), + } newState := ledger.State(targetHash) - // migrate the trie if there migrations + // migrate the trie if there are migrations newTrie, err := led.MigrateAt( newState, migrations, @@ -97,7 +103,8 @@ func extractExecutionState( } // create reporter - reporter := reporters.NewExportReporter(log, + reporter := reporters.NewExportReporter( + log, func() flow.StateCommitment { return targetHash }, ) diff --git a/cmd/util/ledger/migrations/account_based_migration.go b/cmd/util/ledger/migrations/account_based_migration.go index 0172e04737f..cb71887e934 100644 --- a/cmd/util/ledger/migrations/account_based_migration.go +++ b/cmd/util/ledger/migrations/account_based_migration.go @@ -1,189 +1,316 @@ package migrations import ( + "container/heap" + "context" "fmt" + "io" + "sync" + "time" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/util" + moduleUtil "github.com/onflow/flow-go/module/util" ) -// PayloadToAccount takes a payload and return: -// - (address, true, nil) if the payload is for an account, the account address is returned -// - ("", false, nil) if the payload is not for an account -// - ("", false, err) if running into any exception -func PayloadToAccount(p ledger.Payload) (string, bool, error) { - k, err := p.Key() - if err != nil { - return "", false, fmt.Errorf("could not find key for payload: %w", err) - } - id, err := KeyToRegisterID(k) - if err != nil { - return "", false, fmt.Errorf("error converting key to register ID") - } - if len([]byte(id.Owner)) != flow.AddressLength { - return "", false, nil - } - return id.Owner, true, nil +// AccountMigrator takes all the Payloads that belong to the given account +// and return the migrated Payloads +type AccountMigrator interface { + MigratePayloads(ctx context.Context, address common.Address, payloads []ledger.Payload) ([]ledger.Payload, error) } -// PayloadGroup groups payloads by account. -// For global payloads, it's stored under NonAccountPayloads field -type PayloadGroup struct { - NonAccountPayloads []ledger.Payload - Accounts map[string][]ledger.Payload -} +// AccountMigratorFactory creates an AccountMigrator +type AccountMigratorFactory func(allPayloads []ledger.Payload, nWorker int) (AccountMigrator, error) -// PayloadGrouping is a reducer function that adds the given payload to the corresponding -// group under its account -func PayloadGrouping(groups *PayloadGroup, payload ledger.Payload) (*PayloadGroup, error) { - address, isAccount, err := PayloadToAccount(payload) - if err != nil { - return nil, err - } - - if isAccount { - groups.Accounts[address] = append(groups.Accounts[address], payload) - } else { - groups.NonAccountPayloads = append(groups.NonAccountPayloads, payload) +func CreateAccountBasedMigration( + log zerolog.Logger, + migratorFactory AccountMigratorFactory, + nWorker int, +) func(payloads []ledger.Payload) ([]ledger.Payload, error) { + return func(payloads []ledger.Payload) ([]ledger.Payload, error) { + return MigrateByAccount( + log, + migratorFactory, + payloads, + nWorker, + ) } - - return groups, nil } -// AccountMigrator takes all the payloads that belong to the given account -// and return the migrated payloads -type AccountMigrator interface { - MigratePayloads(account string, payloads []ledger.Payload) ([]ledger.Payload, error) -} - -// MigrateByAccount teaks a migrator function and all the payloads, and return the migrated payloads -func MigrateByAccount(migrator AccountMigrator, allPayloads []ledger.Payload, nWorker int) ( - []ledger.Payload, error) { - groups := &PayloadGroup{ +// MigrateByAccount teaks a migrator function and all the Payloads, and return the migrated Payloads +func MigrateByAccount( + log zerolog.Logger, + migratorFactory AccountMigratorFactory, + allPayloads []ledger.Payload, + nWorker int, +) ( + []ledger.Payload, + error, +) { + groups := &payloadGroup{ NonAccountPayloads: make([]ledger.Payload, 0), - Accounts: make(map[string][]ledger.Payload), + Accounts: make(map[common.Address][]ledger.Payload), } - log.Info().Msgf("start grouping for a total of %v payloads", len(allPayloads)) + log.Info().Msgf("start grouping for a total of %v Payloads", len(allPayloads)) var err error - logGrouping := util.LogProgress("grouping payload", len(allPayloads), &log.Logger) + logGrouping := moduleUtil.LogProgress("grouping payload", len(allPayloads), log) for i, payload := range allPayloads { - groups, err = PayloadGrouping(groups, payload) + groups, err = payloadGrouping(groups, payload) if err != nil { return nil, err } logGrouping(i) } - log.Info().Msgf("finish grouping for payloads by account: %v groups in total, %v NonAccountPayloads", + log.Info().Msgf("finish grouping for Payloads by account: %v groups in total, %v NonAccountPayloads", len(groups.Accounts), len(groups.NonAccountPayloads)) - // migrate the payloads under accounts - migrated, err := MigrateGroupConcurrently(migrator, groups.Accounts, nWorker) + migrator, err := migratorFactory(allPayloads, nWorker) + if err != nil { + return nil, fmt.Errorf("could not create account migrator: %w", err) + } + + log.Info(). + Str("migrator", fmt.Sprintf("%T", migrator)). + Int("nWorker", nWorker). + Msgf("created migrator") + + defer func() { + // close the migrator if it's a Closer + if migrator, ok := migrator.(io.Closer); ok { + if err := migrator.Close(); err != nil { + log.Error().Err(err).Msg("error closing migrator") + } + } + }() + + // migrate the Payloads under accounts + migrated, err := MigrateGroupConcurrently(log, migrator, groups.Accounts, nWorker) if err != nil { return nil, fmt.Errorf("could not migrate group: %w", err) } - log.Info().Msgf("finished migrating payloads for %v account", len(groups.Accounts)) + log.Info().Msgf("finished migrating Payloads for %v account", len(groups.Accounts)) // add the non accounts which don't need to be migrated migrated = append(migrated, groups.NonAccountPayloads...) - log.Info().Msgf("finished migrating all account based payloads, total migrated payloads: %v", len(migrated)) + log.Info().Msgf("finished migrating all account based Payloads, total migrated Payloads: %v", len(migrated)) return migrated, nil } -// MigrateGroupSequentially migrate the payloads in the given payloadsByAccount map which +// MigrateGroupConcurrently migrate the Payloads in the given payloadsByAccount map which // using the migrator -func MigrateGroupSequentially( +// It's similar to MigrateGroupSequentially, except it will migrate different groups concurrently +func MigrateGroupConcurrently( + log zerolog.Logger, migrator AccountMigrator, - payloadsByAccount map[string][]ledger.Payload, -) ( - []ledger.Payload, error) { + payloadsByAccount map[common.Address][]ledger.Payload, + nWorker int, +) ([]ledger.Payload, error) { + + const logTopNDurations = 20 + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + jobs := make(chan jobMigrateAccountGroup, len(payloadsByAccount)) + + wg := sync.WaitGroup{} + wg.Add(nWorker) + resultCh := make(chan *migrationResult, len(payloadsByAccount)) + for i := 0; i < int(nWorker); i++ { + go func() { + + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case job, ok := <-jobs: + if !ok { + return + } + start := time.Now() + + accountMigrated, err := migrator.MigratePayloads(ctx, job.Address, job.Payloads) + + resultCh <- &migrationResult{ + migrationDuration: migrationDuration{ + Address: job.Address, + Duration: time.Since(start), + }, + Migrated: accountMigrated, + Err: err, + } + } + } + }() + } - logAccount := util.LogProgress("processing account group", len(payloadsByAccount), &log.Logger) + go func() { + for address, payloads := range payloadsByAccount { + select { + case <-ctx.Done(): + return + case jobs <- jobMigrateAccountGroup{ + Address: address, + Payloads: payloads, + }: + } + } + }() + + // read job results + logAccount := moduleUtil.LogProgress("processing account group", len(payloadsByAccount), log) - i := 0 migrated := make([]ledger.Payload, 0) - for address, payloads := range payloadsByAccount { - accountMigrated, err := migrator.MigratePayloads(address, payloads) + + durations := &migrationDurations{} + var err error + for i := 0; i < len(payloadsByAccount); i++ { + result := <-resultCh + err = result.Err if err != nil { - return nil, fmt.Errorf("could not migrate for account address %v: %w", address, err) + cancel() + log.Error(). + Err(err). + Msg("error migrating account") + break + } + + if durations.Len() < logTopNDurations || result.Duration > (*durations)[0].Duration { + if durations.Len() == logTopNDurations { + heap.Pop(durations) // remove the element with the smallest duration + } + heap.Push(durations, result.migrationDuration) } + accountMigrated := result.Migrated migrated = append(migrated, accountMigrated...) logAccount(i) - i++ + } + close(jobs) + + // make sure to exit all workers before returning from this function + // so that the migrator can be closed properly + wg.Wait() + + log.Info(). + Array("top_longest_migrations", durations.Array()). + Msgf("Top longest migrations") + + if err != nil { + return nil, fmt.Errorf("fail to migrate payload: %w", err) } return migrated, nil } type jobMigrateAccountGroup struct { - Account string + Address common.Address Payloads []ledger.Payload } type migrationResult struct { + migrationDuration + Migrated []ledger.Payload Err error } -// MigrateGroupConcurrently migrate the payloads in the given payloadsByAccount map which -// using the migrator -// It's similar to MigrateGroupSequentially, except it will migrate different groups concurrently -func MigrateGroupConcurrently( - migrator AccountMigrator, - payloadsByAccount map[string][]ledger.Payload, - nWorker int, -) ( - []ledger.Payload, error) { +// payloadToAccount takes a payload and return: +// - (address, true, nil) if the payload is for an account, the account address is returned +// - ("", false, nil) if the payload is not for an account +// - ("", false, err) if running into any exception +func payloadToAccount(p ledger.Payload) (common.Address, bool, error) { + k, err := p.Key() + if err != nil { + return common.Address{}, false, fmt.Errorf("could not find key for payload: %w", err) + } - jobs := make(chan jobMigrateAccountGroup, len(payloadsByAccount)) - go func() { - for account, payloads := range payloadsByAccount { - jobs <- jobMigrateAccountGroup{ - Account: account, - Payloads: payloads, - } - } - close(jobs) - }() + id, err := util.KeyToRegisterID(k) + if err != nil { + return common.Address{}, false, fmt.Errorf("error converting key to register ID") + } - resultCh := make(chan *migrationResult) - for i := 0; i < int(nWorker); i++ { - go func() { - for job := range jobs { - accountMigrated, err := migrator.MigratePayloads(job.Account, job.Payloads) - resultCh <- &migrationResult{ - Migrated: accountMigrated, - Err: err, - } - } - }() + if len([]byte(id.Owner)) != flow.AddressLength { + return common.Address{}, false, nil } - // read job results - logAccount := util.LogProgress("processing account group", len(payloadsByAccount), &log.Logger) + address, err := common.BytesToAddress([]byte(id.Owner)) + if err != nil { + return common.Address{}, false, fmt.Errorf("invalid account address: %w", err) + } - migrated := make([]ledger.Payload, 0) + return address, true, nil +} - for i := 0; i < len(payloadsByAccount); i++ { - result := <-resultCh - if result.Err != nil { - return nil, fmt.Errorf("fail to migrate payload: %w", result.Err) - } +// payloadGroup groups Payloads by account. +// For global Payloads, it's stored under NonAccountPayloads field +type payloadGroup struct { + NonAccountPayloads []ledger.Payload + Accounts map[common.Address][]ledger.Payload +} - accountMigrated := result.Migrated - migrated = append(migrated, accountMigrated...) - logAccount(i) +// payloadGrouping is a reducer function that adds the given payload to the corresponding +// group under its account +func payloadGrouping(groups *payloadGroup, payload ledger.Payload) (*payloadGroup, error) { + address, isAccount, err := payloadToAccount(payload) + if err != nil { + return nil, err } - return migrated, nil + if isAccount { + groups.Accounts[address] = append(groups.Accounts[address], payload) + } else { + groups.NonAccountPayloads = append(groups.NonAccountPayloads, payload) + } + + return groups, nil +} + +type migrationDuration struct { + Address common.Address + Duration time.Duration +} + +// implement heap methods for the timer results +type migrationDurations []migrationDuration + +func (h *migrationDurations) Len() int { return len(*h) } +func (h *migrationDurations) Less(i, j int) bool { + return (*h)[i].Duration < (*h)[j].Duration +} +func (h *migrationDurations) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} +func (h *migrationDurations) Push(x interface{}) { + *h = append(*h, x.(migrationDuration)) +} +func (h *migrationDurations) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (h *migrationDurations) Array() zerolog.LogArrayMarshaler { + array := zerolog.Arr() + for _, result := range *h { + array = array.Str(fmt.Sprintf("%s: %s", result.Address.Hex(), result.Duration.String())) + } + return array } diff --git a/cmd/util/ledger/migrations/account_migration.go b/cmd/util/ledger/migrations/account_migration.go index 51c123712f1..bba8dddc1dd 100644 --- a/cmd/util/ledger/migrations/account_migration.go +++ b/cmd/util/ledger/migrations/account_migration.go @@ -1,21 +1,32 @@ package migrations import ( + "context" "fmt" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" ) -func MigrateAccountUsage(payloads []ledger.Payload, nWorker int) ([]ledger.Payload, error) { - return MigrateByAccount(AccountUsageMigrator{}, payloads, nWorker) +func MigrateAccountUsage(log zerolog.Logger, nWorker int) func([]ledger.Payload) ([]ledger.Payload, error) { + return CreateAccountBasedMigration( + log, + func(allPayloads []ledger.Payload, nWorker int) (AccountMigrator, error) { + return AccountUsageMigrator{}, nil + }, + nWorker, + ) } func payloadSize(key ledger.Key, payload ledger.Payload) (uint64, error) { - id, err := KeyToRegisterID(key) + id, err := util.KeyToRegisterID(key) if err != nil { return 0, err } @@ -31,7 +42,7 @@ type AccountUsageMigrator struct{} // AccountUsageMigrator iterate through each payload, and calculate the storage usage // and update the accoutns status with the updated storage usage -func (m AccountUsageMigrator) MigratePayloads(account string, payloads []ledger.Payload) ([]ledger.Payload, error) { +func (m AccountUsageMigrator) MigratePayloads(ctx context.Context, address common.Address, payloads []ledger.Payload) ([]ledger.Payload, error) { var status *environment.AccountStatus var statusIndex int totalSize := uint64(0) @@ -62,7 +73,7 @@ func (m AccountUsageMigrator) MigratePayloads(account string, payloads []ledger. } if status == nil { - return nil, fmt.Errorf("could not find account status for account %v", account) + return nil, fmt.Errorf("could not find account status for account %v", address.Hex()) } // update storage used diff --git a/cmd/util/ledger/migrations/atree_register_migration.go b/cmd/util/ledger/migrations/atree_register_migration.go new file mode 100644 index 00000000000..206f09c7b0b --- /dev/null +++ b/cmd/util/ledger/migrations/atree_register_migration.go @@ -0,0 +1,455 @@ +package migrations + +import ( + "context" + "fmt" + "github.com/onflow/atree" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + "github.com/rs/zerolog" + "io" + + "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/storage/derived" + "github.com/onflow/flow-go/fvm/storage/logical" + "github.com/onflow/flow-go/fvm/storage/state" + "github.com/onflow/flow-go/fvm/tracing" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/model/flow" +) + +func MigrateAtreeRegisters( + log zerolog.Logger, + rwf reporters.ReportWriterFactory, + nWorker int, +) func([]ledger.Payload) ([]ledger.Payload, error) { + return CreateAccountBasedMigration( + log, + func(allRegisters []ledger.Payload, nWorker int) (AccountMigrator, error) { + return NewMigrator( + log.With().Str("component", "atree-register-migrator").Logger(), + rwf, + allRegisters, + ) + }, + nWorker, + ) +} + +// AtreeRegisterMigrator is a migrator that converts the storage of an account from the +// old atree format to the new atree format. +// Account "storage used" should be correctly updated after the migration. +type AtreeRegisterMigrator struct { + log zerolog.Logger + + snapshot *util.PayloadSnapshot + + rw reporters.ReportWriter + sampler zerolog.Logger +} + +var _ AccountMigrator = (*AtreeRegisterMigrator)(nil) +var _ io.Closer = (*AtreeRegisterMigrator)(nil) + +func NewMigrator( + log zerolog.Logger, + rwf reporters.ReportWriterFactory, + allRegisters []ledger.Payload, +) (*AtreeRegisterMigrator, error) { + // creating a snapshot of all the registers will take a while + snapshot, err := util.NewPayloadSnapshot(allRegisters) + if err != nil { + return nil, fmt.Errorf("failed to create payload snapshot: %w", err) + } + log.Info().Msgf("created snapshot") + + migrator := &AtreeRegisterMigrator{ + snapshot: snapshot, + + log: log, + + rw: rwf.ReportWriter("atree-register-migrator"), + } + + return migrator, nil +} + +func (m *AtreeRegisterMigrator) Close() error { + m.rw.Close() + return nil +} + +func (m *AtreeRegisterMigrator) MigratePayloads( + _ context.Context, + address common.Address, + payloads []ledger.Payload, +) ([]ledger.Payload, error) { + // don't migrate the zero address + // these are the non-account registers and are not atrees + if address == common.ZeroAddress { + return payloads, nil + } + + err := m.checkStorageHealth(address, payloads) + if err != nil { + return nil, fmt.Errorf("storage health issues for address %s: %w", address.Hex(), err) + } + + // Do the storage conversion + changes, err := m.migrateAccountStorage(address) + if err != nil { + return nil, fmt.Errorf("failed to convert storage for address %s: %w", address.Hex(), err) + } + + return m.validateChangesAndCreateNewRegisters(payloads, address, changes) +} + +func (m *AtreeRegisterMigrator) migrateAccountStorage( + address common.Address, +) (map[flow.RegisterID]flow.RegisterValue, error) { + + // create all the runtime components we need for the migration + r, err := m.newMigratorRuntime() + if err != nil { + return nil, fmt.Errorf("failed to create migrator runtime: %w", err) + } + + // iterate through all domains and migrate them + for _, domain := range domains { + err := m.convertStorageDomain(address, r.Storage, r.Interpreter, domain) + if err != nil { + return nil, fmt.Errorf("failed to convert storage domain %s : %w", domain, err) + } + } + + // commit the storage changes + err = r.Storage.Commit(r.Interpreter, true) + if err != nil { + return nil, fmt.Errorf("failed to commit storage: %w", err) + } + + // finalize the transaction + result, err := r.TransactionState.FinalizeMainTransaction() + if err != nil { + return nil, fmt.Errorf("failed to finalize main transaction: %w", err) + } + + return result.WriteSet, nil +} + +func (m *AtreeRegisterMigrator) convertStorageDomain( + address common.Address, + storage *runtime.Storage, + inter *interpreter.Interpreter, + domain string, +) error { + storageMap := storage.GetStorageMap(address, domain, false) + if storageMap == nil { + // no storage for this domain + return nil + } + + iterator := storageMap.Iterator(util.NopMemoryGauge{}) + keys := make([]interpreter.StringStorageMapKey, 0) + // to be safe avoid modifying the map while iterating + for { + key := iterator.NextKey() + if key == nil { + break + } + + stringKey, ok := key.(interpreter.StringAtreeValue) + if !ok { + return fmt.Errorf("invalid key type %T, expected interpreter.StringAtreeValue", key) + } + + keys = append(keys, interpreter.StringStorageMapKey(stringKey)) + } + + for _, key := range keys { + err := func() error { + var value interpreter.Value + + err := capturePanic(func() { + value = storageMap.ReadValue(util.NopMemoryGauge{}, key) + }) + if err != nil { + return fmt.Errorf("failed to read value for key %s: %w", key, err) + } + + err = capturePanic(func() { + // force the value to be read entirely + value = value.Clone(inter) + }) + if err != nil { + return fmt.Errorf("failed to clone value for key %s: %w", key, err) + } + + err = capturePanic(func() { + // set value will first purge the old value + storageMap.SetValue(inter, key, value) + }) + if err != nil { + return fmt.Errorf("failed to set value for key %s: %w", key, err) + } + + return nil + }() + if err != nil { + m.log.Debug().Err(err).Msgf("failed to migrate key %s", key) + + m.rw.Write(migrationError{ + Address: address.Hex(), + Key: string(key), + Kind: "migration_failure", + Msg: err.Error(), + }) + } + } + + return nil +} + +func (m *AtreeRegisterMigrator) newMigratorRuntime() ( + *migratorRuntime, + error, +) { + // the registers will be read concurrently by multiple workers, but won't be modified + transactionState := state.NewTransactionState(m.snapshot, state.DefaultParameters()) + accounts := environment.NewAccounts(transactionState) + + derivedBlockData := derived.NewEmptyDerivedBlockData(logical.EndOfBlockExecutionTime) + + programs := + environment.NewPrograms( + tracing.NewMockTracerSpan(), + util.NopMeter{}, + environment.NoopMetricsReporter{}, + struct { + state.NestedTransactionPreparer + derived.DerivedTransactionPreparer + }{ + NestedTransactionPreparer: transactionState, + // TODO: reuse this + DerivedTransactionPreparer: derivedBlockData.NewSnapshotReadDerivedTransactionData(), + }, + accounts, + ) + + accountsAtreeLedger := util.NewAccountsAtreeLedger(accounts) + storage := runtime.NewStorage(accountsAtreeLedger, util.NopMemoryGauge{}) + + env := runtime.NewBaseInterpreterEnvironment(runtime.Config{ + AccountLinkingEnabled: true, + // Attachments are enabled everywhere except for Mainnet + AttachmentsEnabled: true, + // Capability Controllers are enabled everywhere except for Mainnet + CapabilityControllersEnabled: true, + }) + + ri := util.MigrationRuntimeInterface{ + Accounts: accounts, + Programs: programs, + } + + env.Configure( + ri, + runtime.NewCodesAndPrograms(), + storage, + runtime.NewCoverageReport(), + ) + + inter, err := interpreter.NewInterpreter( + nil, + nil, + env.InterpreterConfig) + if err != nil { + return nil, err + } + + return &migratorRuntime{ + TransactionState: transactionState, + Interpreter: inter, + Storage: storage, + }, nil +} + +type migratorRuntime struct { + TransactionState state.NestedTransactionPreparer + Interpreter *interpreter.Interpreter + Storage *runtime.Storage +} + +func (m *AtreeRegisterMigrator) validateChangesAndCreateNewRegisters( + payloads []ledger.Payload, + address common.Address, + changes map[flow.RegisterID]flow.RegisterValue, +) ([]ledger.Payload, error) { + originalPayloadsSnapshot, err := util.NewPayloadSnapshot(payloads) + if err != nil { + return nil, fmt.Errorf("failed to create payload snapshot: %w", err) + } + originalPayloads := originalPayloadsSnapshot.Payloads + newPayloads := make([]ledger.Payload, 0, len(originalPayloads)) + + for id, value := range changes { + // delete all values that were changed from the original payloads + delete(originalPayloads, id) + + if len(value) == 0 { + // value was deleted + continue + } + ownerAddress, err := common.BytesToAddress([]byte(id.Owner)) + if err != nil { + return nil, fmt.Errorf("failed to convert owner address: %w", err) + } + + if ownerAddress.Hex() != address.Hex() { + // something was changed that does not belong to this account. Log it. + m.log.Error(). + Str("key", id.String()). + Str("owner_address", ownerAddress.Hex()). + Str("account", address.Hex()). + Msg("key is part of the change set, but is for a different account") + + return nil, fmt.Errorf("register for a different account was produced during migration") + } + + newPayloads = append(newPayloads, *ledger.NewPayload(util.RegisterIDToKey(id), value)) + } + + // add all values that were not changed + for id, value := range originalPayloads { + if len(value) == 0 { + // this is strange, but we don't want to add empty values. Log it. + m.log.Warn().Msgf("empty value for key %s", id) + continue + } + if id.IsInternalState() { + // this is expected. Move it to the new payload + newPayloads = append(newPayloads, *ledger.NewPayload(util.RegisterIDToKey(id), value)) + continue + } + + isADomainKey := false + for _, domain := range domains { + if id.Key == domain { + isADomainKey = true + break + } + } + if isADomainKey { + // TODO: check if this is really expected + // this is expected. Move it to the new payload + newPayloads = append(newPayloads, *ledger.NewPayload(util.RegisterIDToKey(id), value)) + continue + } + + // something was not moved. Log it. + m.log.Debug(). + Str("key", id.String()). + Str("account", address.Hex()). + Str("value", fmt.Sprintf("%x", value)). + Msg("Key was not migrated") + m.rw.Write(migrationError{ + Address: address.Hex(), + Key: id.String(), + Kind: "not_migrated", + Msg: fmt.Sprintf("%x", value), + }) + } + return newPayloads, nil +} + +func (m *AtreeRegisterMigrator) checkStorageHealth( + address common.Address, + payloads []ledger.Payload, +) error { + snapshot, err := util.NewPayloadSnapshot(payloads) + if err != nil { + return fmt.Errorf("failed to create payload snapshot: %w", err) + } + + transactionState := state.NewTransactionState(snapshot, state.DefaultParameters()) + accounts := environment.NewAccounts(transactionState) + + accountsAtreeLedger := util.NewAccountsAtreeLedger(accounts) + storage := runtime.NewStorage(accountsAtreeLedger, util.NopMemoryGauge{}) + + rootSlabs, err := atree.CheckStorageHealth(storage, -1) + if err != nil { + m.log.Info(). + Err(err). + Str("account", address.Hex()). + Msg("Account storage health issue") + m.rw.Write(migrationError{ + Address: address.Hex(), + Key: "", + Kind: "storage_health_problem_1", + Msg: err.Error(), + }) + } + + if len(rootSlabs) > 4 { + m.log.Info(). + Err(err). + Str("account", address.Hex()). + Msg("To many root slabs") + m.rw.Write(migrationError{ + Address: address.Hex(), + Key: "", + Kind: "storage_health_problem_2", + Msg: err.Error(), + }) + } + + err = storage.CheckHealth() + if err != nil { + m.log.Info(). + Err(err). + Str("account", address.Hex()). + Msg("Account storage health issue") + m.rw.Write(migrationError{ + Address: address.Hex(), + Key: "", + Kind: "storage_health_problem_3", + Msg: err.Error(), + }) + } + return nil + +} + +// capturePanic captures panics and converts them to errors +// this is needed for some cadence functions that panic on error +func capturePanic(f func()) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + + f() + + return +} + +// convert all domains +var domains = []string{ + common.PathDomainStorage.Identifier(), + common.PathDomainPrivate.Identifier(), + common.PathDomainPublic.Identifier(), + runtime.StorageDomainContract, +} + +// migrationError is a struct for reporting errors +type migrationError struct { + Address string + Key string + Kind string + Msg string +} diff --git a/cmd/util/ledger/migrations/atree_register_migration_test.go b/cmd/util/ledger/migrations/atree_register_migration_test.go new file mode 100644 index 00000000000..b5a020f12ca --- /dev/null +++ b/cmd/util/ledger/migrations/atree_register_migration_test.go @@ -0,0 +1,132 @@ +package migrations_test + +import ( + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/cmd/util/ledger/migrations" + "github.com/onflow/flow-go/cmd/util/ledger/reporters" + "github.com/onflow/flow-go/cmd/util/ledger/util" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/pathfinder" + "github.com/onflow/flow-go/ledger/complete" + "github.com/onflow/flow-go/ledger/complete/wal" + "github.com/onflow/flow-go/module/metrics" +) + +func TestAtreeRegisterMigration(t *testing.T) { + log := zerolog.New(zerolog.NewTestWriter(t)) + dir := t.TempDir() + // Localnet v0.31 was used to produce an execution state that can be used for the tests. + t.Run( + "test v0.31 state", + testWithExistingState( + + log, + "test-data/bootstrapped_v0.31", + migrations.MigrateAtreeRegisters(log, reporters.NewReportFileWriterFactory(dir, log), 2), + func(t *testing.T, oldPayloads []ledger.Payload, newPayloads []ledger.Payload) { + + newSnapshot, err := util.NewPayloadSnapshot(oldPayloads) + require.NoError(t, err) + + for _, payload := range oldPayloads { + key, err := payload.Key() + require.NoError(t, err) + regId, err := util.KeyToRegisterID(key) + require.NoError(t, err) + value, err := newSnapshot.Get(regId) + require.NoError(t, err) + + // TODO: currently the migration does not change the payload values because + // the atree version is not changed. This should be changed in the future. + require.Equal(t, []byte(payload.Value()), value) + } + + // commented out helper code to dump the payloads to csv files for manual inspection + //dumpPayloads := func(n string, payloads []ledger.Payload) { + // f, err := os.Create(n) + // require.NoError(t, err) + // + // defer f.Close() + // + // for _, payload := range payloads { + // key, err := payload.Key() + // require.NoError(t, err) + // _, err = f.WriteString(fmt.Sprintf("%x,%s\n", key.String(), payload.Value())) + // require.NoError(t, err) + // } + //} + //dumpPayloads("old.csv", oldPayloads) + //dumpPayloads("new.csv", newPayloads) + + }, + ), + ) + +} + +func testWithExistingState( + log zerolog.Logger, + inputDir string, + migration ledger.Migration, + f func( + t *testing.T, + oldPayloads []ledger.Payload, + newPayloads []ledger.Payload, + ), +) func(t *testing.T) { + return func(t *testing.T) { + diskWal, err := wal.NewDiskWAL( + log, + nil, + metrics.NewNoopCollector(), + inputDir, + complete.DefaultCacheSize, + pathfinder.PathByteSize, + wal.SegmentSize, + ) + require.NoError(t, err) + + led, err := complete.NewLedger( + diskWal, + complete.DefaultCacheSize, + &metrics.NoopCollector{}, + log, + complete.DefaultPathFinderVersion) + require.NoError(t, err) + + var oldPayloads []ledger.Payload + + // we sandwitch the migration between two identity migrations + // so we can capture the Payloads before and after the migration + var mig = []ledger.Migration{ + func(payloads []ledger.Payload) ([]ledger.Payload, error) { + oldPayloads = make([]ledger.Payload, len(payloads)) + copy(oldPayloads, payloads) + return payloads, nil + }, + + migration, + + func(newPayloads []ledger.Payload) ([]ledger.Payload, error) { + + f(t, oldPayloads, newPayloads) + + return newPayloads, nil + }, + } + + newState, err := led.MostRecentTouchedState() + require.NoError(t, err) + + _, err = led.MigrateAt( + newState, + mig, + complete.DefaultPathFinderVersion, + ) + require.NoError(t, err) + } +} diff --git a/cmd/util/ledger/migrations/storage_fees_migration.go b/cmd/util/ledger/migrations/storage_fees_migration.go index d55a725d90b..5e12ef95182 100644 --- a/cmd/util/ledger/migrations/storage_fees_migration.go +++ b/cmd/util/ledger/migrations/storage_fees_migration.go @@ -1,6 +1,7 @@ package migrations import ( + "github.com/onflow/flow-go/cmd/util/ledger/util" fvm "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/utils" @@ -30,7 +31,7 @@ func StorageFeesMigration(payload []ledger.Payload) ([]ledger.Payload, error) { u = u + uint64(storageUsedByStorageUsed) newPayload = append(newPayload, *ledger.NewPayload( - registerIDToKey(id), + util.RegisterIDToKey(id), utils.Uint64ToBinary(u), )) } @@ -42,7 +43,7 @@ func incrementStorageUsed(p ledger.Payload, used map[string]uint64) error { if err != nil { return err } - id, err := KeyToRegisterID(k) + id, err := util.KeyToRegisterID(k) if err != nil { return err } diff --git a/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/.gitignore b/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/.gitignore new file mode 100644 index 00000000000..0342671c3bc --- /dev/null +++ b/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/.gitignore @@ -0,0 +1,4 @@ +* + +!.gitignore +!00000000 diff --git a/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/00000000 b/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/00000000 new file mode 100644 index 00000000000..48a6440b89f Binary files /dev/null and b/cmd/util/ledger/migrations/test-data/bootstrapped_v0.31/00000000 differ diff --git a/cmd/util/ledger/reporters/fungible_token_tracker.go b/cmd/util/ledger/reporters/fungible_token_tracker.go index 0bb1db764bd..bb438921847 100644 --- a/cmd/util/ledger/reporters/fungible_token_tracker.go +++ b/cmd/util/ledger/reporters/fungible_token_tracker.go @@ -13,7 +13,7 @@ import ( "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" - "github.com/onflow/flow-go/cmd/util/ledger/migrations" + "github.com/onflow/flow-go/cmd/util/ledger/util" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/fvm/storage/state" @@ -146,7 +146,7 @@ func (r *FungibleTokenTracker) worker( state.DefaultParameters()) accounts := environment.NewAccounts(txnState) storage := cadenceRuntime.NewStorage( - &migrations.AccountsAtreeLedger{Accounts: accounts}, + &util.AccountsAtreeLedger{Accounts: accounts}, nil, ) diff --git a/cmd/util/ledger/util/migration_runtime_interface.go b/cmd/util/ledger/util/migration_runtime_interface.go new file mode 100644 index 00000000000..ed3561bcbcf --- /dev/null +++ b/cmd/util/ledger/util/migration_runtime_interface.go @@ -0,0 +1,307 @@ +package util + +import ( + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + + "github.com/onflow/atree" + "github.com/onflow/cadence" + "github.com/onflow/cadence/runtime" + "github.com/onflow/cadence/runtime/common" + "github.com/onflow/cadence/runtime/interpreter" + + "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/model/flow" +) + +type MigrationRuntimeInterface struct { + Accounts environment.Accounts + Programs *environment.Programs + + // GetOrLoadProgramFunc allows for injecting extra logic + GetOrLoadProgramFunc func(location runtime.Location, load func() (*interpreter.Program, error)) (*interpreter.Program, error) +} + +func newMigrationRuntimeInterface( + Accounts environment.Accounts, + Programs *environment.Programs, +) MigrationRuntimeInterface { + + mri := MigrationRuntimeInterface{ + Accounts: Accounts, + Programs: Programs, + } + + return mri +} + +func (m MigrationRuntimeInterface) ResolveLocation( + identifiers []runtime.Identifier, + location runtime.Location, +) ([]runtime.ResolvedLocation, error) { + + addressLocation, isAddress := location.(common.AddressLocation) + + // if the location is not an address location, e.g. an identifier location (`import Crypto`), + // then return a single resolved location which declares all identifiers. + if !isAddress { + return []runtime.ResolvedLocation{ + { + Location: location, + Identifiers: identifiers, + }, + }, nil + } + + // if the location is an address, + // and no specific identifiers where requested in the import statement, + // then fetch all identifiers at this address + if len(identifiers) == 0 { + address := flow.Address(addressLocation.Address) + + contractNames, err := m.Accounts.GetContractNames(address) + if err != nil { + return nil, fmt.Errorf("ResolveLocation failed: %w", err) + } + + // if there are no contractNames deployed, + // then return no resolved locations + if len(contractNames) == 0 { + return nil, nil + } + + identifiers = make([]runtime.Identifier, len(contractNames)) + + for i := range identifiers { + identifiers[i] = runtime.Identifier{ + Identifier: contractNames[i], + } + } + } + + // return one resolved location per identifier. + // each resolved location is an address contract location + resolvedLocations := make([]runtime.ResolvedLocation, len(identifiers)) + for i := range resolvedLocations { + identifier := identifiers[i] + resolvedLocations[i] = runtime.ResolvedLocation{ + Location: common.AddressLocation{ + Address: addressLocation.Address, + Name: identifier.Identifier, + }, + Identifiers: []runtime.Identifier{identifier}, + } + } + + return resolvedLocations, nil +} + +func (m MigrationRuntimeInterface) GetCode(location runtime.Location) ([]byte, error) { + contractLocation, ok := location.(common.AddressLocation) + if !ok { + return nil, fmt.Errorf("GetCode failed: expected AddressLocation") + } + + add, err := m.Accounts.GetContract(contractLocation.Name, flow.Address(contractLocation.Address)) + if err != nil { + return nil, fmt.Errorf("GetCode failed: %w", err) + } + + return add, nil +} + +func (m MigrationRuntimeInterface) GetAccountContractCode( + l common.AddressLocation, +) (code []byte, err error) { + return m.Accounts.GetContract(l.Name, flow.Address(l.Address)) +} + +func (m MigrationRuntimeInterface) GetOrLoadProgram(location runtime.Location, load func() (*interpreter.Program, error)) (*interpreter.Program, error) { + if m.GetOrLoadProgramFunc != nil { + return m.GetOrLoadProgramFunc(location, load) + } + + return m.Programs.GetOrLoadProgram(location, load) +} + +func (m MigrationRuntimeInterface) MeterMemory(_ common.MemoryUsage) error { + return nil +} + +func (m MigrationRuntimeInterface) MeterComputation(_ common.ComputationKind, _ uint) error { + return nil +} + +func (m MigrationRuntimeInterface) GetValue(_, _ []byte) (value []byte, err error) { + panic("unexpected GetValue call") +} + +func (m MigrationRuntimeInterface) SetValue(_, _, _ []byte) (err error) { + panic("unexpected SetValue call") +} + +func (m MigrationRuntimeInterface) CreateAccount(_ runtime.Address) (address runtime.Address, err error) { + panic("unexpected CreateAccount call") +} + +func (m MigrationRuntimeInterface) AddEncodedAccountKey(_ runtime.Address, _ []byte) error { + panic("unexpected AddEncodedAccountKey call") +} + +func (m MigrationRuntimeInterface) RevokeEncodedAccountKey(_ runtime.Address, _ int) (publicKey []byte, err error) { + panic("unexpected RevokeEncodedAccountKey call") +} + +func (m MigrationRuntimeInterface) AddAccountKey(_ runtime.Address, _ *runtime.PublicKey, _ runtime.HashAlgorithm, _ int) (*runtime.AccountKey, error) { + panic("unexpected AddAccountKey call") +} + +func (m MigrationRuntimeInterface) GetAccountKey(_ runtime.Address, _ int) (*runtime.AccountKey, error) { + panic("unexpected GetAccountKey call") +} + +func (m MigrationRuntimeInterface) RevokeAccountKey(_ runtime.Address, _ int) (*runtime.AccountKey, error) { + panic("unexpected RevokeAccountKey call") +} + +func (m MigrationRuntimeInterface) UpdateAccountContractCode(_ common.AddressLocation, _ []byte) (err error) { + panic("unexpected UpdateAccountContractCode call") +} + +func (m MigrationRuntimeInterface) RemoveAccountContractCode(common.AddressLocation) (err error) { + panic("unexpected RemoveAccountContractCode call") +} + +func (m MigrationRuntimeInterface) GetSigningAccounts() ([]runtime.Address, error) { + panic("unexpected GetSigningAccounts call") +} + +func (m MigrationRuntimeInterface) ProgramLog(_ string) error { + panic("unexpected ProgramLog call") +} + +func (m MigrationRuntimeInterface) EmitEvent(_ cadence.Event) error { + panic("unexpected EmitEvent call") +} + +func (m MigrationRuntimeInterface) ValueExists(_, _ []byte) (exists bool, err error) { + panic("unexpected ValueExists call") +} + +func (m MigrationRuntimeInterface) GenerateUUID() (uint64, error) { + panic("unexpected GenerateUUID call") +} + +func (m MigrationRuntimeInterface) GetComputationLimit() uint64 { + panic("unexpected GetComputationLimit call") +} + +func (m MigrationRuntimeInterface) SetComputationUsed(_ uint64) error { + panic("unexpected SetComputationUsed call") +} + +func (m MigrationRuntimeInterface) DecodeArgument(_ []byte, _ cadence.Type) (cadence.Value, error) { + panic("unexpected DecodeArgument call") +} + +func (m MigrationRuntimeInterface) GetCurrentBlockHeight() (uint64, error) { + panic("unexpected GetCurrentBlockHeight call") +} + +func (m MigrationRuntimeInterface) GetBlockAtHeight(_ uint64) (block runtime.Block, exists bool, err error) { + panic("unexpected GetBlockAtHeight call") +} + +func (m MigrationRuntimeInterface) UnsafeRandom() (uint64, error) { + panic("unexpected UnsafeRandom call") +} + +func (m MigrationRuntimeInterface) VerifySignature(_ []byte, _ string, _ []byte, _ []byte, _ runtime.SignatureAlgorithm, _ runtime.HashAlgorithm) (bool, error) { + panic("unexpected VerifySignature call") +} + +func (m MigrationRuntimeInterface) Hash(_ []byte, _ string, _ runtime.HashAlgorithm) ([]byte, error) { + panic("unexpected Hash call") +} + +func (m MigrationRuntimeInterface) GetAccountBalance(_ common.Address) (value uint64, err error) { + panic("unexpected GetAccountBalance call") +} + +func (m MigrationRuntimeInterface) GetAccountAvailableBalance(_ common.Address) (value uint64, err error) { + panic("unexpected GetAccountAvailableBalance call") +} + +func (m MigrationRuntimeInterface) GetStorageUsed(_ runtime.Address) (value uint64, err error) { + panic("unexpected GetStorageUsed call") +} + +func (m MigrationRuntimeInterface) GetStorageCapacity(_ runtime.Address) (value uint64, err error) { + panic("unexpected GetStorageCapacity call") +} + +func (m MigrationRuntimeInterface) ImplementationDebugLog(_ string) error { + panic("unexpected ImplementationDebugLog call") +} + +func (m MigrationRuntimeInterface) ValidatePublicKey(_ *runtime.PublicKey) error { + panic("unexpected ValidatePublicKey call") +} + +func (m MigrationRuntimeInterface) GetAccountContractNames(_ runtime.Address) ([]string, error) { + panic("unexpected GetAccountContractNames call") +} + +func (m MigrationRuntimeInterface) AllocateStorageIndex(_ []byte) (atree.StorageIndex, error) { + panic("unexpected AllocateStorageIndex call") +} + +func (m MigrationRuntimeInterface) ComputationUsed() (uint64, error) { + panic("unexpected ComputationUsed call") +} + +func (m MigrationRuntimeInterface) MemoryUsed() (uint64, error) { + panic("unexpected MemoryUsed call") +} + +func (m MigrationRuntimeInterface) InteractionUsed() (uint64, error) { + panic("unexpected InteractionUsed call") +} + +func (m MigrationRuntimeInterface) SetInterpreterSharedState(_ *interpreter.SharedState) { + panic("unexpected SetInterpreterSharedState call") +} + +func (m MigrationRuntimeInterface) GetInterpreterSharedState() *interpreter.SharedState { + panic("unexpected GetInterpreterSharedState call") +} + +func (m MigrationRuntimeInterface) AccountKeysCount(_ runtime.Address) (uint64, error) { + panic("unexpected AccountKeysCount call") +} + +func (m MigrationRuntimeInterface) BLSVerifyPOP(_ *runtime.PublicKey, _ []byte) (bool, error) { + panic("unexpected BLSVerifyPOP call") +} + +func (m MigrationRuntimeInterface) BLSAggregateSignatures(_ [][]byte) ([]byte, error) { + panic("unexpected BLSAggregateSignatures call") +} + +func (m MigrationRuntimeInterface) BLSAggregatePublicKeys(_ []*runtime.PublicKey) (*runtime.PublicKey, error) { + panic("unexpected BLSAggregatePublicKeys call") +} + +func (m MigrationRuntimeInterface) ResourceOwnerChanged(_ *interpreter.Interpreter, _ *interpreter.CompositeValue, _ common.Address, _ common.Address) { + panic("unexpected ResourceOwnerChanged call") +} + +func (m MigrationRuntimeInterface) GenerateAccountID(_ common.Address) (uint64, error) { + panic("unexpected GenerateAccountID call") +} + +func (m MigrationRuntimeInterface) RecordTrace(_ string, _ runtime.Location, _ time.Duration, _ []attribute.KeyValue) { + panic("unexpected RecordTrace call") +} diff --git a/cmd/util/ledger/util/nop_meter.go b/cmd/util/ledger/util/nop_meter.go new file mode 100644 index 00000000000..4304b1ec9cb --- /dev/null +++ b/cmd/util/ledger/util/nop_meter.go @@ -0,0 +1,44 @@ +package util + +import ( + "github.com/onflow/cadence/runtime/common" + + "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/meter" +) + +type NopMeter struct{} + +func (n NopMeter) MeterComputation(_ common.ComputationKind, _ uint) error { + return nil +} + +func (n NopMeter) ComputationUsed() (uint64, error) { + return 0, nil +} + +func (n NopMeter) ComputationIntensities() meter.MeteredComputationIntensities { + return meter.MeteredComputationIntensities{} +} + +func (n NopMeter) MeterMemory(_ common.MemoryUsage) error { + return nil +} + +func (n NopMeter) MemoryUsed() (uint64, error) { + return 0, nil +} + +func (n NopMeter) MeterEmittedEvent(_ uint64) error { + return nil +} + +func (n NopMeter) TotalEmittedEventBytes() uint64 { + return 0 +} + +func (n NopMeter) InteractionUsed() (uint64, error) { + return 0, nil +} + +var _ environment.Meter = NopMeter{} diff --git a/cmd/util/ledger/migrations/utils.go b/cmd/util/ledger/util/util.go similarity index 68% rename from cmd/util/ledger/migrations/utils.go rename to cmd/util/ledger/util/util.go index 506efe61db0..a55513769eb 100644 --- a/cmd/util/ledger/migrations/utils.go +++ b/cmd/util/ledger/util/util.go @@ -1,12 +1,14 @@ -package migrations +package util import ( "fmt" "github.com/onflow/atree" + "github.com/onflow/cadence/runtime/common" "github.com/onflow/flow-go/engine/execution/state" "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/storage/snapshot" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" ) @@ -24,7 +26,7 @@ func KeyToRegisterID(key ledger.Key) (flow.RegisterID, error) { ), nil } -func registerIDToKey(registerID flow.RegisterID) ledger.Key { +func RegisterIDToKey(registerID flow.RegisterID) ledger.Key { newKey := ledger.Key{} newKey.KeyParts = []ledger.KeyPart{ { @@ -89,3 +91,41 @@ func (a *AccountsAtreeLedger) AllocateStorageIndex(owner []byte) (atree.StorageI } return v, nil } + +type PayloadSnapshot struct { + Payloads map[flow.RegisterID]flow.RegisterValue +} + +var _ snapshot.StorageSnapshot = (*PayloadSnapshot)(nil) + +func NewPayloadSnapshot(payloads []ledger.Payload) (*PayloadSnapshot, error) { + l := &PayloadSnapshot{ + Payloads: make(map[flow.RegisterID][]byte, len(payloads)), + } + for _, payload := range payloads { + key, err := payload.Key() + if err != nil { + return nil, err + } + id, err := KeyToRegisterID(key) + if err != nil { + return nil, err + } + l.Payloads[id] = payload.Value() + } + return l, nil +} + +func (p PayloadSnapshot) Get(id flow.RegisterID) (flow.RegisterValue, error) { + value := p.Payloads[id] + return value, nil +} + +// NopMemoryGauge is a no-op implementation of the MemoryGauge interface +type NopMemoryGauge struct{} + +func (n NopMemoryGauge) MeterMemory(common.MemoryUsage) error { + return nil +} + +var _ common.MemoryGauge = (*NopMemoryGauge)(nil) diff --git a/go.mod b/go.mod index d5890cfa995..6d6db1b8375 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/multiformats/go-multihash v0.2.3 github.com/onflow/atree v0.6.0 - github.com/onflow/cadence v0.39.14 + github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 github.com/onflow/flow v0.3.4 github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 diff --git a/go.sum b/go.sum index 73eadfddbf8..9e11a49fe44 100644 --- a/go.sum +++ b/go.sum @@ -1240,8 +1240,8 @@ github.com/onflow/atree v0.1.0-beta1.0.20211027184039-559ee654ece9/go.mod h1:+6x github.com/onflow/atree v0.6.0 h1:j7nQ2r8npznx4NX39zPpBYHmdy45f4xwoi+dm37Jk7c= github.com/onflow/atree v0.6.0/go.mod h1:gBHU0M05qCbv9NN0kijLWMgC47gHVNBIp4KmsVFi0tc= github.com/onflow/cadence v0.20.1/go.mod h1:7mzUvPZUIJztIbr9eTvs+fQjWWHTF8veC+yk4ihcNIA= -github.com/onflow/cadence v0.39.14 h1:YoR3YFUga49rqzVY1xwI6I2ZDBmvwGh13jENncsleC8= -github.com/onflow/cadence v0.39.14/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 h1:p2f+A94QRsuEXeQRX9cwdEywNneJ6Rlw6LYq0ao5PyA= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= github.com/onflow/flow v0.3.4 h1:FXUWVdYB90f/rjNcY0Owo30gL790tiYff9Pb/sycXYE= github.com/onflow/flow v0.3.4/go.mod h1:lzyAYmbu1HfkZ9cfnL5/sjrrsnJiUU8fRL26CqLP7+c= github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d h1:B7PdhdUNkve5MVrekWDuQf84XsGBxNZ/D3x+QQ8XeVs= diff --git a/insecure/go.mod b/insecure/go.mod index b727c8ecf6c..0d52624926a 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -180,7 +180,7 @@ require ( github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onflow/atree v0.6.0 // indirect - github.com/onflow/cadence v0.39.14 // indirect + github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 // indirect github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d // indirect github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 // indirect github.com/onflow/flow-ft/lib/go/contracts v0.7.0 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 308ac3ab9f3..8d0660ffb87 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1216,8 +1216,8 @@ github.com/onflow/atree v0.1.0-beta1.0.20211027184039-559ee654ece9/go.mod h1:+6x github.com/onflow/atree v0.6.0 h1:j7nQ2r8npznx4NX39zPpBYHmdy45f4xwoi+dm37Jk7c= github.com/onflow/atree v0.6.0/go.mod h1:gBHU0M05qCbv9NN0kijLWMgC47gHVNBIp4KmsVFi0tc= github.com/onflow/cadence v0.20.1/go.mod h1:7mzUvPZUIJztIbr9eTvs+fQjWWHTF8veC+yk4ihcNIA= -github.com/onflow/cadence v0.39.14 h1:YoR3YFUga49rqzVY1xwI6I2ZDBmvwGh13jENncsleC8= -github.com/onflow/cadence v0.39.14/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 h1:p2f+A94QRsuEXeQRX9cwdEywNneJ6Rlw6LYq0ao5PyA= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d h1:B7PdhdUNkve5MVrekWDuQf84XsGBxNZ/D3x+QQ8XeVs= github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d/go.mod h1:xAiV/7TKhw863r6iO3CS5RnQ4F+pBY1TxD272BsILlo= github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 h1:X25A1dNajNUtE+KoV76wQ6BR6qI7G65vuuRXxDDqX7E= @@ -1228,13 +1228,13 @@ github.com/onflow/flow-go-sdk v0.24.0/go.mod h1:IoptMLPyFXWvyd9yYA6/4EmSeeozl6nJ github.com/onflow/flow-go-sdk v0.41.9 h1:cyplhhhc0RnfOAan2t7I/7C9g1hVGDDLUhWj6ZHAkk4= github.com/onflow/flow-go-sdk v0.41.9/go.mod h1:e9Q5TITCy7g08lkdQJxP8fAKBnBoC5FjALvUKr36j4I= github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8OSD97bJc60cLuQ= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce h1:YQKijiQaq8SF1ayNqp3VVcwbBGXSnuHNHq4GQmVGybE= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/flow-go/crypto v0.24.9 h1:0EQp+kSZYJepMIiSypfJVe7tzsPcb6UXOdOtsTCDhBs= github.com/onflow/flow-go/crypto v0.24.9/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7Q6u+bCI78lfNX0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce h1:YQKijiQaq8SF1ayNqp3VVcwbBGXSnuHNHq4GQmVGybE= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d h1:QcOAeEyF3iAUHv21LQ12sdcsr0yFrJGoGLyCAzYYtvI= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d/go.mod h1:GCPpiyRoHncdqPj++zPr9ZOYBX4hpJ0pYZRYqSE8VKk= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/integration/go.mod b/integration/go.mod index e47e8bac3c3..3daf2018d37 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -17,7 +17,7 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger2 v0.1.3 github.com/ipfs/go-ipfs-blockstore v1.3.0 - github.com/onflow/cadence v0.39.14 + github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 github.com/onflow/flow-emulator v0.53.0 diff --git a/integration/go.sum b/integration/go.sum index 4aac8d7305d..e3542bd0b34 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1354,8 +1354,8 @@ github.com/onflow/atree v0.1.0-beta1.0.20211027184039-559ee654ece9/go.mod h1:+6x github.com/onflow/atree v0.6.0 h1:j7nQ2r8npznx4NX39zPpBYHmdy45f4xwoi+dm37Jk7c= github.com/onflow/atree v0.6.0/go.mod h1:gBHU0M05qCbv9NN0kijLWMgC47gHVNBIp4KmsVFi0tc= github.com/onflow/cadence v0.20.1/go.mod h1:7mzUvPZUIJztIbr9eTvs+fQjWWHTF8veC+yk4ihcNIA= -github.com/onflow/cadence v0.39.14 h1:YoR3YFUga49rqzVY1xwI6I2ZDBmvwGh13jENncsleC8= -github.com/onflow/cadence v0.39.14/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2 h1:p2f+A94QRsuEXeQRX9cwdEywNneJ6Rlw6LYq0ao5PyA= +github.com/onflow/cadence v0.39.15-0.20230823153602-c1edd8a467e2/go.mod h1:OIJLyVBPa339DCBQXBfGaorT4tBjQh9gSKe+ZAIyyh0= github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d h1:B7PdhdUNkve5MVrekWDuQf84XsGBxNZ/D3x+QQ8XeVs= github.com/onflow/flow-core-contracts/lib/go/contracts v1.2.4-0.20230703193002-53362441b57d/go.mod h1:xAiV/7TKhw863r6iO3CS5RnQ4F+pBY1TxD272BsILlo= github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 h1:X25A1dNajNUtE+KoV76wQ6BR6qI7G65vuuRXxDDqX7E= diff --git a/ledger/complete/checkpoint_benchmark_test.go b/ledger/complete/checkpoint_benchmark_test.go index 177804be5a7..5d5ae9f726c 100644 --- a/ledger/complete/checkpoint_benchmark_test.go +++ b/ledger/complete/checkpoint_benchmark_test.go @@ -58,7 +58,7 @@ func benchmarkStoreCheckpoint(b *testing.B, version int, concurrent bool) { }() // Load checkpoint - tries, err := wal.LoadCheckpoint(*checkpointFile, &log) + tries, err := wal.LoadCheckpoint(*checkpointFile, log) if err != nil { b.Fatalf("cannot load checkpoint: %s", err) } @@ -69,7 +69,7 @@ func benchmarkStoreCheckpoint(b *testing.B, version int, concurrent bool) { // Serialize checkpoint V5. switch version { case 5: - err = wal.StoreCheckpointV5(outputDir, fileName, &log, tries...) + err = wal.StoreCheckpointV5(outputDir, fileName, log, tries...) case 6: if concurrent { err = wal.StoreCheckpointV6Concurrently(tries, outputDir, fileName, &log) @@ -102,7 +102,7 @@ func BenchmarkLoadCheckpoint(b *testing.B) { b.ResetTimer() // Load checkpoint - _, err = wal.LoadCheckpoint(*checkpointFile, &log) + _, err = wal.LoadCheckpoint(*checkpointFile, log) b.StopTimer() elapsed := time.Since(start) diff --git a/ledger/complete/wal/checkpoint_v5_test.go b/ledger/complete/wal/checkpoint_v5_test.go index 9721a50d04e..4422d3376c0 100644 --- a/ledger/complete/wal/checkpoint_v5_test.go +++ b/ledger/complete/wal/checkpoint_v5_test.go @@ -15,12 +15,12 @@ func TestCopyCheckpointFileV5(t *testing.T) { tries := createSimpleTrie(t) fileName := "checkpoint" logger := unittest.Logger() - require.NoErrorf(t, StoreCheckpointV5(dir, fileName, &logger, tries...), "fail to store checkpoint") + require.NoErrorf(t, StoreCheckpointV5(dir, fileName, logger, tries...), "fail to store checkpoint") to := filepath.Join(dir, "newfolder") newPaths, err := CopyCheckpointFile(fileName, dir, to) require.NoError(t, err) log.Info().Msgf("copied to :%v", newPaths) - decoded, err := LoadCheckpoint(filepath.Join(to, fileName), &logger) + decoded, err := LoadCheckpoint(filepath.Join(to, fileName), logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) requireTriesEqual(t, tries, decoded) }) diff --git a/ledger/complete/wal/checkpoint_v6_leaf_reader.go b/ledger/complete/wal/checkpoint_v6_leaf_reader.go index 77dbc0716b5..169bf895cbd 100644 --- a/ledger/complete/wal/checkpoint_v6_leaf_reader.go +++ b/ledger/complete/wal/checkpoint_v6_leaf_reader.go @@ -29,7 +29,7 @@ func nodeToLeaf(leaf *node.Node) *LeafNode { // OpenAndReadLeafNodesFromCheckpointV6 takes a channel for pushing the leaf nodes that are read from // the given checkpoint file specified by dir and fileName. // It returns when finish reading the checkpoint file and the input channel can be closed. -func OpenAndReadLeafNodesFromCheckpointV6(allLeafNodesCh chan<- *LeafNode, dir string, fileName string, logger *zerolog.Logger) (errToReturn error) { +func OpenAndReadLeafNodesFromCheckpointV6(allLeafNodesCh chan<- *LeafNode, dir string, fileName string, logger zerolog.Logger) (errToReturn error) { // we are the only sender of the channel, closing it after done defer func() { close(allLeafNodesCh) @@ -68,7 +68,7 @@ func OpenAndReadLeafNodesFromCheckpointV6(allLeafNodesCh chan<- *LeafNode, dir s return nil } -func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- *LeafNode, dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) error { +func readCheckpointSubTrieLeafNodes(leafNodesCh chan<- *LeafNode, dir string, fileName string, index int, checksum uint32, logger zerolog.Logger) error { return processCheckpointSubTrie(dir, fileName, index, checksum, logger, func(reader *Crc32Reader, nodesCount uint64) error { scratch := make([]byte, 1024*4) // must not be less than 1024 diff --git a/ledger/complete/wal/checkpoint_v6_reader.go b/ledger/complete/wal/checkpoint_v6_reader.go index 98a9b2f4b77..63791e0f46c 100644 --- a/ledger/complete/wal/checkpoint_v6_reader.go +++ b/ledger/complete/wal/checkpoint_v6_reader.go @@ -31,7 +31,7 @@ var ErrEOFNotReached = errors.New("expect to reach EOF, but actually didn't") // it returns (nil, os.ErrNotExist) if a certain file is missing, use (os.IsNotExist to check) // it returns (nil, ErrEOFNotReached) if a certain part file is malformed // it returns (nil, err) if running into any exception -func readCheckpointV6(headerFile *os.File, logger *zerolog.Logger) ([]*trie.MTrie, error) { +func readCheckpointV6(headerFile *os.File, logger zerolog.Logger) ([]*trie.MTrie, error) { // the full path of header file headerPath := headerFile.Name() dir, fileName := filepath.Split(headerPath) @@ -53,7 +53,7 @@ func readCheckpointV6(headerFile *os.File, logger *zerolog.Logger) ([]*trie.MTri // TODO making number of goroutine configable for reading subtries, which can help us // test the code on machines that don't have as much RAM as EN by using fewer goroutines. - subtrieNodes, err := readSubTriesConcurrently(dir, fileName, subtrieChecksums, &lg) + subtrieNodes, err := readSubTriesConcurrently(dir, fileName, subtrieChecksums, lg) if err != nil { return nil, fmt.Errorf("could not read subtrie from dir: %w", err) } @@ -61,7 +61,7 @@ func readCheckpointV6(headerFile *os.File, logger *zerolog.Logger) ([]*trie.MTri lg.Info().Uint32("topsum", topTrieChecksum). Msg("finish reading all v6 subtrie files, start reading top level tries") - tries, err := readTopLevelTries(dir, fileName, subtrieNodes, topTrieChecksum, &lg) + tries, err := readTopLevelTries(dir, fileName, subtrieNodes, topTrieChecksum, lg) if err != nil { return nil, fmt.Errorf("could not read top level nodes or tries: %w", err) } @@ -83,7 +83,7 @@ func readCheckpointV6(headerFile *os.File, logger *zerolog.Logger) ([]*trie.MTri } // OpenAndReadCheckpointV6 open the checkpoint file and read it with readCheckpointV6 -func OpenAndReadCheckpointV6(dir string, fileName string, logger *zerolog.Logger) ( +func OpenAndReadCheckpointV6(dir string, fileName string, logger zerolog.Logger) ( tries []*trie.MTrie, errToReturn error, ) { @@ -127,7 +127,7 @@ func filePathPattern(dir string, fileName string) string { // readCheckpointHeader takes a file path and returns subtrieChecksums and topTrieChecksum // any error returned are exceptions -func readCheckpointHeader(filepath string, logger *zerolog.Logger) ( +func readCheckpointHeader(filepath string, logger zerolog.Logger) ( checksumsOfSubtries []uint32, checksumOfTopTrie uint32, errToReturn error, @@ -278,7 +278,7 @@ type resultReadSubTrie struct { Err error } -func readSubTriesConcurrently(dir string, fileName string, subtrieChecksums []uint32, logger *zerolog.Logger) ([][]*node.Node, error) { +func readSubTriesConcurrently(dir string, fileName string, subtrieChecksums []uint32, logger zerolog.Logger) ([][]*node.Node, error) { numOfSubTries := len(subtrieChecksums) jobs := make(chan jobReadSubtrie, numOfSubTries) @@ -325,7 +325,7 @@ func readSubTriesConcurrently(dir string, fileName string, subtrieChecksums []ui return nodesGroups, nil } -func readCheckpointSubTrie(dir string, fileName string, index int, checksum uint32, logger *zerolog.Logger) ( +func readCheckpointSubTrie(dir string, fileName string, index int, checksum uint32, logger zerolog.Logger) ( []*node.Node, error, ) { @@ -372,7 +372,7 @@ func processCheckpointSubTrie( fileName string, index int, checksum uint32, - logger *zerolog.Logger, + logger zerolog.Logger, processNode func(*Crc32Reader, uint64) error, ) ( errToReturn error, @@ -498,7 +498,7 @@ func readSubTriesFooter(f *os.File) (uint64, uint32, error) { // 5. node count // 6. trie count // 7. checksum -func readTopLevelTries(dir string, fileName string, subtrieNodes [][]*node.Node, topTrieChecksum uint32, logger *zerolog.Logger) ( +func readTopLevelTries(dir string, fileName string, subtrieNodes [][]*node.Node, topTrieChecksum uint32, logger zerolog.Logger) ( rootTries []*trie.MTrie, errToReturn error, ) { diff --git a/ledger/complete/wal/checkpoint_v6_test.go b/ledger/complete/wal/checkpoint_v6_test.go index fb98777e0ec..78a52624bbc 100644 --- a/ledger/complete/wal/checkpoint_v6_test.go +++ b/ledger/complete/wal/checkpoint_v6_test.go @@ -170,7 +170,7 @@ func TestEncodeSubTrie(t *testing.T) { for index, roots := range subtrieRoots { unittest.RunWithTempDir(t, func(dir string) { uniqueIndices, nodeCount, checksum, err := storeCheckpointSubTrie( - index, roots, estimatedSubtrieNodeCount, dir, file, &logger) + index, roots, estimatedSubtrieNodeCount, dir, file, logger) require.NoError(t, err) // subtrie roots might have duplciates, that why we group the them, @@ -205,7 +205,7 @@ func TestEncodeSubTrie(t *testing.T) { uniqueIndices, nodeCount, checksum) // all the nodes - nodes, err := readCheckpointSubTrie(dir, file, index, checksum, &logger) + nodes, err := readCheckpointSubTrie(dir, file, index, checksum, logger) require.NoError(t, err) for _, root := range roots { @@ -263,7 +263,7 @@ func TestWriteAndReadCheckpointV6EmptyTrie(t *testing.T) { fileName := "checkpoint-empty-trie" logger := unittest.Logger() require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint") - decoded, err := OpenAndReadCheckpointV6(dir, fileName, &logger) + decoded, err := OpenAndReadCheckpointV6(dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) requireTriesEqual(t, tries, decoded) }) @@ -275,7 +275,7 @@ func TestWriteAndReadCheckpointV6SimpleTrie(t *testing.T) { fileName := "checkpoint" logger := unittest.Logger() require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint") - decoded, err := OpenAndReadCheckpointV6(dir, fileName, &logger) + decoded, err := OpenAndReadCheckpointV6(dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) requireTriesEqual(t, tries, decoded) }) @@ -287,7 +287,7 @@ func TestWriteAndReadCheckpointV6MultipleTries(t *testing.T) { fileName := "checkpoint-multi-file" logger := unittest.Logger() require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, &logger), "fail to store checkpoint") - decoded, err := OpenAndReadCheckpointV6(dir, fileName, &logger) + decoded, err := OpenAndReadCheckpointV6(dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) requireTriesEqual(t, tries, decoded) }) @@ -322,7 +322,7 @@ func TestWriteAndReadCheckpointV6LeafEmptyTrie(t *testing.T) { bufSize := 10 leafNodesCh := make(chan *LeafNode, bufSize) go func() { - err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger) + err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) }() for range leafNodesCh { @@ -340,7 +340,7 @@ func TestWriteAndReadCheckpointV6LeafSimpleTrie(t *testing.T) { bufSize := 1 leafNodesCh := make(chan *LeafNode, bufSize) go func() { - err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger) + err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) }() resultPayloads := make([]ledger.Payload, 0) @@ -363,7 +363,7 @@ func TestWriteAndReadCheckpointV6LeafMultipleTries(t *testing.T) { bufSize := 5 leafNodesCh := make(chan *LeafNode, bufSize) go func() { - err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger) + err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) }() resultPayloads := make([]ledger.Payload, 0) @@ -422,7 +422,7 @@ func compareFiles(file1, file2 string) error { return nil } -func storeCheckpointV5(tries []*trie.MTrie, dir string, fileName string, logger *zerolog.Logger) error { +func storeCheckpointV5(tries []*trie.MTrie, dir string, fileName string, logger zerolog.Logger) error { return StoreCheckpointV5(dir, fileName, logger, tries...) } @@ -432,8 +432,8 @@ func TestWriteAndReadCheckpointV5(t *testing.T) { fileName := "checkpoint1" logger := unittest.Logger() - require.NoErrorf(t, storeCheckpointV5(tries, dir, fileName, &logger), "fail to store checkpoint") - decoded, err := LoadCheckpoint(filepath.Join(dir, fileName), &logger) + require.NoErrorf(t, storeCheckpointV5(tries, dir, fileName, logger), "fail to store checkpoint") + decoded, err := LoadCheckpoint(filepath.Join(dir, fileName), logger) require.NoErrorf(t, err, "fail to load checkpoint") requireTriesEqual(t, tries, decoded) }) @@ -448,12 +448,12 @@ func TestWriteAndReadCheckpointV6ThenBackToV5(t *testing.T) { // store tries into v6 then read back, then store into v5 require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, "checkpoint-v6", &logger), "fail to store checkpoint") - decoded, err := OpenAndReadCheckpointV6(dir, "checkpoint-v6", &logger) + decoded, err := OpenAndReadCheckpointV6(dir, "checkpoint-v6", logger) require.NoErrorf(t, err, "fail to read checkpoint %v/checkpoint-v6", dir) - require.NoErrorf(t, storeCheckpointV5(decoded, dir, "checkpoint-v6-v5", &logger), "fail to store checkpoint") + require.NoErrorf(t, storeCheckpointV5(decoded, dir, "checkpoint-v6-v5", logger), "fail to store checkpoint") // store tries directly into v5 checkpoint - require.NoErrorf(t, storeCheckpointV5(tries, dir, "checkpoint-v5", &logger), "fail to store checkpoint") + require.NoErrorf(t, storeCheckpointV5(tries, dir, "checkpoint-v5", logger), "fail to store checkpoint") // compare the two v5 checkpoint files should be identical require.NoError(t, compareFiles( @@ -511,7 +511,7 @@ func TestAllPartFileExist(t *testing.T) { err = os.Remove(fileToDelete) require.NoError(t, err, "fail to remove part file") - _, err = OpenAndReadCheckpointV6(dir, fileName, &logger) + _, err = OpenAndReadCheckpointV6(dir, fileName, logger) require.ErrorIs(t, err, os.ErrNotExist, "wrong error type returned") } }) @@ -541,7 +541,7 @@ func TestAllPartFileExistLeafReader(t *testing.T) { bufSize := 10 leafNodesCh := make(chan *LeafNode, bufSize) - err = OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, &logger) + err = OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger) require.ErrorIs(t, err, os.ErrNotExist, "wrong error type returned") } }) @@ -585,7 +585,7 @@ func TestCopyCheckpointFileV6(t *testing.T) { newPaths, err := CopyCheckpointFile(fileName, dir, to) require.NoError(t, err) log.Info().Msgf("copied to :%v", newPaths) - decoded, err := OpenAndReadCheckpointV6(to, fileName, &logger) + decoded, err := OpenAndReadCheckpointV6(to, fileName, logger) require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName) requireTriesEqual(t, tries, decoded) }) diff --git a/ledger/complete/wal/checkpoint_v6_writer.go b/ledger/complete/wal/checkpoint_v6_writer.go index 7b138a61085..ea2021f2156 100644 --- a/ledger/complete/wal/checkpoint_v6_writer.go +++ b/ledger/complete/wal/checkpoint_v6_writer.go @@ -103,7 +103,7 @@ func storeCheckpointV6( subTrieRootAndTopLevelTrieCount(tries), outputDir, outputFile, - &lg, + lg, nWorker, ) if err != nil { @@ -113,12 +113,12 @@ func storeCheckpointV6( lg.Info().Msgf("subtrie have been stored. sub trie node count: %v", subTriesNodeCount) topTrieChecksum, err := storeTopLevelNodesAndTrieRoots( - tries, subTrieRootIndices, subTriesNodeCount, outputDir, outputFile, &lg) + tries, subTrieRootIndices, subTriesNodeCount, outputDir, outputFile, lg) if err != nil { return fmt.Errorf("could not store top level tries: %w", err) } - err = storeCheckpointHeader(subTrieChecksums, topTrieChecksum, outputDir, outputFile, &lg) + err = storeCheckpointHeader(subTrieChecksums, topTrieChecksum, outputDir, outputFile, lg) if err != nil { return fmt.Errorf("could not store checkpoint header: %w", err) } @@ -136,7 +136,7 @@ func storeCheckpointHeader( topTrieChecksum uint32, outputDir string, outputFile string, - logger *zerolog.Logger, + logger zerolog.Logger, ) ( errToReturn error, ) { @@ -207,7 +207,7 @@ func storeTopLevelNodesAndTrieRoots( subTriesNodeCount uint64, outputDir string, outputFile string, - logger *zerolog.Logger, + logger zerolog.Logger, ) ( checksumOfTopTriePartFile uint32, errToReturn error, @@ -319,7 +319,7 @@ func storeSubTrieConcurrently( subAndTopNodeCount int, // useful for preallocating memory for the node indices map to be returned outputDir string, outputFile string, - logger *zerolog.Logger, + logger zerolog.Logger, nWorker uint, ) ( map[*node.Node]uint64, // node indices @@ -399,13 +399,13 @@ func storeSubTrieConcurrently( return results, nodeCounter, checksums, nil } -func createWriterForTopTries(dir string, file string, logger *zerolog.Logger) (io.WriteCloser, error) { +func createWriterForTopTries(dir string, file string, logger zerolog.Logger) (io.WriteCloser, error) { _, topTriesFileName := filePathTopTries(dir, file) return createClosableWriter(dir, topTriesFileName, logger) } -func createWriterForSubtrie(dir string, file string, logger *zerolog.Logger, index int) (io.WriteCloser, error) { +func createWriterForSubtrie(dir string, file string, logger zerolog.Logger, index int) (io.WriteCloser, error) { _, subTriesFileName, err := filePathSubTries(dir, file, index) if err != nil { return nil, err @@ -414,7 +414,7 @@ func createWriterForSubtrie(dir string, file string, logger *zerolog.Logger, ind return createClosableWriter(dir, subTriesFileName, logger) } -func createClosableWriter(dir string, fileName string, logger *zerolog.Logger) (io.WriteCloser, error) { +func createClosableWriter(dir string, fileName string, logger zerolog.Logger) (io.WriteCloser, error) { fullPath := path.Join(dir, fileName) if utilsio.FileExists(fullPath) { return nil, fmt.Errorf("checkpoint part file %v already exists", fullPath) @@ -447,7 +447,7 @@ func storeCheckpointSubTrie( estimatedSubtrieNodeCount int, // for estimate the amount of memory to be preallocated outputDir string, outputFile string, - logger *zerolog.Logger, + logger zerolog.Logger, ) ( rootNodesOfAllSubtries map[*node.Node]uint64, // the stored position of each unique root node totalSubtrieNodeCount uint64, diff --git a/ledger/complete/wal/checkpointer.go b/ledger/complete/wal/checkpointer.go index 6b9239f1c22..5e58ff95235 100644 --- a/ledger/complete/wal/checkpointer.go +++ b/ledger/complete/wal/checkpointer.go @@ -267,7 +267,7 @@ func NumberToFilename(n int) string { } func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error) { - return CreateCheckpointWriterForFile(c.dir, NumberToFilename(to), &c.wal.log) + return CreateCheckpointWriterForFile(c.dir, NumberToFilename(to), c.wal.log) } func (c *Checkpointer) Dir() string { @@ -275,7 +275,7 @@ func (c *Checkpointer) Dir() string { } // CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it. -func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) (io.WriteCloser, error) { +func CreateCheckpointWriterForFile(dir, filename string, logger zerolog.Logger) (io.WriteCloser, error) { fullname := path.Join(dir, filename) @@ -312,7 +312,7 @@ func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) // as for each node, the children have been previously encountered. // TODO: evaluate alternatives to CRC32 since checkpoint file is many GB in size. // TODO: add concurrency if the performance gains are enough to offset complexity. -func StoreCheckpointV5(dir string, fileName string, logger *zerolog.Logger, tries ...*trie.MTrie) ( +func StoreCheckpointV5(dir string, fileName string, logger zerolog.Logger, tries ...*trie.MTrie) ( // error // Note, the above code, which didn't define the name "err" for the returned error, would be wrong, // beause err needs to be defined in order to be updated by the defer function @@ -428,7 +428,7 @@ func StoreCheckpointV5(dir string, fileName string, logger *zerolog.Logger, trie // Index 0 is a special case with nil node. traversedSubtrieNodes[nil] = 0 - logging := logProgress(fmt.Sprintf("storing %v-th sub trie roots", i), estimatedSubtrieNodeCount, &log.Logger) + logging := logProgress(fmt.Sprintf("storing %v-th sub trie roots", i), estimatedSubtrieNodeCount, log.Logger) for _, root := range subTrieRoot { // Empty trie is always added to forest as starting point and // empty trie's root is nil. It remains in the forest until evicted @@ -516,7 +516,7 @@ func StoreCheckpointV5(dir string, fileName string, logger *zerolog.Logger, trie return nil } -func logProgress(msg string, estimatedSubtrieNodeCount int, logger *zerolog.Logger) func(nodeCounter uint64) { +func logProgress(msg string, estimatedSubtrieNodeCount int, logger zerolog.Logger) func(nodeCounter uint64) { lg := util.LogProgress(msg, estimatedSubtrieNodeCount, logger) return func(index uint64) { lg(int(index)) @@ -601,12 +601,12 @@ func getNodesAtLevel(root *node.Node, level uint) []*node.Node { func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error) { filepath := path.Join(c.dir, NumberToFilename(checkpoint)) - return LoadCheckpoint(filepath, &c.wal.log) + return LoadCheckpoint(filepath, c.wal.log) } func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error) { filepath := path.Join(c.dir, bootstrap.FilenameWALRootCheckpoint) - return LoadCheckpoint(filepath, &c.wal.log) + return LoadCheckpoint(filepath, c.wal.log) } func (c *Checkpointer) HasRootCheckpoint() (bool, error) { @@ -628,7 +628,7 @@ func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error { return deleteCheckpointFiles(c.dir, name) } -func LoadCheckpoint(filepath string, logger *zerolog.Logger) ( +func LoadCheckpoint(filepath string, logger zerolog.Logger) ( tries []*trie.MTrie, errToReturn error) { file, err := os.Open(filepath) @@ -648,7 +648,7 @@ func LoadCheckpoint(filepath string, logger *zerolog.Logger) ( return readCheckpoint(file, logger) } -func readCheckpoint(f *os.File, logger *zerolog.Logger) ([]*trie.MTrie, error) { +func readCheckpoint(f *os.File, logger zerolog.Logger) ([]*trie.MTrie, error) { // Read header: magic (2 bytes) + version (2 bytes) header := make([]byte, headerSize) @@ -888,7 +888,7 @@ func readCheckpointV4(f *os.File) ([]*trie.MTrie, error) { // readCheckpointV5 decodes checkpoint file (version 5) and returns a list of tries. // Checkpoint file header (magic and version) are verified by the caller. -func readCheckpointV5(f *os.File, logger *zerolog.Logger) ([]*trie.MTrie, error) { +func readCheckpointV5(f *os.File, logger zerolog.Logger) ([]*trie.MTrie, error) { logger.Info().Msgf("reading v5 checkpoint file") // Scratch buffer is used as temporary buffer that reader can read into. @@ -1006,7 +1006,7 @@ func readCheckpointV5(f *os.File, logger *zerolog.Logger) ([]*trie.MTrie, error) // causes two checkpoint files to be cached for each checkpointing, eventually // caching hundreds of GB. // CAUTION: no-op when GOOS != linux. -func evictFileFromLinuxPageCache(f *os.File, fsync bool, logger *zerolog.Logger) error { +func evictFileFromLinuxPageCache(f *os.File, fsync bool, logger zerolog.Logger) error { err := fadviseNoLinuxPageCache(f.Fd(), fsync) if err != nil { return err diff --git a/ledger/complete/wal/checkpointer_test.go b/ledger/complete/wal/checkpointer_test.go index 40ec3ff5925..a0a828748d3 100644 --- a/ledger/complete/wal/checkpointer_test.go +++ b/ledger/complete/wal/checkpointer_test.go @@ -531,12 +531,12 @@ func Test_StoringLoadingCheckpoints(t *testing.T) { fullpath := path.Join(dir, "temp-checkpoint") - err = realWAL.StoreCheckpointV5(dir, "temp-checkpoint", &logger, updatedTrie) + err = realWAL.StoreCheckpointV5(dir, "temp-checkpoint", logger, updatedTrie) require.NoError(t, err) t.Run("works without data modification", func(t *testing.T) { logger := unittest.Logger() - tries, err := realWAL.LoadCheckpoint(fullpath, &logger) + tries, err := realWAL.LoadCheckpoint(fullpath, logger) require.NoError(t, err) require.Equal(t, 1, len(tries)) require.Equal(t, updatedTrie, tries[0]) @@ -554,7 +554,7 @@ func Test_StoringLoadingCheckpoints(t *testing.T) { require.NoError(t, err) logger := unittest.Logger() - tries, err := realWAL.LoadCheckpoint(fullpath, &logger) + tries, err := realWAL.LoadCheckpoint(fullpath, logger) require.Error(t, err) require.Nil(t, tries) require.Contains(t, err.Error(), "checksum") diff --git a/ledger/complete/wal/checkpointer_versioning_test.go b/ledger/complete/wal/checkpointer_versioning_test.go index 58c85a3d2dc..af2d6ab4acd 100644 --- a/ledger/complete/wal/checkpointer_versioning_test.go +++ b/ledger/complete/wal/checkpointer_versioning_test.go @@ -20,7 +20,7 @@ func TestLoadCheckpointV1(t *testing.T) { } logger := zerolog.Nop() - tries, err := LoadCheckpoint("test_data/checkpoint.v1", &logger) + tries, err := LoadCheckpoint("test_data/checkpoint.v1", logger) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) @@ -40,7 +40,7 @@ func TestLoadCheckpointV3(t *testing.T) { } logger := zerolog.Nop() - tries, err := LoadCheckpoint("test_data/checkpoint.v3", &logger) + tries, err := LoadCheckpoint("test_data/checkpoint.v3", logger) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) @@ -60,7 +60,7 @@ func TestLoadCheckpointV4(t *testing.T) { } logger := zerolog.Nop() - tries, err := LoadCheckpoint("test_data/checkpoint.v4", &logger) + tries, err := LoadCheckpoint("test_data/checkpoint.v4", logger) require.NoError(t, err) require.Equal(t, len(expectedRootHash), len(tries)) diff --git a/ledger/complete/wal/syncrename.go b/ledger/complete/wal/syncrename.go index 140d4534006..28a0e47cfea 100644 --- a/ledger/complete/wal/syncrename.go +++ b/ledger/complete/wal/syncrename.go @@ -21,7 +21,7 @@ type WriterSeekerCloser interface { // to target one as the last step. This help avoid situation when writing is // interrupted and unusable file but with target name exists. type SyncOnCloseRenameFile struct { - logger *zerolog.Logger + logger zerolog.Logger file *os.File targetName string savedError error // savedError is the first error returned from Write. Close() renames temp file to target file only if savedError is nil. diff --git a/ledger/complete/wal/syncrename_test.go b/ledger/complete/wal/syncrename_test.go index 406905a631b..c8ee860f487 100644 --- a/ledger/complete/wal/syncrename_test.go +++ b/ledger/complete/wal/syncrename_test.go @@ -34,7 +34,7 @@ func Test_RenameHappensAfterClosing(t *testing.T) { file: file, targetName: fullFileName, Writer: writer, - logger: &logger, + logger: logger, } sampleBytes := []byte{2, 1, 3, 7} diff --git a/module/util/log.go b/module/util/log.go index 45807b9757d..d949e042168 100644 --- a/module/util/log.go +++ b/module/util/log.go @@ -1,6 +1,9 @@ package util import ( + "sync" + "time" + "github.com/rs/zerolog" ) @@ -8,18 +11,91 @@ import ( // it prints the progress from 0% to 100% to indicate the index from 0 to (total - 1) has been // processed. // useful to report the progress of processing the index from 0 to (total - 1) -func LogProgress(msg string, total int, logger *zerolog.Logger) func(currentIndex int) { - logThreshold := float64(0) +func LogProgress(msg string, total int, log zerolog.Logger) func(currentIndex int) { + nth := uint32(total / 10) // sample every 10% by default + if nth == 0 { + nth = 1 + } + + sampler := log.Sample(newProgressLogsSampler(nth, 60*time.Second)) + + start := time.Now() return func(currentIndex int) { percentage := float64(100) if total > 0 { percentage = (float64(currentIndex+1) / float64(total)) * 100. // currentIndex+1 assuming zero based indexing } - // report every 10 percent - if percentage >= logThreshold { - logger.Info().Msgf("%s progress: %v percent", msg, logThreshold) - logThreshold += 10 + etaString := "unknown" + if percentage > 0 { + eta := time.Duration(float64(time.Since(start)) / percentage * (100 - percentage)) + etaString = eta.String() + + } + + if currentIndex+1 != total { + sampler.Info().Msgf("%s progress %d/%d (%.1f%%) eta %s", msg, currentIndex+1, total, percentage, etaString) + } else { + log.Info().Msgf("%s progress %d/%d (%.1f%%) total time %s", msg, currentIndex+1, total, percentage, time.Since(start)) } } } + +type TimedSampler struct { + start time.Time + Duration time.Duration + mu sync.Mutex +} + +var _ zerolog.Sampler = (*TimedSampler)(nil) + +func NewTimedSampler(duration time.Duration) *TimedSampler { + return &TimedSampler{ + start: time.Now(), + Duration: duration, + mu: sync.Mutex{}, + } +} + +func (s *TimedSampler) Sample(_ zerolog.Level) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if time.Since(s.start) > s.Duration { + s.start = time.Now() + return true + } + return false +} + +func (s *TimedSampler) Reset() { + s.mu.Lock() + defer s.mu.Unlock() + + s.start = time.Now() +} + +type progressLogsSampler struct { + basicSampler *zerolog.BasicSampler + timedSampler *TimedSampler +} + +var _ zerolog.Sampler = (*progressLogsSampler)(nil) + +// newProgressLogsSampler returns a sampler that samples every nth log +// and also samples a log if the last log was more than duration ago +func newProgressLogsSampler(nth uint32, duration time.Duration) zerolog.Sampler { + return &progressLogsSampler{ + basicSampler: &zerolog.BasicSampler{N: nth}, + timedSampler: NewTimedSampler(duration), + } +} + +func (s *progressLogsSampler) Sample(lvl zerolog.Level) bool { + sample := s.basicSampler.Sample(lvl) + if sample { + s.timedSampler.Reset() + return true + } + return s.timedSampler.Sample(lvl) +} diff --git a/module/util/log_test.go b/module/util/log_test.go index 9d1d4851dcd..4593f575343 100644 --- a/module/util/log_test.go +++ b/module/util/log_test.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "fmt" "testing" "github.com/rs/zerolog" @@ -12,48 +13,46 @@ func TestLogProgress40(t *testing.T) { buf := bytes.NewBufferString("") lg := zerolog.New(buf) total := 40 - logger := LogProgress("test", total, &lg) + logger := LogProgress("test", total, lg) for i := 0; i < total; i++ { logger(i) } - expectedLogs := - `{"level":"info","message":"test progress: 0 percent"} -{"level":"info","message":"test progress: 10 percent"} -{"level":"info","message":"test progress: 20 percent"} -{"level":"info","message":"test progress: 30 percent"} -{"level":"info","message":"test progress: 40 percent"} -{"level":"info","message":"test progress: 50 percent"} -{"level":"info","message":"test progress: 60 percent"} -{"level":"info","message":"test progress: 70 percent"} -{"level":"info","message":"test progress: 80 percent"} -{"level":"info","message":"test progress: 90 percent"} -{"level":"info","message":"test progress: 100 percent"} -` - require.Equal(t, expectedLogs, buf.String()) + expectedLogs := []string{ + `test progress 1/40 (2.5%)`, + `test progress 5/40 (12.5%)`, + `test progress 9/40 (22.5%)`, + `test progress 13/40 (32.5%)`, + `test progress 17/40 (42.5%)`, + `test progress 21/40 (52.5%)`, + `test progress 25/40 (62.5%)`, + `test progress 29/40 (72.5%)`, + `test progress 33/40 (82.5%)`, + `test progress 37/40 (92.5%)`, + `test progress 40/40 (100.0%)`, + } + + for _, log := range expectedLogs { + require.Contains(t, buf.String(), log, total) + } } func TestLogProgress1000(t *testing.T) { for total := 11; total < 1000; total++ { buf := bytes.NewBufferString("") lg := zerolog.New(buf) - logger := LogProgress("test", total, &lg) + logger := LogProgress("test", total, lg) for i := 0; i < total; i++ { logger(i) } - expectedLogs := `{"level":"info","message":"test progress: 0 percent"} -{"level":"info","message":"test progress: 10 percent"} -{"level":"info","message":"test progress: 20 percent"} -{"level":"info","message":"test progress: 30 percent"} -{"level":"info","message":"test progress: 40 percent"} -{"level":"info","message":"test progress: 50 percent"} -{"level":"info","message":"test progress: 60 percent"} -{"level":"info","message":"test progress: 70 percent"} -{"level":"info","message":"test progress: 80 percent"} -{"level":"info","message":"test progress: 90 percent"} -{"level":"info","message":"test progress: 100 percent"} -` - require.Equal(t, expectedLogs, buf.String(), total) + expectedLogs := []string{ + fmt.Sprintf(`test progress 1/%d`, total), + fmt.Sprintf(`test progress %d/%d (100.0%%)`, total, total), + } + + for _, log := range expectedLogs { + require.Contains(t, buf.String(), log, total) + } } }