Skip to content

Commit

Permalink
feat: add support for tradingtTransaction ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
ze97286 committed May 15, 2024
1 parent 13b93fb commit 794f791
Show file tree
Hide file tree
Showing 12 changed files with 2,714 additions and 2,404 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [11196](https://github.com/vegaprotocol/vega/issues/11196) - Add an active field in the price monitoring bounds payload.
- [11211](https://github.com/vegaprotocol/vega/issues/11211) - Liquidation engine includes `vAMM` shapes as available volume.
- [11217](https://github.com/vegaprotocol/vega/issues/11217) - Allow market proposals to override risk factors.
- [11285](https://github.com/vegaprotocol/vega/issues/11285) - Add support for trading transaction ordering.

### 🐛 Fixes

Expand Down
82 changes: 82 additions & 0 deletions core/blockchain/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"strconv"

"code.vegaprotocol.io/vega/core/blockchain"
"code.vegaprotocol.io/vega/core/txn"
vgcontext "code.vegaprotocol.io/vega/libs/context"
commandspb "code.vegaprotocol.io/vega/protos/vega/commands/v1"

"github.com/cometbft/cometbft/abci/types"
)
Expand Down Expand Up @@ -143,12 +145,76 @@ func (app *App) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.Re
return AddCommonCheckTxEvents(resp, tx), nil
}

func PreprocessBlock(txs []Tx, rawTxs [][]byte, previousBlock []Tx) ([]Tx, [][]byte) {
cancellations := []Tx{}
postOnly := []Tx{}

executatbleBatchesFromThisBlock := []Tx{}
anythingElseFromThisBlock := []Tx{}
nextBlockRtx := [][]byte{}

for i, tx := range txs {
switch tx.Command() {
case txn.CancelOrderCommand, txn.CancelAMMCommand, txn.StopOrdersCancellationCommand:
cancellations = append(cancellations, tx)
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.Unmarshal(s); err != nil {
continue
}
if s.PostOnly {
postOnly = append(postOnly, tx)
} else {
nextBlockRtx = append(nextBlockRtx, rawTxs[i])
}
case txn.AmendOrderCommand, txn.AmendAMMCommand, txn.StopOrdersSubmissionCommand:
nextBlockRtx = append(nextBlockRtx, rawTxs[i])
case txn.BatchMarketInstructions:
batch := &commandspb.BatchMarketInstructions{}
if err := tx.Unmarshal(batch); err != nil {
continue
}
// if there are no amends/submissions
if len(batch.Amendments) == 0 && len(batch.Submissions) == 0 && len(batch.StopOrdersSubmission) == 0 {
executatbleBatchesFromThisBlock = append(executatbleBatchesFromThisBlock, tx)
} else if len(batch.Amendments) == 0 && len(batch.StopOrdersSubmission) == 0 {
allPostOnly := true
for _, sub := range batch.Submissions {
if !sub.PostOnly {
allPostOnly = false
break
}
}
if allPostOnly {
executatbleBatchesFromThisBlock = append(executatbleBatchesFromThisBlock, tx)
continue
}
} else {
nextBlockRtx = append(nextBlockRtx, rawTxs[i])
}
default:
anythingElseFromThisBlock = append(anythingElseFromThisBlock, tx)
}
}

// arranging transactions for this block
thisBlock := []Tx{}
thisBlock = append(thisBlock, cancellations...) // cancellations go first
thisBlock = append(thisBlock, executatbleBatchesFromThisBlock...) // executable batches from this block go next
thisBlock = append(thisBlock, postOnly...) // then post only orders
thisBlock = append(thisBlock, previousBlock...) // then anything from previous block
thisBlock = append(thisBlock, anythingElseFromThisBlock...) // finally anything else from this block

return thisBlock, nextBlockRtx
}

// FinalizeBlock lets the application process a whole block end to end.
func (app *App) FinalizeBlock(_ context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
blockHeight := uint64(req.Height)
blockTime := req.Time

txs := make([]Tx, 0, len(req.Txs))
rtxs := make([][]byte, 0, len(req.Txs))
for _, rtx := range req.Txs {
// getTx can't fail at this point as we've verified on processProposal, however as it can fail in nullblockchain, handle it here
tx, _, err := app.getTx(rtx)
Expand All @@ -157,8 +223,24 @@ func (app *App) FinalizeBlock(_ context.Context, req *types.RequestFinalizeBlock
}
app.removeTxFromCache(rtx)
txs = append(txs, tx)
rtxs = append(rtxs, rtx)
}

prevBlockRawTxs := app.PreviousBlockTxCache.GetRawTxs()
prevBlockTxs := make([]Tx, 0, len(prevBlockRawTxs))
for _, rtx := range prevBlockRawTxs {
tx, _, err := app.getTx(rtx)
if err != nil {
continue
}
app.removeTxFromCache(rtx)
prevBlockTxs = append(prevBlockTxs, tx)
}

thisBlock, nextBlockRtx := PreprocessBlock(txs, rtxs, prevBlockTxs)
txs = thisBlock
app.PreviousBlockTxCache.SetRawTxs(nextBlockRtx)

app.ctx = app.OnBeginBlock(blockHeight, hex.EncodeToString(req.Hash), blockTime, hex.EncodeToString(req.ProposerAddress), txs)
results := make([]*types.ExecTxResult, 0, len(req.Txs))
events := []types.Event{}
Expand Down
7 changes: 7 additions & 0 deletions core/blockchain/abci/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type SnapshotEngine interface {
AddProviders(provs ...types.StateProvider)
}

type TxCache interface {
SetRawTxs(rtx [][]byte)
GetRawTxs() [][]byte
}

type App struct {
abci.BaseApplication
codec Codec
Expand All @@ -49,6 +54,8 @@ type App struct {
OnEndBlock OnEndBlockHandler
OnFinalize FinalizeHandler

PreviousBlockTxCache TxCache

// spam check
OnCheckTxSpam OnCheckTxSpamHandler

Expand Down
8 changes: 8 additions & 0 deletions core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ type EthCallEngine interface {
Start()
}

type TxCache interface {
SetRawTxs(rtx [][]byte)
GetRawTxs() [][]byte
}

type App struct {
abci *abci.App
currentTimestamp time.Time
Expand Down Expand Up @@ -299,6 +304,7 @@ func NewApp(log *logging.Logger,
ethCallEngine EthCallEngine,
balanceChecker BalanceChecker,
partiesEngine PartiesEngine,
txCache TxCache,
) *App {
log = log.Named(namedLogger)
log.SetLevel(config.Level.Get())
Expand Down Expand Up @@ -370,6 +376,8 @@ func NewApp(log *logging.Logger,
app.abci.OnApplySnapshotChunk = app.ApplySnapshotChunk
app.abci.OnLoadSnapshotChunk = app.LoadSnapshotChunk

app.abci.PreviousBlockTxCache = txCache

app.abci.
HandleCheckTx(txn.NodeSignatureCommand, app.RequireValidatorPubKey).
HandleCheckTx(txn.NodeVoteCommand, app.RequireValidatorPubKey).
Expand Down
4 changes: 4 additions & 0 deletions core/protocol/all_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"code.vegaprotocol.io/vega/core/statevar"
"code.vegaprotocol.io/vega/core/stats"
"code.vegaprotocol.io/vega/core/teams"
"code.vegaprotocol.io/vega/core/txcache"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/core/validators"
"code.vegaprotocol.io/vega/core/validators/erc20multisig"
Expand Down Expand Up @@ -128,6 +129,7 @@ type allServices struct {
ethereumOraclesVerifier *ethverifier.Verifier

partiesEngine *parties.SnapshottedEngine
txCache *txcache.TxCache

assets *assets.Service
topology *validators.Topology
Expand Down Expand Up @@ -289,6 +291,7 @@ func newServices(
svcs.teamsEngine = teams.NewSnapshottedEngine(svcs.broker, svcs.timeService)

svcs.partiesEngine = parties.NewSnapshottedEngine(svcs.broker)
svcs.txCache = txcache.NewTxCache()

svcs.statevar = statevar.New(svcs.log, svcs.conf.StateVar, svcs.broker, svcs.topology, svcs.commander)
svcs.marketActivityTracker = common.NewMarketActivityTracker(svcs.log, svcs.teamsEngine, svcs.stakingAccounts, svcs.broker)
Expand Down Expand Up @@ -424,6 +427,7 @@ func newServices(
svcs.topology.NotifyOnKeyChange(svcs.governance.ValidatorKeyChanged)

svcs.snapshotEngine.AddProviders(
svcs.txCache,
svcs.checkpoint,
svcs.collateral,
svcs.governance,
Expand Down
1 change: 1 addition & 0 deletions core/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func New(
svcs.ethCallEngine,
svcs.collateral,
svcs.partiesEngine,
svcs.txCache,
),
log: log,
confWatcher: confWatcher,
Expand Down
1 change: 1 addition & 0 deletions core/snapshot/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// providersInCallOrder holds the providers namespace in the order in which
// they must be called.
var providersInCallOrder = []types.SnapshotNamespace{
types.TxCacheSnapshot,
types.EpochSnapshot,
types.AssetsSnapshot, // Needs to happen before banking.
types.WitnessSnapshot, // Needs to happen before banking and governance.
Expand Down
85 changes: 85 additions & 0 deletions core/txcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package txcache

import (
"context"
"fmt"

"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/libs/proto"
snapshotpb "code.vegaprotocol.io/vega/protos/vega/snapshot/v1"
)

func NewTxCache() *TxCache {
return &TxCache{}
}

type TxCache struct {
rtxs [][]byte
}

func (t *TxCache) SetRawTxs(rtx [][]byte) {
t.rtxs = rtx
}

func (t *TxCache) GetRawTxs() [][]byte {
return t.rtxs
}

func (t *TxCache) Namespace() types.SnapshotNamespace {
return types.TxCacheSnapshot
}

func (t *TxCache) Keys() []string {
return []string{(&types.PayloadTxCache{}).Key()}
}

func (t *TxCache) GetState(k string) ([]byte, []types.StateProvider, error) {
payload := &snapshotpb.Payload{
Data: &snapshotpb.Payload_TxCache{
TxCache: &snapshotpb.TxCache{
Txs: t.rtxs,
},
},
}

serialised, err := proto.Marshal(payload)
if err != nil {
return nil, nil, fmt.Errorf("could not serialize tx cache payload: %w", err)
}
return serialised, nil, err
}

func (t *TxCache) LoadState(_ context.Context, p *types.Payload) ([]types.StateProvider, error) {
if t.Namespace() != p.Data.Namespace() {
return nil, types.ErrInvalidSnapshotNamespace
}

switch data := p.Data.(type) {
case *types.PayloadTxCache:
t.rtxs = data.TxCache.Txs
return nil, nil
default:
return nil, types.ErrUnknownSnapshotType
}
}

func (t *TxCache) Stopped() bool {
return false
}

func (e *TxCache) StopSnapshots() {}
1 change: 1 addition & 0 deletions core/types/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ const (
ActivityStreakSnapshot SnapshotNamespace = "activitystreak"
VolumeDiscountProgramSnapshot SnapshotNamespace = "volumeDiscountProgram"
LiquidationSnapshot SnapshotNamespace = "liquidation"
TxCacheSnapshot SnapshotNamespace = "txCache"

MaxChunkSize = 16 * 1000 * 1000 // technically 16 * 1024 * 1024, but you know
IdealChunkSize = 10 * 1000 * 1000 // aim for 10MB
Expand Down
34 changes: 34 additions & 0 deletions core/types/snapshot_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,8 @@ func PayloadFromProto(p *snapshot.Payload) *Payload {
ret.Data = PayloadEthOracleVerifierMisc(dt)
case *snapshot.Payload_EvmMultisigTopologies:
ret.Data = PayloadEVMMultisigTopologiesFromProto(dt)
case *snapshot.Payload_TxCache:
ret.Data = PayloadTxCacheFromProto(dt)
default:
panic(fmt.Errorf("missing support for payload %T", dt))
}
Expand Down Expand Up @@ -1084,6 +1086,8 @@ func (p Payload) IntoProto() *snapshot.Payload {
ret.Data = dt
case *snapshot.Payload_EvmMultisigTopologies:
ret.Data = dt
case *snapshot.Payload_TxCache:
ret.Data = dt
default:
panic(fmt.Errorf("missing support for payload %T", dt))
}
Expand Down Expand Up @@ -4285,3 +4289,33 @@ func (p *PayloadEVMMultisigTopologies) Namespace() SnapshotNamespace {
func (p *PayloadEVMMultisigTopologies) Key() string {
return "all"
}

type PayloadTxCache struct {
TxCache *snapshot.TxCache
}

func (p *PayloadTxCache) Key() string {
return "txCache"
}

func (*PayloadTxCache) Namespace() SnapshotNamespace {
return TxCacheSnapshot
}

func (p *PayloadTxCache) IntoProto() *snapshot.Payload_TxCache {
return &snapshot.Payload_TxCache{
TxCache: p.TxCache,
}
}

func (*PayloadTxCache) isPayload() {}

func (p *PayloadTxCache) plToProto() interface{} {
return p.IntoProto()
}

func PayloadTxCacheFromProto(txCachePayload *snapshot.Payload_TxCache) *PayloadTxCache {
return &PayloadTxCache{
TxCache: txCachePayload.TxCache,
}
}
5 changes: 5 additions & 0 deletions protos/sources/vega/snapshot/v1/snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ message Payload {
EthOracleVerifierMisc eth_oracle_verifier_misc = 85;
BankingEVMBridgeStates banking_evm_bridge_states = 86;
EVMMultisigTopologies evm_multisig_topologies = 87;
TxCache tx_cache = 88;
}
}

Expand Down Expand Up @@ -1421,3 +1422,7 @@ message MarketLiquidity {
int64 tick = 2;
repeated AMMValues amm = 3;
}

message TxCache {
repeated bytes txs = 1;
}
Loading

0 comments on commit 794f791

Please sign in to comment.