Skip to content

Commit

Permalink
Merge pull request #9686 from vegaprotocol/deterministic-genesis-evts
Browse files Browse the repository at this point in the history
fix: all events are now sent out in a deterministic order
  • Loading branch information
jeremyletang authored Oct 6, 2023
2 parents fcbb227 + 4be12da commit 001c57c
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
- [9343](https://github.com/vegaprotocol/vega/issues/9343) - Prevent malicious validator submitting Ethereum oracle chain event prior to initial start time
- [8792](https://github.com/vegaprotocol/vega/issues/8792) - Fix panic when starting null block chain node.
- [8739](https://github.com/vegaprotocol/vega/issues/8739) - Cancel orders for rejected markets.
- [9685](https://github.com/vegaprotocol/vega/issues/9685) - All events for core are sent out in a stable order.
- [9594](https://github.com/vegaprotocol/vega/issues/9594) - non deterministic order events on market termination
- [9350](https://github.com/vegaprotocol/vega/issues/9350) - Clear account ledger events causing segment divergence
- [9118](https://github.com/vegaprotocol/vega/issues/9118) - Improve list stop orders error message
Expand Down
8 changes: 6 additions & 2 deletions core/execution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"code.vegaprotocol.io/vega/libs/num"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/protos/vega"
"golang.org/x/exp/maps"
)

var (
Expand Down Expand Up @@ -1355,9 +1356,12 @@ func (e *Engine) OnTick(ctx context.Context, t time.Time) {
evts = append(evts, events.NewMarketDataEvent(ctx, mkt.GetMarketData()))
}
e.broker.SendBatch(evts)

// reject successor markets in the toSkip list
for _, sid := range toSkip {
e.RejectMarket(ctx, sid)
mids := maps.Values(toSkip)
sort.Strings(mids)
for _, mid := range mids {
e.RejectMarket(ctx, mid)
}

rmCPStates := make([]string, 0, len(toDelete))
Expand Down
2 changes: 1 addition & 1 deletion core/governance/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func newUpdatedProposalEvents(ctx context.Context, proposal *proposal) []events.
votes = append(votes, events.NewVoteEvent(ctx, *n))
}

sort.Slice(votes, func(i, j int) bool {
sort.SliceStable(votes, func(i, j int) bool {
return votes[i].Proto().Timestamp < votes[j].Proto().Timestamp
})

Expand Down
16 changes: 11 additions & 5 deletions core/netparams/netparams.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,19 @@ func (s *Store) UponGenesis(ctx context.Context, rawState []byte) (err error) {
}

evts := make([]events.Event, 0, len(s.store))
keys := maps.Keys(s.store)
sort.Strings(keys)
// first we going to send the initial state through the broker
for k, v := range s.store {
evts = append(evts, events.NewNetworkParameterEvent(ctx, k, v.String()))
for _, k := range keys {
evts = append(evts, events.NewNetworkParameterEvent(ctx, k, s.store[k].String()))
}
s.broker.SendBatch(evts)

// now iterate over all parameters and update the existing ones
for k, v := range state {
if err := s.UpdateOptionalValidation(ctx, k, v, false, true); err != nil {
keys = maps.Keys(state)
sort.Strings(keys)
for _, k := range keys {
if err := s.UpdateOptionalValidation(ctx, k, state[k], false, true); err != nil {
return fmt.Errorf("%v: %v", k, err)
}
}
Expand All @@ -160,7 +164,9 @@ func (s *Store) UponGenesis(ctx context.Context, rawState []byte) (err error) {
// now we can iterate again over ALL the net params,
// and dispatch the value of them all so any watchers can get updated
// with genesis values
for k := range s.store {
keys = maps.Keys(s.store)
sort.Strings(keys)
for _, k := range keys {
if err := s.dispatchUpdate(ctx, k); err != nil {
return fmt.Errorf("could not propagate netparams update to listener, %v: %v", k, err)
}
Expand Down
4 changes: 4 additions & 0 deletions core/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (n *Notary) IsSigned(
// aggregate node sig
sig := map[string]struct{}{}
out := []commandspb.NodeSignature{}

for k := range n.sigs[idkind] {
// is node sig is part of the registered nodes, and is a tendermint validator
// add it to the map
Expand All @@ -231,6 +232,9 @@ func (n *Notary) IsSigned(

// now we check the number of required node sigs
if n.votePassed(len(sig), n.top.Len()) {
sort.Slice(out, func(i, j int) bool {
return string(out[i].Sig) < string(out[j].Sig)
})
return out, true
}

Expand Down
2 changes: 1 addition & 1 deletion core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ func (app *App) LoadSnapshotChunk(req tmtypes.RequestLoadSnapshotChunk) tmtypes.

func (app *App) OnInitChain(req tmtypes.RequestInitChain) tmtypes.ResponseInitChain {
app.log.Debug("ABCI service InitChain start")
hash := hex.EncodeToString(vgcrypto.Hash(req.AppStateBytes))
hash := hex.EncodeToString(vgcrypto.Hash([]byte(req.ChainId)))
app.abci.SetChainID(req.ChainId)
app.chainCtx = vgcontext.WithChainID(context.Background(), req.ChainId)
ctx := vgcontext.WithBlockHeight(app.chainCtx, req.InitialHeight)
Expand Down
8 changes: 6 additions & 2 deletions core/protocol/upon_genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"context"
"errors"
"fmt"
"sort"

"code.vegaprotocol.io/vega/core/assets"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/logging"
proto "code.vegaprotocol.io/vega/protos/vega"
"github.com/cenkalti/backoff"
"golang.org/x/exp/maps"
)

var (
Expand All @@ -48,8 +50,10 @@ func (svcs *allServices) UponGenesis(ctx context.Context, rawstate []byte) (err
return nil
}

for k, v := range state {
err := svcs.loadAsset(ctx, k, v)
keys := maps.Keys(state)
sort.Strings(keys)
for _, k := range keys {
err := svcs.loadAsset(ctx, k, state[k])
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion core/validators/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
proto "code.vegaprotocol.io/vega/protos/vega"
commandspb "code.vegaprotocol.io/vega/protos/vega/commands/v1"
v1 "code.vegaprotocol.io/vega/protos/vega/snapshot/v1"
"golang.org/x/exp/maps"

"github.com/ethereum/go-ethereum/common"
abcitypes "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -566,7 +567,10 @@ func (t *Topology) LoadValidatorsOnGenesis(ctx context.Context, rawstate []byte)
}

// tm is base64 encoded, vega is hex
for tm, data := range state {
keys := maps.Keys(state)
sort.Strings(keys)
for _, tm := range keys {
data := state[tm]
if !data.IsValid() {
return fmt.Errorf("missing required field from validator data: %#v", data)
}
Expand Down

0 comments on commit 001c57c

Please sign in to comment.