From a0944f22fa7e835d3ca98b26ca0c2190c7ee8cb6 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 00:06:46 +0300 Subject: [PATCH 1/5] fix(pullsync): swallow and log process want reserve get errs --- pkg/pullsync/pullsync.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 6c7b2403f2c..56f2c684038 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -215,6 +215,11 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start } addr := swarm.NewAddress(delivery.Address) + if addr.Equal(swarm.ZeroAddress) { + s.logger.Debug("received zero address chunk", "peer_address", peer) + continue + } + newChunk := swarm.NewChunk(addr, delivery.Data) stamp := new(postage.Stamp) @@ -343,9 +348,6 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea chs, err := s.processWant(ctx, offer, &want) if err != nil { - if errors.Is(err, storage.ErrNotFound) { - s.intervalsSF.Forget(sfKey(uint8(rn.Bin), rn.Start)) - } return fmt.Errorf("process want: %w", err) } @@ -467,9 +469,12 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw for i := 0; i < len(o.Chunks); i++ { if bv.Get(i) { ch := o.Chunks[i] + addr := swarm.NewAddress(ch.Address) c, err := s.store.ReserveGet(ctx, swarm.NewAddress(ch.Address), ch.BatchID) if err != nil { - return nil, err + s.logger.Error(err, "processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) + chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil)) + continue } chunks = append(chunks, c) } From 71efe2978490729dda62fb6bbcdf4db166a5da5b Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 00:08:56 +0300 Subject: [PATCH 2/5] fix: var --- pkg/pullsync/pullsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 56f2c684038..e3e2cba217b 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -470,7 +470,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw if bv.Get(i) { ch := o.Chunks[i] addr := swarm.NewAddress(ch.Address) - c, err := s.store.ReserveGet(ctx, swarm.NewAddress(ch.Address), ch.BatchID) + c, err := s.store.ReserveGet(ctx, addr, ch.BatchID) if err != nil { s.logger.Error(err, "processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil)) From 78e8e179c502e32f1abcb5d8e3b869a47037fc45 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 12:28:48 +0300 Subject: [PATCH 3/5] fix: metrics --- pkg/pullsync/metrics.go | 7 +++++++ pkg/pullsync/pullsync.go | 1 + 2 files changed, 8 insertions(+) diff --git a/pkg/pullsync/metrics.go b/pkg/pullsync/metrics.go index fd7aa8fb921..45511d1bf37 100644 --- a/pkg/pullsync/metrics.go +++ b/pkg/pullsync/metrics.go @@ -12,6 +12,7 @@ import ( type metrics struct { Offered prometheus.Counter // number of chunks offered Wanted prometheus.Counter // number of chunks wanted + MissingChunks prometheus.Counter // number of reserve get errs Delivered prometheus.Counter // number of chunk deliveries Sent prometheus.Counter // number of chunks sent DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got @@ -34,6 +35,12 @@ func newMetrics() metrics { Name: "chunks_wanted", Help: "Total chunks wanted.", }), + MissingChunks: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "missing_chunks", + Help: "Total reserve get errors.", + }), Delivered: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index e3e2cba217b..efefe984e0f 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -474,6 +474,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw if err != nil { s.logger.Error(err, "processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil)) + s.metrics.MissingChunks.Inc() continue } chunks = append(chunks, c) From 947884d82a96fb78d8477368d63eb42f04a0ddd3 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 23 Sep 2023 13:41:59 +0300 Subject: [PATCH 4/5] fix: unit test --- pkg/pullsync/pullsync.go | 14 +++++++++----- pkg/pullsync/pullsync_test.go | 21 +++++++++++++++++++++ pkg/storer/mock/mockreserve.go | 6 +++++- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index efefe984e0f..17a5b9e7ec9 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -351,12 +351,16 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea return fmt.Errorf("process want: %w", err) } - for _, v := range chs { - stamp, err := v.Stamp().MarshalBinary() - if err != nil { - return fmt.Errorf("serialise stamp: %w", err) + for _, c := range chs { + var stamp []byte + if c.Stamp() != nil { + stamp, err = c.Stamp().MarshalBinary() + if err != nil { + return fmt.Errorf("serialise stamp: %w", err) + } } - deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data(), Stamp: stamp} + + deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp} if err := w.WriteMsgWithContext(ctx, &deliver); err != nil { return fmt.Errorf("write delivery: %w", err) } diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 2488ce72cce..e4e4c716bec 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -228,6 +228,27 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { } } +func TestMissingChunk(t *testing.T) { + t.Parallel() + + var ( + zeroChunk = swarm.NewChunk(swarm.ZeroAddress, nil) + topMost = uint64(4) + ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...)) + recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol())) + psClient, _ = newPullSync(t, recorder, 0) + ) + + topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + if err != nil { + t.Fatal(err) + } + + if topmost != topMost { + t.Fatalf("got offer topmost %d but want %d", topmost, topMost) + } +} + func TestGetCursors(t *testing.T) { t.Parallel() diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index 8825e8907f0..d6d70390242 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -34,7 +34,11 @@ func WithChunks(chs ...swarm.Chunk) Option { return optionFunc(func(p *ReserveStore) { for _, c := range chs { c := c - p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c + if c.Stamp() != nil { + p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c + } else { + p.chunks[c.Address().String()] = c + } } }) } From 0c159ab65890da25d398cf254b7e82ba932a9d39 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 25 Sep 2023 14:44:35 +0300 Subject: [PATCH 5/5] fix: metrics --- pkg/pullsync/metrics.go | 21 ++++++++++++++------- pkg/pullsync/pullsync.go | 3 ++- pkg/pullsync/pullsync_test.go | 5 ++++- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/pullsync/metrics.go b/pkg/pullsync/metrics.go index 45511d1bf37..6097b946334 100644 --- a/pkg/pullsync/metrics.go +++ b/pkg/pullsync/metrics.go @@ -10,13 +10,14 @@ import ( ) type metrics struct { - Offered prometheus.Counter // number of chunks offered - Wanted prometheus.Counter // number of chunks wanted - MissingChunks prometheus.Counter // number of reserve get errs - Delivered prometheus.Counter // number of chunk deliveries - Sent prometheus.Counter // number of chunks sent - DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got - LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin + Offered prometheus.Counter // number of chunks offered + Wanted prometheus.Counter // number of chunks wanted + MissingChunks prometheus.Counter // number of reserve get errs + ReceivedZeroAddress prometheus.Counter // number of delivered chunks with invalid address + Delivered prometheus.Counter // number of chunk deliveries + Sent prometheus.Counter // number of chunks sent + DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got + LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin } func newMetrics() metrics { @@ -41,6 +42,12 @@ func newMetrics() metrics { Name: "missing_chunks", Help: "Total reserve get errors.", }), + ReceivedZeroAddress: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "received_zero_address", + Help: "Total chunks delivered with zero address and no chunk data.", + }), Delivered: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 17a5b9e7ec9..ba14a07b998 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -217,6 +217,7 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start addr := swarm.NewAddress(delivery.Address) if addr.Equal(swarm.ZeroAddress) { s.logger.Debug("received zero address chunk", "peer_address", peer) + s.metrics.ReceivedZeroAddress.Inc() continue } @@ -476,7 +477,7 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw addr := swarm.NewAddress(ch.Address) c, err := s.store.ReserveGet(ctx, addr, ch.BatchID) if err != nil { - s.logger.Error(err, "processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) + s.logger.Debug("processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID) chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil)) s.metrics.MissingChunks.Inc() continue diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index e4e4c716bec..70f958e63dd 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -239,7 +239,7 @@ func TestMissingChunk(t *testing.T) { psClient, _ = newPullSync(t, recorder, 0) ) - topmost, _, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) + topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0) if err != nil { t.Fatal(err) } @@ -247,6 +247,9 @@ func TestMissingChunk(t *testing.T) { if topmost != topMost { t.Fatalf("got offer topmost %d but want %d", topmost, topMost) } + if count != 0 { + t.Fatalf("got count %d but want %d", count, 0) + } } func TestGetCursors(t *testing.T) {