Skip to content

Commit

Permalink
fix: make cache ops independent of parent context (#4423)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 24, 2023
1 parent 06bc50e commit 7501877
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 15 deletions.
10 changes: 5 additions & 5 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, all
return closest, nil
}

func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
func (s *Service) handler(p2pctx context.Context, p p2p.Peer, stream p2p.Stream) (err error) {
ctx, cancel := context.WithTimeout(p2pctx, retrieveChunkTimeout)
defer cancel()

w, r := protobuf.NewWriterAndReader(stream)
Expand Down Expand Up @@ -486,11 +486,11 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e

// cache the request last, so that putting to the localstore does not slow down the request flow
if s.caching && forwarded {
err = s.storer.Cache().Put(ctx, chunk)
if err != nil {
return fmt.Errorf("retrieve cache put: %w", err)
if err := s.storer.Cache().Put(p2pctx, chunk); err != nil {
s.logger.Debug("retrieve cache put", "error", err)
}
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/storer/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestCompact(t *testing.T) {
t.Cleanup(unsub)
<-c

time.Sleep(time.Second)

if err := st.Close(); err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,16 @@ func (c *Cache) MoveFromReserve(
ctx context.Context,
store internal.Storage,
addrs ...swarm.Address,
) error {
) (err error) {

defer func() {
if err != nil {
for _, addr := range addrs {
err = errors.Join(store.ChunkStore().Delete(context.Background(), addr))
}
}
}()

batch, err := store.IndexStore().Batch(ctx)
if err != nil {
return fmt.Errorf("failed creating batch: %w", err)
Expand Down
19 changes: 10 additions & 9 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,16 @@ func (r *Reserve) EvictBatchBin(
if err := batch.Commit(); err != nil {
return err
}
if err := r.cacheCb(ctx, store, moveToCache...); err != nil {
r.logger.Error(err, "evict and move to cache")
for _, rItem := range moveToCache {
err = store.ChunkStore().Delete(ctx, rItem)
if err != nil {
return err

go func(addrs []swarm.Address) {
_ = txExecutor.Execute(ctx, func(store internal.Storage) error {
if err := r.cacheCb(ctx, store, addrs...); err != nil {
r.logger.Error(err, "evict and move to cache")
}
}
}
return nil
})
}(moveToCache)

return nil
})
if err != nil {
Expand Down Expand Up @@ -405,7 +406,7 @@ func (r *Reserve) DeleteChunk(
}
if err := r.cacheCb(ctx, store, item.Address); err != nil {
r.logger.Error(err, "delete and move to cache")
return store.ChunkStore().Delete(ctx, item.Address)
return err
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"math/rand"
"testing"
"time"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/postage"
Expand Down Expand Up @@ -275,6 +276,8 @@ func TestEvict(t *testing.T) {
t.Fatalf("got %d, want %d", totalEvicted, chunksPerBatch)
}

time.Sleep(time.Second)

for i, ch := range chunks {
binID := i%chunksPerBatch + 1
b := swarm.Proximity(baseAddr.Bytes(), ch.Address().Bytes())
Expand Down

0 comments on commit 7501877

Please sign in to comment.