From 414fc3b9d19b1d53b26175c3d7f8bde3e1ffc4ce Mon Sep 17 00:00:00 2001 From: mrekucci Date: Tue, 10 Oct 2023 19:26:52 +0400 Subject: [PATCH] feat: add logs to the txstore and txchunkstore recovery method (#4387) --- pkg/storage/leveldbstore/recovery.go | 13 +++++++++++++ pkg/storer/internal/chunkstore/recovery.go | 22 ++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/pkg/storage/leveldbstore/recovery.go b/pkg/storage/leveldbstore/recovery.go index b99ecf2df6a..f52701275f0 100644 --- a/pkg/storage/leveldbstore/recovery.go +++ b/pkg/storage/leveldbstore/recovery.go @@ -7,6 +7,7 @@ package leveldbstore import ( "fmt" + "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/storage" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" @@ -35,13 +36,18 @@ func (p *pendingTx) Unmarshal(bytes []byte) error { // Recover attempts to recover from a previous // crash by reverting all uncommitted transactions. func (s *TxStore) Recover() error { + logger := log.NewLogger("node").WithName("tx_store_recovery").Register() // "node" - copies the node.LoggerName in order to avoid circular import. + batch := new(leveldb.Batch) + logger.Info("checking for uncommitted transactions") err := s.Iterate(storage.Query{ Factory: func() storage.Item { return new(pendingTx) }, ItemProperty: storage.QueryItem, }, func(r storage.Result) (bool, error) { + logger.Info("uncommitted transaction found", "id", r.ID) if err := r.Entry.(*pendingTx).val.Replay(batch); err != nil { + logger.Debug("unable to replay uncommitted transaction", "id", r.ID, "err", err) return true, fmt.Errorf("unable to replay batch for %s: %w", r.ID, err) } batch.Delete(id(r.ID)) @@ -51,8 +57,15 @@ func (s *TxStore) Recover() error { return fmt.Errorf("leveldbstore: recovery: iteration failed: %w", err) } + if batch.Len() == 0 { + logger.Info("no uncommitted transactions found") + return nil + } + + logger.Info("reversing uncommitted transactions", "count", batch.Len()) if err := s.BatchedStore.(*Store).db.Write(batch, &opt.WriteOptions{Sync: true}); err != nil { return fmt.Errorf("leveldbstore: recovery: unable to write batch: %w", err) } + logger.Info("recovery successful") return nil } diff --git a/pkg/storer/internal/chunkstore/recovery.go b/pkg/storer/internal/chunkstore/recovery.go index 2a948a43ed9..d595f519e52 100644 --- a/pkg/storer/internal/chunkstore/recovery.go +++ b/pkg/storer/internal/chunkstore/recovery.go @@ -8,6 +8,7 @@ import ( "context" "fmt" + "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storageutil" @@ -63,15 +64,23 @@ func (p *pendingTx) String() string { // Recover attempts to recover from a previous crash // by reverting all uncommitted transactions. func (cs *TxChunkStoreWrapper) Recover() error { + logger := log.NewLogger("node").WithName("tx_chunkstore_recovery").Register() // "node" - copies the node.LoggerName in order to avoid circular import. + if rr, ok := cs.txStore.(storage.Recoverer); ok { if err := rr.Recover(); err != nil { return fmt.Errorf("chunkstore: recovery: %w", err) } } + + var found bool + + logger.Info("checking for uncommitted transactions") err := cs.txStore.Iterate(storage.Query{ Factory: func() storage.Item { return new(pendingTx) }, ItemProperty: storage.QueryItem, }, func(r storage.Result) (bool, error) { + found = true + item := r.Entry.(*pendingTx) item.key = r.ID @@ -81,15 +90,22 @@ func (cs *TxChunkStoreWrapper) Recover() error { } ctx := context.Background() + logger.Info("sharky unreleased location found", "count", len(locations), "id", r.ID) for _, location := range locations { + logger.Debug("releasing location", "location", location) if err := cs.txSharky.Sharky.Release(ctx, location); err != nil { + logger.Debug("unable to release location", "location", location, "err", err) return true, fmt.Errorf("unable to release location %v for %s: %w", location, r.ID, err) } } + logger.Info("sharky unreleased location released", "id", r.ID) + logger.Info("cleaning uncommitted transaction log", "id", r.ID) if err := cs.txStore.Delete(r.Entry); err != nil { + logger.Debug("unable to delete unreleased location", "id", r.ID, "err", err) return true, fmt.Errorf("unable to delete %s: %w", r.ID, err) } + logger.Info("uncommitted transaction log cleaned", "id", r.ID) return false, nil }) @@ -97,5 +113,11 @@ func (cs *TxChunkStoreWrapper) Recover() error { return fmt.Errorf("chunkstore: recovery: iteration failed: %w", err) } + if found { + logger.Info("recovery successful") + } else { + logger.Info("no uncommitted transactions found") + } + return nil }