Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Malfeasance2 fetcher and sync #6652

Closed
wants to merge 68 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
61e36d2
Optimize MarriagesATX query
fasmat Jan 20, 2025
9b68d94
Add missing tests
fasmat Jan 20, 2025
bef01f4
Add malfeasance handler to fetcher
fasmat Jan 20, 2025
b2319d3
Prepare datastore for malfeasance proofs
fasmat Jan 21, 2025
36a813f
Add malfeasance v2 fetcher WiP
fasmat Jan 21, 2025
c944eac
Remove unneeded interface method
fasmat Jan 21, 2025
fc98d79
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Jan 24, 2025
7610f31
Deprecate function in CachedDB
fasmat Jan 24, 2025
f3426fe
Use statesql instead of cached DB for datastore
fasmat Jan 24, 2025
597dd08
Add proof function to publisher
fasmat Jan 24, 2025
2ba6c71
Make proof handling transactional
fasmat Jan 24, 2025
baea754
Add Malfeasance2 to datastore package
fasmat Jan 24, 2025
a936e10
Add more tests
fasmat Jan 24, 2025
fdb4571
Fix linter message
fasmat Jan 24, 2025
21f9a2f
Update and integrate new fetcher
fasmat Jan 24, 2025
baf5447
Ensure an already handled proof isn't stored again
fasmat Jan 24, 2025
6e253ed
More tests for handler
fasmat Jan 24, 2025
c5a67b0
Add fetcher for malicious IDs and more tests
fasmat Jan 24, 2025
41483bb
Add syncer code
fasmat Jan 24, 2025
4e74dd2
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Jan 24, 2025
3877360
Fix import errors
fasmat Jan 24, 2025
7393c44
Add more tests
fasmat Jan 24, 2025
b560110
Revert some changes
fasmat Jan 25, 2025
06d9865
fix failing tests
fasmat Jan 25, 2025
58f3e02
Check for malfeasance old and new
fasmat Jan 25, 2025
2f38268
Fix data race in test
fasmat Jan 25, 2025
aae80ac
Split up test into sections
fasmat Jan 27, 2025
d83dd3c
Fix missing logger config for malfeasance2
fasmat Jan 27, 2025
2f39fec
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Jan 28, 2025
c938443
Update systest for malfeasance
fasmat Jan 28, 2025
4ee8331
Update malfeasance stream and add more logging
fasmat Jan 28, 2025
6c0cdff
Use priv connection for malfeasance stream
fasmat Jan 28, 2025
15cf90d
Use waitgroup instead of t.Run
fasmat Jan 28, 2025
90b72a4
Activate malfeasance for v2
fasmat Jan 28, 2025
733e939
use different dirs for services
fasmat Jan 28, 2025
c6eb4e8
Add test for checkpoint
fasmat Jan 28, 2025
2b75bd1
Avoid port conflict
fasmat Jan 28, 2025
eb230fa
Ensure at least some labels are valid
fasmat Jan 28, 2025
2b89775
Separate tests
fasmat Jan 28, 2025
e227831
Cleanup
fasmat Jan 28, 2025
3d16a41
More cleanup
fasmat Jan 28, 2025
8c99ec6
Run sequentially instead of parallel
fasmat Jan 28, 2025
8058372
Fix wrong ATX version selection
fasmat Jan 29, 2025
cadc317
Fix assertions
fasmat Jan 29, 2025
5856c6a
Add more info to logs
fasmat Jan 29, 2025
a85a154
Allow type to be a string in malfeasance 2
fasmat Jan 29, 2025
4f30b2f
ATXID is short string
fasmat Jan 29, 2025
0ffed26
Publishing should be on the same timeout as streaming for the malfeas…
fasmat Jan 29, 2025
7c263da
Increase logging of systests
fasmat Jan 29, 2025
3181538
Increase probability to detect invalid post labels
fasmat Jan 29, 2025
abacfb2
Add more logging
fasmat Jan 29, 2025
21f08df
Avoid nested tx when publishing malfeasance proofs
fasmat Jan 29, 2025
a7c003f
Move check for identities existence into proof validation, allow inva…
fasmat Jan 30, 2025
cbe73fd
Fix failing tests
fasmat Jan 30, 2025
6aec6d9
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Jan 30, 2025
f3758d7
Update api dependency
fasmat Jan 31, 2025
8c357f8
Review feedback
fasmat Feb 3, 2025
a39c41c
Review feedback
fasmat Feb 3, 2025
7d2a564
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Feb 3, 2025
9514603
Add comment
fasmat Feb 3, 2025
b98d791
Check for malfeasance in single TX
fasmat Feb 3, 2025
0066ecf
Add a timeout to watcher
fasmat Feb 3, 2025
721b479
Update old smesher image
fasmat Feb 3, 2025
ba129ea
Fix linter error
fasmat Feb 3, 2025
8085763
Review fixes
fasmat Feb 4, 2025
706f5e4
Add TODO
fasmat Feb 4, 2025
0c4a9b1
Make flaky test less likely to fail
fasmat Feb 4, 2025
fcea6a0
Merge remote-tracking branch 'origin/develop' into malfeasance-fetcher
fasmat Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/sql/malfeasance"
"github.com/spacemeshos/go-spacemesh/system"
)

Expand Down Expand Up @@ -222,12 +223,12 @@
watx.NumUnits,
PostSubset([]byte(h.local)), // use the local peer ID as seed for random subset
)
var invalidIdx *verifying.ErrInvalidIndex
if errors.As(err, &invalidIdx) {
var errInvalidIdx *verifying.ErrInvalidIndex
if errors.As(err, &errInvalidIdx) {
h.logger.Debug("ATX with invalid post index",
log.ZContext(ctx),
zap.Stringer("atx_id", watx.ID()),
zap.Int("index", invalidIdx.Index),
zap.Int("index", errInvalidIdx.Index),
)
malicious, err := identities.IsMalicious(h.cdb, watx.SmesherID)
if err != nil {
Expand All @@ -236,13 +237,20 @@
if malicious {
return nil, fmt.Errorf("smesher %s is known malfeasant", watx.SmesherID.ShortString())
}
malicious, err = malfeasance.IsMalicious(h.cdb, watx.SmesherID)
if err != nil {
return nil, fmt.Errorf("check if smesher is malicious: %w", err)
}

Check warning on line 243 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L242-L243

Added lines #L242 - L243 were not covered by tests
if malicious {
return nil, fmt.Errorf("smesher %s is known malfeasant", watx.SmesherID.ShortString())
}

Check warning on line 246 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L245-L246

Added lines #L245 - L246 were not covered by tests
proof := &mwire.MalfeasanceProof{
Layer: watx.PublishEpoch.FirstLayer(),
Proof: mwire.Proof{
Type: mwire.InvalidPostIndex,
Data: &mwire.InvalidPostIndexProof{
Atx: *watx,
InvalidIdx: uint32(invalidIdx.Index),
InvalidIdx: uint32(errInvalidIdx.Index),
},
},
}
Expand Down Expand Up @@ -489,6 +497,11 @@
if err != nil {
return fmt.Errorf("check if node is malicious: %w", err)
}
malicious2, err := malfeasance.IsMalicious(tx, atx.SmesherID)
if err != nil {
return fmt.Errorf("check if node is malicious: %w", err)
}

Check warning on line 503 in activation/handler_v1.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v1.go#L502-L503

Added lines #L502 - L503 were not covered by tests
malicious = malicious || malicious2
if !malicious {
malicious, err = h.checkMalicious(ctx, tx, watx)
if err != nil {
Expand Down
154 changes: 89 additions & 65 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,15 @@
if err == nil {
return nil
}
errInvalid := &verifying.ErrInvalidIndex{}
if !errors.As(err, &errInvalid) {
errInvalidIdx := &verifying.ErrInvalidIndex{}
if !errors.As(err, &errInvalidIdx) {
return fmt.Errorf("validating post for ID %s: %w", nodeID.ShortString(), err)
}
h.logger.Debug("ATX with invalid post index",
log.ZContext(ctx),
zap.Stringer("atx_id", atx.ID()),
zap.Int("index", errInvalidIdx.Index),
)

// check if post contains at least one valid label
validIdx := 0
Expand Down Expand Up @@ -715,7 +720,7 @@
commitment,
nodeID,
nipostIndex,
uint32(errInvalid.Index),
uint32(errInvalidIdx.Index),
uint32(validIdx),
)
if err != nil {
Expand All @@ -724,40 +729,35 @@
if err := h.malPublisher.Publish(ctx, nodeID, proof); err != nil {
return fmt.Errorf("publishing malfeasance proof for invalid post: %w", err)
}
return fmt.Errorf("invalid post for ID %s: %w", nodeID.ShortString(), errInvalid)
return fmt.Errorf("invalid post for ID %s: %w", nodeID.ShortString(), errInvalidIdx)
}

func (h *HandlerV2) checkMalicious(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
malicious, err := malfeasance.IsMalicious(tx, atx.SmesherID)
if err != nil {
return malicious, fmt.Errorf("checking if node is malicious: %w", err)
}
if malicious {
return true, nil
}

malicious, err = h.checkDoubleMarry(ctx, tx, atx)
func (h *HandlerV2) checkMalicious(
ctx context.Context,
tx sql.Transaction,
watx *activationTx,
) (wire.Proof, types.NodeID, error) {
proof, nodeID, err := h.checkDoubleMarry(ctx, tx, watx)
if err != nil {
return malicious, fmt.Errorf("checking double marry: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("checking double marry: %w", err)

Check warning on line 742 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L742

Added line #L742 was not covered by tests
}
if malicious {
return true, nil
if proof != nil {
return proof, nodeID, nil
}

malicious, err = h.checkDoubleMerge(ctx, tx, atx)
proof, nodeID, err = h.checkDoubleMerge(ctx, tx, watx)
if err != nil {
return malicious, fmt.Errorf("checking double merge: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("checking double merge: %w", err)

Check warning on line 750 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L750

Added line #L750 was not covered by tests
}
if malicious {
return true, nil
if proof != nil {
return proof, nodeID, nil
}

malicious, err = h.checkPrevAtx(ctx, tx, atx)
proof, nodeID, err = h.checkPrevAtx(ctx, tx, watx)
if err != nil {
return malicious, fmt.Errorf("checking previous ATX: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("checking previous ATX: %w", err)

Check warning on line 758 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L758

Added line #L758 was not covered by tests
}

return malicious, err
return proof, nodeID, nil
}

func (h *HandlerV2) fetchWireAtx(
Expand All @@ -778,11 +778,15 @@
return atx, nil
}

func (h *HandlerV2) checkDoubleMarry(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
func (h *HandlerV2) checkDoubleMarry(
ctx context.Context,
tx sql.Transaction,
atx *activationTx,
) (wire.Proof, types.NodeID, error) {
for _, m := range atx.marriages {
info, err := marriage.FindByNodeID(tx, m.id)
if err != nil {
return false, fmt.Errorf("checking if ID is married: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("checking if ID is married: %w", err)

Check warning on line 789 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L789

Added line #L789 was not covered by tests
}
if info.ATX == atx.ID() {
continue
Expand All @@ -795,28 +799,32 @@
zap.Stringer("atx_id", info.ATX),
)
case err != nil:
return false, fmt.Errorf("fetching other ATX: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("fetching other ATX: %w", err)

Check warning on line 802 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L802

Added line #L802 was not covered by tests
}

proof, err := wire.NewDoubleMarryProof(tx, atx.ActivationTxV2, otherAtx, m.id)
if err != nil {
return true, fmt.Errorf("creating double marry proof: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("creating double marry proof: %w", err)

Check warning on line 807 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L807

Added line #L807 was not covered by tests
}
return true, h.malPublisher.Publish(ctx, m.id, proof)
return proof, m.id, nil
}
return false, nil
return nil, types.EmptyNodeID, nil
}

func (h *HandlerV2) checkDoubleMerge(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
func (h *HandlerV2) checkDoubleMerge(
ctx context.Context,
tx sql.Transaction,
atx *activationTx,
) (wire.Proof, types.NodeID, error) {
if atx.MarriageATX == nil {
return false, nil
return nil, types.EmptyNodeID, nil
}
ids, err := atxs.MergeConflict(tx, *atx.MarriageATX, atx.PublishEpoch)
switch {
case errors.Is(err, sql.ErrNotFound):
return false, nil
return nil, types.EmptyNodeID, nil
case err != nil:
return false, fmt.Errorf("searching for ATXs with the same marriage ATX: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("searching for ATXs with the same marriage ATX: %w", err)

Check warning on line 827 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L827

Added line #L827 was not covered by tests
}
otherIndex := slices.IndexFunc(ids, func(id types.ATXID) bool { return id != atx.ID() })
other := ids[otherIndex]
Expand All @@ -836,7 +844,7 @@
// see https://github.com/spacemeshos/go-spacemesh/issues/6434
otherAtx, err := h.fetchWireAtx(ctx, tx, other)
if err != nil {
return false, fmt.Errorf("fetching other ATX: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("fetching other ATX: %w", err)

Check warning on line 847 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L847

Added line #L847 was not covered by tests
}

// TODO(mafa): checkpoints need to include all marriage ATXs in full to be able to create malfeasance proofs
Expand All @@ -845,16 +853,20 @@
// see https://github.com/spacemeshos/go-spacemesh/issues/6435
proof, err := wire.NewDoubleMergeProof(tx, atx.ActivationTxV2, otherAtx)
if err != nil {
return true, fmt.Errorf("creating double merge proof: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("creating double merge proof: %w", err)

Check warning on line 856 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L856

Added line #L856 was not covered by tests
}
return true, h.malPublisher.Publish(ctx, atx.ActivationTxV2.SmesherID, proof)
return proof, atx.ActivationTxV2.SmesherID, nil
}

func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
func (h *HandlerV2) checkPrevAtx(
ctx context.Context,
tx sql.Transaction,
atx *activationTx,
) (wire.Proof, types.NodeID, error) {
for id, data := range atx.ids {
expectedPrevID, err := atxs.PrevIDByNodeID(tx, id, atx.PublishEpoch)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return false, fmt.Errorf("get last atx by node id: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("get last atx by node id: %w", err)

Check warning on line 869 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L869

Added line #L869 was not covered by tests
}
if expectedPrevID == data.previous {
continue
Expand All @@ -871,7 +883,7 @@
case errors.Is(err, sql.ErrNotFound):
continue
case err != nil:
return true, fmt.Errorf("checking for previous ATX collision: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("checking for previous ATX collision: %w", err)

Check warning on line 886 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L886

Added line #L886 was not covered by tests
}

var wireAtxV1 *wire.ActivationTxV1
Expand All @@ -882,7 +894,7 @@
var blob sql.Blob
v, err := atxs.LoadBlob(ctx, tx, collision.Bytes(), &blob)
if err != nil {
return true, fmt.Errorf("get atx blob %s: %w", id.ShortString(), err)
return nil, types.EmptyNodeID, fmt.Errorf("get atx blob %s: %w", id.ShortString(), err)

Check warning on line 897 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L897

Added line #L897 was not covered by tests
}
switch v {
case types.AtxV1:
Expand All @@ -903,9 +915,9 @@
)
proof, err := wire.NewInvalidPrevAtxProofV2(tx, atx.ActivationTxV2, wireAtx, id)
if err != nil {
return true, fmt.Errorf("creating invalid previous ATX proof: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("creating invalid previous ATX proof: %w", err)

Check warning on line 918 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L918

Added line #L918 was not covered by tests
}
return true, h.malPublisher.Publish(ctx, id, proof)
return proof, id, nil
default:
h.logger.Fatal("Failed to create invalid previous ATX proof: unknown ATX version",
zap.Stringer("atx_id", collision),
Expand All @@ -921,16 +933,19 @@
)
proof, err := wire.NewInvalidPrevAtxProofV1(tx, atx.ActivationTxV2, wireAtxV1, id)
if err != nil {
return true, fmt.Errorf("creating invalid previous ATX proof: %w", err)
return nil, types.EmptyNodeID, fmt.Errorf("creating invalid previous ATX proof: %w", err)

Check warning on line 936 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L936

Added line #L936 was not covered by tests
}
return true, h.malPublisher.Publish(ctx, id, proof)
return proof, id, nil
}
return false, nil
return nil, types.EmptyNodeID, nil
}

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
republishProof := false
malicious := false
var proof wire.Proof
var nodeID types.NodeID
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if len(watx.marriages) != 0 {
newMarriageID, err := marriage.NewID(tx)
Expand All @@ -942,7 +957,6 @@
ATX: atx.ID(),
Target: atx.SmesherID,
}
malicious := false
marriageIDs := make(map[marriage.ID]struct{}, 1)
marriageIDs[newMarriageID] = struct{}{}
for i, m := range watx.marriages {
Expand Down Expand Up @@ -1009,34 +1023,44 @@
return fmt.Errorf("setting atx units for ID %s: %w", id, err)
}
}
return nil
}); err != nil {
return fmt.Errorf("store atx: %w", err)
}

malicious := false
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if malicious || republishProof {
return nil
}

// malfeasance check happens after storing the ATX because storing updates the marriage set
// that is needed for the malfeasance proof
//
// TODO(mafa): don't store own ATX if it would mark the node as malicious
// this probably needs to be done by validating and storing own ATXs eagerly and skipping validation in
// the gossip handler (not sync!)
if republishProof {
malicious = true
return h.malPublisher.Regossip(ctx, atx.SmesherID)
}

var err error
malicious, err = h.checkMalicious(ctx, tx, watx)
proof, nodeID, err = h.checkMalicious(ctx, tx, watx)
return err
})
if err != nil {
return fmt.Errorf("check malicious: %w", err)
}); err != nil {
return fmt.Errorf("store atx: %w", err)
}

Check warning on line 1041 in activation/handler_v2.go

View check run for this annotation

Codecov / codecov/patch

activation/handler_v2.go#L1039-L1041

Added lines #L1039 - L1041 were not covered by tests

switch {
case republishProof: // marriage set of known malicious smesher has changed, force re-gossip of proof
if err := h.malPublisher.Regossip(ctx, watx.SmesherID); err != nil {
h.logger.Error("failed to regossip malfeasance proof",
zap.Stringer("atx_id", watx.ID()),
zap.Stringer("smesher_id", watx.SmesherID),
zap.Error(err),
)
}
case proof != nil: // new malfeasance proof for identity created, publish proof (gossip is decided by publisher)
if err := h.malPublisher.Publish(ctx, nodeID, proof); err != nil {
h.logger.Error("failed to publish malfeasance proof",
zap.Stringer("atx_id", watx.ID()),
zap.Stringer("smesher_id", watx.SmesherID),
zap.Error(err),
)
}
}

h.beacon.OnAtx(atx)
if added := h.atxsdata.AddFromAtx(atx, malicious); added != nil {
if added := h.atxsdata.AddFromAtx(atx, malicious || proof != nil); added != nil {
h.tortoise.OnAtx(atx.TargetEpoch(), atx.ID(), added)
}

Expand Down
Loading
Loading