Skip to content

Commit

Permalink
fix(pullsync): swallow and log process want reserve get errs (#4339)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 25, 2023
1 parent 9ce6f3c commit 4ac8d9f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 17 deletions.
26 changes: 20 additions & 6 deletions pkg/pullsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
)

type metrics struct {
Offered prometheus.Counter // number of chunks offered
Wanted prometheus.Counter // number of chunks wanted
Delivered prometheus.Counter // number of chunk deliveries
Sent prometheus.Counter // number of chunks sent
DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got
LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin
Offered prometheus.Counter // number of chunks offered
Wanted prometheus.Counter // number of chunks wanted
MissingChunks prometheus.Counter // number of reserve get errs
ReceivedZeroAddress prometheus.Counter // number of delivered chunks with invalid address
Delivered prometheus.Counter // number of chunk deliveries
Sent prometheus.Counter // number of chunks sent
DuplicateRuid prometheus.Counter // number of duplicate RUID requests we got
LastReceived *prometheus.CounterVec // last timestamp of the received chunks per bin
}

func newMetrics() metrics {
Expand All @@ -34,6 +36,18 @@ func newMetrics() metrics {
Name: "chunks_wanted",
Help: "Total chunks wanted.",
}),
MissingChunks: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "missing_chunks",
Help: "Total reserve get errors.",
}),
ReceivedZeroAddress: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "received_zero_address",
Help: "Total chunks delivered with zero address and no chunk data.",
}),
Delivered: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
31 changes: 21 additions & 10 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
}

addr := swarm.NewAddress(delivery.Address)
if addr.Equal(swarm.ZeroAddress) {
s.logger.Debug("received zero address chunk", "peer_address", peer)
s.metrics.ReceivedZeroAddress.Inc()
continue
}

newChunk := swarm.NewChunk(addr, delivery.Data)

stamp := new(postage.Stamp)
Expand Down Expand Up @@ -343,18 +349,19 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea

chs, err := s.processWant(ctx, offer, &want)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
s.intervalsSF.Forget(sfKey(uint8(rn.Bin), rn.Start))
}
return fmt.Errorf("process want: %w", err)
}

for _, v := range chs {
stamp, err := v.Stamp().MarshalBinary()
if err != nil {
return fmt.Errorf("serialise stamp: %w", err)
for _, c := range chs {
var stamp []byte
if c.Stamp() != nil {
stamp, err = c.Stamp().MarshalBinary()
if err != nil {
return fmt.Errorf("serialise stamp: %w", err)
}
}
deliver := pb.Delivery{Address: v.Address().Bytes(), Data: v.Data(), Stamp: stamp}

deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp}
if err := w.WriteMsgWithContext(ctx, &deliver); err != nil {
return fmt.Errorf("write delivery: %w", err)
}
Expand Down Expand Up @@ -467,9 +474,13 @@ func (s *Syncer) processWant(ctx context.Context, o *pb.Offer, w *pb.Want) ([]sw
for i := 0; i < len(o.Chunks); i++ {
if bv.Get(i) {
ch := o.Chunks[i]
c, err := s.store.ReserveGet(ctx, swarm.NewAddress(ch.Address), ch.BatchID)
addr := swarm.NewAddress(ch.Address)
c, err := s.store.ReserveGet(ctx, addr, ch.BatchID)
if err != nil {
return nil, err
s.logger.Debug("processing want: unable to find chunk", "chunk_address", addr, "batch_id", ch.BatchID)
chunks = append(chunks, swarm.NewChunk(swarm.ZeroAddress, nil))
s.metrics.MissingChunks.Inc()
continue
}
chunks = append(chunks, c)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/pullsync/pullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) {
}
}

func TestMissingChunk(t *testing.T) {
t.Parallel()

var (
zeroChunk = swarm.NewChunk(swarm.ZeroAddress, nil)
topMost = uint64(4)
ps, _ = newPullSync(t, nil, 5, mock.WithSubscribeResp(results, nil), mock.WithChunks([]swarm.Chunk{zeroChunk}...))
recorder = streamtest.New(streamtest.WithProtocols(ps.Protocol()))
psClient, _ = newPullSync(t, recorder, 0)
)

topmost, count, err := psClient.Sync(context.Background(), swarm.ZeroAddress, 0, 0)
if err != nil {
t.Fatal(err)
}

if topmost != topMost {
t.Fatalf("got offer topmost %d but want %d", topmost, topMost)
}
if count != 0 {
t.Fatalf("got count %d but want %d", count, 0)
}
}

func TestGetCursors(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 5 additions & 1 deletion pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func WithChunks(chs ...swarm.Chunk) Option {
return optionFunc(func(p *ReserveStore) {
for _, c := range chs {
c := c
p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c
if c.Stamp() != nil {
p.chunks[c.Address().String()+string(c.Stamp().BatchID())] = c
} else {
p.chunks[c.Address().String()] = c
}
}
})
}
Expand Down

0 comments on commit 4ac8d9f

Please sign in to comment.