From 4be12da733a94223a947003c34bae4be40709cc8 Mon Sep 17 00:00:00 2001 From: wwestgarth Date: Fri, 6 Oct 2023 12:53:57 +0100 Subject: [PATCH] fix: all events are now sent out in a deterministic order --- CHANGELOG.md | 1 + core/execution/engine.go | 8 ++++++-- core/governance/engine.go | 2 +- core/netparams/netparams.go | 16 +++++++++++----- core/notary/notary.go | 4 ++++ core/processor/abci.go | 2 +- core/protocol/upon_genesis.go | 8 ++++++-- core/validators/topology.go | 6 +++++- 8 files changed, 35 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c36161c509b..5af1134f536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -149,6 +149,7 @@ - [9343](https://github.com/vegaprotocol/vega/issues/9343) - Prevent malicious validator submitting Ethereum oracle chain event prior to initial start time - [8792](https://github.com/vegaprotocol/vega/issues/8792) - Fix panic when starting null block chain node. - [8739](https://github.com/vegaprotocol/vega/issues/8739) - Cancel orders for rejected markets. +- [9685](https://github.com/vegaprotocol/vega/issues/9685) - All events for core are sent out in a stable order. - [9594](https://github.com/vegaprotocol/vega/issues/9594) - non deterministic order events on market termination - [9350](https://github.com/vegaprotocol/vega/issues/9350) - Clear account ledger events causing segment divergence - [9118](https://github.com/vegaprotocol/vega/issues/9118) - Improve list stop orders error message diff --git a/core/execution/engine.go b/core/execution/engine.go index a611498b7ea..cde3573ecfa 100644 --- a/core/execution/engine.go +++ b/core/execution/engine.go @@ -31,6 +31,7 @@ import ( "code.vegaprotocol.io/vega/libs/num" "code.vegaprotocol.io/vega/logging" "code.vegaprotocol.io/vega/protos/vega" + "golang.org/x/exp/maps" ) var ( @@ -1355,9 +1356,12 @@ func (e *Engine) OnTick(ctx context.Context, t time.Time) { evts = append(evts, events.NewMarketDataEvent(ctx, mkt.GetMarketData())) } e.broker.SendBatch(evts) + // reject successor markets in the toSkip list - for _, sid := range toSkip { - e.RejectMarket(ctx, sid) + mids := maps.Values(toSkip) + sort.Strings(mids) + for _, mid := range mids { + e.RejectMarket(ctx, mid) } rmCPStates := make([]string, 0, len(toDelete)) diff --git a/core/governance/engine.go b/core/governance/engine.go index da7007b391b..67dbd38a39d 100644 --- a/core/governance/engine.go +++ b/core/governance/engine.go @@ -1117,7 +1117,7 @@ func newUpdatedProposalEvents(ctx context.Context, proposal *proposal) []events. votes = append(votes, events.NewVoteEvent(ctx, *n)) } - sort.Slice(votes, func(i, j int) bool { + sort.SliceStable(votes, func(i, j int) bool { return votes[i].Proto().Timestamp < votes[j].Proto().Timestamp }) diff --git a/core/netparams/netparams.go b/core/netparams/netparams.go index e456b2e8a33..965554e4955 100644 --- a/core/netparams/netparams.go +++ b/core/netparams/netparams.go @@ -130,15 +130,19 @@ func (s *Store) UponGenesis(ctx context.Context, rawState []byte) (err error) { } evts := make([]events.Event, 0, len(s.store)) + keys := maps.Keys(s.store) + sort.Strings(keys) // first we going to send the initial state through the broker - for k, v := range s.store { - evts = append(evts, events.NewNetworkParameterEvent(ctx, k, v.String())) + for _, k := range keys { + evts = append(evts, events.NewNetworkParameterEvent(ctx, k, s.store[k].String())) } s.broker.SendBatch(evts) // now iterate over all parameters and update the existing ones - for k, v := range state { - if err := s.UpdateOptionalValidation(ctx, k, v, false, true); err != nil { + keys = maps.Keys(state) + sort.Strings(keys) + for _, k := range keys { + if err := s.UpdateOptionalValidation(ctx, k, state[k], false, true); err != nil { return fmt.Errorf("%v: %v", k, err) } } @@ -160,7 +164,9 @@ func (s *Store) UponGenesis(ctx context.Context, rawState []byte) (err error) { // now we can iterate again over ALL the net params, // and dispatch the value of them all so any watchers can get updated // with genesis values - for k := range s.store { + keys = maps.Keys(s.store) + sort.Strings(keys) + for _, k := range keys { if err := s.dispatchUpdate(ctx, k); err != nil { return fmt.Errorf("could not propagate netparams update to listener, %v: %v", k, err) } diff --git a/core/notary/notary.go b/core/notary/notary.go index a62d036a681..2669b88e791 100644 --- a/core/notary/notary.go +++ b/core/notary/notary.go @@ -215,6 +215,7 @@ func (n *Notary) IsSigned( // aggregate node sig sig := map[string]struct{}{} out := []commandspb.NodeSignature{} + for k := range n.sigs[idkind] { // is node sig is part of the registered nodes, and is a tendermint validator // add it to the map @@ -231,6 +232,9 @@ func (n *Notary) IsSigned( // now we check the number of required node sigs if n.votePassed(len(sig), n.top.Len()) { + sort.Slice(out, func(i, j int) bool { + return string(out[i].Sig) < string(out[j].Sig) + }) return out, true } diff --git a/core/processor/abci.go b/core/processor/abci.go index f3f194e1f67..26b2bde8bed 100644 --- a/core/processor/abci.go +++ b/core/processor/abci.go @@ -722,7 +722,7 @@ func (app *App) LoadSnapshotChunk(req tmtypes.RequestLoadSnapshotChunk) tmtypes. func (app *App) OnInitChain(req tmtypes.RequestInitChain) tmtypes.ResponseInitChain { app.log.Debug("ABCI service InitChain start") - hash := hex.EncodeToString(vgcrypto.Hash(req.AppStateBytes)) + hash := hex.EncodeToString(vgcrypto.Hash([]byte(req.ChainId))) app.abci.SetChainID(req.ChainId) app.chainCtx = vgcontext.WithChainID(context.Background(), req.ChainId) ctx := vgcontext.WithBlockHeight(app.chainCtx, req.InitialHeight) diff --git a/core/protocol/upon_genesis.go b/core/protocol/upon_genesis.go index 5e5f38ed663..27c9b1b519a 100644 --- a/core/protocol/upon_genesis.go +++ b/core/protocol/upon_genesis.go @@ -16,12 +16,14 @@ import ( "context" "errors" "fmt" + "sort" "code.vegaprotocol.io/vega/core/assets" "code.vegaprotocol.io/vega/core/types" "code.vegaprotocol.io/vega/logging" proto "code.vegaprotocol.io/vega/protos/vega" "github.com/cenkalti/backoff" + "golang.org/x/exp/maps" ) var ( @@ -48,8 +50,10 @@ func (svcs *allServices) UponGenesis(ctx context.Context, rawstate []byte) (err return nil } - for k, v := range state { - err := svcs.loadAsset(ctx, k, v) + keys := maps.Keys(state) + sort.Strings(keys) + for _, k := range keys { + err := svcs.loadAsset(ctx, k, state[k]) if err != nil { return err } diff --git a/core/validators/topology.go b/core/validators/topology.go index afb3b284435..c3764465244 100644 --- a/core/validators/topology.go +++ b/core/validators/topology.go @@ -32,6 +32,7 @@ import ( proto "code.vegaprotocol.io/vega/protos/vega" commandspb "code.vegaprotocol.io/vega/protos/vega/commands/v1" v1 "code.vegaprotocol.io/vega/protos/vega/snapshot/v1" + "golang.org/x/exp/maps" "github.com/ethereum/go-ethereum/common" abcitypes "github.com/tendermint/tendermint/abci/types" @@ -566,7 +567,10 @@ func (t *Topology) LoadValidatorsOnGenesis(ctx context.Context, rawstate []byte) } // tm is base64 encoded, vega is hex - for tm, data := range state { + keys := maps.Keys(state) + sort.Strings(keys) + for _, tm := range keys { + data := state[tm] if !data.IsValid() { return fmt.Errorf("missing required field from validator data: %#v", data) }