diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index c1727901..8f2d92fb 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -6,17 +6,54 @@ on: branches: - main pull_request: + jobs: + setup: + name: Setup + runs-on: ubuntu-latest + outputs: + go-version: ${{ steps.go-version.outputs.go-version }} + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Read .go-version file + id: go-version + run: | + echo "go-version=$(cat .go-version)" >> $GITHUB_OUTPUT + + lint: + needs: [setup] + name: Lint + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v4 + with: + go-version: ${{ needs.setup.outputs.go-version }} + + - name: golangci-lint + uses: golangci/golangci-lint-action@v3.7.0 + with: + args: --timeout 10m + version: v1.55 + skip-pkg-cache: true + skip-build-cache: true build: + needs: [setup, lint] + name: Build runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v4 with: - go-version: 1.21 + go-version: ${{ needs.setup.outputs.go-version }} - name: Build run: go build -v ./... diff --git a/.go-version b/.go-version new file mode 100644 index 00000000..d2ab029d --- /dev/null +++ b/.go-version @@ -0,0 +1 @@ +1.21 diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..975a818f --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,72 @@ +run: + timeout: 5m + +linters: + enable: + - bodyclose + # - depguard as of v1.54.2, the default config throws errors on our repo + - dogsled + - dupl + - errcheck + # - funlen + # - gochecknoglobals + # - gochecknoinits + - goconst + - gocritic + # - gocyclo + # - godox + - gofmt + - goimports + # - golint - deprecated since v1.41. revive will be used instead + - revive + - gosec + - gosimple + - govet + - ineffassign + # - interfacer + - lll + - misspell + # - maligned + - nakedret + - prealloc + # - scopelint - deprecated since v1.39. exportloopref will be used instead + - exportloopref + - staticcheck + - stylecheck + - typecheck + - unconvert + # - unparam + - unused + # - whitespace + # - wsl + # - gocognit + - nolintlint + - asciicheck + +issues: + exclude-rules: + - path: _test\.go + linters: + - gosec + - govet + - linters: + - lll + source: "https://" + max-same-issues: 50 + +linters-settings: + dogsled: + max-blank-identifiers: 3 + golint: + min-confidence: 0 + maligned: + suggest-new: true + misspell: + locale: US + goimports: + local-prefixes: github.com/celestiaorg/go-header + dupl: + threshold: 200 + govet: + enable: + - fieldalignment diff --git a/headertest/dummy_header.go b/headertest/dummy_header.go index e8480dde..b1fb7d46 100644 --- a/headertest/dummy_header.go +++ b/headertest/dummy_header.go @@ -17,13 +17,15 @@ import ( var ErrDummyVerify = errors.New("dummy verify error") type DummyHeader struct { + Timestamp time.Time + Chainid string PreviousHash header.Hash - HeightI uint64 - Timestamp time.Time hash header.Hash + HeightI uint64 + // VerifyFailure allows for testing scenarios where a header would fail // verification. When set to true, it forces a failure. VerifyFailure bool diff --git a/headertest/store.go b/headertest/store.go index 7a663360..cffcefab 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -23,7 +23,7 @@ func NewDummyStore(t *testing.T) *Store[*DummyHeader] { } // NewStore creates a generic mock store supporting different type of Headers based on Generator. -func NewStore[H header.Header[H]](t *testing.T, gen Generator[H], numHeaders int) *Store[H] { +func NewStore[H header.Header[H]](_ *testing.T, gen Generator[H], numHeaders int) *Store[H] { store := &Store[H]{ Headers: make(map[uint64]H), HeadHeight: 0, @@ -43,14 +43,14 @@ func NewStore[H header.Header[H]](t *testing.T, gen Generator[H], numHeaders int func (m *Store[H]) Init(context.Context, H) error { return nil } func (m *Store[H]) Height() uint64 { - return uint64(m.HeadHeight) + return m.HeadHeight } func (m *Store[H]) Head(context.Context, ...header.HeadOption[H]) (H, error) { return m.Headers[m.HeadHeight], nil } -func (m *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { +func (m *Store[H]) Get(_ context.Context, hash header.Hash) (H, error) { for _, header := range m.Headers { if bytes.Equal(header.Hash(), hash) { return header, nil @@ -60,7 +60,7 @@ func (m *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { return zero, header.ErrNotFound } -func (m *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { +func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { return m.Headers[height], nil } @@ -74,7 +74,7 @@ func (m *Store[H]) GetRangeByHeight(ctx context.Context, fromHead H, to uint64) return m.getRangeByHeight(ctx, from, to) } -func (m *Store[H]) getRangeByHeight(ctx context.Context, from, to uint64) ([]H, error) { +func (m *Store[H]) getRangeByHeight(_ context.Context, from, to uint64) ([]H, error) { amount := to - from headers := make([]H, amount) @@ -99,7 +99,7 @@ func (m *Store[H]) HasAt(_ context.Context, height uint64) bool { return height != 0 && m.HeadHeight >= height } -func (m *Store[H]) Append(ctx context.Context, headers ...H) error { +func (m *Store[H]) Append(_ context.Context, headers ...H) error { for _, header := range headers { m.Headers[header.Height()] = header // set head diff --git a/headertest/subscriber.go b/headertest/subscriber.go index 64e92b98..b64e3b99 100644 --- a/headertest/subscriber.go +++ b/headertest/subscriber.go @@ -22,7 +22,7 @@ func (mhs *Subscriber[H]) Subscribe() (header.Subscription[H], error) { return mhs, nil } -func (mhs *Subscriber[H]) NextHeader(ctx context.Context) (H, error) { +func (mhs *Subscriber[H]) NextHeader(_ context.Context) (H, error) { defer func() { if len(mhs.Headers) > 1 { // pop the already-returned header diff --git a/p2p/exchange.go b/p2p/exchange.go index 607bfb0f..331f9fbc 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -34,8 +34,8 @@ var maxUntrustedHeadRequests = 4 // Exchange enables sending outbound HeaderRequests to the network as well as // handling inbound HeaderRequests from the network. type Exchange[H header.Header[H]] struct { - ctx context.Context cancel context.CancelFunc + ctx context.Context protocolID protocol.ID host host.Host diff --git a/p2p/exchange_metrics.go b/p2p/exchange_metrics.go index 82456c2f..b642e867 100644 --- a/p2p/exchange_metrics.go +++ b/p2p/exchange_metrics.go @@ -31,17 +31,20 @@ type exchangeMetrics struct { responseSizeInst metric.Int64Histogram responseTimeInst metric.Float64Histogram - trackerPeersNum atomic.Int64 trackedPeersNumInst metric.Int64ObservableGauge trackedPeersNumReg metric.Registration - disconnectedPeersNum atomic.Int64 disconnectedPeersNumInst metric.Int64ObservableGauge disconnectedPeersNumReg metric.Registration - blockedPeersNum atomic.Int64 blockedPeersNumInst metric.Int64ObservableGauge blockedPeersNumReg metric.Registration + + trackerPeersNum atomic.Int64 + + disconnectedPeersNum atomic.Int64 + + blockedPeersNum atomic.Int64 } func newExchangeMetrics() (m *exchangeMetrics, err error) { diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index 38298e0e..74c43c1a 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -51,10 +51,10 @@ func TestExchange_RequestHead(t *testing.T) { }) tests := []struct { - requestFromTrusted bool lastHeader *headertest.DummyHeader - expectedHeight uint64 expectedHash header.Hash + expectedHeight uint64 + requestFromTrusted bool }{ // routes to trusted peer only { @@ -607,7 +607,12 @@ func quicHosts(t *testing.T, n int) []libhost.Host { return hosts } -func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer.ID) *Exchange[*headertest.DummyHeader] { +func client( + ctx context.Context, + t *testing.T, + host libhost.Host, + trusted []peer.ID, +) *Exchange[*headertest.DummyHeader] { client, err := NewExchange[*headertest.DummyHeader](host, trusted, nil) require.NoError(t, err) @@ -621,7 +626,12 @@ func client(ctx context.Context, t *testing.T, host libhost.Host, trusted []peer return client } -func server(ctx context.Context, t *testing.T, host libhost.Host, store header.Store[*headertest.DummyHeader]) *ExchangeServer[*headertest.DummyHeader] { +func server( + ctx context.Context, + t *testing.T, + host libhost.Host, + store header.Store[*headertest.DummyHeader], +) *ExchangeServer[*headertest.DummyHeader] { server, err := NewExchangeServer[*headertest.DummyHeader](host, store) require.NoError(t, err) err = server.Start(ctx) @@ -643,7 +653,10 @@ func (t *timedOutStore) HasAt(_ context.Context, _ uint64) bool { return true } -func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.DummyHeader]) (*headertest.DummyHeader, error) { +func (t *timedOutStore) Head( + context.Context, + ...header.HeadOption[*headertest.DummyHeader], +) (*headertest.DummyHeader, error) { time.Sleep(t.timeout) return nil, header.ErrNoHead } diff --git a/p2p/options.go b/p2p/options.go index b1b4b9cf..fd16ac4e 100644 --- a/p2p/options.go +++ b/p2p/options.go @@ -18,6 +18,9 @@ type Option[T parameters] func(*T) // ServerParameters is the set of parameters that must be configured for the exchange. type ServerParameters struct { + // networkID is a network that will be used to create a protocol.ID + // Is empty by default + networkID string // WriteDeadline sets the timeout for sending messages to the stream WriteDeadline time.Duration // ReadDeadline sets the timeout for reading messages from the stream @@ -25,9 +28,6 @@ type ServerParameters struct { // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - // Is empty by default - networkID string // metrics is a flag that enables metrics collection. metrics bool } @@ -57,7 +57,7 @@ func (p *ServerParameters) Validate() error { func WithMetrics[T parameters]() Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic + switch t := any(p).(type) { case *ServerParameters: t.metrics = true case *ClientParameters: @@ -123,19 +123,19 @@ func WithParams[T parameters](params T) Option[T] { // ClientParameters is the set of parameters that must be configured for the exchange. type ClientParameters struct { + // pidstore is an optional interface used to periodically dump peers + pidstore PeerIDStore + // networkID is a network that will be used to create a protocol.ID + networkID string + // chainID is an identifier of the chain. + chainID string // MaxHeadersPerRangeRequest defines the max amount of headers that can be requested per 1 request. MaxHeadersPerRangeRequest uint64 // RangeRequestTimeout defines a timeout after which the session will try to re-request headers // from another peer. RangeRequestTimeout time.Duration - // networkID is a network that will be used to create a protocol.ID - networkID string - // chainID is an identifier of the chain. - chainID string // metrics is a flag that enables metrics collection. metrics bool - // pidstore is an optional interface used to periodically dump peers - pidstore PeerIDStore } // DefaultClientParameters returns the default params to configure the store. @@ -178,8 +178,7 @@ func WithMaxHeadersPerRangeRequest[T ClientParameters](amount uint64) Option[T] // `chainID` parameter. func WithChainID[T ClientParameters](chainID string) Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic - case *ClientParameters: + if t, ok := any(p).(*ClientParameters); ok { t.chainID = chainID } } @@ -189,8 +188,7 @@ func WithChainID[T ClientParameters](chainID string) Option[T] { // inside the peerTracker. func WithPeerIDStore[T ClientParameters](pidstore PeerIDStore) Option[T] { return func(p *T) { - switch t := any(p).(type) { //nolint:gocritic - case *ClientParameters: + if t, ok := any(p).(*ClientParameters); ok { t.pidstore = pidstore } } diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index 0277fcd4..dc94488a 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -10,14 +10,16 @@ import ( ) // peerStat represents a peer's average statistics. +// +//nolint:govet type peerStat struct { sync.RWMutex - peerID peer.ID - // score is the average speed per single request - peerScore float32 // pruneDeadline specifies when disconnected peer will be removed if // it does not return online. pruneDeadline time.Time + peerID peer.ID + // score is the average speed per single request + peerScore float32 } // updateStats recalculates peer.score by averaging the last score @@ -100,10 +102,10 @@ func (ps *peerStats) Pop() any { type peerQueue struct { ctx context.Context - statsLk sync.RWMutex - stats peerStats - havePeer chan struct{} + stats peerStats + + statsLk sync.RWMutex } func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index 33fffc91..67ca1d32 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -27,12 +27,19 @@ var ( gcCycle = time.Minute * 5 ) +//nolint:govet type peerTracker struct { - host host.Host + peerLk sync.RWMutex + host host.Host + + // an optional interface used to periodically dump + // good peers during garbage collection + pidstore PeerIDStore + + ctx context.Context connGater *conngater.BasicConnectionGater metrics *exchangeMetrics - peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers @@ -41,11 +48,6 @@ type peerTracker struct { // online until pruneDeadline, it will be removed and its score will be lost disconnectedPeers map[libpeer.ID]*peerStat - // an optional interface used to periodically dump - // good peers during garbage collection - pidstore PeerIDStore - - ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. // It allows to wait until track() and gc() will be stopped. diff --git a/p2p/server.go b/p2p/server.go index 327b9158..08df2030 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -242,10 +242,10 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { log.Debugw("server: serving partial range", "prevMaxHeight", to, - "newMaxHeight", uint64(head.Height())+1, + "newMaxHeight", head.Height()+1, ) // change `to` height to return a partial range - to = uint64(head.Height()) + 1 + to = head.Height() + 1 } headersByRange, err := serv.store.GetRange(ctx, from, to) diff --git a/p2p/session.go b/p2p/session.go index 37f1e258..2943a64f 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -29,21 +29,22 @@ func withValidation[H header.Header[H]](from H) option[H] { // session aims to divide a range of headers // into several smaller requests among different peers. type session[H header.Header[H]] struct { - host host.Host - protocolID protocol.ID - queue *peerQueue + host host.Host + + // Otherwise, it will be nil. + // `from` is set when additional validation for range is needed. + from H + + ctx context.Context + queue *peerQueue // peerTracker contains discovered peers with records that describes their activity. peerTracker *peerTracker metrics *exchangeMetrics - // Otherwise, it will be nil. - // `from` is set when additional validation for range is needed. - from H + cancel context.CancelFunc + reqCh chan *p2p_pb.HeaderRequest + protocolID protocol.ID requestTimeout time.Duration - - ctx context.Context - cancel context.CancelFunc - reqCh chan *p2p_pb.HeaderRequest } func newSession[H header.Header[H]]( diff --git a/p2p/subscriber.go b/p2p/subscriber.go index c5f70dcc..eded8539 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -24,12 +24,11 @@ type SubscriberParams struct { // Subscriber manages the lifecycle and relationship of header Module // with the "header-sub" gossipsub topic. type Subscriber[H header.Header[H]] struct { + metrics *subscriberMetrics + pubsub *pubsub.PubSub + topic *pubsub.Topic + msgID pubsub.MsgIdFunction pubsubTopicID string - - metrics *subscriberMetrics - pubsub *pubsub.PubSub - topic *pubsub.Topic - msgID pubsub.MsgIdFunction } // WithSubscriberMetrics enables metrics collection for the Subscriber. diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go index 164ff371..1ed56034 100644 --- a/p2p/subscriber_metrics.go +++ b/p2p/subscriber_metrics.go @@ -20,12 +20,14 @@ type subscriberMetrics struct { messageNumInst metric.Int64Counter messageSizeInst metric.Int64Histogram - messageTimeLast atomic.Pointer[time.Time] messageTimeInst metric.Float64Histogram - subscriptionNum atomic.Int64 subscriptionNumInst metric.Int64ObservableGauge subscriptionNumReg metric.Registration + + messageTimeLast atomic.Pointer[time.Time] + + subscriptionNum atomic.Int64 } func newSubscriberMetrics() (m *subscriberMetrics, err error) { diff --git a/p2p/subscription_test.go b/p2p/subscription_test.go index e9e53700..81430872 100644 --- a/p2p/subscription_test.go +++ b/p2p/subscription_test.go @@ -30,7 +30,11 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 1 - p2pSub1, err := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) + p2pSub1, err := NewSubscriber[*headertest.DummyHeader]( + pubsub1, + pubsub.DefaultMsgIdFn, + WithSubscriberNetworkID(networkID), + ) require.NoError(t, err) err = p2pSub1.Start(context.Background()) require.NoError(t, err) @@ -41,7 +45,11 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 2 - p2pSub2, err := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) + p2pSub2, err := NewSubscriber[*headertest.DummyHeader]( + pubsub2, + pubsub.DefaultMsgIdFn, + WithSubscriberNetworkID(networkID), + ) require.NoError(t, err) err = p2pSub2.Start(context.Background()) require.NoError(t, err) diff --git a/store/batch.go b/store/batch.go index 7785953f..db50c2bb 100644 --- a/store/batch.go +++ b/store/batch.go @@ -13,6 +13,8 @@ import ( // unlike the Store which keeps 'hash -> header' and 'height -> hash'. // The approach simplifies implementation for the batch and // makes it better optimized for the GetByHeight case which is what we need. +// +//nolint:govet type batch[H header.Header[H]] struct { lk sync.RWMutex heights map[string]uint64 diff --git a/store/heightsub.go b/store/heightsub.go index a69f28f6..d72c7d47 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -14,11 +14,11 @@ var errElapsedHeight = errors.New("elapsed height") // heightSub provides a minimalistic mechanism to wait till header for a height becomes available. type heightSub[H header.Header[H]] struct { + heightReqs map[uint64][]chan H // height refers to the latest locally available header height // that has been fully verified and inserted into the subjective chain height atomic.Uint64 heightReqsLk sync.Mutex - heightReqs map[uint64][]chan H } // newHeightSub instantiates new heightSub. @@ -84,7 +84,11 @@ func (hs *heightSub[H]) Pub(headers ...H) { height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) + log.Fatalf( + "PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", + height+1, + from, + ) return } hs.SetHeight(to) diff --git a/store/init.go b/store/init.go index 5d26a97b..ae35b588 100644 --- a/store/init.go +++ b/store/init.go @@ -9,7 +9,12 @@ import ( // Init ensures a Store is initialized. If it is not already initialized, // it initializes the Store by requesting the header with the given hash. -func Init[H header.Header[H]](ctx context.Context, store header.Store[H], ex header.Exchange[H], hash header.Hash) error { +func Init[H header.Header[H]]( + ctx context.Context, + store header.Store[H], + ex header.Exchange[H], + hash header.Hash, +) error { _, err := store.Head(ctx) switch { default: diff --git a/store/metrics.go b/store/metrics.go index e5f14211..ee72132f 100644 --- a/store/metrics.go +++ b/store/metrics.go @@ -13,7 +13,6 @@ import ( var meter = otel.Meter("header/store") type metrics struct { - headHeight atomic.Int64 headHeightInst metric.Int64ObservableGauge headHeightReg metric.Registration @@ -21,6 +20,7 @@ type metrics struct { readTimeInst metric.Float64Histogram writesQueueBlockedInst metric.Int64Counter + headHeight atomic.Int64 } func newMetrics() (m *metrics, err error) { diff --git a/store/options.go b/store/options.go index b0c01d62..95da0c41 100644 --- a/store/options.go +++ b/store/options.go @@ -12,6 +12,11 @@ type Option func(*Parameters) // Parameters is the set of parameters that must be configured for the store. type Parameters struct { + + // storePrefix defines the prefix used to wrap the store + // OPTIONAL + storePrefix datastore.Key + // StoreCacheSize defines the maximum amount of entries in the Header Store cache. StoreCacheSize int @@ -22,10 +27,6 @@ type Parameters struct { // Headers are written in batches not to thrash the underlying Datastore with writes. WriteBatchSize int - // storePrefix defines the prefix used to wrap the store - // OPTIONAL - storePrefix datastore.Key - // metrics is a flag that enables metrics collection metrics bool } diff --git a/store/testing.go b/store/testing.go index b415bcea..3b64c060 100644 --- a/store/testing.go +++ b/store/testing.go @@ -13,7 +13,11 @@ import ( ) // NewTestStore creates initialized and started in memory header Store which is useful for testing. -func NewTestStore(ctx context.Context, t *testing.T, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { +func NewTestStore( + ctx context.Context, + t *testing.T, + head *headertest.DummyHeader, +) header.Store[*headertest.DummyHeader] { store, err := NewStoreWithHead(ctx, sync.MutexWrap(datastore.NewMapDatastore()), head) require.NoError(t, err) diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..a4466175 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -11,8 +11,8 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge + totalSynced atomic.Int64 } func newMetrics() (*metrics, error) { diff --git a/sync/ranges.go b/sync/ranges.go index 4329ab0c..3b5cf2cf 100644 --- a/sync/ranges.go +++ b/sync/ranges.go @@ -9,9 +9,12 @@ import ( // ranges keeps non-overlapping and non-adjacent header ranges which are used to cache headers (in // ascending order). This prevents unnecessary / duplicate network requests for additional headers // during sync. +// +// @ramin: allowing this one to place the syncRWMutex at bottom of +// struct as the alignment allows 32bytes -> 8 type ranges[H header.Header[H]] struct { - lk sync.RWMutex ranges []*headerRange[H] + lk sync.RWMutex } // Head returns the highest Header in all ranges if any. @@ -87,14 +90,14 @@ func (rs *ranges[H]) First() (*headerRange[H], bool) { } type headerRange[H header.Header[H]] struct { - lk sync.RWMutex headers []H start uint64 + lk sync.RWMutex } func newRange[H header.Header[H]](h H) *headerRange[H] { return &headerRange[H]{ - start: uint64(h.Height()), + start: h.Height(), headers: []H{h}, } } @@ -142,7 +145,7 @@ func (r *headerRange[H]) Remove(end uint64) { amnt := r.rangeAmount(end) r.headers = r.headers[amnt:] if len(r.headers) != 0 { - r.start = uint64(r.headers[0].Height()) + r.start = r.headers[0].Height() } } diff --git a/sync/sync.go b/sync/sync.go index 1dc53c27..4beaf763 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -30,27 +30,31 @@ var log = logging.Logger("header/sync") // - Sets as the new Subjective Head, which // - if there is a gap between the previous and the new Subjective Head // - Triggers s.syncLoop and saves the Subjective Head in the pending so s.syncLoop can access it +// +//nolint:govet type Syncer[H header.Header[H]] struct { - sub header.Subscriber[H] // to subscribe for new Network Heads - store syncStore[H] // to store all the headers to - getter syncGetter[H] // to fetch headers from - metrics *metrics - // stateLk protects state which represents the current or latest sync stateLk sync.RWMutex state State - // signals to start syncing - triggerSync chan struct{} - // pending keeps ranges of valid new network headers awaiting to be appended to store - pending ranges[H] // incomingMu ensures only one incoming network head candidate is processed at the time incomingMu sync.Mutex + store syncStore[H] // to store all the headers to + sub header.Subscriber[H] // to subscribe for new Network Heads + getter syncGetter[H] // to fetch headers from + metrics *metrics + // controls lifecycle for syncLoop ctx context.Context cancel context.CancelFunc + // signals to start syncing + triggerSync chan struct{} + + // pending keeps ranges of valid new network headers awaiting to be appended to store + pending ranges[H] + Params *Parameters } @@ -126,15 +130,15 @@ func (s *Syncer[H]) SyncWait(ctx context.Context) error { // State collects all the information about a sync. type State struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + Error string `json:"error,omitempty"` // the error that might happen within a sync + FromHash header.Hash `json:"from_hash"` + ToHash header.Hash `json:"to_hash"` ID uint64 `json:"id"` // incrementing ID of a sync Height uint64 `json:"height"` // height at the moment when State is requested for a sync FromHeight uint64 `json:"from_height"` // the starting point of a sync ToHeight uint64 `json:"to_height"` // the ending point of a sync - FromHash header.Hash `json:"from_hash"` - ToHash header.Hash `json:"to_hash"` - Start time.Time `json:"start"` - End time.Time `json:"end"` - Error string `json:"error,omitempty"` // the error that might happen within a sync } // Finished returns true if sync is done, false otherwise. @@ -158,7 +162,7 @@ func (s *Syncer[H]) State() State { head, err := s.store.Head(s.ctx) if err == nil { - state.Height = uint64(head.Height()) + state.Height = head.Height() } else if state.Error == "" { // don't ignore the error if we can show it in the state state.Error = err.Error() @@ -239,8 +243,8 @@ func (s *Syncer[H]) sync(ctx context.Context) { func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) { s.stateLk.Lock() s.state.ID++ - s.state.FromHeight = uint64(fromHead.Height()) + 1 - s.state.ToHeight = uint64(toHead.Height()) + s.state.FromHeight = fromHead.Height() + 1 + s.state.ToHeight = toHead.Height() s.state.FromHash = fromHead.Hash() s.state.ToHash = toHead.Hash() s.state.Start = time.Now() diff --git a/sync/sync_getter.go b/sync/sync_getter.go index 267240c5..b14e9cb5 100644 --- a/sync/sync_getter.go +++ b/sync/sync_getter.go @@ -9,6 +9,8 @@ import ( ) // syncGetter is a Getter wrapper that ensure only one Head call happens at the time +// +//nolint:govet type syncGetter[H header.Header[H]] struct { getterLk sync.RWMutex isGetterLk atomic.Bool diff --git a/sync/sync_getter_test.go b/sync/sync_getter_test.go index 16ddc196..2722e3c3 100644 --- a/sync/sync_getter_test.go +++ b/sync/sync_getter_test.go @@ -59,14 +59,21 @@ func (f *fakeGetter[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (h return } -func (f *fakeGetter[H]) Get(ctx context.Context, hash header.Hash) (H, error) { +func (f *fakeGetter[H]) Get( + _ context.Context, + _ header.Hash, +) (H, error) { panic("implement me") } -func (f *fakeGetter[H]) GetByHeight(ctx context.Context, u uint64) (H, error) { +func (f *fakeGetter[H]) GetByHeight(_ context.Context, _ uint64) (H, error) { panic("implement me") } -func (f *fakeGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) { +func (f *fakeGetter[H]) GetRangeByHeight( + _ context.Context, + _ H, + _ uint64, +) ([]H, error) { panic("implement me") } diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index 29b75ff4..bdfb3f76 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -114,7 +114,10 @@ func newWrappedGetter(ex header.Exchange[*headertest.DummyHeader]) *wrappedGette } } -func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[*headertest.DummyHeader]) (*headertest.DummyHeader, error) { +func (t *wrappedGetter) Head( + ctx context.Context, + options ...header.HeadOption[*headertest.DummyHeader], +) (*headertest.DummyHeader, error) { params := header.HeadParams[*headertest.DummyHeader]{} for _, opt := range options { opt(¶ms) @@ -125,21 +128,27 @@ func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[* return t.ex.Head(ctx, options...) } -func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) { - //TODO implement me +func (t *wrappedGetter) Get( + _ context.Context, + _ header.Hash, +) (*headertest.DummyHeader, error) { + // TODO implement me panic("implement me") } -func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - //TODO implement me +func (t *wrappedGetter) GetByHeight( + _ context.Context, + _ uint64, +) (*headertest.DummyHeader, error) { + // TODO implement me panic("implement me") } func (t *wrappedGetter) GetRangeByHeight( - ctx context.Context, - from *headertest.DummyHeader, - to uint64, + _ context.Context, + _ *headertest.DummyHeader, + _ uint64, ) ([]*headertest.DummyHeader, error) { - //TODO implement me + // TODO implement me panic("implement me") } diff --git a/sync/sync_test.go b/sync/sync_test.go index dc108cc5..585b9b78 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -54,9 +54,9 @@ func TestSyncSimpleRequestingHead(t *testing.T) { assert.Empty(t, syncer.pending.Head()) state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) + assert.Equal(t, exp.Height(), state.Height) assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) + assert.Equal(t, exp.Height(), state.ToHeight) assert.True(t, state.Finished(), state) } @@ -144,9 +144,9 @@ func TestSyncCatchUp(t *testing.T) { assert.Empty(t, syncer.pending.Head()) state := syncer.State() - assert.Equal(t, uint64(exp.Height()+1), state.Height) + assert.Equal(t, exp.Height()+1, state.Height) assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()+1), state.ToHeight) + assert.Equal(t, exp.Height()+1, state.ToHeight) assert.True(t, state.Finished(), state) } @@ -290,7 +290,7 @@ func TestSyncerIncomingDuplicate(t *testing.T) { // TestSync_InvalidSyncTarget tests the possible case that a sync target // passes non-adjacent verification but is actually invalid once it is processed -// via VerifyAdjacent during sync. The expected behaviour is that the syncer would +// via VerifyAdjacent during sync. The expected behavior is that the syncer would // discard the invalid sync target and listen for a new sync target from headersub // and sync the valid chain. func TestSync_InvalidSyncTarget(t *testing.T) { @@ -300,7 +300,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - // create a local store which is initialised at genesis height + // create a local store which is initialized at genesis height localStore := store.NewTestStore(ctx, t, head) // create a peer which is already on height 100 remoteStore := headertest.NewStore[*headertest.DummyHeader](t, suite, 100) @@ -341,7 +341,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { cancel() // ensure that syncer still expects to sync to the bad sync target's // height - require.Equal(t, uint64(maliciousHeader.Height()), syncer.State().ToHeight) + require.Equal(t, maliciousHeader.Height(), syncer.State().ToHeight) // ensure syncer could only sync up to one header below the bad sync target h, err := localStore.Head(ctx) require.NoError(t, err) @@ -370,7 +370,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { // ensure that maliciousHeader height was re-requested and a good one was // found - rerequested, err := localStore.GetByHeight(ctx, uint64(maliciousHeader.Height())) + rerequested, err := localStore.GetByHeight(ctx, maliciousHeader.Height()) require.NoError(t, err) require.False(t, rerequested.VerifyFailure) diff --git a/verify.go b/verify.go index 0c1f3e9e..a2ec58ce 100644 --- a/verify.go +++ b/verify.go @@ -59,12 +59,23 @@ func verify[H Header[H]](trstd, untrstd H, heightThreshold uint64) error { } if untrstd.Time().Before(trstd.Time()) { - return fmt.Errorf("%w: timestamp '%s' < current '%s'", ErrUnorderedTime, formatTime(untrstd.Time()), formatTime(trstd.Time())) + return fmt.Errorf( + "%w: timestamp '%s' < current '%s'", + ErrUnorderedTime, + formatTime(untrstd.Time()), + formatTime(trstd.Time()), + ) } now := time.Now() if untrstd.Time().After(now.Add(clockDrift)) { - return fmt.Errorf("%w: timestamp '%s' > now '%s', clock_drift '%v'", ErrFromFuture, formatTime(untrstd.Time()), formatTime(now), clockDrift) + return fmt.Errorf( + "%w: timestamp '%s' > now '%s', clock_drift '%v'", + ErrFromFuture, + formatTime(untrstd.Time()), + formatTime(now), + clockDrift, + ) } known := untrstd.Height() <= trstd.Height() @@ -76,7 +87,13 @@ func verify[H Header[H]](trstd, untrstd H, heightThreshold uint64) error { // yet taken as sync target adequateHeight := untrstd.Height()-trstd.Height() < heightThreshold if !adequateHeight { - return fmt.Errorf("%w: '%d' - current '%d' >= threshold '%d'", ErrHeightFromFuture, untrstd.Height(), trstd.Height(), heightThreshold) + return fmt.Errorf( + "%w: '%d' - current '%d' >= threshold '%d'", + ErrHeightFromFuture, + untrstd.Height(), + trstd.Height(), + heightThreshold, + ) } return nil