Skip to content

Commit bf8f9d0

Browse files
committed
Malfeasance2 fetcher and sync (#6652)
## Motivation This adds the necessary fetcher code for malfeasance v2 so that the node can sync new malfeasance proofs.
1 parent 19bd73d commit bf8f9d0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+3045
-1090
lines changed

activation/handler_v1.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/spacemeshos/go-spacemesh/sql"
2828
"github.com/spacemeshos/go-spacemesh/sql/atxs"
2929
"github.com/spacemeshos/go-spacemesh/sql/identities"
30+
"github.com/spacemeshos/go-spacemesh/sql/malfeasance"
3031
"github.com/spacemeshos/go-spacemesh/system"
3132
)
3233

@@ -222,12 +223,12 @@ func (h *HandlerV1) syntacticallyValidateDeps(
222223
watx.NumUnits,
223224
PostSubset([]byte(h.local)), // use the local peer ID as seed for random subset
224225
)
225-
var invalidIdx *verifying.ErrInvalidIndex
226-
if errors.As(err, &invalidIdx) {
226+
var errInvalidIdx *verifying.ErrInvalidIndex
227+
if errors.As(err, &errInvalidIdx) {
227228
h.logger.Debug("ATX with invalid post index",
228229
log.ZContext(ctx),
229230
zap.Stringer("atx_id", watx.ID()),
230-
zap.Int("index", invalidIdx.Index),
231+
zap.Int("index", errInvalidIdx.Index),
231232
)
232233
malicious, err := identities.IsMalicious(h.cdb, watx.SmesherID)
233234
if err != nil {
@@ -236,13 +237,20 @@ func (h *HandlerV1) syntacticallyValidateDeps(
236237
if malicious {
237238
return nil, fmt.Errorf("smesher %s is known malfeasant", watx.SmesherID.ShortString())
238239
}
240+
malicious, err = malfeasance.IsMalicious(h.cdb, watx.SmesherID)
241+
if err != nil {
242+
return nil, fmt.Errorf("check if smesher is malicious: %w", err)
243+
}
244+
if malicious {
245+
return nil, fmt.Errorf("smesher %s is known malfeasant", watx.SmesherID.ShortString())
246+
}
239247
proof := &mwire.MalfeasanceProof{
240248
Layer: watx.PublishEpoch.FirstLayer(),
241249
Proof: mwire.Proof{
242250
Type: mwire.InvalidPostIndex,
243251
Data: &mwire.InvalidPostIndexProof{
244252
Atx: *watx,
245-
InvalidIdx: uint32(invalidIdx.Index),
253+
InvalidIdx: uint32(errInvalidIdx.Index),
246254
},
247255
},
248256
}
@@ -489,6 +497,11 @@ func (h *HandlerV1) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
489497
if err != nil {
490498
return fmt.Errorf("check if node is malicious: %w", err)
491499
}
500+
malicious2, err := malfeasance.IsMalicious(tx, atx.SmesherID)
501+
if err != nil {
502+
return fmt.Errorf("check if node is malicious: %w", err)
503+
}
504+
malicious = malicious || malicious2
492505
if !malicious {
493506
malicious, err = h.checkMalicious(ctx, tx, watx)
494507
if err != nil {

activation/handler_v2.go

+89-65
Original file line numberDiff line numberDiff line change
@@ -679,10 +679,15 @@ func (h *HandlerV2) validatePost(
679679
if err == nil {
680680
return nil
681681
}
682-
errInvalid := &verifying.ErrInvalidIndex{}
683-
if !errors.As(err, &errInvalid) {
682+
errInvalidIdx := &verifying.ErrInvalidIndex{}
683+
if !errors.As(err, &errInvalidIdx) {
684684
return fmt.Errorf("validating post for ID %s: %w", nodeID.ShortString(), err)
685685
}
686+
h.logger.Debug("ATX with invalid post index",
687+
log.ZContext(ctx),
688+
zap.Stringer("atx_id", atx.ID()),
689+
zap.Int("index", errInvalidIdx.Index),
690+
)
686691

687692
// check if post contains at least one valid label
688693
validIdx := 0
@@ -715,7 +720,7 @@ func (h *HandlerV2) validatePost(
715720
commitment,
716721
nodeID,
717722
nipostIndex,
718-
uint32(errInvalid.Index),
723+
uint32(errInvalidIdx.Index),
719724
uint32(validIdx),
720725
)
721726
if err != nil {
@@ -724,40 +729,35 @@ func (h *HandlerV2) validatePost(
724729
if err := h.malPublisher.Publish(ctx, nodeID, proof); err != nil {
725730
return fmt.Errorf("publishing malfeasance proof for invalid post: %w", err)
726731
}
727-
return fmt.Errorf("invalid post for ID %s: %w", nodeID.ShortString(), errInvalid)
732+
return fmt.Errorf("invalid post for ID %s: %w", nodeID.ShortString(), errInvalidIdx)
728733
}
729734

730-
func (h *HandlerV2) checkMalicious(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
731-
malicious, err := malfeasance.IsMalicious(tx, atx.SmesherID)
732-
if err != nil {
733-
return malicious, fmt.Errorf("checking if node is malicious: %w", err)
734-
}
735-
if malicious {
736-
return true, nil
737-
}
738-
739-
malicious, err = h.checkDoubleMarry(ctx, tx, atx)
735+
func (h *HandlerV2) checkMalicious(
736+
ctx context.Context,
737+
tx sql.Transaction,
738+
watx *activationTx,
739+
) (wire.Proof, types.NodeID, error) {
740+
proof, nodeID, err := h.checkDoubleMarry(ctx, tx, watx)
740741
if err != nil {
741-
return malicious, fmt.Errorf("checking double marry: %w", err)
742+
return nil, types.EmptyNodeID, fmt.Errorf("checking double marry: %w", err)
742743
}
743-
if malicious {
744-
return true, nil
744+
if proof != nil {
745+
return proof, nodeID, nil
745746
}
746747

747-
malicious, err = h.checkDoubleMerge(ctx, tx, atx)
748+
proof, nodeID, err = h.checkDoubleMerge(ctx, tx, watx)
748749
if err != nil {
749-
return malicious, fmt.Errorf("checking double merge: %w", err)
750+
return nil, types.EmptyNodeID, fmt.Errorf("checking double merge: %w", err)
750751
}
751-
if malicious {
752-
return true, nil
752+
if proof != nil {
753+
return proof, nodeID, nil
753754
}
754755

755-
malicious, err = h.checkPrevAtx(ctx, tx, atx)
756+
proof, nodeID, err = h.checkPrevAtx(ctx, tx, watx)
756757
if err != nil {
757-
return malicious, fmt.Errorf("checking previous ATX: %w", err)
758+
return nil, types.EmptyNodeID, fmt.Errorf("checking previous ATX: %w", err)
758759
}
759-
760-
return malicious, err
760+
return proof, nodeID, nil
761761
}
762762

763763
func (h *HandlerV2) fetchWireAtx(
@@ -778,11 +778,15 @@ func (h *HandlerV2) fetchWireAtx(
778778
return atx, nil
779779
}
780780

781-
func (h *HandlerV2) checkDoubleMarry(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
781+
func (h *HandlerV2) checkDoubleMarry(
782+
ctx context.Context,
783+
tx sql.Transaction,
784+
atx *activationTx,
785+
) (wire.Proof, types.NodeID, error) {
782786
for _, m := range atx.marriages {
783787
info, err := marriage.FindByNodeID(tx, m.id)
784788
if err != nil {
785-
return false, fmt.Errorf("checking if ID is married: %w", err)
789+
return nil, types.EmptyNodeID, fmt.Errorf("checking if ID is married: %w", err)
786790
}
787791
if info.ATX == atx.ID() {
788792
continue
@@ -795,28 +799,32 @@ func (h *HandlerV2) checkDoubleMarry(ctx context.Context, tx sql.Transaction, at
795799
zap.Stringer("atx_id", info.ATX),
796800
)
797801
case err != nil:
798-
return false, fmt.Errorf("fetching other ATX: %w", err)
802+
return nil, types.EmptyNodeID, fmt.Errorf("fetching other ATX: %w", err)
799803
}
800804

801805
proof, err := wire.NewDoubleMarryProof(tx, atx.ActivationTxV2, otherAtx, m.id)
802806
if err != nil {
803-
return true, fmt.Errorf("creating double marry proof: %w", err)
807+
return nil, types.EmptyNodeID, fmt.Errorf("creating double marry proof: %w", err)
804808
}
805-
return true, h.malPublisher.Publish(ctx, m.id, proof)
809+
return proof, m.id, nil
806810
}
807-
return false, nil
811+
return nil, types.EmptyNodeID, nil
808812
}
809813

810-
func (h *HandlerV2) checkDoubleMerge(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
814+
func (h *HandlerV2) checkDoubleMerge(
815+
ctx context.Context,
816+
tx sql.Transaction,
817+
atx *activationTx,
818+
) (wire.Proof, types.NodeID, error) {
811819
if atx.MarriageATX == nil {
812-
return false, nil
820+
return nil, types.EmptyNodeID, nil
813821
}
814822
ids, err := atxs.MergeConflict(tx, *atx.MarriageATX, atx.PublishEpoch)
815823
switch {
816824
case errors.Is(err, sql.ErrNotFound):
817-
return false, nil
825+
return nil, types.EmptyNodeID, nil
818826
case err != nil:
819-
return false, fmt.Errorf("searching for ATXs with the same marriage ATX: %w", err)
827+
return nil, types.EmptyNodeID, fmt.Errorf("searching for ATXs with the same marriage ATX: %w", err)
820828
}
821829
otherIndex := slices.IndexFunc(ids, func(id types.ATXID) bool { return id != atx.ID() })
822830
other := ids[otherIndex]
@@ -836,7 +844,7 @@ func (h *HandlerV2) checkDoubleMerge(ctx context.Context, tx sql.Transaction, at
836844
// see https://github.com/spacemeshos/go-spacemesh/issues/6434
837845
otherAtx, err := h.fetchWireAtx(ctx, tx, other)
838846
if err != nil {
839-
return false, fmt.Errorf("fetching other ATX: %w", err)
847+
return nil, types.EmptyNodeID, fmt.Errorf("fetching other ATX: %w", err)
840848
}
841849

842850
// TODO(mafa): checkpoints need to include all marriage ATXs in full to be able to create malfeasance proofs
@@ -845,16 +853,20 @@ func (h *HandlerV2) checkDoubleMerge(ctx context.Context, tx sql.Transaction, at
845853
// see https://github.com/spacemeshos/go-spacemesh/issues/6435
846854
proof, err := wire.NewDoubleMergeProof(tx, atx.ActivationTxV2, otherAtx)
847855
if err != nil {
848-
return true, fmt.Errorf("creating double merge proof: %w", err)
856+
return nil, types.EmptyNodeID, fmt.Errorf("creating double merge proof: %w", err)
849857
}
850-
return true, h.malPublisher.Publish(ctx, atx.ActivationTxV2.SmesherID, proof)
858+
return proof, atx.ActivationTxV2.SmesherID, nil
851859
}
852860

853-
func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *activationTx) (bool, error) {
861+
func (h *HandlerV2) checkPrevAtx(
862+
ctx context.Context,
863+
tx sql.Transaction,
864+
atx *activationTx,
865+
) (wire.Proof, types.NodeID, error) {
854866
for id, data := range atx.ids {
855867
expectedPrevID, err := atxs.PrevIDByNodeID(tx, id, atx.PublishEpoch)
856868
if err != nil && !errors.Is(err, sql.ErrNotFound) {
857-
return false, fmt.Errorf("get last atx by node id: %w", err)
869+
return nil, types.EmptyNodeID, fmt.Errorf("get last atx by node id: %w", err)
858870
}
859871
if expectedPrevID == data.previous {
860872
continue
@@ -871,7 +883,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a
871883
case errors.Is(err, sql.ErrNotFound):
872884
continue
873885
case err != nil:
874-
return true, fmt.Errorf("checking for previous ATX collision: %w", err)
886+
return nil, types.EmptyNodeID, fmt.Errorf("checking for previous ATX collision: %w", err)
875887
}
876888

877889
var wireAtxV1 *wire.ActivationTxV1
@@ -882,7 +894,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a
882894
var blob sql.Blob
883895
v, err := atxs.LoadBlob(ctx, tx, collision.Bytes(), &blob)
884896
if err != nil {
885-
return true, fmt.Errorf("get atx blob %s: %w", id.ShortString(), err)
897+
return nil, types.EmptyNodeID, fmt.Errorf("get atx blob %s: %w", id.ShortString(), err)
886898
}
887899
switch v {
888900
case types.AtxV1:
@@ -903,9 +915,9 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a
903915
)
904916
proof, err := wire.NewInvalidPrevAtxProofV2(tx, atx.ActivationTxV2, wireAtx, id)
905917
if err != nil {
906-
return true, fmt.Errorf("creating invalid previous ATX proof: %w", err)
918+
return nil, types.EmptyNodeID, fmt.Errorf("creating invalid previous ATX proof: %w", err)
907919
}
908-
return true, h.malPublisher.Publish(ctx, id, proof)
920+
return proof, id, nil
909921
default:
910922
h.logger.Fatal("Failed to create invalid previous ATX proof: unknown ATX version",
911923
zap.Stringer("atx_id", collision),
@@ -921,16 +933,19 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a
921933
)
922934
proof, err := wire.NewInvalidPrevAtxProofV1(tx, atx.ActivationTxV2, wireAtxV1, id)
923935
if err != nil {
924-
return true, fmt.Errorf("creating invalid previous ATX proof: %w", err)
936+
return nil, types.EmptyNodeID, fmt.Errorf("creating invalid previous ATX proof: %w", err)
925937
}
926-
return true, h.malPublisher.Publish(ctx, id, proof)
938+
return proof, id, nil
927939
}
928-
return false, nil
940+
return nil, types.EmptyNodeID, nil
929941
}
930942

931943
// Store an ATX in the DB.
932944
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
933945
republishProof := false
946+
malicious := false
947+
var proof wire.Proof
948+
var nodeID types.NodeID
934949
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
935950
if len(watx.marriages) != 0 {
936951
newMarriageID, err := marriage.NewID(tx)
@@ -942,7 +957,6 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
942957
ATX: atx.ID(),
943958
Target: atx.SmesherID,
944959
}
945-
malicious := false
946960
marriageIDs := make(map[marriage.ID]struct{}, 1)
947961
marriageIDs[newMarriageID] = struct{}{}
948962
for i, m := range watx.marriages {
@@ -1009,34 +1023,44 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
10091023
return fmt.Errorf("setting atx units for ID %s: %w", id, err)
10101024
}
10111025
}
1012-
return nil
1013-
}); err != nil {
1014-
return fmt.Errorf("store atx: %w", err)
1015-
}
10161026

1017-
malicious := false
1018-
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
1027+
if malicious || republishProof {
1028+
return nil
1029+
}
1030+
10191031
// malfeasance check happens after storing the ATX because storing updates the marriage set
10201032
// that is needed for the malfeasance proof
10211033
//
10221034
// TODO(mafa): don't store own ATX if it would mark the node as malicious
10231035
// this probably needs to be done by validating and storing own ATXs eagerly and skipping validation in
10241036
// the gossip handler (not sync!)
1025-
if republishProof {
1026-
malicious = true
1027-
return h.malPublisher.Regossip(ctx, atx.SmesherID)
1028-
}
1029-
1030-
var err error
1031-
malicious, err = h.checkMalicious(ctx, tx, watx)
1037+
proof, nodeID, err = h.checkMalicious(ctx, tx, watx)
10321038
return err
1033-
})
1034-
if err != nil {
1035-
return fmt.Errorf("check malicious: %w", err)
1039+
}); err != nil {
1040+
return fmt.Errorf("store atx: %w", err)
1041+
}
1042+
1043+
switch {
1044+
case republishProof: // marriage set of known malicious smesher has changed, force re-gossip of proof
1045+
if err := h.malPublisher.Regossip(ctx, watx.SmesherID); err != nil {
1046+
h.logger.Error("failed to regossip malfeasance proof",
1047+
zap.Stringer("atx_id", watx.ID()),
1048+
zap.Stringer("smesher_id", watx.SmesherID),
1049+
zap.Error(err),
1050+
)
1051+
}
1052+
case proof != nil: // new malfeasance proof for identity created, publish proof (gossip is decided by publisher)
1053+
if err := h.malPublisher.Publish(ctx, nodeID, proof); err != nil {
1054+
h.logger.Error("failed to publish malfeasance proof",
1055+
zap.Stringer("atx_id", watx.ID()),
1056+
zap.Stringer("smesher_id", watx.SmesherID),
1057+
zap.Error(err),
1058+
)
1059+
}
10361060
}
10371061

10381062
h.beacon.OnAtx(atx)
1039-
if added := h.atxsdata.AddFromAtx(atx, malicious); added != nil {
1063+
if added := h.atxsdata.AddFromAtx(atx, malicious || proof != nil); added != nil {
10401064
h.tortoise.OnAtx(atx.TargetEpoch(), atx.ID(), added)
10411065
}
10421066

0 commit comments

Comments
 (0)