From 0f12e512f16ba5e650183aaf64a2ff2f4023441e Mon Sep 17 00:00:00 2001 From: ramin Date: Wed, 22 Nov 2023 13:13:24 +0000 Subject: [PATCH] fix alignment of all structs --- headertest/dummy_header.go | 6 ++++-- p2p/exchange.go | 16 +++++++--------- p2p/exchange_metrics.go | 9 ++++++--- p2p/exchange_test.go | 4 ++-- p2p/options.go | 18 +++++++++--------- p2p/peer_stats.go | 14 +++++++------- p2p/peer_tracker.go | 16 +++++++++------- p2p/session.go | 21 +++++++++++---------- p2p/subscriber.go | 9 ++++----- p2p/subscriber_metrics.go | 6 ++++-- store/batch.go | 2 +- store/heightsub.go | 2 +- store/metrics.go | 2 +- store/options.go | 9 +++++---- sync/metrics.go | 2 +- sync/ranges.go | 4 ++-- sync/sync.go | 38 ++++++++++++++++++++------------------ sync/sync_getter.go | 2 +- 18 files changed, 95 insertions(+), 85 deletions(-) diff --git a/headertest/dummy_header.go b/headertest/dummy_header.go index e8480dde..b1fb7d46 100644 --- a/headertest/dummy_header.go +++ b/headertest/dummy_header.go @@ -17,13 +17,15 @@ import ( var ErrDummyVerify = errors.New("dummy verify error") type DummyHeader struct { + Timestamp time.Time + Chainid string PreviousHash header.Hash - HeightI uint64 - Timestamp time.Time hash header.Hash + HeightI uint64 + // VerifyFailure allows for testing scenarios where a header would fail // verification. When set to true, it forces a failure. VerifyFailure bool diff --git a/p2p/exchange.go b/p2p/exchange.go index 607bfb0f..15f5c4aa 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -34,17 +34,15 @@ var maxUntrustedHeadRequests = 4 // Exchange enables sending outbound HeaderRequests to the network as well as // handling inbound HeaderRequests from the network. type Exchange[H header.Header[H]] struct { - ctx context.Context - cancel context.CancelFunc - - protocolID protocol.ID - host host.Host + host host.Host + ctx context.Context + peerTracker *peerTracker + metrics *exchangeMetrics + cancel context.CancelFunc trustedPeers func() peer.IDSlice - peerTracker *peerTracker - metrics *exchangeMetrics - - Params ClientParameters + protocolID protocol.ID + Params ClientParameters } func NewExchange[H header.Header[H]]( diff --git a/p2p/exchange_metrics.go b/p2p/exchange_metrics.go index 82456c2f..b642e867 100644 --- a/p2p/exchange_metrics.go +++ b/p2p/exchange_metrics.go @@ -31,17 +31,20 @@ type exchangeMetrics struct { responseSizeInst metric.Int64Histogram responseTimeInst metric.Float64Histogram - trackerPeersNum atomic.Int64 trackedPeersNumInst metric.Int64ObservableGauge trackedPeersNumReg metric.Registration - disconnectedPeersNum atomic.Int64 disconnectedPeersNumInst metric.Int64ObservableGauge disconnectedPeersNumReg metric.Registration - blockedPeersNum atomic.Int64 blockedPeersNumInst metric.Int64ObservableGauge blockedPeersNumReg metric.Registration + + trackerPeersNum atomic.Int64 + + disconnectedPeersNum atomic.Int64 + + blockedPeersNum atomic.Int64 } func newExchangeMetrics() (m *exchangeMetrics, err error) { diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index a7b72f11..74c43c1a 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -51,10 +51,10 @@ func TestExchange_RequestHead(t *testing.T) { }) tests := []struct { - requestFromTrusted bool lastHeader *headertest.DummyHeader - expectedHeight uint64 expectedHash header.Hash + expectedHeight uint64 + requestFromTrusted bool }{ // routes to trusted peer only { diff --git a/p2p/options.go b/p2p/options.go index 1e3f8ad0..fd16ac4e 100644 --- a/p2p/options.go +++ b/p2p/options.go @@ -18,6 +18,9 @@ type Option[T parameters] func(*T) // ServerParameters is the set of parameters that must be configured for the exchange. type ServerParameters struct { + // networkID is a network that will be used to create a protocol.ID + // Is empty by default + networkID string // WriteDeadline sets the timeout for sending messages to the stream WriteDeadline time.Duration // ReadDeadline sets the timeout for reading messages from the stream @@ -25,9 +28,6 @@ type ServerParameters struct { // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - // Is empty by default - networkID string // metrics is a flag that enables metrics collection. metrics bool } @@ -123,19 +123,19 @@ func WithParams[T parameters](params T) Option[T] { // ClientParameters is the set of parameters that must be configured for the exchange. type ClientParameters struct { + // pidstore is an optional interface used to periodically dump peers + pidstore PeerIDStore + // networkID is a network that will be used to create a protocol.ID + networkID string + // chainID is an identifier of the chain. + chainID string // MaxHeadersPerRangeRequest defines the max amount of headers that can be requested per 1 request. MaxHeadersPerRangeRequest uint64 // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - networkID string - // chainID is an identifier of the chain. - chainID string // metrics is a flag that enables metrics collection. metrics bool - // pidstore is an optional interface used to periodically dump peers - pidstore PeerIDStore } // DefaultClientParameters returns the default params to configure the store. diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index 0277fcd4..fac733a8 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -11,13 +11,13 @@ import ( // peerStat represents a peer's average statistics. type peerStat struct { - sync.RWMutex - peerID peer.ID - // score is the average speed per single request - peerScore float32 // pruneDeadline specifies when disconnected peer will be removed if // it does not return online. pruneDeadline time.Time + peerID peer.ID + sync.RWMutex + // score is the average speed per single request + peerScore float32 } // updateStats recalculates peer.score by averaging the last score @@ -100,10 +100,10 @@ func (ps *peerStats) Pop() any { type peerQueue struct { ctx context.Context - statsLk sync.RWMutex - stats peerStats - havePeer chan struct{} + stats peerStats + + statsLk sync.RWMutex } func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index 33fffc91..02feaeac 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -28,11 +28,16 @@ var ( ) type peerTracker struct { - host host.Host + host host.Host + + // an optional interface used to periodically dump + // good peers during garbage collection + pidstore PeerIDStore + + ctx context.Context connGater *conngater.BasicConnectionGater metrics *exchangeMetrics - peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers @@ -41,15 +46,12 @@ type peerTracker struct { // online until pruneDeadline, it will be removed and its score will be lost disconnectedPeers map[libpeer.ID]*peerStat - // an optional interface used to periodically dump - // good peers during garbage collection - pidstore PeerIDStore - - ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. // It allows to wait until track() and gc() will be stopped. done chan struct{} + + peerLk sync.RWMutex } func newPeerTracker( diff --git a/p2p/session.go b/p2p/session.go index 37f1e258..2943a64f 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -29,21 +29,22 @@ func withValidation[H header.Header[H]](from H) option[H] { // session aims to divide a range of headers // into several smaller requests among different peers. type session[H header.Header[H]] struct { - host host.Host - protocolID protocol.ID - queue *peerQueue + host host.Host + + // Otherwise, it will be nil. + // `from` is set when additional validation for range is needed. + from H + + ctx context.Context + queue *peerQueue // peerTracker contains discovered peers with records that describes their activity. peerTracker *peerTracker metrics *exchangeMetrics - // Otherwise, it will be nil. - // `from` is set when additional validation for range is needed. - from H + cancel context.CancelFunc + reqCh chan *p2p_pb.HeaderRequest + protocolID protocol.ID requestTimeout time.Duration - - ctx context.Context - cancel context.CancelFunc - reqCh chan *p2p_pb.HeaderRequest } func newSession[H header.Header[H]]( diff --git a/p2p/subscriber.go b/p2p/subscriber.go index c5f70dcc..eded8539 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -24,12 +24,11 @@ type SubscriberParams struct { // Subscriber manages the lifecycle and relationship of header Module // with the "header-sub" gossipsub topic. type Subscriber[H header.Header[H]] struct { + metrics *subscriberMetrics + pubsub *pubsub.PubSub + topic *pubsub.Topic + msgID pubsub.MsgIdFunction pubsubTopicID string - - metrics *subscriberMetrics - pubsub *pubsub.PubSub - topic *pubsub.Topic - msgID pubsub.MsgIdFunction } // WithSubscriberMetrics enables metrics collection for the Subscriber. diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go index 164ff371..1ed56034 100644 --- a/p2p/subscriber_metrics.go +++ b/p2p/subscriber_metrics.go @@ -20,12 +20,14 @@ type subscriberMetrics struct { messageNumInst metric.Int64Counter messageSizeInst metric.Int64Histogram - messageTimeLast atomic.Pointer[time.Time] messageTimeInst metric.Float64Histogram - subscriptionNum atomic.Int64 subscriptionNumInst metric.Int64ObservableGauge subscriptionNumReg metric.Registration + + messageTimeLast atomic.Pointer[time.Time] + + subscriptionNum atomic.Int64 } func newSubscriberMetrics() (m *subscriberMetrics, err error) { diff --git a/store/batch.go b/store/batch.go index 7785953f..11191f6d 100644 --- a/store/batch.go +++ b/store/batch.go @@ -14,9 +14,9 @@ import ( // The approach simplifies implementation for the batch and // makes it better optimized for the GetByHeight case which is what we need. type batch[H header.Header[H]] struct { - lk sync.RWMutex heights map[string]uint64 headers []H + lk sync.RWMutex } // newBatch creates the batch with the given pre-allocated size. diff --git a/store/heightsub.go b/store/heightsub.go index 074ed644..d72c7d47 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -14,11 +14,11 @@ var errElapsedHeight = errors.New("elapsed height") // heightSub provides a minimalistic mechanism to wait till header for a height becomes available. type heightSub[H header.Header[H]] struct { + heightReqs map[uint64][]chan H // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 heightReqsLk sync.Mutex - heightReqs map[uint64][]chan H } // newHeightSub instantiates new heightSub. diff --git a/store/metrics.go b/store/metrics.go index e5f14211..ee72132f 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -13,7 +13,6 @@ import ( var meter = otel.Meter("header/store") type metrics struct { - headHeight atomic.Int64 headHeightInst metric.Int64ObservableGauge headHeightReg metric.Registration @@ -21,6 +20,7 @@ type metrics struct { readTimeInst metric.Float64Histogram writesQueueBlockedInst metric.Int64Counter + headHeight atomic.Int64 } func newMetrics() (m *metrics, err error) { diff --git a/store/options.go b/store/options.go index b0c01d62..95da0c41 100644 --- a/store/options.go +++ b/store/options.go @@ -12,6 +12,11 @@ type Option func(*Parameters) // Parameters is the set of parameters that must be configured for the store. type Parameters struct { + + // storePrefix defines the prefix used to wrap the store + // OPTIONAL + storePrefix datastore.Key + // StoreCacheSize defines the maximum amount of entries in the Header Store cache. StoreCacheSize int @@ -22,10 +27,6 @@ type Parameters struct { // Headers are written in batches not to thrash the underlying Datastore with writes. WriteBatchSize int - // storePrefix defines the prefix used to wrap the store - // OPTIONAL - storePrefix datastore.Key - // metrics is a flag that enables metrics collection metrics bool } diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..a4466175 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -11,8 +11,8 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge + totalSynced atomic.Int64 } func newMetrics() (*metrics, error) { diff --git a/sync/ranges.go b/sync/ranges.go index 5e07ac23..b0fc485b 100644 --- a/sync/ranges.go +++ b/sync/ranges.go @@ -10,8 +10,8 @@ import ( // ascending order). This prevents unnecessary / duplicate network requests for additional headers // during sync. type ranges[H header.Header[H]] struct { - lk sync.RWMutex ranges []*headerRange[H] + lk sync.RWMutex } // Head returns the highest Header in all ranges if any. @@ -87,9 +87,9 @@ func (rs *ranges[H]) First() (*headerRange[H], bool) { } type headerRange[H header.Header[H]] struct { - lk sync.RWMutex headers []H start uint64 + lk sync.RWMutex } func newRange[H header.Header[H]](h H) *headerRange[H] { diff --git a/sync/sync.go b/sync/sync.go index 8538c046..d2fe473e 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -31,27 +31,29 @@ var log = logging.Logger("header/sync") // - if there is a gap between the previous and the new Subjective Head // - Triggers s.syncLoop and saves the Subjective Head in the pending so s.syncLoop can access it type Syncer[H header.Header[H]] struct { - sub header.Subscriber[H] // to subscribe for new Network Heads - store syncStore[H] // to store all the headers to - getter syncGetter[H] // to fetch headers from - metrics *metrics + store syncStore[H] // to store all the headers to + sub header.Subscriber[H] // to subscribe for new Network Heads - // stateLk protects state which represents the current or latest sync - stateLk sync.RWMutex - state State + // controls lifecycle for syncLoop + ctx context.Context + metrics *metrics // signals to start syncing triggerSync chan struct{} + cancel context.CancelFunc + + Params *Parameters + getter syncGetter[H] // to fetch headers from + // pending keeps ranges of valid new network headers awaiting to be appended to store pending ranges[H] - // incomingMu ensures only one incoming network head candidate is processed at the time - incomingMu sync.Mutex - // controls lifecycle for syncLoop - ctx context.Context - cancel context.CancelFunc + state State - Params *Parameters + // stateLk protects state which represents the current or latest sync + stateLk sync.RWMutex + // incomingMu ensures only one incoming network head candidate is processed at the time + incomingMu sync.Mutex } // NewSyncer creates a new instance of Syncer. @@ -126,15 +128,15 @@ func (s *Syncer[H]) SyncWait(ctx context.Context) error { // State collects all the information about a sync. type State struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + Error string `json:"error,omitempty"` // the error that might happen within a sync + FromHash header.Hash `json:"from_hash"` + ToHash header.Hash `json:"to_hash"` ID uint64 `json:"id"` // incrementing ID of a sync Height uint64 `json:"height"` // height at the moment when State is requested for a sync FromHeight uint64 `json:"from_height"` // the starting point of a sync ToHeight uint64 `json:"to_height"` // the ending point of a sync - FromHash header.Hash `json:"from_hash"` - ToHash header.Hash `json:"to_hash"` - Start time.Time `json:"start"` - End time.Time `json:"end"` - Error string `json:"error,omitempty"` // the error that might happen within a sync } // Finished returns true if sync is done, false otherwise. diff --git a/sync/sync_getter.go b/sync/sync_getter.go index 267240c5..faf08829 100644 --- a/sync/sync_getter.go +++ b/sync/sync_getter.go @@ -10,9 +10,9 @@ import ( // syncGetter is a Getter wrapper that ensure only one Head call happens at the time type syncGetter[H header.Header[H]] struct { + header.Getter[H] getterLk sync.RWMutex isGetterLk atomic.Bool - header.Getter[H] } // Lock locks the getter for single user.