Skip to content

Commit

Permalink
Merge pull request #6452 from spacemeshos/node-split-smesher-states-poc
Browse files Browse the repository at this point in the history
node-split: add smeshing identities endpoint to smesher service PoC
  • Loading branch information
kacpersaw authored Nov 18, 2024
2 parents fac1e72 + 5e802e9 commit 79c0bd5
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 61 deletions.
27 changes: 26 additions & 1 deletion activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type Builder struct {
// states of each known identity
postStates PostStates

// identity states of each known identity
identitiesStates IdentityStates

// smeshingMutex protects methods like `StartSmeshing` and `StopSmeshing` from concurrent execution
// since they (can) modify the fields below.
smeshingMutex sync.Mutex
Expand Down Expand Up @@ -158,6 +161,12 @@ func WithPostStates(ps PostStates) BuilderOption {
}
}

func WithIdentityStates(is IdentityStates) BuilderOption {
return func(b *Builder) {
b.identitiesStates = is
}
}

func BuilderAtxVersions(v AtxVersions) BuilderOption {
return func(h *Builder) {
h.versions = append([]atxVersion{{0, types.AtxV1}}, v.asSlice()...)
Expand Down Expand Up @@ -191,6 +200,7 @@ func NewBuilder(
logger: log,
poetRetryInterval: defaultPoetRetryInterval,
postStates: NewPostStates(log),
identitiesStates: NewIdentityStateStorage(),
versions: []atxVersion{{0, types.AtxV1}},
posAtxFinder: positioningAtxFinder{
logger: log,
Expand Down Expand Up @@ -440,6 +450,8 @@ func (b *Builder) run(ctx context.Context, sig *signing.EdSigner) {

b.logger.Warn("failed to publish atx", zap.Error(err))

b.identitiesStates.Set(sig.NodeID(), nil, IdentityStateRetrying, err.Error())

poetErr := &PoetSvcUnstableError{}
switch {
case errors.Is(err, ErrATXChallengeExpired):
Expand Down Expand Up @@ -498,10 +510,18 @@ func (b *Builder) run(ctx context.Context, sig *signing.EdSigner) {

func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID) (*types.NIPostChallenge, error) {
logger := b.logger.With(log.ZShortStringer("smesherID", nodeID))

atxSyncedCh := b.syncer.RegisterForATXSynced()
select {
case <-atxSyncedCh:
default:
b.identitiesStates.Set(nodeID, nil, IdentityStateWaitForATXSynced, "")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-b.syncer.RegisterForATXSynced():
case <-atxSyncedCh:
}

currentEpochId := b.layerClock.CurrentLayer().GetEpoch()
Expand Down Expand Up @@ -551,6 +571,7 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
)
events.EmitPoetWaitRound(nodeID, currentEpochId, publishEpochId, wait)
events.EmitWaitingForPoETRegistrationWindow(nodeID, currentEpochId, publishEpochId, wait)
b.identitiesStates.Set(nodeID, &publishEpochId, IdentityStateWaitingForPoetRegistrationWindow, "")
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -715,6 +736,8 @@ func (b *Builder) PublishActivationTx(ctx context.Context, sig *signing.EdSigner
zap.Uint32("current_epoch", b.layerClock.CurrentLayer().GetEpoch().Uint32()),
zap.Object("challenge", challenge),
)
b.identitiesStates.Set(sig.NodeID(), &challenge.PublishEpoch, IdentityStatePoetChallengeReady, "")

targetEpoch := challenge.PublishEpoch.Add(1)
ctx, cancel := context.WithDeadline(ctx, b.layerClock.LayerToTime(targetEpoch.FirstLayer()))
defer cancel()
Expand All @@ -729,6 +752,7 @@ func (b *Builder) PublishActivationTx(ctx context.Context, sig *signing.EdSigner
zap.Uint32("current_layer", b.layerClock.CurrentLayer().Uint32()),
log.ZShortStringer("smesherID", sig.NodeID()),
)
b.identitiesStates.Set(sig.NodeID(), &challenge.PublishEpoch, IdentityStateATXReady, "")
select {
case <-ctx.Done():
return fmt.Errorf("wait for publication epoch: %w", ctx.Err())
Expand Down Expand Up @@ -774,6 +798,7 @@ func (b *Builder) PublishActivationTx(ctx context.Context, sig *signing.EdSigner
atx.ID(),
b.layerClock.LayerToTime(target.FirstLayer()),
)
b.identitiesStates.Set(sig.NodeID(), &challenge.PublishEpoch, IdentityStateATXBroadcasted, "")
return nil
}

Expand Down
132 changes: 132 additions & 0 deletions activation/identity_states.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package activation

import (
"errors"
"fmt"
"sync"
"time"

"github.com/spacemeshos/go-spacemesh/common/types"
)

var (
ErrIdentityStateUnknown = errors.New("identity state is unknown")
ErrInvalidIdentityStateSwitch = errors.New("invalid identity state switch")
)

type IdentityState int

const (
IdentityStateNotSet IdentityState = iota

IdentityStateWaitForATXSynced
IdentityStateRetrying

// poet.
IdentityStateWaitingForPoetRegistrationWindow
// building nipost challenge.
IdentityStatePoetChallengeReady
IdentityStatePoetRegistered
// 2w pass ...
IdentityStateWaitForPoetRoundEnd
IdentityStatePoetProofReceived

// post.
IdentityStateGeneratingPostProof
IdentityStatePostProofReady

// atx.
IdentityStateATXReady
IdentityStateATXBroadcasted
)

func (s IdentityState) String() string {
switch s {
case IdentityStateNotSet:
return "not set"
case IdentityStateWaitForATXSynced:
return "wait for atx synced"
case IdentityStateRetrying:
return "retrying"
case IdentityStatePoetChallengeReady:
return "poet challenge ready"
case IdentityStateWaitingForPoetRegistrationWindow:
return "waiting for poet registration window"
case IdentityStatePoetRegistered:
return "poet registered"
case IdentityStateWaitForPoetRoundEnd:
return "wait for poet round end"
case IdentityStatePoetProofReceived:
return "poet proof received"
case IdentityStateGeneratingPostProof:
return "generating post proof"
case IdentityStatePostProofReady:
return "post proof ready"
case IdentityStateATXReady:
return "atx ready"
case IdentityStateATXBroadcasted:
return "atx broadcasted"
default:
panic(fmt.Sprintf(ErrIdentityStateUnknown.Error()+" %d", s))
}
}

type IdentityStateInfo struct {
State IdentityState
PublishEpoch *types.EpochID
Message string
Time time.Time
}

type IdentityStateStorage struct {
mu sync.RWMutex
identities map[types.NodeID][]IdentityStateInfo
}

func NewIdentityStateStorage() *IdentityStateStorage {
return &IdentityStateStorage{
identities: make(map[types.NodeID][]IdentityStateInfo),
}
}

func (s *IdentityStateStorage) Set(
id types.NodeID,
publishEpoch *types.EpochID,
newState IdentityState,
message string,
) {
s.mu.Lock()
defer s.mu.Unlock()

if _, exists := s.identities[id]; !exists {
s.identities[id] = []IdentityStateInfo{}
}

if len(s.identities[id]) > 100 {
s.identities[id] = s.identities[id][1:]
}

s.identities[id] = append(s.identities[id], IdentityStateInfo{
State: newState,
PublishEpoch: publishEpoch,
Message: message,
Time: time.Now(),
})
}

func (s *IdentityStateStorage) Get(id types.NodeID) ([]IdentityStateInfo, error) {
s.mu.RLock()
defer s.mu.RUnlock()

state, exists := s.identities[id]
if !exists {
return nil, ErrIdentityStateUnknown
}
return state, nil
}

func (s *IdentityStateStorage) All() map[types.NodeID][]IdentityStateInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.identities
}
6 changes: 6 additions & 0 deletions activation/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,9 @@ type PostStates interface {
Set(id types.NodeID, state types.PostState)
Get() map[types.NodeID]types.PostState
}

type IdentityStates interface {
Set(id types.NodeID, publishEpoch *types.EpochID, newState IdentityState, message string)
Get(id types.NodeID) ([]IdentityStateInfo, error)
All() map[types.NodeID][]IdentityStateInfo
}
137 changes: 137 additions & 0 deletions activation/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 79c0bd5

Please sign in to comment.