Skip to content

Commit

Permalink
Merge pull request #264 from ethstorage/bugfix
Browse files Browse the repository at this point in the history
Fix crashs: add lock ReportPeerSummary and check Peer is null in getIdlePeerForTask
  • Loading branch information
ping-ke authored Apr 8, 2024
2 parents 838d112 + 7325934 commit 0610992
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 13 deletions.
11 changes: 0 additions & 11 deletions ethstorage/p2p/protocol/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package protocol

import (
"context"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -57,16 +56,6 @@ func (p *Peer) Shards() map[common.Address][]uint64 {
return p.shards
}

// SetShards this should only be set when doing handshake.
func (p *Peer) SetShards(shards map[common.Address][]uint64) error {
// shards can only be set once.
if p.shards != nil {
return fmt.Errorf("peer shards has been set multi times")
}
p.shards = shards
return nil
}

// IsShardExist checks whether one specific shard is supported by this peer.
func (p *Peer) IsShardExist(contract common.Address, shardId uint64) bool {
if ids, ok := p.shards[contract]; ok {
Expand Down
9 changes: 7 additions & 2 deletions ethstorage/p2p/protocol/syncclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ func (s *SyncClient) getIdlePeerForTask(t *task) *Peer {
if _, ok := t.statelessPeers[id]; ok {
continue
}
p := s.peers[id]
if p.IsShardExist(t.Contract, t.ShardId) {
p, ok := s.peers[id]
if ok && p.IsShardExist(t.Contract, t.ShardId) {
return p
}
}
Expand Down Expand Up @@ -1200,6 +1200,7 @@ func (s *SyncClient) ReportPeerSummary() {
select {
case <-ticker.C:
inbound, outbound := 0, 0
s.lock.Lock()
for _, p := range s.peers {
if p.direction == network.DirInbound {
inbound++
Expand All @@ -1208,6 +1209,7 @@ func (s *SyncClient) ReportPeerSummary() {
}
}
log.Info("P2P Summary", "activePeers", len(s.peers), "inbound", inbound, "outbound", outbound)
s.lock.Unlock()
case <-s.resCtx.Done():
log.Info("P2P summary stop")
return
Expand All @@ -1217,6 +1219,9 @@ func (s *SyncClient) ReportPeerSummary() {
}

func (s *SyncClient) needThisPeer(contractShards map[common.Address][]uint64) bool {
if contractShards == nil {
return false
}
for contract, shards := range contractShards {
for _, shard := range shards {
for _, t := range s.tasks {
Expand Down

0 comments on commit 0610992

Please sign in to comment.