Skip to content

Commit

Permalink
fix alignment of all structs
Browse files Browse the repository at this point in the history
  • Loading branch information
ramin committed Nov 22, 2023
1 parent 5d26d56 commit 0f12e51
Show file tree
Hide file tree
Showing 18 changed files with 95 additions and 85 deletions.
6 changes: 4 additions & 2 deletions headertest/dummy_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
var ErrDummyVerify = errors.New("dummy verify error")

type DummyHeader struct {
Timestamp time.Time

Chainid string
PreviousHash header.Hash
HeightI uint64
Timestamp time.Time

hash header.Hash

HeightI uint64

// VerifyFailure allows for testing scenarios where a header would fail
// verification. When set to true, it forces a failure.
VerifyFailure bool
Expand Down
16 changes: 7 additions & 9 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@ var maxUntrustedHeadRequests = 4
// Exchange enables sending outbound HeaderRequests to the network as well as
// handling inbound HeaderRequests from the network.
type Exchange[H header.Header[H]] struct {
ctx context.Context
cancel context.CancelFunc

protocolID protocol.ID
host host.Host
host host.Host
ctx context.Context
peerTracker *peerTracker
metrics *exchangeMetrics

cancel context.CancelFunc
trustedPeers func() peer.IDSlice
peerTracker *peerTracker
metrics *exchangeMetrics

Params ClientParameters
protocolID protocol.ID
Params ClientParameters
}

func NewExchange[H header.Header[H]](
Expand Down
9 changes: 6 additions & 3 deletions p2p/exchange_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@ type exchangeMetrics struct {
responseSizeInst metric.Int64Histogram
responseTimeInst metric.Float64Histogram

trackerPeersNum atomic.Int64
trackedPeersNumInst metric.Int64ObservableGauge
trackedPeersNumReg metric.Registration

disconnectedPeersNum atomic.Int64
disconnectedPeersNumInst metric.Int64ObservableGauge
disconnectedPeersNumReg metric.Registration

blockedPeersNum atomic.Int64
blockedPeersNumInst metric.Int64ObservableGauge
blockedPeersNumReg metric.Registration

trackerPeersNum atomic.Int64

disconnectedPeersNum atomic.Int64

blockedPeersNum atomic.Int64
}

func newExchangeMetrics() (m *exchangeMetrics, err error) {
Expand Down
4 changes: 2 additions & 2 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func TestExchange_RequestHead(t *testing.T) {
})

tests := []struct {
requestFromTrusted bool
lastHeader *headertest.DummyHeader
expectedHeight uint64
expectedHash header.Hash
expectedHeight uint64
requestFromTrusted bool
}{
// routes to trusted peer only
{
Expand Down
18 changes: 9 additions & 9 deletions p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ type Option[T parameters] func(*T)

// ServerParameters is the set of parameters that must be configured for the exchange.
type ServerParameters struct {
// networkID is a network that will be used to create a protocol.ID
// Is empty by default
networkID string
// WriteDeadline sets the timeout for sending messages to the stream
WriteDeadline time.Duration
// ReadDeadline sets the timeout for reading messages from the stream
ReadDeadline time.Duration
// RangeRequestTimeout defines a timeout after which the session will try to re-request headers
// from another peer.
RangeRequestTimeout time.Duration
// networkID is a network that will be used to create a protocol.ID
// Is empty by default
networkID string
// metrics is a flag that enables metrics collection.
metrics bool
}
Expand Down Expand Up @@ -123,19 +123,19 @@ func WithParams[T parameters](params T) Option[T] {

// ClientParameters is the set of parameters that must be configured for the exchange.
type ClientParameters struct {
// pidstore is an optional interface used to periodically dump peers
pidstore PeerIDStore
// networkID is a network that will be used to create a protocol.ID
networkID string
// chainID is an identifier of the chain.
chainID string
// MaxHeadersPerRangeRequest defines the max amount of headers that can be requested per 1 request.
MaxHeadersPerRangeRequest uint64
// RangeRequestTimeout defines a timeout after which the session will try to re-request headers
// from another peer.
RangeRequestTimeout time.Duration
// networkID is a network that will be used to create a protocol.ID
networkID string
// chainID is an identifier of the chain.
chainID string
// metrics is a flag that enables metrics collection.
metrics bool
// pidstore is an optional interface used to periodically dump peers
pidstore PeerIDStore
}

// DefaultClientParameters returns the default params to configure the store.
Expand Down
14 changes: 7 additions & 7 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (

// peerStat represents a peer's average statistics.
type peerStat struct {
sync.RWMutex
peerID peer.ID
// score is the average speed per single request
peerScore float32
// pruneDeadline specifies when disconnected peer will be removed if
// it does not return online.
pruneDeadline time.Time
peerID peer.ID
sync.RWMutex
// score is the average speed per single request
peerScore float32
}

// updateStats recalculates peer.score by averaging the last score
Expand Down Expand Up @@ -100,10 +100,10 @@ func (ps *peerStats) Pop() any {
type peerQueue struct {
ctx context.Context

statsLk sync.RWMutex
stats peerStats

havePeer chan struct{}
stats peerStats

statsLk sync.RWMutex
}

func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
Expand Down
16 changes: 9 additions & 7 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ var (
)

type peerTracker struct {
host host.Host
host host.Host

// an optional interface used to periodically dump
// good peers during garbage collection
pidstore PeerIDStore

ctx context.Context
connGater *conngater.BasicConnectionGater
metrics *exchangeMetrics

peerLk sync.RWMutex
// trackedPeers contains active peers that we can request to.
// we cache the peer once they disconnect,
// so we can guarantee that peerQueue will only contain active peers
Expand All @@ -41,15 +46,12 @@ type peerTracker struct {
// online until pruneDeadline, it will be removed and its score will be lost
disconnectedPeers map[libpeer.ID]*peerStat

// an optional interface used to periodically dump
// good peers during garbage collection
pidstore PeerIDStore

ctx context.Context
cancel context.CancelFunc
// done is used to gracefully stop the peerTracker.
// It allows to wait until track() and gc() will be stopped.
done chan struct{}

peerLk sync.RWMutex
}

func newPeerTracker(
Expand Down
21 changes: 11 additions & 10 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ func withValidation[H header.Header[H]](from H) option[H] {
// session aims to divide a range of headers
// into several smaller requests among different peers.
type session[H header.Header[H]] struct {
host host.Host
protocolID protocol.ID
queue *peerQueue
host host.Host

// Otherwise, it will be nil.
// `from` is set when additional validation for range is needed.
from H

ctx context.Context
queue *peerQueue
// peerTracker contains discovered peers with records that describes their activity.
peerTracker *peerTracker
metrics *exchangeMetrics

// Otherwise, it will be nil.
// `from` is set when additional validation for range is needed.
from H
cancel context.CancelFunc
reqCh chan *p2p_pb.HeaderRequest
protocolID protocol.ID
requestTimeout time.Duration

ctx context.Context
cancel context.CancelFunc
reqCh chan *p2p_pb.HeaderRequest
}

func newSession[H header.Header[H]](
Expand Down
9 changes: 4 additions & 5 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ type SubscriberParams struct {
// Subscriber manages the lifecycle and relationship of header Module
// with the "header-sub" gossipsub topic.
type Subscriber[H header.Header[H]] struct {
metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
pubsubTopicID string

metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
Expand Down
6 changes: 4 additions & 2 deletions p2p/subscriber_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ type subscriberMetrics struct {
messageNumInst metric.Int64Counter
messageSizeInst metric.Int64Histogram

messageTimeLast atomic.Pointer[time.Time]
messageTimeInst metric.Float64Histogram

subscriptionNum atomic.Int64
subscriptionNumInst metric.Int64ObservableGauge
subscriptionNumReg metric.Registration

messageTimeLast atomic.Pointer[time.Time]

subscriptionNum atomic.Int64
}

func newSubscriberMetrics() (m *subscriberMetrics, err error) {
Expand Down
2 changes: 1 addition & 1 deletion store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// The approach simplifies implementation for the batch and
// makes it better optimized for the GetByHeight case which is what we need.
type batch[H header.Header[H]] struct {
lk sync.RWMutex
heights map[string]uint64
headers []H
lk sync.RWMutex
}

// newBatch creates the batch with the given pre-allocated size.
Expand Down
2 changes: 1 addition & 1 deletion store/heightsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ var errElapsedHeight = errors.New("elapsed height")

// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
type heightSub[H header.Header[H]] struct {
heightReqs map[uint64][]chan H
// height refers to the latest locally available header height
// that has been fully verified and inserted into the subjective chain
height atomic.Uint64
heightReqsLk sync.Mutex
heightReqs map[uint64][]chan H
}

// newHeightSub instantiates new heightSub.
Expand Down
2 changes: 1 addition & 1 deletion store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
var meter = otel.Meter("header/store")

type metrics struct {
headHeight atomic.Int64
headHeightInst metric.Int64ObservableGauge
headHeightReg metric.Registration

flushTimeInst metric.Float64Histogram
readTimeInst metric.Float64Histogram

writesQueueBlockedInst metric.Int64Counter
headHeight atomic.Int64
}

func newMetrics() (m *metrics, err error) {
Expand Down
9 changes: 5 additions & 4 deletions store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type Option func(*Parameters)

// Parameters is the set of parameters that must be configured for the store.
type Parameters struct {

// storePrefix defines the prefix used to wrap the store
// OPTIONAL
storePrefix datastore.Key

// StoreCacheSize defines the maximum amount of entries in the Header Store cache.
StoreCacheSize int

Expand All @@ -22,10 +27,6 @@ type Parameters struct {
// Headers are written in batches not to thrash the underlying Datastore with writes.
WriteBatchSize int

// storePrefix defines the prefix used to wrap the store
// OPTIONAL
storePrefix datastore.Key

// metrics is a flag that enables metrics collection
metrics bool
}
Expand Down
2 changes: 1 addition & 1 deletion sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
var meter = otel.Meter("header/sync")

type metrics struct {
totalSynced atomic.Int64
totalSyncedGauge metric.Float64ObservableGauge
totalSynced atomic.Int64
}

func newMetrics() (*metrics, error) {
Expand Down
4 changes: 2 additions & 2 deletions sync/ranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
// ascending order). This prevents unnecessary / duplicate network requests for additional headers
// during sync.
type ranges[H header.Header[H]] struct {
lk sync.RWMutex
ranges []*headerRange[H]
lk sync.RWMutex
}

// Head returns the highest Header in all ranges if any.
Expand Down Expand Up @@ -87,9 +87,9 @@ func (rs *ranges[H]) First() (*headerRange[H], bool) {
}

type headerRange[H header.Header[H]] struct {
lk sync.RWMutex
headers []H
start uint64
lk sync.RWMutex
}

func newRange[H header.Header[H]](h H) *headerRange[H] {
Expand Down
Loading

0 comments on commit 0f12e51

Please sign in to comment.