From 750187793ed691a43a0cb93fa8e8aabb557827f8 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 24 Oct 2023 21:56:26 +0300 Subject: [PATCH] fix: make cache ops independent of parent context (#4423) --- pkg/retrieval/retrieval.go | 10 +++++----- pkg/storer/compact_test.go | 2 ++ pkg/storer/internal/cache/cache.go | 11 ++++++++++- pkg/storer/internal/reserve/reserve.go | 19 ++++++++++--------- pkg/storer/internal/reserve/reserve_test.go | 3 +++ 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 033805ee312..22564ff5745 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -418,8 +418,8 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, all return closest, nil } -func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { - ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout) +func (s *Service) handler(p2pctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { + ctx, cancel := context.WithTimeout(p2pctx, retrieveChunkTimeout) defer cancel() w, r := protobuf.NewWriterAndReader(stream) @@ -486,11 +486,11 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e // cache the request last, so that putting to the localstore does not slow down the request flow if s.caching && forwarded { - err = s.storer.Cache().Put(ctx, chunk) - if err != nil { - return fmt.Errorf("retrieve cache put: %w", err) + if err := s.storer.Cache().Put(p2pctx, chunk); err != nil { + s.logger.Debug("retrieve cache put", "error", err) } } + return nil } diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index 8737654547a..eb41c13a24f 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -64,6 +64,8 @@ func TestCompact(t *testing.T) { t.Cleanup(unsub) <-c + time.Sleep(time.Second) + if err := st.Close(); err != nil { t.Fatal(err) } diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index c2bd9eb7b6e..53e40d6d821 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -376,7 +376,16 @@ func (c *Cache) MoveFromReserve( ctx context.Context, store internal.Storage, addrs ...swarm.Address, -) error { +) (err error) { + + defer func() { + if err != nil { + for _, addr := range addrs { + err = errors.Join(store.ChunkStore().Delete(context.Background(), addr)) + } + } + }() + batch, err := store.IndexStore().Batch(ctx) if err != nil { return fmt.Errorf("failed creating batch: %w", err) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 6931d51f4dd..4e04df245ae 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -363,15 +363,16 @@ func (r *Reserve) EvictBatchBin( if err := batch.Commit(); err != nil { return err } - if err := r.cacheCb(ctx, store, moveToCache...); err != nil { - r.logger.Error(err, "evict and move to cache") - for _, rItem := range moveToCache { - err = store.ChunkStore().Delete(ctx, rItem) - if err != nil { - return err + + go func(addrs []swarm.Address) { + _ = txExecutor.Execute(ctx, func(store internal.Storage) error { + if err := r.cacheCb(ctx, store, addrs...); err != nil { + r.logger.Error(err, "evict and move to cache") } - } - } + return nil + }) + }(moveToCache) + return nil }) if err != nil { @@ -405,7 +406,7 @@ func (r *Reserve) DeleteChunk( } if err := r.cacheCb(ctx, store, item.Address); err != nil { r.logger.Error(err, "delete and move to cache") - return store.ChunkStore().Delete(ctx, item.Address) + return err } return nil } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 457493d65c4..596552ad4dc 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -10,6 +10,7 @@ import ( "errors" "math/rand" "testing" + "time" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/postage" @@ -275,6 +276,8 @@ func TestEvict(t *testing.T) { t.Fatalf("got %d, want %d", totalEvicted, chunksPerBatch) } + time.Sleep(time.Second) + for i, ch := range chunks { binID := i%chunksPerBatch + 1 b := swarm.Proximity(baseAddr.Bytes(), ch.Address().Bytes())