Skip to content

Commit

Permalink
fix: move pullsync rate limiter and remove reserve epoch in migration (
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 9, 2024
1 parent 2d75623 commit d200c95
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
DefaultMaxPage uint64 = 250
pageTimeout = time.Second
makeOfferTimeout = 15 * time.Minute
handleMaxChunksPerSecond = 100
handleMaxChunksPerSecond = 250
handleRequestsLimitRate = time.Second / handleMaxChunksPerSecond // handle max 100 chunks per second per peer
)

Expand Down Expand Up @@ -194,6 +194,15 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
return fmt.Errorf("process want: %w", err)
}

// slow down future requests
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
if err != nil {
return fmt.Errorf("rate limiter: %w", err)
}
if waitDur > 0 {
s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address)
}

for _, c := range chs {
var stamp []byte
if c.Stamp() != nil {
Expand All @@ -210,15 +219,6 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
s.metrics.Sent.Inc()
}

// slow down future requests
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
if err != nil {
return fmt.Errorf("rate limiter: %w", err)
}
if waitDur > 0 {
s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address)
}

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/storer/migration/step_06.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro
return 0, 0, fmt.Errorf("pre-migration check: index counts do not match, %d vs %d. It's recommended that the repair-reserve cmd is run first", preBatchRadiusCnt, preChunkBinCnt)
}

// Delete epoch timestamp
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Delete(&reserve.EpochItem{})
})
if err != nil {
return 0, 0, err
}

itemC := make(chan *reserve.BatchRadiusItemV1)

errC := make(chan error, 1)
Expand Down
6 changes: 6 additions & 0 deletions pkg/storer/migration/step_06_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func Test_Step_06(t *testing.T) {
err = localmigration.Step_06(store)()
require.NoError(t, err)

has, err := store.IndexStore().Has(&reserve.EpochItem{})
if has {
t.Fatal("epoch item should be deleted")
}
require.NoError(t, err)

checkBatchRadiusItems(t, store.IndexStore(), len(chunks), batchRadiusItems)
checkChunkBinItems(t, store.IndexStore(), len(chunks), chunkBinItems)
checkStampIndex(t, store.IndexStore(), len(chunks), stampIndexItems)
Expand Down

0 comments on commit d200c95

Please sign in to comment.