Skip to content

Commit

Permalink
feat: add logs to the txstore and txchunkstore recovery method (#4387)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrekucci authored Oct 10, 2023
1 parent 9d14736 commit 414fc3b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/storage/leveldbstore/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package leveldbstore
import (
"fmt"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
Expand Down Expand Up @@ -35,13 +36,18 @@ func (p *pendingTx) Unmarshal(bytes []byte) error {
// Recover attempts to recover from a previous
// crash by reverting all uncommitted transactions.
func (s *TxStore) Recover() error {
logger := log.NewLogger("node").WithName("tx_store_recovery").Register() // "node" - copies the node.LoggerName in order to avoid circular import.

batch := new(leveldb.Batch)

logger.Info("checking for uncommitted transactions")
err := s.Iterate(storage.Query{
Factory: func() storage.Item { return new(pendingTx) },
ItemProperty: storage.QueryItem,
}, func(r storage.Result) (bool, error) {
logger.Info("uncommitted transaction found", "id", r.ID)
if err := r.Entry.(*pendingTx).val.Replay(batch); err != nil {
logger.Debug("unable to replay uncommitted transaction", "id", r.ID, "err", err)
return true, fmt.Errorf("unable to replay batch for %s: %w", r.ID, err)
}
batch.Delete(id(r.ID))
Expand All @@ -51,8 +57,15 @@ func (s *TxStore) Recover() error {
return fmt.Errorf("leveldbstore: recovery: iteration failed: %w", err)
}

if batch.Len() == 0 {
logger.Info("no uncommitted transactions found")
return nil
}

logger.Info("reversing uncommitted transactions", "count", batch.Len())
if err := s.BatchedStore.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil {
return fmt.Errorf("leveldbstore: recovery: unable to write batch: %w", err)
}
logger.Info("recovery successful")
return nil
}
22 changes: 22 additions & 0 deletions pkg/storer/internal/chunkstore/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storageutil"
Expand Down Expand Up @@ -63,15 +64,23 @@ func (p *pendingTx) String() string {
// Recover attempts to recover from a previous crash
// by reverting all uncommitted transactions.
func (cs *TxChunkStoreWrapper) Recover() error {
logger := log.NewLogger("node").WithName("tx_chunkstore_recovery").Register() // "node" - copies the node.LoggerName in order to avoid circular import.

if rr, ok := cs.txStore.(storage.Recoverer); ok {
if err := rr.Recover(); err != nil {
return fmt.Errorf("chunkstore: recovery: %w", err)
}
}

var found bool

logger.Info("checking for uncommitted transactions")
err := cs.txStore.Iterate(storage.Query{
Factory: func() storage.Item { return new(pendingTx) },
ItemProperty: storage.QueryItem,
}, func(r storage.Result) (bool, error) {
found = true

item := r.Entry.(*pendingTx)
item.key = r.ID

Expand All @@ -81,21 +90,34 @@ func (cs *TxChunkStoreWrapper) Recover() error {
}

ctx := context.Background()
logger.Info("sharky unreleased location found", "count", len(locations), "id", r.ID)
for _, location := range locations {
logger.Debug("releasing location", "location", location)
if err := cs.txSharky.Sharky.Release(ctx, location); err != nil {
logger.Debug("unable to release location", "location", location, "err", err)
return true, fmt.Errorf("unable to release location %v for %s: %w", location, r.ID, err)
}
}
logger.Info("sharky unreleased location released", "id", r.ID)

logger.Info("cleaning uncommitted transaction log", "id", r.ID)
if err := cs.txStore.Delete(r.Entry); err != nil {
logger.Debug("unable to delete unreleased location", "id", r.ID, "err", err)
return true, fmt.Errorf("unable to delete %s: %w", r.ID, err)
}
logger.Info("uncommitted transaction log cleaned", "id", r.ID)

return false, nil
})
if err != nil {
return fmt.Errorf("chunkstore: recovery: iteration failed: %w", err)
}

if found {
logger.Info("recovery successful")
} else {
logger.Info("no uncommitted transactions found")
}

return nil
}

0 comments on commit 414fc3b

Please sign in to comment.