Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WiP: Move NiPoST state into db #5202

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 101 additions & 71 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync/atomic"
"time"

"github.com/google/go-cmp/cmp"
"github.com/spacemeshos/post/shared"
"golang.org/x/sync/errgroup"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/nipost"
)

// PoetConfig is the configuration to interact with the poet server.
Expand Down Expand Up @@ -285,6 +287,52 @@ func (b *Builder) StopSmeshing(deleteFiles bool) error {
}
}

func (b *Builder) MovePostToDb() error {
post, err := loadPost(b.nipostBuilder.DataDir())
switch {
case os.IsNotExist(err):
return nil // no post file, nothing to do
case err != nil:
return fmt.Errorf("loading post: %w", err)
default:
}

client, err := b.postService.Client(b.nodeID)
if err != nil {
return fmt.Errorf("getting post client: %w", err)
}

info, err := client.Info(context.Background())
if err != nil {
return fmt.Errorf("getting post info: %w", err)
}

ch := &types.NIPostChallenge{
InitialPost: post,
CommitmentATX: &info.CommitmentATX,
}
if err := nipost.AddChallenge(b.cdb, b.nodeID, ch); err != nil {
return fmt.Errorf("adding post to db: %w", err)
}
return discardPost(b.nipostBuilder.DataDir())
}

func (b *Builder) MoveNipostChallengeToDb() error {
ch, err := LoadNipostChallenge(b.nipostBuilder.DataDir())
switch {
case os.IsNotExist(err):
return nil // no challenge file, nothing to do
case err != nil:
return fmt.Errorf("loading nipost challenge: %w", err)
default:
}

if err := nipost.AddChallenge(b.cdb, b.nodeID, ch); err != nil {
return fmt.Errorf("adding challenge to db: %w", err)
}
return discardNipostChallenge(b.nipostBuilder.DataDir())
}

// SmesherID returns the ID of the smesher that created this activation.
func (b *Builder) SmesherID() types.NodeID {
return b.nodeID
Expand All @@ -295,11 +343,14 @@ func (b *Builder) generateInitialPost(ctx context.Context) error {
if _, err := b.cdb.GetLastAtx(b.nodeID); err == nil {
return nil
}
// ...and if we don't have an initial POST persisted already.
if post, err := loadPost(b.nipostBuilder.DataDir()); err == nil {
b.log.Info("loaded the initial post from disk")
b.initialPost = post
// TODO(mafa): initial post info?
// ...and if we haven't stored an initial post yet.
if state, err := nipost.ChallengeBySequence(b.cdb, b.nodeID, 0); err == nil {
b.log.Info("load initial post from db")
b.initialPost = state.InitialPost
b.initialPostInfo = &types.PostInfo{
NodeID: b.nodeID,
CommitmentATX: *state.CommitmentATX,
}
return nil
}

Expand All @@ -315,7 +366,14 @@ func (b *Builder) generateInitialPost(ctx context.Context) error {
public.PostSeconds.Set(float64(time.Since(startTime)))
b.log.Info("created the initial post")

if err := savePost(b.nipostBuilder.DataDir(), post); err != nil {
if err := nipost.AddChallenge(b.cdb, b.nodeID, &types.NIPostChallenge{
PublishEpoch: 0, // will be updated later
Sequence: 0,
PrevATXID: types.EmptyATXID, // initial has no previous ATX
PositioningATX: types.EmptyATXID, // will be updated later
InitialPost: post,
CommitmentATX: &postInfo.CommitmentATX,
}); err != nil {
b.log.With().Warning("failed to save initial post: %w", log.Err(err))
}
return nil
Expand Down Expand Up @@ -402,12 +460,8 @@ func (b *Builder) buildNIPostChallenge(ctx context.Context) (*types.NIPostChalle
}
current := b.currentEpoch()
prev, err := b.cdb.GetLastAtx(b.nodeID)
if err != nil {
if !errors.Is(err, sql.ErrNotFound) {
return nil, err
}
} else if prev.PublishEpoch == current+1 {
current += 1
if err == nil {
current = max(current, prev.PublishEpoch)
}

until := time.Until(b.poetRoundStart(current))
Expand All @@ -431,43 +485,49 @@ func (b *Builder) buildNIPostChallenge(ctx context.Context) (*types.NIPostChalle
}
}

if challenge, err := nipost.ChallengeByEpoch(b.cdb, b.nodeID, current+1); err == nil {
// use existing challenge
return challenge, nil
}

posAtx, err := b.GetPositioningAtx()
if err != nil {
return nil, fmt.Errorf("failed to get positioning ATX: %w", err)
}

challenge := &types.NIPostChallenge{
PublishEpoch: current + 1,
PositioningATX: posAtx,
}
if prevAtx, err := b.cdb.GetLastAtx(b.nodeID); err == nil {
challenge := &types.NIPostChallenge{
PublishEpoch: current + 1,
Sequence: prevAtx.Sequence + 1,

if prevAtx, err := b.cdb.GetLastAtx(b.nodeID); err != nil {
client, err := b.postService.Client(b.nodeID)
if err != nil {
return nil, fmt.Errorf("failed to fetch commitment ATX: %w", err)
PrevATXID: prevAtx.ID,
PositioningATX: posAtx,
}
if b.initialPostInfo == nil {
// This is a temporary workaround for the case where an initial post has been generated,
// persisted to and loaded from disk. In this case we don't have a post info object
// and need to fetch it from the post service
//
// In a future PR all data that is persisted to disk will instead be persisted to db and
// the initial post data will be extended with post info to not require this any more
info, err := client.Info(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get commitment ATX: %w", err)
if old, err := nipost.ChallengeBySequence(b.cdb, b.nodeID, challenge.Sequence); err == nil {
if !cmp.Equal(old, challenge) {
b.pendingATX = nil
err := nipost.UpdateChallengeBySequence(b.cdb, b.nodeID, challenge)
if err != nil {
return nil, fmt.Errorf("update nipost challenge: %w", err)
}
}
b.initialPostInfo = info
return challenge, nil
}
if err := nipost.AddChallenge(b.cdb, b.nodeID, challenge); err != nil {
return nil, fmt.Errorf("add nipost challenge: %w", err)
}
challenge.CommitmentATX = &b.initialPostInfo.CommitmentATX
challenge.InitialPost = b.initialPost
} else {
challenge.PrevATXID = prevAtx.ID
challenge.Sequence = prevAtx.Sequence + 1
return challenge, nil
}

if err = SaveNipostChallenge(b.nipostBuilder.DataDir(), challenge); err != nil {
return nil, err
challenge, err := nipost.ChallengeBySequence(b.cdb, b.nodeID, 0)
if err != nil {
return nil, fmt.Errorf("fetch nipost: %w", err)
}
challenge.PublishEpoch = current + 1
challenge.PositioningATX = posAtx
if err := nipost.UpdateChallengeBySequence(b.cdb, b.nodeID, challenge); err != nil {
b.pendingATX = nil
return nil, fmt.Errorf("update nipost challenge: %w", err)
}
return challenge, nil
}
Expand Down Expand Up @@ -519,40 +579,13 @@ func (b *Builder) Coinbase() types.Address {
return b.coinbaseAccount
}

func (b *Builder) loadChallenge() (*types.NIPostChallenge, error) {
nipost, err := LoadNipostChallenge(b.nipostBuilder.DataDir())
if err != nil {
return nil, err
}
if nipost.PublishEpoch < b.currentEpoch() {
b.log.With().Info("atx nipost challenge is stale - discarding it",
log.Stringer("publish_epoch", nipost.PublishEpoch),
log.Stringer("current_epoch", b.currentEpoch()),
)
if err = b.discardChallenge(); err != nil {
return nil, fmt.Errorf("%w: atx nipost challenge is stale", err)
}
return nil, errors.New("atx nipost challenge is stale")
}
return nipost, nil
}

// PublishActivationTx attempts to publish an atx, it returns an error if an atx cannot be created.
func (b *Builder) PublishActivationTx(ctx context.Context) error {
logger := b.log.WithContext(ctx)

challenge, err := b.loadChallenge()
challenge, err := b.buildNIPostChallenge(ctx)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
logger.With().Warning("failed to load atx challenge", log.Err(err))
}
logger.With().Info("building new atx challenge",
log.Stringer("current_epoch", b.currentEpoch()),
)
challenge, err = b.buildNIPostChallenge(ctx)
if err != nil {
return err
}
return err
}

logger.With().Info("atx challenge is ready",
Expand Down Expand Up @@ -668,10 +701,7 @@ func (b *Builder) currentEpoch() types.EpochID {

func (b *Builder) discardChallenge() error {
b.pendingATX = nil
if err := discardNipostChallenge(b.nipostBuilder.DataDir()); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
return nil
return nipost.RemoveChallenge(b.cdb, b.nodeID)
}

func (b *Builder) broadcast(ctx context.Context, atx *types.ActivationTx) (int, error) {
Expand Down
Loading