Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a migration which detects and filters out unreferenced slabs #5653

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (

func newMigrations(
log zerolog.Logger,
dir string,
outputDir string,
runMigrations bool,
opts migrators.Options,
) []ledger.Migration {
Expand All @@ -382,10 +382,11 @@ func newMigrations(

log.Info().Msgf("initializing migrations")

rwf := reporters.NewReportFileWriterFactory(dir, log)
rwf := reporters.NewReportFileWriterFactory(outputDir, log)

namedMigrations := migrators.NewCadence1Migrations(
log,
outputDir,
rwf,
opts,
)
Expand Down
3 changes: 2 additions & 1 deletion cmd/util/ledger/migrations/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ type Options struct {

func NewCadence1Migrations(
log zerolog.Logger,
outputDir string,
rwf reporters.ReportWriterFactory,
opts Options,
) []NamedMigration {
Expand Down Expand Up @@ -420,7 +421,7 @@ func NewCadence1Migrations(
log,
opts.NWorker,
[]AccountBasedMigration{
NewFilterUnreferencedSlabsMigration(rwf),
NewFilterUnreferencedSlabsMigration(outputDir, rwf),
},
),
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/util/ledger/migrations/cadence_values_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestCadenceValuesMigration(t *testing.T) {

migrations := NewCadence1Migrations(
logger,
t.TempDir(),
rwf,
Options{
NWorker: nWorker,
Expand Down Expand Up @@ -698,6 +699,7 @@ func TestBootstrappedStateMigration(t *testing.T) {

migrations := NewCadence1Migrations(
logger,
t.TempDir(),
rwf,
Options{
NWorker: nWorker,
Expand Down Expand Up @@ -823,6 +825,7 @@ func TestProgramParsingError(t *testing.T) {

migrations := NewCadence1Migrations(
logger,
t.TempDir(),
rwf,
Options{
NWorker: nWorker,
Expand Down Expand Up @@ -948,6 +951,7 @@ func TestCoreContractUsage(t *testing.T) {

migrations := NewCadence1Migrations(
logger,
t.TempDir(),
rwf,
Options{
NWorker: nWorker,
Expand Down
76 changes: 69 additions & 7 deletions cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"errors"
"fmt"
"path"
"sync"
"time"

"github.com/onflow/atree"
"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -25,19 +29,26 @@ func StorageIDFromRegisterID(registerID flow.RegisterID) atree.StorageID {
}

type FilterUnreferencedSlabsMigration struct {
log zerolog.Logger
rw reporters.ReportWriter
log zerolog.Logger
rw reporters.ReportWriter
outputDir string
mutex sync.Mutex
filteredAccounts []common.Address
turbolent marked this conversation as resolved.
Show resolved Hide resolved
filteredPayloads []*ledger.Payload
turbolent marked this conversation as resolved.
Show resolved Hide resolved
payloadsFile string
}

var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{}

const filterUnreferencedSlabsName = "filter-unreferenced-slabs"

func NewFilterUnreferencedSlabsMigration(
outputDir string,
rwf reporters.ReportWriterFactory,
) *FilterUnreferencedSlabsMigration {
return &FilterUnreferencedSlabsMigration{
rw: rwf.ReportWriter(filterUnreferencedSlabsName),
outputDir: outputDir,
rw: rwf.ReportWriter(filterUnreferencedSlabsName),
}
}

Expand Down Expand Up @@ -136,25 +147,76 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount(
}

m.rw.Write(unreferencedSlabs{
Account: address,
Payloads: filteredPayloads,
Account: address,
PayloadCount: len(filteredPayloads),
})

m.mergeFilteredPayloads(
address,
filteredPayloads,
)

// Do NOT report the health check error here.
// The health check error is only reported if it is not due to unreferenced slabs.
// If it is due to unreferenced slabs, we filter them out and continue.

return newPayloads, nil
}

func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads(
address common.Address,
payloads []*ledger.Payload,
) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.filteredAccounts = append(m.filteredAccounts, address)
m.filteredPayloads = append(m.filteredPayloads, payloads...)
}

func (m *FilterUnreferencedSlabsMigration) Close() error {
// close the report writer so it flushes to file
m.rw.Close()

err := m.writeFilteredPayloads()
if err != nil {
return fmt.Errorf("failed to write filtered payloads to file: %w", err)
}

return nil
}

func (m *FilterUnreferencedSlabsMigration) writeFilteredPayloads() error {

m.payloadsFile = path.Join(
m.outputDir,
fmt.Sprintf("filtered_%d.payloads", int32(time.Now().Unix())),
)

writtenPayloadCount, err := util.CreatePayloadFile(
m.log,
m.payloadsFile,
m.filteredPayloads,
m.filteredAccounts,
true,
)
turbolent marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return fmt.Errorf("failed to write all filtered payloads to file: %w", err)
}

if writtenPayloadCount != len(m.filteredPayloads) {
return fmt.Errorf(
"failed to write all filtered payloads to file: expected %d, got %d",
len(m.filteredPayloads),
writtenPayloadCount,
)
}

return nil
}

type unreferencedSlabs struct {
Account common.Address `json:"account"`
Payloads []*ledger.Payload `json:"payloads"`
Account common.Address `json:"account"`
PayloadCount int `json:"payload_count"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestFilterUnreferencedSlabs(t *testing.T) {
// Act

rwf := &testReportWriterFactory{}
migration := NewFilterUnreferencedSlabsMigration(rwf)
migration := NewFilterUnreferencedSlabsMigration(t.TempDir(), rwf)

log := zerolog.New(zerolog.NewTestWriter(t))

Expand All @@ -148,6 +148,9 @@ func TestFilterUnreferencedSlabs(t *testing.T) {
newPayloads, err := migration.MigrateAccount(ctx, testAddress, oldPayloads)
require.NoError(t, err)

err = migration.Close()
require.NoError(t, err)

// Assert

writer := rwf.reportWriters[filterUnreferencedSlabsName]
Expand Down Expand Up @@ -180,10 +183,23 @@ func TestFilterUnreferencedSlabs(t *testing.T) {
assert.Equal(t,
[]any{
unreferencedSlabs{
Account: testAddress,
Payloads: expectedFilteredPayloads,
Account: testAddress,
PayloadCount: len(expectedFilteredPayloads),
},
},
writer.entries,
)
assert.Equal(t,
expectedFilteredPayloads,
migration.filteredPayloads,
)
assert.Equal(t,
[]common.Address{testAddress},
migration.filteredAccounts,
)

readIsPartial, readFilteredPayloads, err := util.ReadPayloadFile(log, migration.payloadsFile)
require.NoError(t, err)
assert.True(t, readIsPartial)
assert.Equal(t, expectedFilteredPayloads, readFilteredPayloads)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,7 @@ func TestConcurrentContractUpdate(t *testing.T) {

migrations := NewCadence1Migrations(
logger,
t.TempDir(),
rwf,
Options{
NWorker: nWorker,
Expand Down
Loading