diff --git a/store/migration/manager.go b/store/migration/manager.go new file mode 100644 index 000000000000..48537664a3f4 --- /dev/null +++ b/store/migration/manager.go @@ -0,0 +1,61 @@ +package migration + +import ( + "golang.org/x/sync/errgroup" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" +) + +const ( + // defaultChannelBufferSize is the default buffer size for the migration stream. + defaultChannelBufferSize = 1024 + // defaultStorageBufferSize is the default buffer size for the storage snapshotter. + defaultStorageBufferSize = 1024 +) + +// Manager manages the migration of the whole state from store/v1 to store/v2. +type Manager struct { + logger log.Logger + snapshotsManager *snapshots.Manager + + storageSnapshotter snapshots.StorageSnapshotter + commitSnapshotter snapshots.CommitSnapshotter +} + +// NewManager returns a new Manager. +func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager { + return &Manager{ + logger: logger, + snapshotsManager: sm, + storageSnapshotter: ss, + commitSnapshotter: cs, + } +} + +// Migrate migrates the whole state at the given height to the new store/v2. +func (m *Manager) Migrate(height uint64) error { + // create the migration stream and snapshot, + // which acts as protoio.Reader and snapshots.WriteCloser. + ms := NewMigrationStream(defaultChannelBufferSize) + + if err := m.snapshotsManager.CreateMigration(height, ms); err != nil { + return err + } + + // restore the snapshot + chStorage := make(chan *store.KVPair, defaultStorageBufferSize) + + eg := new(errgroup.Group) + eg.Go(func() error { + return m.storageSnapshotter.Restore(height, chStorage) + }) + eg.Go(func() error { + defer close(chStorage) + _, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage) + return err + }) + + return eg.Wait() +} diff --git a/store/migration/manager_test.go b/store/migration/manager_test.go new file mode 100644 index 000000000000..5cda14a2e3f6 --- /dev/null +++ b/store/migration/manager_test.go @@ -0,0 +1,105 @@ +package migration + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" + "cosmossdk.io/store/v2/storage/pebbledb" +) + +var storeKeys = []string{"store1", "store2"} + +func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { + t.Helper() + + db := dbm.NewMemDB() + multiTrees := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(db, []byte(storeKey)) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig()) + } + + commitStore, err := commitment.NewCommitStore(multiTrees, db, log.NewNopLogger()) + require.NoError(t, err) + + snapshotsStore, err := snapshots.NewStore(db, t.TempDir()) + require.NoError(t, err) + + snapshotsManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, log.NewNopLogger()) + + storageDB, err := pebbledb.New(t.TempDir()) + require.NoError(t, err) + newStorageStore := storage.NewStorageStore(storageDB) // for store/v2 + + db1 := dbm.NewMemDB() + multiTrees1 := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(db1, []byte(storeKey)) + multiTrees1[storeKey] = iavl.NewIavlTree(prefixDB, log.NewNopLogger(), iavl.DefaultConfig()) + } + + newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, log.NewNopLogger()) // for store/v2 + require.NoError(t, err) + + return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore +} + +func TestMigrateState(t *testing.T) { + m, orgCommitStore := setupMigrationManager(t) + + // apply changeset + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + require.NoError(t, orgCommitStore.WriteBatch(cs)) + _, err := orgCommitStore.Commit(version) + require.NoError(t, err) + } + + err := m.Migrate(toVersion - 1) + require.NoError(t, err) + + // check the migrated state + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } + // check the latest state + val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1")) + require.NoError(t, err) + require.Nil(t, val) + val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0")) + require.NoError(t, err) + require.Nil(t, val) + + // check the storage + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } +} diff --git a/store/migration/stream.go b/store/migration/stream.go new file mode 100644 index 000000000000..aead5df348f3 --- /dev/null +++ b/store/migration/stream.go @@ -0,0 +1,79 @@ +package migration + +import ( + "fmt" + "io" + "sync/atomic" + + protoio "github.com/cosmos/gogoproto/io" + "github.com/cosmos/gogoproto/proto" + + "cosmossdk.io/store/v2/snapshots" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" +) + +var ( + _ snapshots.WriteCloser = (*MigrationStream)(nil) + _ protoio.ReadCloser = (*MigrationStream)(nil) +) + +// MigrationStream is a stream for migrating the whole IAVL state as a snapshot. +// It's used to sync the whole state from the store/v1 to store/v2. +// The main idea is to use the same snapshotter interface without writing to disk. +type MigrationStream struct { + chBuffer chan proto.Message + err atomic.Value // atomic error +} + +// NewMigrationStream returns a new MigrationStream. +func NewMigrationStream(chBufferSize int) *MigrationStream { + return &MigrationStream{ + chBuffer: make(chan proto.Message, chBufferSize), + } +} + +// WriteMsg implements protoio.Write interface. +func (ms *MigrationStream) WriteMsg(msg proto.Message) error { + ms.chBuffer <- msg + return nil +} + +// CloseWithError implements snapshots.WriteCloser interface. +func (ms *MigrationStream) CloseWithError(err error) { + ms.err.Store(err) + close(ms.chBuffer) +} + +// ReadMsg implements the protoio.Read interface. +// +// NOTE: It we follow the pattern of snapshot.Restore, however, the migration is done in memory. +// It doesn't require any deserialization -- just passing the pointer to the . +func (ms *MigrationStream) ReadMsg(msg proto.Message) error { + // msg should be a pointer to the same type as the one written to the stream + snapshotsItem, ok := msg.(*snapshotstypes.SnapshotItem) + if !ok { + return fmt.Errorf("unexpected message type: %T", msg) + } + + // It doesn't require any deserialization, just a type assertion. + item := <-ms.chBuffer + if item == nil { + return io.EOF + } + + *snapshotsItem = *(item.(*snapshotstypes.SnapshotItem)) + + // check if there is an error from the writer. + err := ms.err.Load() + if err != nil { + return err.(error) + } + + return nil +} + +// Close implements io.Closer interface. +func (ms *MigrationStream) Close() error { + close(ms.chBuffer) + return nil +} diff --git a/store/snapshots/manager.go b/store/snapshots/manager.go index 1c2d4ec65a31..ad29da179671 100644 --- a/store/snapshots/manager.go +++ b/store/snapshots/manager.go @@ -233,6 +233,30 @@ func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) { } } +// CreateMigration creates a migration snapshot and writes it to the given writer. +// It is used to migrate the state from the original store to the store/v2. +func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error { + if m == nil { + return errorsmod.Wrap(store.ErrLogic, "Snapshot Manager is nil") + } + + err := m.begin(opSnapshot) + if err != nil { + return err + } + defer m.end() + + go func() { + if err := m.commitSnapshotter.Snapshot(height, protoWriter); err != nil { + protoWriter.CloseWithError(err) + return + } + _ = protoWriter.Close() // always return nil + }() + + return nil +} + // List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations. func (m *Manager) List() ([]*types.Snapshot, error) { return m.store.List() diff --git a/store/snapshots/stream.go b/store/snapshots/stream.go index e010f9224468..4662d138b233 100644 --- a/store/snapshots/stream.go +++ b/store/snapshots/stream.go @@ -19,6 +19,13 @@ const ( snapshotCompressionLevel = 7 ) +type WriteCloser interface { + protoio.WriteCloser + + // CloseWithError closes the writer and sends an error to the reader. + CloseWithError(err error) +} + // StreamWriter set up a stream pipeline to serialize snapshot nodes: // Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser type StreamWriter struct {