diff --git a/pkg/pullsync/metrics.go b/pkg/pullsync/metrics.go index fd7aa8fb921..6097b946334 100644 --- a/pkg/pullsync/metrics.go +++ b/pkg/pullsync/metrics.go @@ -10,12 +10,14 @@ import ( ) type metrics struct { - Offered prometheus.Counter // number of chunks offered - Wanted prometheus.Counter // number of chunks wanted - Delivered prometheus.Counter // number of chunk deliveries - Sent prometheus.Counter // number of chunks sent - DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got - LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin + Offered prometheus.Counter // number of chunks offered + Wanted prometheus.Counter // number of chunks wanted + MissingChunks prometheus.Counter // number of reserve get errs + ReceivedZeroAddress prometheus.Counter // number of delivered chunks with invalid address + Delivered prometheus.Counter // number of chunk deliveries + Sent prometheus.Counter // number of chunks sent + DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got + LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin } func newMetrics() metrics { @@ -34,6 +36,18 @@ func newMetrics() metrics { Name: "chunks_wanted", Help: "Total chunks wanted.", }), + MissingChunks: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "missing_chunks", + Help: "Total reserve get errors.", + }), + ReceivedZeroAddress: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "received_zero_address", + Help: "Total chunks delivered with zero address and no chunk data.", + }), Delivered: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 6c7b2403f2c..ba14a07b998 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -215,6 +215,12 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } addr := swarm.NewAddress(delivery.Address) + if addr.Equal(swarm.ZeroAddress) { + s.logger.Debug("received zero address chunk", "peer_address", peer) + s.metrics.ReceivedZeroAddress.Inc() + continue + } + newChunk := swarm.NewChunk(addr, delivery.Data) stamp := new(postage.Stamp) @@ -343,18 +349,19 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea chs, err := s.processWant(ctx, offer, &want) if err != nil { - if errors.Is(err, storage.ErrNotFound) { - s.intervalsSF.Forget(sfKey(uint8(rn.Bin), rn.Start)) - } return fmt.Errorf("process want: %w", err) } - for _, v := range chs { - stamp, err := v.Stamp().MarshalBinary() - if err != nil { - return fmt.Errorf("serialise stamp: %w", err) + for _, c := range chs { + var stamp []byte + if c.Stamp() != nil { + stamp, err = c.Stamp().MarshalBinary() + if err != nil { + return fmt.Errorf("serialise stamp: %w", err) + } } - deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data(), Stamp: stamp} + + deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp} if err := w.WriteMsgWithContext(ctx, &deliver); err != nil { return fmt.Errorf("write delivery: %w", err) } @@ -467,9 +474,13 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw for i := 0; i < len(o.Chunks); i++ { if bv.Get(i) { ch := o.Chunks[i] - c, err := s.store.ReserveGet(ctx, swarm.NewAddress(ch.Address), ch.BatchID) + addr := swarm.NewAddress(ch.Address) + c, err := s.store.ReserveGet(ctx, addr, ch.BatchID) if err != nil { - return nil, err + s.logger.Debug("processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) + chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil)) + s.metrics.MissingChunks.Inc() + continue } chunks = append(chunks, c) } diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 2488ce72cce..70f958e63dd 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -228,6 +228,30 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { } } +func TestMissingChunk(t *testing.T) { + t.Parallel() + + var ( + zeroChunk = swarm.NewChunk(swarm.ZeroAddress, nil) + topMost = uint64(4) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + if err != nil { + t.Fatal(err) + } + + if topmost != topMost { + t.Fatalf("got offer topmost %d but want %d", topmost, topMost) + } + if count != 0 { + t.Fatalf("got count %d but want %d", count, 0) + } +} + func TestGetCursors(t *testing.T) { t.Parallel() diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index 8825e8907f0..d6d70390242 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -34,7 +34,11 @@ func WithChunks(chs ...swarm.Chunk) Option { return optionFunc(func(p *ReserveStore) { for _, c := range chs { c := c - p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c + if c.Stamp() != nil { + p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c + } else { + p.chunks[c.Address().String()] = c + } } }) }