Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epoch Interface Refactor: Split into TentativeEpoch and CommittedEpoch #6941

Open
wants to merge 6 commits into
base: feature/epoch-interface-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/bootstrap/transit/cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func snapshot(cmd *cobra.Command, args []string) {
}

// check if given NodeID is part of the current or next epoch
currentIdentities, err := snapshot.Epochs().Current().InitialIdentities()
currentEpoch := snapshot.Epochs().Current()
currentIdentities, err := currentEpoch.InitialIdentities()
if err != nil {
log.Fatal().Err(err).Msg("could not get initial identities from current epoch")
}
Expand All @@ -89,7 +90,8 @@ func snapshot(cmd *cobra.Command, args []string) {
return
}

nextIdentities, err := snapshot.Epochs().Next().InitialIdentities()
nextEpoch := snapshot.Epochs().NextCommitted()
nextIdentities, err := nextEpoch.InitialIdentities()
if err != nil {
log.Fatal().Err(err).Msg("could not get initial identities from next epoch")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/epochs/cmd/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func extractResetEpochArgs(snapshot *inmem.Snapshot) []cadence.Value {
// ^ ^ ^-dkgPhase2FinalView
// | `-dkgPhase1FinalView
// `-stakingEndView
func getStakingAuctionEndView(epoch protocol.Epoch) (uint64, error) {
func getStakingAuctionEndView(epoch protocol.CommittedEpoch) (uint64, error) {
dkgPhase1FinalView, err := epoch.DKGPhase1FinalView()
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/committees/cluster_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewClusterCommittee(
state protocol.State,
payloads storage.ClusterPayloads,
cluster protocol.Cluster,
epoch protocol.Epoch,
epoch protocol.CommittedEpoch,
me flow.Identifier,
) (*Cluster, error) {
selection, err := leader.SelectionForCluster(cluster, epoch)
Expand Down
10 changes: 5 additions & 5 deletions consensus/hotstuff/committees/consensus_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (e *epochInfo) recomputeLeaderSelectionForExtendedViewRange(extension flow.
// newEpochInfo retrieves the committee information and computes leader selection.
// This can be cached and used for all by-view queries for this epoch.
// No errors are expected during normal operation.
func newEpochInfo(epoch protocol.Epoch) (*epochInfo, error) {
func newEpochInfo(epoch protocol.CommittedEpoch) (*epochInfo, error) {
randomSeed, err := epoch.RandomSource()
if err != nil {
return nil, fmt.Errorf("could not get epoch random source: %w", err)
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewConsensusCommittee(state protocol.State, me flow.Identifier) (*Consensus
final := state.Final()

// pre-compute leader selection for all presently relevant committed epochs
epochs := make([]protocol.Epoch, 0, 3)
epochs := make([]protocol.CommittedEpoch, 0, 3)

// we prepare the previous epoch, if one exists
exists, err := protocol.PreviousEpochExists(final)
Expand All @@ -168,7 +168,7 @@ func NewConsensusCommittee(state protocol.State, me flow.Identifier) (*Consensus
return nil, fmt.Errorf("could not check epoch phase: %w", err)
}
if phase == flow.EpochPhaseCommitted {
epochs = append(epochs, final.Epochs().Next())
epochs = append(epochs, final.Epochs().NextCommitted())
}

for _, epoch := range epochs {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (c *Consensus) handleEpochExtended(epochCounter uint64, extension flow.Epoc
// When the next epoch is committed, we compute leader selection for the epoch and cache it.
// No errors are expected during normal operation.
func (c *Consensus) handleEpochCommittedPhaseStarted(refBlock *flow.Header) error {
epoch := c.state.AtHeight(refBlock.Height).Epochs().Next()
epoch := c.state.AtHeight(refBlock.Height).Epochs().NextCommitted()
_, err := c.prepareEpoch(epoch)
if err != nil {
return fmt.Errorf("could not cache data for committed next epoch: %w", err)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *Consensus) epochInfoByView(view uint64) (*epochInfo, error) {
// Calling prepareEpoch multiple times for the same epoch returns cached epoch information.
// Input must be a committed epoch.
// No errors are expected during normal operation.
func (c *Consensus) prepareEpoch(epoch protocol.Epoch) (*epochInfo, error) {
func (c *Consensus) prepareEpoch(epoch protocol.CommittedEpoch) (*epochInfo, error) {
counter, err := epoch.Counter()
if err != nil {
return nil, fmt.Errorf("could not get counter for epoch to prepare: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (suite *ConsensusSuite) CreateAndStartCommittee() {

// CommitEpoch adds the epoch to the protocol state and mimics the protocol state
// behaviour when committing an epoch, by sending the protocol event to the committee.
func (suite *ConsensusSuite) CommitEpoch(epoch protocol.Epoch) {
func (suite *ConsensusSuite) CommitEpoch(epoch protocol.CommittedEpoch) {
firstBlockOfCommittedPhase := unittest.BlockHeaderFixture()
suite.state.On("AtHeight", firstBlockOfCommittedPhase.Height).Return(suite.snapshot)
suite.epochs.Add(epoch)
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/committees/leader/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// committee in the given epoch. A cluster containing nodes with zero `InitialWeight`
// is an accepted input as long as there are nodes with positive weights. Zero-weight nodes
// have zero probability of being selected as leaders in accordance with their weight.
func SelectionForCluster(cluster protocol.Cluster, epoch protocol.Epoch) (*LeaderSelection, error) {
func SelectionForCluster(cluster protocol.Cluster, epoch protocol.CommittedEpoch) (*LeaderSelection, error) {
// sanity check to ensure the cluster and epoch match
counter, err := epoch.Counter()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/committees/leader/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// SelectionForConsensusFromEpoch is a ...
func SelectionForConsensusFromEpoch(epoch protocol.Epoch) (*LeaderSelection, error) {
func SelectionForConsensusFromEpoch(epoch protocol.CommittedEpoch) (*LeaderSelection, error) {

identities, err := epoch.InitialIdentities()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions consensus/hotstuff/cruisectl/block_time_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type epochTiming struct {
}

// newEpochTiming queries the timing information from the given `epoch` and returns it as a new `epochTiming` instance.
func newEpochTiming(epoch protocol.Epoch) (*epochTiming, error) {
func newEpochTiming(epoch protocol.CommittedEpoch) (*epochTiming, error) {
firstView, err := epoch.FirstView()
if err != nil {
return nil, fmt.Errorf("could not retrieve epoch's first view: %w", err)
Expand Down Expand Up @@ -191,7 +191,7 @@ func (ctl *BlockTimeController) initEpochTiming() error {
return fmt.Errorf("could not check snapshot phase: %w", err)
}
if phase == flow.EpochPhaseCommitted {
ctl.nextEpochTiming, err = newEpochTiming(finalSnapshot.Epochs().Next())
ctl.nextEpochTiming, err = newEpochTiming(finalSnapshot.Epochs().NextCommitted())
if err != nil {
return fmt.Errorf("failed to retrieve the next epoch's timing information: %w", err)
}
Expand Down Expand Up @@ -459,7 +459,7 @@ func (ctl *BlockTimeController) processEpochExtended(first *flow.Header) error {
func (ctl *BlockTimeController) processEpochCommittedPhaseStarted(first *flow.Header) error {
var err error
snapshot := ctl.state.AtHeight(first.Height)
ctl.nextEpochTiming, err = newEpochTiming(snapshot.Epochs().Next())
ctl.nextEpochTiming, err = newEpochTiming(snapshot.Epochs().NextCommitted())
if err != nil {
return fmt.Errorf("failed to retrieve the next epoch's timing information: %w", err)
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/hotstuff/cruisectl/block_time_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (bs *BlockTimeControllerSuite) AssertCorrectInitialization() {

// if next epoch is committed, final view should be set
if phase := bs.epochs.Phase(); phase == flow.EpochPhaseCommitted {
finalView, err := bs.epochs.Next().FinalView()
finalView, err := bs.epochs.NextCommitted().FinalView()
require.NoError(bs.T(), err)
require.NotNil(bs.T(), bs.ctl.nextEpochTiming)
assert.Equal(bs.T(), finalView, bs.ctl.nextEpochTiming.finalView)
Expand Down Expand Up @@ -232,8 +232,9 @@ func (bs *BlockTimeControllerSuite) TestOnEpochExtended() {
FirstView: setupFixture.FirstView,
FinalView: setupFixture.FinalView,
}
commitFixture := unittest.EpochCommitFixture()

epoch := inmem.NewSetupEpoch(setupFixture, []flow.EpochExtension{extension})
epoch := inmem.NewCommittedEpoch(setupFixture, []flow.EpochExtension{extension}, commitFixture)
bs.epochs.Add(epoch)
bs.epochs.Transition()

Expand Down
4 changes: 2 additions & 2 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type epochInfo struct {
}

// buildEpochLookupList is a helper function which builds an auxiliary structure of epochs sorted by counter
func buildEpochLookupList(epochs ...protocol.Epoch) []epochInfo {
func buildEpochLookupList(epochs ...protocol.CommittedEpoch) []epochInfo {
infos := make([]epochInfo, 0)
for _, epoch := range epochs {
finalView, err := epoch.FinalView()
Expand Down Expand Up @@ -193,7 +193,7 @@ func createNodes(t *testing.T, participants *ConsensusParticipants, rootSnapshot
require.NoError(t, err)

epochViewLookup := buildEpochLookupList(rootSnapshot.Epochs().Current(),
rootSnapshot.Epochs().Next())
rootSnapshot.Epochs().NextCommitted())

epochLookup := &mockmodule.EpochLookup{}
epochLookup.On("EpochForView", mock.Anything).Return(
Expand Down
6 changes: 3 additions & 3 deletions engine/collection/epochmgr/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (e *Engine) Done() <-chan struct{} {
// the given epoch, using the configured factory.
// Error returns:
// - ErrNotAuthorizedForEpoch if this node is not authorized in the epoch.
func (e *Engine) createEpochComponents(epoch protocol.Epoch) (*EpochComponents, error) {
func (e *Engine) createEpochComponents(epoch protocol.CommittedEpoch) (*EpochComponents, error) {
counter, err := epoch.Counter()
if err != nil {
return nil, fmt.Errorf("could not get epoch counter: %w", err)
Expand Down Expand Up @@ -343,7 +343,7 @@ func (e *Engine) handleEpochEvents(ctx irrecoverable.SignalerContext, ready comp
ctx.Throw(err)
}
case firstBlock := <-e.epochSetupPhaseStartedEvents:
nextEpoch := e.state.AtBlockID(firstBlock.ID()).Epochs().Next()
nextEpoch := e.state.AtBlockID(firstBlock.ID()).Epochs().NextUnsafe()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only occur at the start of the EpochSetup phase

e.onEpochSetupPhaseStarted(ctx, nextEpoch)
case epochCounter := <-e.epochStopEvents:
err := e.stopEpochComponents(epochCounter)
Expand Down Expand Up @@ -454,7 +454,7 @@ func (e *Engine) prepareToStopEpochComponents(epochCounter, epochMaxHeight uint6
// setup phase, or when the node is restarted during the epoch setup phase. It
// kicks off setup tasks for the phase, in particular submitting a vote for the
// next epoch's root cluster QC.
func (e *Engine) onEpochSetupPhaseStarted(ctx irrecoverable.SignalerContext, nextEpoch protocol.Epoch) {
func (e *Engine) onEpochSetupPhaseStarted(ctx irrecoverable.SignalerContext, nextEpoch protocol.TentativeEpoch) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
42 changes: 26 additions & 16 deletions engine/collection/epochmgr/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,35 @@ type Suite struct {
func (suite *Suite) MockFactoryCreate(arg any) {
suite.factory.On("Create", arg).
Run(func(args mock.Arguments) {
epoch, ok := args.Get(0).(realprotocol.Epoch)
epoch, ok := args.Get(0).(realprotocol.CommittedEpoch)
suite.Require().Truef(ok, "invalid type %T", args.Get(0))
counter, err := epoch.Counter()
suite.Require().Nil(err)
suite.components[counter] = newMockComponents(suite.T())
}).
Return(
func(epoch realprotocol.Epoch) realcluster.State { return suite.ComponentsForEpoch(epoch).state },
func(epoch realprotocol.Epoch) component.Component { return suite.ComponentsForEpoch(epoch).prop },
func(epoch realprotocol.Epoch) realmodule.ReadyDoneAware { return suite.ComponentsForEpoch(epoch).sync },
func(epoch realprotocol.Epoch) realmodule.HotStuff { return suite.ComponentsForEpoch(epoch).hotstuff },
func(epoch realprotocol.Epoch) hotstuff.VoteAggregator {
func(epoch realprotocol.CommittedEpoch) realcluster.State {
return suite.ComponentsForEpoch(epoch).state
},
func(epoch realprotocol.CommittedEpoch) component.Component {
return suite.ComponentsForEpoch(epoch).prop
},
func(epoch realprotocol.CommittedEpoch) realmodule.ReadyDoneAware {
return suite.ComponentsForEpoch(epoch).sync
},
func(epoch realprotocol.CommittedEpoch) realmodule.HotStuff {
return suite.ComponentsForEpoch(epoch).hotstuff
},
func(epoch realprotocol.CommittedEpoch) hotstuff.VoteAggregator {
return suite.ComponentsForEpoch(epoch).voteAggregator
},
func(epoch realprotocol.Epoch) hotstuff.TimeoutAggregator {
func(epoch realprotocol.CommittedEpoch) hotstuff.TimeoutAggregator {
return suite.ComponentsForEpoch(epoch).timeoutAggregator
},
func(epoch realprotocol.Epoch) component.Component { return suite.ComponentsForEpoch(epoch).messageHub },
func(epoch realprotocol.Epoch) error { return nil },
func(epoch realprotocol.CommittedEpoch) component.Component {
return suite.ComponentsForEpoch(epoch).messageHub
},
func(epoch realprotocol.CommittedEpoch) error { return nil },
).Maybe()
}

Expand Down Expand Up @@ -239,7 +249,7 @@ func (suite *Suite) AssertEpochStopped(counter uint64) {
components.sync.AssertCalled(suite.T(), "Done")
}

func (suite *Suite) ComponentsForEpoch(epoch realprotocol.Epoch) *mockComponents {
func (suite *Suite) ComponentsForEpoch(epoch realprotocol.CommittedEpoch) *mockComponents {
counter, err := epoch.Counter()
suite.Require().Nil(err, "cannot get counter")
components, ok := suite.components[counter]
Expand All @@ -252,12 +262,12 @@ func (suite *Suite) ComponentsForEpoch(epoch realprotocol.Epoch) *mockComponents
func (suite *Suite) MockAsUnauthorizedNode(forEpoch uint64) {

// mock as unauthorized for given epoch only
unauthorizedMatcher := func(epoch realprotocol.Epoch) bool {
unauthorizedMatcher := func(epoch realprotocol.CommittedEpoch) bool {
counter, err := epoch.Counter()
require.NoError(suite.T(), err)
return counter == forEpoch
}
authorizedMatcher := func(epoch realprotocol.Epoch) bool { return !unauthorizedMatcher(epoch) }
authorizedMatcher := func(epoch realprotocol.CommittedEpoch) bool { return !unauthorizedMatcher(epoch) }

suite.factory = epochmgr.NewEpochComponentsFactory(suite.T())
suite.factory.
Expand All @@ -280,7 +290,7 @@ func (suite *Suite) TestRestartInSetupPhase() {
suite.phase = flow.EpochPhaseSetup
// should call voter with next epoch
var called = make(chan struct{})
suite.voter.On("Vote", mock.Anything, suite.epochQuery.Next()).
suite.voter.On("Vote", mock.Anything, suite.epochQuery.NextUnsafe()).
Return(nil).
Run(func(args mock.Arguments) {
close(called)
Expand Down Expand Up @@ -418,7 +428,7 @@ func (suite *Suite) TestStartAsUnauthorizedNode() {
suite.phase = flow.EpochPhaseSetup
// should call voter with next epoch
var called = make(chan struct{})
suite.voter.On("Vote", mock.Anything, suite.epochQuery.Next()).
suite.voter.On("Vote", mock.Anything, suite.epochQuery.NextUnsafe()).
Return(nil).
Run(func(args mock.Arguments) {
close(called)
Expand All @@ -444,7 +454,7 @@ func (suite *Suite) TestRespondToPhaseChange() {
suite.phase = flow.EpochPhaseStaking
// should call voter with next epoch
var called = make(chan struct{})
suite.voter.On("Vote", mock.Anything, suite.epochQuery.Next()).
suite.voter.On("Vote", mock.Anything, suite.epochQuery.NextUnsafe()).
Return(nil).
Run(func(args mock.Arguments) {
close(called)
Expand Down Expand Up @@ -543,7 +553,7 @@ func (suite *Suite) TestStopQcVoting() {
suite.engineEventsDistributor.On("ActiveClustersChanged", mock.AnythingOfType("flow.ChainIDList")).Once()

receivedCancelSignal := make(chan struct{})
suite.voter.On("Vote", mock.Anything, suite.epochQuery.Next()).
suite.voter.On("Vote", mock.Anything, suite.epochQuery.NextUnsafe()).
Return(nil).
Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/epochmgr/factories/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewEpochComponentsFactory(
}

func (factory *EpochComponentsFactory) Create(
epoch protocol.Epoch,
epoch protocol.CommittedEpoch,
) (
state cluster.State,
compliance component.Component,
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/epochmgr/factories/hotstuff.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewHotStuffFactory(
}

func (f *HotStuffFactory) CreateModules(
epoch protocol.Epoch,
epoch protocol.CommittedEpoch,
cluster protocol.Cluster,
clusterState cluster.State,
headers storage.Headers,
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/epochmgr/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type EpochComponentsFactory interface {
// a given epoch counter.
//
// Must return ErrNotAuthorizedForEpoch if this node is not authorized in the epoch.
Create(epoch protocol.Epoch) (
Create(epoch protocol.CommittedEpoch) (
state cluster.State,
proposal component.Component,
sync module.ReadyDoneAware,
Expand Down
Loading
Loading