From 0a7fd5c92866fb977eecdb8dbf6137aac4bb181b Mon Sep 17 00:00:00 2001
From: Anatol <87016465+notanatol@users.noreply.github.com>
Date: Mon, 5 Feb 2024 07:14:11 -0600
Subject: [PATCH] fix: remove epoch migration code (#4563)
---
pkg/node/statestore.go | 35 +-
pkg/node/statestore_test.go | 81 ---
pkg/postage/batchstore/store_test.go | 2 +-
pkg/statestore/leveldb/export_test.go | 7 -
pkg/statestore/leveldb/leveldb.go | 49 +-
pkg/statestore/leveldb/leveldb_test.go | 21 -
pkg/statestore/leveldb/migration.go | 380 --------------
pkg/statestore/leveldb/migration_test.go | 335 ------------
pkg/statestore/mock/store.go | 7 -
.../storeadapter/storeadapter_test.go | 6 -
pkg/statestore/test/store.go | 2 +-
pkg/storer/epoch_migration.go | 493 ------------------
pkg/storer/epoch_migration_test.go | 332 ------------
pkg/storer/export_test.go | 9 -
pkg/storer/storer.go | 59 ---
15 files changed, 4 insertions(+), 1814 deletions(-)
delete mode 100644 pkg/node/statestore_test.go
delete mode 100644 pkg/statestore/leveldb/export_test.go
delete mode 100644 pkg/statestore/leveldb/migration.go
delete mode 100644 pkg/statestore/leveldb/migration_test.go
delete mode 100644 pkg/storer/epoch_migration.go
delete mode 100644 pkg/storer/epoch_migration_test.go
diff --git a/pkg/node/statestore.go b/pkg/node/statestore.go
index 5fdef7dc418..9579e1487ef 100644
--- a/pkg/node/statestore.go
+++ b/pkg/node/statestore.go
@@ -11,7 +11,6 @@ import (
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/metrics"
- "github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/statestore/storeadapter"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
@@ -55,12 +54,7 @@ func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.Stat
if err != nil {
return nil, err
}
- // TODO: remove migration after it has been a few months after the localstoreV2 release
- err = migrateStamperData(stateStore, stamperStore)
- if err != nil {
- stamperStore.Close()
- return nil, fmt.Errorf("migrating stamper data: %w", err)
- }
+
return stamperStore, nil
}
@@ -105,30 +99,3 @@ func setOverlay(s storage.StateStorer, overlay swarm.Address, nonce []byte) erro
s.Put(noncedOverlayKey, overlay),
)
}
-
-func migrateStamperData(stateStore storage.StateStorer, stamperStore storage.Store) error {
- var keys []string
- err := stateStore.Iterate("postage", func(key, value []byte) (bool, error) {
- keys = append(keys, string(key))
- st := &postage.StampIssuer{}
- if err := st.UnmarshalBinary(value); err != nil {
- return false, err
- }
- if err := stamperStore.Put(&postage.StampIssuerItem{
- Issuer: st,
- }); err != nil {
- return false, err
- }
- return false, nil
- })
- if err != nil {
- return err
- }
-
- for _, key := range keys {
- if err = stateStore.Delete(key); err != nil {
- return err
- }
- }
- return nil
-}
diff --git a/pkg/node/statestore_test.go b/pkg/node/statestore_test.go
deleted file mode 100644
index 55feb9e0e8a..00000000000
--- a/pkg/node/statestore_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright 2023 The Swarm Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package node
-
-import (
- "crypto/rand"
- "fmt"
- "math/big"
- "testing"
- "time"
-
- "github.com/ethersphere/bee/pkg/log"
- "github.com/ethersphere/bee/pkg/postage"
- "github.com/ethersphere/bee/pkg/storage"
-)
-
-func TestInitStamperStore(t *testing.T) {
- dataDir := t.TempDir()
- stateStore, _, err := InitStateStore(log.Noop, dataDir, 100_000)
- if err != nil {
- t.Fatal(err)
- }
-
- ids := make(map[string]int)
-
- // add 10 stamps to the state store
- for i := 0; i < 10; i++ {
- bID := make([]byte, 32)
- _, err = rand.Read(bID)
- if err != nil {
- t.Fatal(err)
- }
- si := postage.NewStampIssuer("", "", bID, big.NewInt(3), 11, 10, 1000, true)
- err = stateStore.Put(fmt.Sprintf("postage%s", string(si.ID())), si)
- if err != nil {
- t.Fatal(err)
- }
- ids[string(si.ID())] = 0
- }
-
- stamperStore, err := InitStamperStore(log.Noop, dataDir, stateStore)
- if err != nil {
- t.Fatal("init stamper store should migrate stamps from state store", err)
- }
-
- err = stamperStore.Iterate(
- storage.Query{
- Factory: func() storage.Item { return new(postage.StampIssuerItem) },
- }, func(result storage.Result) (bool, error) {
- issuer := result.Entry.(*postage.StampIssuerItem).Issuer
- ids[string(issuer.ID())]++
- return false, nil
- })
- if err != nil {
- t.Fatal(err)
- }
-
- var got int
- for _, v := range ids {
- if v > 0 {
- got++
- }
- }
- if got != 10 {
- t.Fatalf("want %d stamps. got %d", 10, got)
- }
-
- t.Cleanup(func() {
- err = stateStore.Close()
- if err != nil {
- t.Fatal(err)
- }
- err = stamperStore.Close()
- if err != nil {
- t.Fatal(err)
- }
- time.Sleep(1 * time.Second)
- })
-}
diff --git a/pkg/postage/batchstore/store_test.go b/pkg/postage/batchstore/store_test.go
index 2c13dd2054b..43e24014d33 100644
--- a/pkg/postage/batchstore/store_test.go
+++ b/pkg/postage/batchstore/store_test.go
@@ -222,7 +222,7 @@ func TestBatchStore_Reset(t *testing.T) {
// we expect one key in the statestore since the schema name
// will always be there.
- if c != 1 {
+ if c != 0 {
t.Fatalf("expected only one key in statestore, got %d", c)
}
}
diff --git a/pkg/statestore/leveldb/export_test.go b/pkg/statestore/leveldb/export_test.go
deleted file mode 100644
index 75486dc5af2..00000000000
--- a/pkg/statestore/leveldb/export_test.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package leveldb
-
-var DbSchemaCurrent = dbSchemaCurrent
-
-func (s *Store) GetSchemaName() (string, error) {
- return s.getSchemaName()
-}
diff --git a/pkg/statestore/leveldb/leveldb.go b/pkg/statestore/leveldb/leveldb.go
index 56f2f9bc8c3..65d2cc471ac 100644
--- a/pkg/statestore/leveldb/leveldb.go
+++ b/pkg/statestore/leveldb/leveldb.go
@@ -25,8 +25,7 @@ import (
const loggerName = "leveldb"
var (
- _ storage.StateStorer = (*Store)(nil)
- _ storage.StateStorerCleaner = (*Store)(nil)
+ _ storage.StateStorer = (*Store)(nil)
)
// Store uses LevelDB to store values.
@@ -46,10 +45,6 @@ func NewInMemoryStateStore(l log.Logger) (*Store, error) {
logger: l.WithName(loggerName).Register(),
}
- if err := migrate(s); err != nil {
- return nil, err
- }
-
return s, nil
}
@@ -76,36 +71,9 @@ func NewStateStore(path string, l log.Logger) (*Store, error) {
logger: l,
}
- if err := migrate(s); err != nil {
- return nil, err
- }
-
return s, nil
}
-func migrate(s *Store) error {
- sn, err := s.getSchemaName()
- if err != nil {
- if !errors.Is(err, storage.ErrNotFound) {
- _ = s.Close()
- return fmt.Errorf("get schema name: %w", err)
- }
- // new statestore - put schema key with current name
- if err := s.putSchemaName(dbSchemaCurrent); err != nil {
- _ = s.Close()
- return fmt.Errorf("put schema name: %w", err)
- }
- sn = dbSchemaCurrent
- }
-
- if err = s.migrate(sn); err != nil {
- _ = s.Close()
- return fmt.Errorf("migrate: %w", err)
- }
-
- return nil
-}
-
// Get retrieves a value of the requested key. If no results are found,
// storage.ErrNotFound will be returned.
func (s *Store) Get(key string, i interface{}) error {
@@ -161,21 +129,6 @@ func (s *Store) Iterate(prefix string, iterFunc storage.StateIterFunc) (err erro
return iter.Error()
}
-func (s *Store) getSchemaName() (string, error) {
- name, err := s.db.Get([]byte(dbSchemaKey), nil)
- if err != nil {
- if errors.Is(err, leveldb.ErrNotFound) {
- return "", storage.ErrNotFound
- }
- return "", err
- }
- return string(name), nil
-}
-
-func (s *Store) putSchemaName(val string) error {
- return s.db.Put([]byte(dbSchemaKey), []byte(val), nil)
-}
-
// Close releases the resources used by the store.
func (s *Store) Close() error {
return s.db.Close()
diff --git a/pkg/statestore/leveldb/leveldb_test.go b/pkg/statestore/leveldb/leveldb_test.go
index 790e1debc0b..2e18b9f6b61 100644
--- a/pkg/statestore/leveldb/leveldb_test.go
+++ b/pkg/statestore/leveldb/leveldb_test.go
@@ -43,24 +43,3 @@ func TestPersistentStateStore(t *testing.T) {
return store
})
}
-
-func TestGetSchemaName(t *testing.T) {
- dir := t.TempDir()
-
- store, err := leveldb.NewStateStore(dir, log.Noop)
- if err != nil {
- t.Fatal(err)
- }
- t.Cleanup(func() {
- if err := store.Close(); err != nil {
- t.Fatal(err)
- }
- })
- n, err := store.GetSchemaName() // expect current
- if err != nil {
- t.Fatal(err)
- }
- if n != leveldb.DbSchemaCurrent {
- t.Fatalf("wanted current db schema but got '%s'", n)
- }
-}
diff --git a/pkg/statestore/leveldb/migration.go b/pkg/statestore/leveldb/migration.go
deleted file mode 100644
index 9fb57e9e27e..00000000000
--- a/pkg/statestore/leveldb/migration.go
+++ /dev/null
@@ -1,380 +0,0 @@
-// nolint: goheader
-// Copyright 2019 The Swarm Authors
-// This file is part of the Swarm library.
-//
-// The Swarm library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The Swarm library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the Swarm library. If not, see .
-
-package leveldb
-
-import (
- "errors"
- "fmt"
- "strings"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethersphere/bee/pkg/storage"
-)
-
-var (
- errMissingCurrentSchema = errors.New("could not find current db schema")
- errMissingTargetSchema = errors.New("could not find target db schema")
-)
-
-const (
- dbSchemaKey = "statestore_schema"
- dbSchemaGrace = "grace"
- dbSchemaDrain = "drain"
- dbSchemaCleanInterval = "clean-interval"
- dbSchemaNoStamp = "no-stamp"
- dbSchemaFlushBlock = "flushblock"
- dbSchemaSwapAddr = "swapaddr"
- dBSchemaKademliaMetrics = "kademlia-metrics"
- dBSchemaBatchStore = "batchstore"
- dBSchemaBatchStoreV2 = "batchstoreV2"
- dBSchemaBatchStoreV3 = "batchstoreV3"
- dBSchemaBatchStoreV4 = "batchstoreV4"
- dBSchemaInterval = "interval"
- dBSchemaClearAddressBook = "address-book"
- dBSResetInterval = "interval-reset"
- dBSchemaBatchStoreV5 = "batchstoreV5"
-)
-
-var (
- dbSchemaCurrent = dBSchemaBatchStoreV5
-)
-
-type migration struct {
- name string // name of the schema
- fn func(s *Store) error // the migration function that needs to be performed in order to get to the current schema name
-}
-
-// schemaMigrations contains an ordered list of the database schemes, that is
-// in order to run data migrations in the correct sequence
-var schemaMigrations = []migration{
- {name: dbSchemaGrace, fn: func(s *Store) error { return nil }},
- {name: dbSchemaDrain, fn: migrateGrace},
- {name: dbSchemaCleanInterval, fn: migrateGrace},
- {name: dbSchemaNoStamp, fn: migrateStamp},
- {name: dbSchemaFlushBlock, fn: migrateFB},
- {name: dbSchemaSwapAddr, fn: migrateSwap},
- {name: dBSchemaKademliaMetrics, fn: migrateKademliaMetrics},
- {name: dBSchemaBatchStore, fn: migrateBatchstore},
- {name: dBSchemaBatchStoreV2, fn: migrateBatchstoreV2},
- {name: dBSchemaBatchStoreV3, fn: migrateBatchstore},
- {name: dBSchemaBatchStoreV4, fn: migrateBatchstore},
- {name: dBSchemaInterval, fn: noOpMigration},
- {name: dBSchemaClearAddressBook, fn: clearAddressBook},
- {name: dBSResetInterval, fn: clearIntervals},
- {name: dBSchemaBatchStoreV5, fn: migrateBatchstore},
-}
-
-func migrateFB(s *Store) error {
- collectedKeys, err := collectKeys(s, "blocklist-")
- if err != nil {
- return err
- }
- return deleteKeys(s, collectedKeys)
-}
-
-func migrateBatchstoreV2(s *Store) error {
- for _, pfx := range []string{"batchstore_", "verified_overlay_"} {
- collectedKeys, err := collectKeys(s, pfx)
- if err != nil {
- return err
- }
- if err := deleteKeys(s, collectedKeys); err != nil {
- return err
- }
- }
- return nil
-}
-
-func noOpMigration(s *Store) error {
- return nil
-}
-
-func clearAddressBook(s *Store) error {
- collectedKeys, err := collectKeys(s, "addressbook_entry_")
- if err != nil {
- return err
- }
- return deleteKeys(s, collectedKeys)
-}
-
-func clearIntervals(s *Store) error {
- collectedKeys, err := collectKeys(s, "sync|")
- if err != nil {
- return err
- }
- return deleteKeys(s, collectedKeys)
-}
-
-func migrateBatchstore(s *Store) error {
- collectedKeys, err := collectKeys(s, "batchstore_")
- if err != nil {
- return err
- }
- return deleteKeys(s, collectedKeys)
-}
-
-func migrateStamp(s *Store) error {
- for _, pfx := range []string{"postage", "batchstore", "addressbook_entry_"} {
- collectedKeys, err := collectKeys(s, pfx)
- if err != nil {
- return err
- }
- if err := deleteKeys(s, collectedKeys); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func migrateGrace(s *Store) error {
- var collectedKeys []string
- mgfn := func(k, v []byte) (bool, error) {
- stk := string(k)
- if strings.Contains(stk, "|") &&
- len(k) > 32 &&
- !strings.Contains(stk, "swap") &&
- !strings.Contains(stk, "peer") {
- s.logger.Debug("found key designated to deletion", "key", k)
- collectedKeys = append(collectedKeys, stk)
- }
-
- return false, nil
- }
-
- _ = s.Iterate("", mgfn)
-
- for _, v := range collectedKeys {
- err := s.Delete(v)
- if err != nil {
- s.logger.Debug("error deleting key", "key", v)
- continue
- }
- s.logger.Debug("deleted key", "key", v)
- }
- s.logger.Debug("keys deleted", "count", len(collectedKeys))
-
- return nil
-}
-
-func migrateSwap(s *Store) error {
- migratePrefix := func(prefix string) error {
- keys, err := collectKeys(s, prefix)
- if err != nil {
- return err
- }
-
- for _, key := range keys {
- split := strings.SplitAfter(key, prefix)
- if len(split) != 2 {
- return errors.New("no peer in key")
- }
-
- if len(split[1]) != 20 {
- s.logger.Debug("skipping already migrated key", "key", key)
- continue
- }
-
- addr := common.BytesToAddress([]byte(split[1]))
- fixed := fmt.Sprintf("%s%x", prefix, addr)
-
- var val string
- if err = s.Get(fixed, &val); err == nil {
- s.logger.Debug("skipping duplicate key", "key", key)
- if err = s.Delete(key); err != nil {
- return err
- }
- continue
- }
- if !errors.Is(err, storage.ErrNotFound) {
- return err
- }
-
- if err = s.Get(key, &val); err != nil {
- return err
- }
-
- if err = s.Put(fixed, val); err != nil {
- return err
- }
-
- if err = s.Delete(key); err != nil {
- return err
- }
- }
- return nil
- }
-
- if err := migratePrefix("swap_peer_chequebook_"); err != nil {
- return err
- }
-
- return migratePrefix("swap_beneficiary_peer_")
-}
-
-// migrateKademliaMetrics removes all old existing
-// kademlia metrics database content.
-func migrateKademliaMetrics(s *Store) error {
- for _, prefix := range []string{"peer-last-seen-timestamp", "peer-total-connection-duration"} {
- start := time.Now()
- s.logger.Debug("removing kademlia metrics", "metrics_prefix", prefix)
-
- keys, err := collectKeys(s, prefix)
- if err != nil {
- return err
- }
-
- if err := deleteKeys(s, keys); err != nil {
- return err
- }
-
- s.logger.Debug("removing kademlia metrics done", "metrics_prefix", prefix, "elapsed", time.Since(start))
- }
- return nil
-}
-
-func (s *Store) migrate(schemaName string) error {
- migrations, err := getMigrations(schemaName, dbSchemaCurrent, schemaMigrations, s)
- if err != nil {
- return fmt.Errorf("error getting migrations for current schema (%s): %w", schemaName, err)
- }
-
- // no migrations to run
- if migrations == nil {
- return nil
- }
-
- s.logger.Debug("statestore: need to run data migrations to schema", "migration_count", len(migrations), "schema_name", schemaName)
- for i := 0; i < len(migrations); i++ {
- err := migrations[i].fn(s)
- if err != nil {
- return err
- }
- err = s.putSchemaName(migrations[i].name) // put the name of the current schema
- if err != nil {
- return err
- }
- schemaName, err = s.getSchemaName()
- if err != nil {
- return err
- }
- s.logger.Debug("statestore: successfully ran migration", "migration_number", i, "schema_name", schemaName)
- }
- return nil
-}
-
-// getMigrations returns an ordered list of migrations that need be executed
-// with no errors in order to bring the statestore to the most up-to-date
-// schema definition
-func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration, store *Store) (migrations []migration, err error) {
- foundCurrent := false
- foundTarget := false
- if currentSchema == dbSchemaCurrent {
- return nil, nil
- }
- for i, v := range allSchemeMigrations {
- switch v.name {
- case currentSchema:
- if foundCurrent {
- return nil, errors.New("found schema name for the second time when looking for migrations")
- }
- foundCurrent = true
- store.logger.Debug("statestore migration: migrating schema", "current_schema_name", currentSchema, "next_schema_name", dbSchemaCurrent, "total_migration_count", len(allSchemeMigrations)-i)
- continue // current schema migration should not be executed (already has been when schema was migrated to)
- case targetSchema:
- foundTarget = true
- }
- if foundCurrent {
- migrations = append(migrations, v)
- }
- }
- if !foundCurrent {
- return nil, errMissingCurrentSchema
- }
- if !foundTarget {
- return nil, errMissingTargetSchema
- }
- return migrations, nil
-}
-
-func collectKeysExcept(s *Store, prefixesToPreserve []string) (keys []string, err error) {
- if err := s.Iterate("", func(k, v []byte) (bool, error) {
- stk := string(k)
- has := false
- for _, v := range prefixesToPreserve {
- if strings.HasPrefix(stk, v) {
- has = true
- break
- }
- }
- if !has {
- keys = append(keys, stk)
- }
- return false, nil
- }); err != nil {
- return nil, err
- }
- return keys, nil
-}
-
-func collectKeys(s *Store, prefix string) (keys []string, err error) {
- if err := s.Iterate(prefix, func(k, v []byte) (bool, error) {
- stk := string(k)
- if strings.HasPrefix(stk, prefix) {
- keys = append(keys, stk)
- }
- return false, nil
- }); err != nil {
- return nil, err
- }
- return keys, nil
-}
-
-func deleteKeys(s *Store, keys []string) error {
- for _, v := range keys {
- err := s.Delete(v)
- if err != nil {
- return fmt.Errorf("error deleting key %s: %w", v, err)
- }
- }
- s.logger.Debug("keys deleted", "count", len(keys))
- return nil
-}
-
-// Nuke the store so that only the bare essential entries are
-// left. Careful!
-func (s *Store) Nuke() error {
- var (
- prefixesToPreserve = []string{
- "non-mineable-overlay",
- "overlayV2_nonce",
- "pseudosettle",
- "accounting",
- "swap",
- }
- keys []string
- err error
- )
-
- keys, err = collectKeysExcept(s, prefixesToPreserve)
- if err != nil {
- return fmt.Errorf("collect keys except: %w", err)
- }
- return deleteKeys(s, keys)
-}
diff --git a/pkg/statestore/leveldb/migration_test.go b/pkg/statestore/leveldb/migration_test.go
deleted file mode 100644
index 8ae436f0fe3..00000000000
--- a/pkg/statestore/leveldb/migration_test.go
+++ /dev/null
@@ -1,335 +0,0 @@
-// nolint: goheader
-// Copyright 2019 The Swarm Authors
-// This file is part of the Swarm library.
-//
-// The Swarm library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The Swarm library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the Swarm library. If not, see .
-
-package leveldb
-
-import (
- "errors"
- "fmt"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethersphere/bee/pkg/log"
- "github.com/ethersphere/bee/pkg/storage"
-)
-
-func TestOneMigration(t *testing.T) {
- defer func(v []migration, s string) {
- schemaMigrations = v
- dbSchemaCurrent = s
- }(schemaMigrations, dbSchemaCurrent)
-
- dbSchemaCode := "code"
- dbSchemaCurrent = dbSchemaCode
- dbSchemaNext := "dbSchemaNext"
-
- ran := false
- shouldNotRun := false
- schemaMigrations = []migration{
- {name: dbSchemaCode, fn: func(db *Store) error {
- shouldNotRun = true // this should not be executed
- return nil
- }},
- {name: dbSchemaNext, fn: func(db *Store) error {
- ran = true
- return nil
- }},
- }
-
- dir := t.TempDir()
- logger := log.Noop
-
- // start the fresh statestore with the sanctuary schema name
- db, err := NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- err = db.Close()
- if err != nil {
- t.Fatal(err)
- }
-
- dbSchemaCurrent = dbSchemaNext
-
- // start the existing statestore and expect the migration to run
- db, err = NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- schemaName, err := db.GetSchemaName()
- if err != nil {
- t.Fatal(err)
- }
-
- if schemaName != dbSchemaNext {
- t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, dbSchemaNext)
- }
-
- if !ran {
- t.Error("expected migration did not run")
- }
-
- if shouldNotRun {
- t.Error("migration ran but shouldnt have")
- }
-
- err = db.Close()
- if err != nil {
- t.Error(err)
- }
-}
-
-func TestManyMigrations(t *testing.T) {
- defer func(v []migration, s string) {
- schemaMigrations = v
- dbSchemaCurrent = s
- }(schemaMigrations, dbSchemaCurrent)
-
- dbSchemaCode := "code"
- dbSchemaCurrent = dbSchemaCode
-
- shouldNotRun := false
- executionOrder := []int{-1, -1, -1, -1}
-
- schemaMigrations = []migration{
- {name: dbSchemaCode, fn: func(db *Store) error {
- shouldNotRun = true // this should not be executed
- return nil
- }},
- {name: "keju", fn: func(db *Store) error {
- executionOrder[0] = 0
- return nil
- }},
- {name: "coconut", fn: func(db *Store) error {
- executionOrder[1] = 1
- return nil
- }},
- {name: "mango", fn: func(db *Store) error {
- executionOrder[2] = 2
- return nil
- }},
- {name: "salvation", fn: func(db *Store) error {
- executionOrder[3] = 3
- return nil
- }},
- }
-
- dir := t.TempDir()
- logger := log.Noop
-
- // start the fresh statestore with the sanctuary schema name
- db, err := NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- err = db.Close()
- if err != nil {
- t.Fatal(err)
- }
-
- dbSchemaCurrent = "salvation"
-
- // start the existing statestore and expect the migration to run
- db, err = NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- schemaName, err := db.GetSchemaName()
- if err != nil {
- t.Fatal(err)
- }
-
- if schemaName != "salvation" {
- t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, "salvation")
- }
-
- if shouldNotRun {
- t.Error("migration ran but shouldnt have")
- }
-
- for i, v := range executionOrder {
- if i != v && i != len(executionOrder)-1 {
- t.Errorf("migration did not run in sequence, slot %d value %d", i, v)
- }
- }
-
- err = db.Close()
- if err != nil {
- t.Error(err)
- }
-}
-
-// TestMigrationErrorFrom checks that local store boot should fail when the schema we're migrating from cannot be found
-func TestMigrationErrorFrom(t *testing.T) {
- defer func(v []migration, s string) {
- schemaMigrations = v
- dbSchemaCurrent = s
- }(schemaMigrations, dbSchemaCurrent)
-
- dbSchemaCurrent = "koo-koo-schema"
-
- shouldNotRun := false
- schemaMigrations = []migration{
- {name: "langur", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- {name: "coconut", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- {name: "chutney", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- }
- dir := t.TempDir()
- logger := log.Noop
-
- // start the fresh statestore with the sanctuary schema name
- db, err := NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- err = db.Close()
- if err != nil {
- t.Fatal(err)
- }
-
- dbSchemaCurrent = "foo"
-
- // start the existing statestore and expect the migration to run
- _, err = NewStateStore(dir, logger)
- if !errors.Is(err, errMissingCurrentSchema) {
- t.Fatalf("expected errCannotFindSchema but got %v", err)
- }
-
- if shouldNotRun {
- t.Error("migration ran but shouldnt have")
- }
-}
-
-// TestMigrationErrorTo checks that local store boot should fail when the schema we're migrating to cannot be found
-func TestMigrationErrorTo(t *testing.T) {
- defer func(v []migration, s string) {
- schemaMigrations = v
- dbSchemaCurrent = s
- }(schemaMigrations, dbSchemaCurrent)
-
- dbSchemaCurrent = "langur"
-
- shouldNotRun := false
- schemaMigrations = []migration{
- {name: "langur", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- {name: "coconut", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- {name: "chutney", fn: func(db *Store) error {
- shouldNotRun = true
- return nil
- }},
- }
- dir := t.TempDir()
- logger := log.Noop
-
- // start the fresh statestore with the sanctuary schema name
- db, err := NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
-
- err = db.Close()
- if err != nil {
- t.Fatal(err)
- }
-
- dbSchemaCurrent = "foo"
-
- // start the existing statestore and expect the migration to run
- _, err = NewStateStore(dir, logger)
- if !errors.Is(err, errMissingTargetSchema) {
- t.Fatalf("expected errMissingTargetSchema but got %v", err)
- }
-
- if shouldNotRun {
- t.Error("migration ran but shouldnt have")
- }
-}
-
-func TestMigrationSwap(t *testing.T) {
- dir := t.TempDir()
- logger := log.Noop
-
- // start the fresh statestore with the sanctuary schema name
- db, err := NewStateStore(dir, logger)
- if err != nil {
- t.Fatal(err)
- }
- defer db.Close()
-
- address := common.HexToAddress("0xabcd")
- storedAddress := common.HexToAddress("0xffff")
-
- legacyKey1 := fmt.Sprintf("swap_peer_chequebook_%s", address[:])
- legacyKey2 := fmt.Sprintf("swap_beneficiary_peer_%s", address[:])
-
- if err = db.Put(legacyKey1, storedAddress); err != nil {
- t.Fatal(err)
- }
-
- if err = db.Put(legacyKey2, storedAddress); err != nil {
- t.Fatal(err)
- }
-
- if err = migrateSwap(db); err != nil {
- t.Fatal(err)
- }
-
- var retrievedAddress common.Address
- if err = db.Get("swap_peer_chequebook_000000000000000000000000000000000000abcd", &retrievedAddress); err != nil {
- t.Fatal(err)
- }
-
- if retrievedAddress != storedAddress {
- t.Fatalf("got wrong address. wanted %x, got %x", storedAddress, retrievedAddress)
- }
-
- if err = db.Get("swap_beneficiary_peer_000000000000000000000000000000000000abcd", &retrievedAddress); err != nil {
- t.Fatal(err)
- }
-
- if retrievedAddress != storedAddress {
- t.Fatalf("got wrong address. wanted %x, got %x", storedAddress, retrievedAddress)
- }
-
- if err = db.Get(legacyKey1, &retrievedAddress); !errors.Is(err, storage.ErrNotFound) {
- t.Fatalf("legacyKey1 not deleted. got error %v", err)
- }
-
- if err = db.Get(legacyKey2, &retrievedAddress); !errors.Is(err, storage.ErrNotFound) {
- t.Fatalf("legacyKey2 not deleted. got error %v", err)
- }
-}
diff --git a/pkg/statestore/mock/store.go b/pkg/statestore/mock/store.go
index c1150260c2e..964c5c4ef61 100644
--- a/pkg/statestore/mock/store.go
+++ b/pkg/statestore/mock/store.go
@@ -7,7 +7,6 @@ package mock
import (
"encoding"
"encoding/json"
- "fmt"
"strings"
"sync"
@@ -16,8 +15,6 @@ import (
var _ storage.StateStorer = (*store)(nil)
-const mockSchemaNameKey = "schema_name"
-
type store struct {
store map[string][]byte
mtx sync.RWMutex
@@ -28,10 +25,6 @@ func NewStateStore() storage.StateStorer {
store: make(map[string][]byte),
}
- if err := s.Put(mockSchemaNameKey, "mock_schema"); err != nil {
- panic(fmt.Errorf("put schema name: %w", err))
- }
-
return s
}
diff --git a/pkg/statestore/storeadapter/storeadapter_test.go b/pkg/statestore/storeadapter/storeadapter_test.go
index 7364a297ed4..5065a5158da 100644
--- a/pkg/statestore/storeadapter/storeadapter_test.go
+++ b/pkg/statestore/storeadapter/storeadapter_test.go
@@ -30,12 +30,6 @@ func TestStateStoreAdapter(t *testing.T) {
}
})
- // The test requires the state store to have
- // a schema, otherwise the delete test fails.
- if err := store.Put("test_schema", "name"); err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
-
return store
})
diff --git a/pkg/statestore/test/store.go b/pkg/statestore/test/store.go
index 11c7acb9fe5..ee5050b8633 100644
--- a/pkg/statestore/test/store.go
+++ b/pkg/statestore/test/store.go
@@ -225,5 +225,5 @@ func testStoreIterator(t *testing.T, store storage.StateStorer, prefix string, s
func testEmpty(t *testing.T, store storage.StateStorer) {
t.Helper()
- testStoreIterator(t, store, "", 1) // 1 because of the schema entry.
+ testStoreIterator(t, store, "", 0)
}
diff --git a/pkg/storer/epoch_migration.go b/pkg/storer/epoch_migration.go
deleted file mode 100644
index f93ecafa486..00000000000
--- a/pkg/storer/epoch_migration.go
+++ /dev/null
@@ -1,493 +0,0 @@
-// Copyright 2023 The Swarm Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package storer
-
-import (
- "context"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
-
- "github.com/ethersphere/bee/pkg/log"
- "github.com/ethersphere/bee/pkg/postage"
- "github.com/ethersphere/bee/pkg/sharky"
- "github.com/ethersphere/bee/pkg/shed"
- "github.com/ethersphere/bee/pkg/storage"
- "github.com/ethersphere/bee/pkg/storer/internal"
- "github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
- pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning"
- "github.com/ethersphere/bee/pkg/swarm"
- "github.com/ethersphere/bee/pkg/traversal"
- "golang.org/x/sync/errgroup"
-)
-
-// epochKey implements storage.Item and is used to store the epoch in the
-// store. It is used to check if the epoch migration has already been
-// performed.
-type epochKey struct{}
-
-func (epochKey) Namespace() string { return "localstore" }
-
-func (epochKey) ID() string { return "epoch" }
-
-// this is a key-only item, so we don't need to marshal/unmarshal
-func (epochKey) Marshal() ([]byte, error) { return nil, nil }
-
-func (epochKey) Unmarshal([]byte) error { return nil }
-
-func (epochKey) Clone() storage.Item { return epochKey{} }
-
-func (epochKey) String() string { return "localstore-epoch" }
-
-var (
- _ internal.Storage = (*putOpStorage)(nil)
- _ chunkstore.Sharky = (*putOpStorage)(nil)
-)
-
-// putOpStorage implements the internal.Storage interface which is used by
-// the internal component stores to store chunks. It also implements the sharky interface
-// which uses the recovery mechanism to recover chunks without moving them.
-type putOpStorage struct {
- chunkstore.Sharky
-
- store storage.BatchedStore
- location sharky.Location
- recovery sharkyRecover
-}
-
-func (p *putOpStorage) IndexStore() storage.BatchedStore { return p.store }
-
-func (p *putOpStorage) ChunkStore() storage.ChunkStore {
- return chunkstore.New(p.store, p)
-}
-
-// Write implements the sharky.Store interface. It uses the sharky recovery mechanism
-// to recover chunks without moving them. The location returned is the same as the
-// one passed in. This is present in the old localstore indexes.
-func (p *putOpStorage) Write(_ context.Context, _ []byte) (sharky.Location, error) {
- return p.location, p.recovery.Add(p.location)
-}
-
-type reservePutter interface {
- Put(context.Context, internal.Storage, swarm.Chunk) (bool, error)
- AddSize(int)
- Size() int
-}
-
-type sharkyRecover interface {
- Add(sharky.Location) error
- Read(context.Context, sharky.Location, []byte) error
-}
-
-// epochMigration performs the initial migration if it hasnt been done already. It
-// reads the old indexes and writes them in the new format. It only migrates the
-// reserve and pinned chunks. It also creates the new epoch key in the store to
-// indicate that the migration has been performed. Due to a bug in the old localstore
-// pinned chunks are not always present in the pinned index. So we do a best-effort
-// migration of the pinning index. If the migration fails, the user can re-pin
-// the chunks using the stewardship endpoint if the stamps used to upload them are
-// still valid.
-func epochMigration(
- ctx context.Context,
- path string,
- stateStore storage.StateStorer,
- store storage.BatchedStore,
- reserve reservePutter,
- recovery sharkyRecover,
- logger log.Logger,
-) error {
- has, err := store.Has(epochKey{})
- if err != nil {
- return fmt.Errorf("has epoch key: %w", err)
- }
-
- if has {
- return nil
- }
-
- logger.Debug("started", "path", path, "start_time", time.Now())
-
- dbshed, err := shed.NewDB(path, nil)
- if err != nil {
- return fmt.Errorf("shed.NewDB: %w", err)
- }
-
- defer func() {
- if dbshed != nil {
- dbshed.Close()
- }
- }()
-
- pullIndex, retrievalDataIndex, err := initShedIndexes(dbshed, swarm.ZeroAddress)
- if err != nil {
- return fmt.Errorf("initShedIndexes: %w", err)
- }
-
- chunkCount, err := retrievalDataIndex.Count()
- if err != nil {
- return fmt.Errorf("retrievalDataIndex count: %w", err)
- }
-
- pullIdxCnt, _ := pullIndex.Count()
-
- logger.Debug("index counts", "retrieval index", chunkCount, "pull index", pullIdxCnt)
-
- e := &epochMigrator{
- stateStore: stateStore,
- store: store,
- recovery: recovery,
- reserve: reserve,
- pullIndex: pullIndex,
- retrievalDataIndex: retrievalDataIndex,
- logger: logger,
- }
-
- if e.reserve != nil && chunkCount > 0 {
- err = e.migrateReserve(ctx)
- if err != nil {
- return err
- }
- }
-
- if e.stateStore != nil && chunkCount > 0 {
- err = e.migratePinning(ctx)
- if err != nil {
- return err
- }
- }
-
- dbshed.Close()
- dbshed = nil
-
- matches, err := filepath.Glob(filepath.Join(path, "*"))
- if err != nil {
- return err
- }
-
- for _, m := range matches {
- if !strings.Contains(m, indexPath) && !strings.Contains(m, sharkyPath) {
- err = os.Remove(m)
- if err != nil {
- return err
- }
- }
- }
-
- return store.Put(epochKey{})
-}
-
-func initShedIndexes(dbshed *shed.DB, baseAddress swarm.Address) (pullIndex shed.Index, retrievalDataIndex shed.Index, err error) {
- // pull index allows history and live syncing per po bin
- pullIndex, err = dbshed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{
- EncodeKey: func(fields shed.Item) (key []byte, err error) {
- key = make([]byte, 9)
- key[0] = swarm.Proximity(baseAddress.Bytes(), fields.Address)
- binary.BigEndian.PutUint64(key[1:9], fields.BinID)
- return key, nil
- },
- DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.BinID = binary.BigEndian.Uint64(key[1:9])
- return e, nil
- },
- EncodeValue: func(fields shed.Item) (value []byte, err error) {
- value = make([]byte, 64) // 32 bytes address, 32 bytes batch id
- copy(value, fields.Address)
- copy(value[32:], fields.BatchID)
- return value, nil
- },
- DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.Address = value[:32]
- e.BatchID = value[32:64]
- return e, nil
- },
- })
- if err != nil {
- return shed.Index{}, shed.Index{}, err
- }
-
- // Index storing actual chunk address, data and bin id.
- headerSize := 16 + postage.StampSize
- retrievalDataIndex, err = dbshed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{
- EncodeKey: func(fields shed.Item) (key []byte, err error) {
- return fields.Address, nil
- },
- DecodeKey: func(key []byte) (e shed.Item, err error) {
- e.Address = key
- return e, nil
- },
- EncodeValue: func(fields shed.Item) (value []byte, err error) {
- b := make([]byte, headerSize)
- binary.BigEndian.PutUint64(b[:8], fields.BinID)
- binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
- stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
- if err != nil {
- return nil, err
- }
- copy(b[16:], stamp)
- value = append(b, fields.Location...)
- return value, nil
- },
- DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
- e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
- e.BinID = binary.BigEndian.Uint64(value[:8])
- stamp := new(postage.Stamp)
- if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
- return e, err
- }
- e.BatchID = stamp.BatchID()
- e.Index = stamp.Index()
- e.Timestamp = stamp.Timestamp()
- e.Sig = stamp.Sig()
- e.Location = value[headerSize:]
- return e, nil
- },
- })
- if err != nil {
- return shed.Index{}, shed.Index{}, err
- }
-
- return pullIndex, retrievalDataIndex, nil
-}
-
-// epochMigrator is a helper struct for migrating epoch data. It is used to house
-// the main logic of the migration so that it can be tested. Also it houses the
-// dependencies of the migration logic.
-type epochMigrator struct {
- stateStore storage.StateStorer
- store storage.BatchedStore
- recovery sharkyRecover
- reserve reservePutter
- pullIndex shed.Index
- retrievalDataIndex shed.Index
- logger log.Logger
-}
-
-func (e *epochMigrator) migrateReserve(ctx context.Context) error {
- type putOp struct {
- pIdx shed.Item
- chunk swarm.Chunk
- loc sharky.Location
- }
-
- e.logger.Debug("migrating reserve contents")
-
- opChan := make(chan putOp, 4)
- eg, egCtx := errgroup.WithContext(ctx)
-
- for i := 0; i < 4; i++ {
- eg.Go(func() error {
- for {
- select {
- case <-egCtx.Done():
- return egCtx.Err()
- case op, more := <-opChan:
- if !more {
- return nil
- }
- pStorage := &putOpStorage{
- store: e.store,
- location: op.loc,
- recovery: e.recovery,
- }
-
- switch newIdx, err := e.reserve.Put(egCtx, pStorage, op.chunk); {
- case err != nil:
- return err
- case newIdx:
- e.reserve.AddSize(1)
- }
- }
- }
- })
- }
-
- err := func() error {
- defer close(opChan)
-
- return e.pullIndex.Iterate(func(i shed.Item) (stop bool, err error) {
- addr := swarm.NewAddress(i.Address)
-
- item, err := e.retrievalDataIndex.Get(i)
- if err != nil {
- e.logger.Debug("retrieval data index read failed", "chunk_address", addr, "error", err)
- return false, nil //continue
- }
-
- l, err := sharky.LocationFromBinary(item.Location)
- if err != nil {
- e.logger.Debug("location from binary failed", "chunk_address", addr, "error", err)
- return false, err
- }
-
- chData := make([]byte, l.Length)
- err = e.recovery.Read(ctx, l, chData)
- if err != nil {
- e.logger.Debug("reading location failed", "chunk_address", addr, "error", err)
- return false, nil // continue
- }
-
- ch := swarm.NewChunk(addr, chData).
- WithStamp(postage.NewStamp(item.BatchID, item.Index, item.Timestamp, item.Sig))
-
- select {
- case <-egCtx.Done():
- return true, egCtx.Err()
- case opChan <- putOp{pIdx: i, chunk: ch, loc: l}:
- }
- return false, nil
- }, nil)
- }()
- if err != nil {
- return err
- }
-
- if err := eg.Wait(); err != nil {
- return err
- }
-
- e.logger.Debug("migrating reserve contents done", "reserve_size", e.reserve.Size())
-
- return nil
-}
-
-const pinStorePrefix = "root-pin"
-
-func (e *epochMigrator) migratePinning(ctx context.Context) error {
- pinChan := make(chan swarm.Address, 4)
- eg, egCtx := errgroup.WithContext(ctx)
-
- pStorage := &putOpStorage{
- store: e.store,
- recovery: e.recovery,
- }
- var mu sync.Mutex // used to protect pStorage.location
-
- traverser := traversal.New(
- storage.GetterFunc(func(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {
- i := shed.Item{
- Address: addr.Bytes(),
- }
- item, err := e.retrievalDataIndex.Get(i)
- if err != nil {
- return nil, err
- }
-
- l, err := sharky.LocationFromBinary(item.Location)
- if err != nil {
- return nil, err
- }
-
- chData := make([]byte, l.Length)
- err = e.recovery.Read(ctx, l, chData)
- if err != nil {
- return nil, err
- }
-
- return swarm.NewChunk(addr, chData), nil
- }),
- )
-
- e.logger.Debug("migrating pinning collections, if all the chunks in the collection" +
- " are not found locally, the collection will not be migrated. Users will have to" +
- " re-pin the content using the stewardship API. The migration will print out the failed" +
- " collections at the end.")
-
- for i := 0; i < 4; i++ {
- eg.Go(func() error {
- for {
- select {
- case <-egCtx.Done():
- return egCtx.Err()
- case addr, more := <-pinChan:
- if !more {
- return nil
- }
-
- pinningPutter, err := pinstore.NewCollection(pStorage)
- if err != nil {
- return err
- }
-
- traverserFn := func(chAddr swarm.Address) error {
- item, err := e.retrievalDataIndex.Get(shed.Item{Address: chAddr.Bytes()})
- if err != nil {
- return err
- }
-
- l, err := sharky.LocationFromBinary(item.Location)
- if err != nil {
- return err
- }
- ch := swarm.NewChunk(chAddr, nil)
-
- mu.Lock()
- pStorage.location = l
- err = pinningPutter.Put(egCtx, pStorage, pStorage.IndexStore(), ch)
- if err != nil {
- mu.Unlock()
- return err
- }
- mu.Unlock()
-
- return nil
- }
-
- err = func() error {
- if err := traverser.Traverse(egCtx, addr, traverserFn); err != nil {
- return err
- }
-
- if err := pinningPutter.Close(pStorage, pStorage.IndexStore(), addr); err != nil {
- return err
- }
- return nil
- }()
-
- _ = e.stateStore.Delete(fmt.Sprintf("%s-%s", pinStorePrefix, addr))
-
- // do not fail the entire migration if the collection is not migrated
- if err != nil {
- e.logger.Debug("pinning collection migration failed", "collection_root_address", addr, "error", err)
- } else {
- e.logger.Debug("pinning collection migration successful", "collection_root_address", addr)
- }
- }
- }
- })
- }
-
- err := func() error {
- defer close(pinChan)
-
- return e.stateStore.Iterate(pinStorePrefix, func(key, value []byte) (stop bool, err error) {
- var ref swarm.Address
- if err := json.Unmarshal(value, &ref); err != nil {
- return true, fmt.Errorf("pinning: unmarshal pin reference: %w", err)
- }
- select {
- case <-egCtx.Done():
- return true, egCtx.Err()
- case pinChan <- ref:
- }
- return false, nil
- })
- }()
- if err != nil {
- return err
- }
-
- if err := eg.Wait(); err != nil {
- return err
- }
-
- e.logger.Debug("migrating pinning collections done")
-
- return nil
-}
diff --git a/pkg/storer/epoch_migration_test.go b/pkg/storer/epoch_migration_test.go
deleted file mode 100644
index b51451466f2..00000000000
--- a/pkg/storer/epoch_migration_test.go
+++ /dev/null
@@ -1,332 +0,0 @@
-// Copyright 2023 The Swarm Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package storer_test
-
-import (
- "bytes"
- "context"
- "crypto/rand"
- "fmt"
- "io"
- "io/fs"
- "os"
- "path"
- "path/filepath"
- "strings"
- "sync"
- "testing"
-
- "github.com/ethersphere/bee/pkg/file/splitter"
- "github.com/ethersphere/bee/pkg/log"
- postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
- "github.com/ethersphere/bee/pkg/sharky"
- "github.com/ethersphere/bee/pkg/shed"
- mockstatestore "github.com/ethersphere/bee/pkg/statestore/mock"
- storage "github.com/ethersphere/bee/pkg/storage"
- "github.com/ethersphere/bee/pkg/storage/inmemstore"
- chunktest "github.com/ethersphere/bee/pkg/storage/testing"
- storer "github.com/ethersphere/bee/pkg/storer"
- "github.com/ethersphere/bee/pkg/storer/internal"
- pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning"
- "github.com/ethersphere/bee/pkg/swarm"
-)
-
-type dirFS struct {
- basedir string
-}
-
-func (d *dirFS) Open(path string) (fs.File, error) {
- return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644)
-}
-
-func createOldDataDir(t *testing.T, dataPath string, baseAddress swarm.Address, stateStore storage.StateStorer) {
- t.Helper()
-
- binIDs := map[uint8]int{}
-
- assignBinID := func(addr swarm.Address) int {
- po := swarm.Proximity(baseAddress.Bytes(), addr.Bytes())
- if _, ok := binIDs[po]; !ok {
- binIDs[po] = 1
- return 1
- }
- binIDs[po]++
- return binIDs[po]
- }
-
- err := os.Mkdir(filepath.Join(dataPath, "sharky"), 0777)
- if err != nil {
- t.Fatal(err)
- }
-
- sharkyStore, err := sharky.New(&dirFS{basedir: filepath.Join(dataPath, "sharky")}, 2, swarm.SocMaxChunkSize)
- if err != nil {
- t.Fatal(err)
- }
- defer sharkyStore.Close()
-
- shedDB, err := shed.NewDB(dataPath, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer shedDB.Close()
-
- pIdx, rIdx, err := storer.InitShedIndexes(shedDB, baseAddress)
- if err != nil {
- t.Fatal(err)
- }
-
- reserveChunks := chunktest.GenerateTestRandomChunks(10)
-
- for _, c := range reserveChunks {
- loc, err := sharkyStore.Write(context.Background(), c.Data())
- if err != nil {
- t.Fatal(err)
- }
-
- locBuf, err := loc.MarshalBinary()
- if err != nil {
- t.Fatal(err)
- }
-
- binID := assignBinID(c.Address())
-
- err = pIdx.Put(shed.Item{
- Address: c.Address().Bytes(),
- BinID: uint64(binID),
- BatchID: c.Stamp().BatchID(),
- })
- if err != nil {
- t.Fatal(err)
- }
-
- err = rIdx.Put(shed.Item{
- Address: c.Address().Bytes(),
- BinID: uint64(binID),
- BatchID: c.Stamp().BatchID(),
- Index: c.Stamp().Index(),
- Timestamp: c.Stamp().Timestamp(),
- Sig: c.Stamp().Sig(),
- Location: locBuf,
- })
-
- if err != nil {
- t.Fatal(err)
- }
- }
-
- // create a pinning collection
- writer := splitter.NewSimpleSplitter(
- storage.PutterFunc(
- func(ctx context.Context, chunk swarm.Chunk) error {
- c := chunk.WithStamp(postagetesting.MustNewStamp())
-
- loc, err := sharkyStore.Write(context.Background(), c.Data())
- if err != nil {
- return err
- }
-
- locBuf, err := loc.MarshalBinary()
- if err != nil {
- return err
- }
-
- return rIdx.Put(shed.Item{
- Address: c.Address().Bytes(),
- BatchID: c.Stamp().BatchID(),
- Index: c.Stamp().Index(),
- Timestamp: c.Stamp().Timestamp(),
- Sig: c.Stamp().Sig(),
- Location: locBuf,
- })
- },
- ),
- )
-
- randData := make([]byte, 4096*20)
- _, err = rand.Read(randData)
- if err != nil {
- t.Fatal(err)
- }
-
- root, err := writer.Split(context.Background(), io.NopCloser(bytes.NewBuffer(randData)), 4096*20, false)
- if err != nil {
- t.Fatal(err)
- }
-
- err = stateStore.Put(fmt.Sprintf("root-pin-%s", root.String()), root)
- if err != nil {
- t.Fatal(err)
- }
-}
-
-type testSharkyRecovery struct {
- *sharky.Recovery
- mtx sync.Mutex
- addCalls int
-}
-
-func (t *testSharkyRecovery) Add(loc sharky.Location) error {
- t.mtx.Lock()
- t.addCalls++
- t.mtx.Unlock()
- return t.Recovery.Add(loc)
-}
-
-type testReservePutter struct {
- mtx sync.Mutex
- size int
- calls int
-}
-
-func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) (bool, error) {
- t.mtx.Lock()
- t.calls++
- t.mtx.Unlock()
- return true, st.ChunkStore().Put(ctx, ch)
-}
-
-func (t *testReservePutter) AddSize(size int) {
- t.mtx.Lock()
- t.size += size
- t.mtx.Unlock()
-}
-
-func (t *testReservePutter) Size() int {
- t.mtx.Lock()
- defer t.mtx.Unlock()
- return t.size
-}
-
-// TestEpochMigration_FLAKY is flaky on windows.
-func TestEpochMigration_FLAKY(t *testing.T) {
- t.Parallel()
-
- var (
- dataPath = t.TempDir()
- baseAddress = swarm.RandAddress(t)
- stateStore = mockstatestore.NewStateStore()
- reserve = &testReservePutter{}
- logBytes = bytes.NewBuffer(nil)
- logger = log.NewLogger("test", log.WithSink(logBytes))
- indexStore = inmemstore.New()
- )
-
- createOldDataDir(t, dataPath, baseAddress, stateStore)
-
- r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize)
- if err != nil {
- t.Fatal(err)
- }
-
- sharkyRecovery := &testSharkyRecovery{Recovery: r}
-
- err = storer.EpochMigration(
- context.Background(),
- dataPath,
- stateStore,
- indexStore,
- reserve,
- sharkyRecovery,
- logger,
- )
- if err != nil {
- t.Fatal(err)
- }
-
- if !strings.Contains(logBytes.String(), "migrating pinning collections done") {
- t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
- }
-
- if !strings.Contains(logBytes.String(), "migrating reserve contents done") {
- t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
- }
-
- if sharkyRecovery.addCalls != 31 {
- t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls)
- }
-
- if reserve.calls != 10 {
- t.Fatalf("expected 10 reserve calls, got %d", reserve.calls)
- }
-
- if reserve.size != 10 {
- t.Fatalf("expected 10 reserve size, got %d", reserve.size)
- }
-
- pins, err := pinstore.Pins(indexStore)
- if err != nil {
- t.Fatal(err)
- }
-
- if len(pins) != 1 {
- t.Fatalf("expected 1 pin, got %d", len(pins))
- }
-
- if !strings.Contains(logBytes.String(), pins[0].String()) {
- t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String())
- }
-}
-
-func TestEpochMigrationLightNode(t *testing.T) {
- t.Parallel()
-
- var (
- dataPath = t.TempDir()
- baseAddress = swarm.RandAddress(t)
- stateStore = mockstatestore.NewStateStore()
- reserve storer.ReservePutter
- logBytes = bytes.NewBuffer(nil)
- logger = log.NewLogger("test", log.WithSink(logBytes))
- indexStore = inmemstore.New()
- )
-
- createOldDataDir(t, dataPath, baseAddress, stateStore)
-
- r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize)
- if err != nil {
- t.Fatal(err)
- }
-
- sharkyRecovery := &testSharkyRecovery{Recovery: r}
-
- err = storer.EpochMigration(
- context.Background(),
- dataPath,
- stateStore,
- indexStore,
- reserve,
- sharkyRecovery,
- logger,
- )
- if err != nil {
- t.Fatal(err)
- }
-
- if !strings.Contains(logBytes.String(), "migrating pinning collections done") {
- t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String())
- }
-
- if strings.Contains(logBytes.String(), "migrating reserve contents done") {
- t.Fatalf("expected log to not contain 'migrating reserve contents done', got %s", logBytes.String())
- }
-
- if sharkyRecovery.addCalls != 21 {
- t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls)
- }
-
- pins, err := pinstore.Pins(indexStore)
- if err != nil {
- t.Fatal(err)
- }
-
- if len(pins) != 1 {
- t.Fatalf("expected 1 pin, got %d", len(pins))
- }
-
- if !strings.ContainsAny(logBytes.String(), pins[0].String()) {
- t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String())
- }
-}
diff --git a/pkg/storer/export_test.go b/pkg/storer/export_test.go
index 7bdc2198e10..0613e467831 100644
--- a/pkg/storer/export_test.go
+++ b/pkg/storer/export_test.go
@@ -12,15 +12,6 @@ import (
"github.com/ethersphere/bee/pkg/storer/internal/reserve"
)
-var (
- InitShedIndexes = initShedIndexes
- EpochMigration = epochMigration
-)
-
-type (
- ReservePutter = reservePutter
-)
-
func (db *DB) Reserve() *reserve.Reserve {
return db.reserve
}
diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go
index e7d973ea0b7..3ebb3a025d7 100644
--- a/pkg/storer/storer.go
+++ b/pkg/storer/storer.go
@@ -382,57 +382,6 @@ func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*
return c, commit()
}
-type noopRadiusSetter struct{}
-
-func (noopRadiusSetter) SetStorageRadius(uint8) {}
-
-func performEpochMigration(ctx context.Context, basePath string, opts *Options) (retErr error) {
- store, err := initStore(basePath, opts)
- if err != nil {
- return err
- }
- defer store.Close()
-
- sharkyBasePath := path.Join(basePath, sharkyPath)
- var sharkyRecover *sharky.Recovery
- // if this is a fresh node then perform an empty epoch migration
- if _, err := os.Stat(sharkyBasePath); err == nil {
- sharkyRecover, err = sharky.NewRecovery(sharkyBasePath, sharkyNoOfShards, swarm.SocMaxChunkSize)
- if err != nil {
- return err
- }
- defer sharkyRecover.Close()
- }
-
- logger := opts.Logger.WithName("epochmigration").Register()
-
- var rs reservePutter
-
- if opts.ReserveCapacity > 0 {
- rs, err = reserve.New(
- opts.Address,
- store,
- opts.ReserveCapacity,
- noopRadiusSetter{},
- logger,
- func(_ context.Context, _ internal.Storage, _ ...swarm.Address) error {
- return nil
- },
- )
- if err != nil {
- return err
- }
- }
-
- defer func() {
- if sharkyRecover != nil {
- retErr = errors.Join(retErr, sharkyRecover.Save())
- }
- }()
-
- return epochMigration(ctx, basePath, opts.StateStore, store, rs, sharkyRecover, logger)
-}
-
const lockKeyNewSession string = "new_session"
// Options provides a container to configure different things in the storer.
@@ -548,14 +497,6 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
} else {
- // only perform migration if not done already
- if _, err := os.Stat(path.Join(dirPath, indexPath)); err != nil {
- err = performEpochMigration(ctx, dirPath, opts)
- if err != nil {
- return nil, err
- }
- }
-
repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
if err != nil {
return nil, err