Skip to content

Commit

Permalink
feat: add support for trading transaction ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
ze97286 committed May 16, 2024
1 parent bc7cf9e commit 4a74854
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [11196](https://github.com/vegaprotocol/vega/issues/11196) - Add an active field in the price monitoring bounds payload.
- [11211](https://github.com/vegaprotocol/vega/issues/11211) - Liquidation engine includes `vAMM` shapes as available volume.
- [11217](https://github.com/vegaprotocol/vega/issues/11217) - Allow market proposals to override risk factors.
- [11285](https://github.com/vegaprotocol/vega/issues/11285) - Add support for trading transaction ordering.

### 🐛 Fixes

Expand Down
2 changes: 2 additions & 0 deletions commands/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func CheckInputData(rawInputData []byte) (*commandspb.InputData, Errors) {
errs.Merge(checkAmendAMM(cmd.AmendAmm))
case *commandspb.InputData_CancelAmm:
errs.Merge(checkCancelAMM(cmd.CancelAmm))
case *commandspb.InputData_DelayedTransactionsWrapper:
break
default:
errs.AddForProperty("tx.input_data.command", ErrIsNotSupported)
}
Expand Down
2 changes: 2 additions & 0 deletions core/events/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewTransactionResultEventFailure(

func (t *TransactionResult) setTx(tx interface{}) *TransactionResult {
switch tv := tx.(type) {
case *commandspb.DelayedTransactionsWrapper:
break
case *commandspb.OrderSubmission:
t.evt.Transaction = &eventspb.TransactionResult_OrderSubmission{
OrderSubmission: tv,
Expand Down
36 changes: 36 additions & 0 deletions core/nodewallets/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,36 @@ func NewCommander(cfg Config, log *logging.Logger, bc Chain, w *vega.Wallet, bst
}, nil
}

func (c *Commander) NewTransaction(ctx context.Context, cmd txn.Command, payload proto.Message) ([]byte, error) {
chainID, err := c.bc.GetChainID(ctx)
if err != nil {
c.log.Error("couldn't retrieve chain ID",
logging.Error(err),
)
return nil, err
}
inputData := commands.NewInputData(c.bstats.Height())
wrapPayloadIntoInputData(inputData, cmd, payload)
marshalInputData, err := commands.MarshalInputData(inputData)
if err != nil {
// this should never be possible
c.log.Panic("could not marshal core transaction", logging.Error(err))
}

signature, err := c.sign(commands.BundleInputDataForSigning(marshalInputData, chainID))
if err != nil {
// this should never be possible too
c.log.Panic("could not sign command", logging.Error(err))
}

tx := commands.NewTransaction(c.wallet.PubKey().Hex(), marshalInputData, signature)
marshalledTx, err := proto.Marshal(tx)
if err != nil {
return nil, err
}
return marshalledTx, nil
}

// Command - send command to chain.
// Note: beware when passing in an exponential back off since the done function may be called many times.
func (c *Commander) Command(ctx context.Context, cmd txn.Command, payload proto.Message, done func(string, error), bo *backoff.ExponentialBackOff) {
Expand Down Expand Up @@ -165,6 +195,12 @@ func wrapPayloadIntoInputData(data *commandspb.InputData, cmd txn.Command, paylo
switch cmd {
case txn.SubmitOrderCommand, txn.CancelOrderCommand, txn.AmendOrderCommand, txn.VoteCommand, txn.WithdrawCommand, txn.LiquidityProvisionCommand, txn.ProposeCommand, txn.BatchProposeCommand, txn.SubmitOracleDataCommand, txn.StopOrdersCancellationCommand, txn.StopOrdersSubmissionCommand:
panic("command is not supported to be sent by a node.")
case txn.DelayedTransactionsWrapper:
if underlyingCmd, ok := payload.(*commandspb.DelayedTransactionsWrapper); ok {
data.Command = &commandspb.InputData_DelayedTransactionsWrapper{
DelayedTransactionsWrapper: underlyingCmd,
}
}
case txn.ProtocolUpgradeCommand:
if underlyingCmd, ok := payload.(*commandspb.ProtocolUpgradeProposal); ok {
data.Command = &commandspb.InputData_ProtocolUpgradeProposal{
Expand Down
98 changes: 95 additions & 3 deletions core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ type EthCallEngine interface {
Start()
}

type TxCache interface {
SetRawTxs(rtx [][]byte)
GetRawTxs() [][]byte
NewDelayedTransaction(ctx context.Context, delayed [][]byte) []byte
}

type App struct {
abci *abci.App
currentTimestamp time.Time
Expand Down Expand Up @@ -253,6 +259,7 @@ type App struct {
nilSpam bool

maxBatchSize atomic.Uint64
txCache TxCache
}

func NewApp(log *logging.Logger,
Expand Down Expand Up @@ -297,6 +304,7 @@ func NewApp(log *logging.Logger,
ethCallEngine EthCallEngine,
balanceChecker BalanceChecker,
partiesEngine PartiesEngine,
txCache TxCache,
) *App {
log = log.Named(namedLogger)
log.SetLevel(config.Level.Get())
Expand Down Expand Up @@ -345,6 +353,7 @@ func NewApp(log *logging.Logger,
ethCallEngine: ethCallEngine,
balanceChecker: balanceChecker,
partiesEngine: partiesEngine,
txCache: txCache,
}

// setup handlers
Expand Down Expand Up @@ -527,7 +536,9 @@ func NewApp(log *logging.Logger,
).
HandleDeliverTx(txn.UpdatePartyProfileCommand,
app.SendTransactionResult(app.UpdatePartyProfile),
)
).
HandleDeliverTx(txn.DelayedTransactionsWrapper,
app.SendTransactionResult(app.handleDelayedTransactionWrapper))

app.time.NotifyOnTick(app.onTick)

Expand Down Expand Up @@ -891,7 +902,11 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
validationResults := []pow.ValidationEntry{}
maxGas := app.getMaxGas()
totalGasWanted := uint64(0)
blockTxs := [][]byte{}
cancellations := [][]byte{}
postOnly := [][]byte{}
executatbleBatchesFromThisBlock := [][]byte{}
anythingElseFromThisBlock := [][]byte{}
nextBlockRtx := [][]byte{}

for _, tx := range wrappedTxs {
totalBytes += int64(len(tx.raw))
Expand All @@ -903,6 +918,11 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
break
}

if tx.tx.Command() == txn.DelayedTransactionsWrapper {
app.log.Debug("delayed transaction wrapper should never be submitted into the mempool")
continue
}

if !app.nilPow {
vr, d := app.pow.CheckBlockTx(tx.tx)
validationResults = append(validationResults, pow.ValidationEntry{Tx: tx.tx, Difficulty: d, ValResult: vr})
Expand All @@ -923,9 +943,60 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
if err := app.canSubmitTx(tx.tx); err != nil {
continue
}

switch tx.tx.Command() {
case txn.CancelOrderCommand, txn.CancelAMMCommand, txn.StopOrdersCancellationCommand:
cancellations = append(cancellations, tx.raw)
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.tx.Unmarshal(s); err != nil {
continue
}
if s.PostOnly {
postOnly = append(postOnly, tx.raw)
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
case txn.AmendOrderCommand, txn.AmendAMMCommand, txn.StopOrdersSubmissionCommand:
nextBlockRtx = append(nextBlockRtx, tx.raw)
case txn.BatchMarketInstructions:
batch := &commandspb.BatchMarketInstructions{}
if err := tx.tx.Unmarshal(batch); err != nil {
continue
}
// if there are no amends/submissions
if len(batch.Amendments) == 0 && len(batch.Submissions) == 0 && len(batch.StopOrdersSubmission) == 0 {
executatbleBatchesFromThisBlock = append(executatbleBatchesFromThisBlock, tx.raw)
} else if len(batch.Amendments) == 0 && len(batch.StopOrdersSubmission) == 0 {
allPostOnly := true
for _, sub := range batch.Submissions {
if !sub.PostOnly {
allPostOnly = false
break
}
}
if allPostOnly {
executatbleBatchesFromThisBlock = append(executatbleBatchesFromThisBlock, tx.raw)
continue
}
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
default:
anythingElseFromThisBlock = append(anythingElseFromThisBlock, tx.raw)
}

app.log.Debug("adding tx to blockProposal", logging.String("tx-hash", hex.EncodeToString(tx.tx.Hash())), logging.String("tid", tx.tx.GetPoWTID()))
blockTxs = append(blockTxs, tx.raw)
}

blockTxs := [][]byte{}
blockTxs = append(blockTxs, cancellations...) // cancellations go first
blockTxs = append(blockTxs, executatbleBatchesFromThisBlock...) // executable batches from this block go next
blockTxs = append(blockTxs, postOnly...) // then post only orders
blockTxs = append(blockTxs, app.txCache.GetRawTxs()...) // then anything from previous block
blockTxs = append(blockTxs, anythingElseFromThisBlock...) // finally anything else from this block
blockTxs = append(blockTxs, app.txCache.NewDelayedTransaction(app.blockCtx, nextBlockRtx))

app.log.Debug("prepareProposal returned with", logging.Int("blockTxs", len(blockTxs)))
if !app.nilPow {
app.pow.EndPrepareProposal(validationResults)
Expand All @@ -947,6 +1018,8 @@ func (app *App) processProposal(txs []abci.Tx) bool {
maxGas := app.gastimator.GetMaxGas()
maxBytes := tmtypesint.DefaultBlockParams().MaxBytes * 4
size := int64(0)
delayedTxCount := 0

for _, tx := range txs {
size += int64(tx.GetLength())
if size > maxBytes {
Expand All @@ -960,6 +1033,13 @@ func (app *App) processProposal(txs []abci.Tx) bool {
if totalGasWanted > int(maxGas) {
return false
}
// allow only one delayed transaction wrapper in one block and its transactions must match what we expect.
if tx.Command() == txn.DelayedTransactionsWrapper {
if delayedTxCount > 0 {
return false
}
delayedTxCount += 1
}
}

if !app.nilPow && !app.pow.ProcessProposal(txs) {
Expand Down Expand Up @@ -2715,6 +2795,18 @@ func (app *App) JoinTeam(ctx context.Context, tx abci.Tx) error {
return nil
}

func (app *App) handleDelayedTransactionWrapper(ctx context.Context, tx abci.Tx) error {
println("handleDelayedTransactionWrapper")
txs := &commandspb.DelayedTransactionsWrapper{}
if err := tx.Unmarshal(txs); err != nil {
println("failed to unmarshal delayed tx wrapper", err.Error())
return fmt.Errorf("could not deserialize DelayedTransactionsWrapper command: %w", err)
}
println("setting tx cache to ", len(txs.Transactions), "transactions")
app.txCache.SetRawTxs(txs.Transactions)
return nil
}

func (app *App) UpdatePartyProfile(ctx context.Context, tx abci.Tx) error {
params := &commandspb.UpdatePartyProfile{}
if err := tx.Unmarshal(params); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/processor/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func DecodeTx(payload []byte, chainID string) (*Tx, error) {

func (t Tx) Command() txn.Command {
switch cmd := t.inputData.Command.(type) {
case *commandspb.InputData_DelayedTransactionsWrapper:
return txn.DelayedTransactionsWrapper
case *commandspb.InputData_OrderSubmission:
return txn.SubmitOrderCommand
case *commandspb.InputData_OrderCancellation:
Expand Down Expand Up @@ -261,6 +263,8 @@ func (t Tx) GetCmd() interface{} {
return cmd.AmendAmm
case *commandspb.InputData_CancelAmm:
return cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
return cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down Expand Up @@ -490,6 +494,12 @@ func (t Tx) Unmarshal(i interface{}) error {
return errors.New("failed to unmarshall to CancelAMM")
}
*underlyingCmd = *cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
underlyingCmd, ok := i.(*commandspb.DelayedTransactionsWrapper)
if !ok {
return errors.New("failed to unmarshall to DelayedTransactionsWrapper")
}
*underlyingCmd = *cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down
3 changes: 3 additions & 0 deletions core/protocol/all_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"code.vegaprotocol.io/vega/core/statevar"
"code.vegaprotocol.io/vega/core/stats"
"code.vegaprotocol.io/vega/core/teams"
"code.vegaprotocol.io/vega/core/txcache"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/core/validators"
"code.vegaprotocol.io/vega/core/validators/erc20multisig"
Expand Down Expand Up @@ -128,6 +129,7 @@ type allServices struct {
ethereumOraclesVerifier *ethverifier.Verifier

partiesEngine *parties.SnapshottedEngine
txCache *txcache.TxCache

assets *assets.Service
topology *validators.Topology
Expand Down Expand Up @@ -289,6 +291,7 @@ func newServices(
svcs.teamsEngine = teams.NewSnapshottedEngine(svcs.broker, svcs.timeService)

svcs.partiesEngine = parties.NewSnapshottedEngine(svcs.broker)
svcs.txCache = txcache.NewTxCache(svcs.commander)

svcs.statevar = statevar.New(svcs.log, svcs.conf.StateVar, svcs.broker, svcs.topology, svcs.commander)
svcs.marketActivityTracker = common.NewMarketActivityTracker(svcs.log, svcs.teamsEngine, svcs.stakingAccounts, svcs.broker)
Expand Down
1 change: 1 addition & 0 deletions core/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func New(
svcs.ethCallEngine,
svcs.collateral,
svcs.partiesEngine,
svcs.txCache,
),
log: log,
confWatcher: confWatcher,
Expand Down
52 changes: 52 additions & 0 deletions core/txcache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package txcache

import (
"context"

"code.vegaprotocol.io/vega/core/nodewallets"
"code.vegaprotocol.io/vega/core/txn"
commandspb "code.vegaprotocol.io/vega/protos/vega/commands/v1"
)

func NewTxCache(commander *nodewallets.Commander) *TxCache {
return &TxCache{
commander: commander,
}
}

type TxCache struct {
commander *nodewallets.Commander
rtxs [][]byte
}

func (t *TxCache) NewDelayedTransaction(ctx context.Context, delayed [][]byte) []byte {
payload := &commandspb.DelayedTransactionsWrapper{Transactions: delayed}
tx, err := t.commander.NewTransaction(ctx, txn.DelayedTransactionsWrapper, payload)
if err != nil {
panic(err.Error())
}
return tx
}

func (t *TxCache) SetRawTxs(rtx [][]byte) {
t.rtxs = rtx
}

func (t *TxCache) GetRawTxs() [][]byte {
return t.rtxs
}
5 changes: 4 additions & 1 deletion core/txn/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const (
AmendAMMCommand Command = 0x65
// CancelAMMCommand ...
CancelAMMCommand Command = 0x66
// DelayedTransactionsWrapper ...
DelayedTransactionsWrapper Command = 0x67
)

var commandName = map[Command]string{
Expand Down Expand Up @@ -134,11 +136,12 @@ var commandName = map[Command]string{
SubmitAMMCommand: "Submit AMM",
AmendAMMCommand: "Amend AMM",
CancelAMMCommand: "Cancel AMM",
DelayedTransactionsWrapper: "Delatyed Transactions Wrapper",
}

func (cmd Command) IsValidatorCommand() bool {
switch cmd {
case NodeSignatureCommand, ChainEventCommand, NodeVoteCommand, ValidatorHeartbeatCommand, RotateKeySubmissionCommand, StateVariableProposalCommand, RotateEthereumKeySubmissionCommand:
case DelayedTransactionsWrapper, NodeSignatureCommand, ChainEventCommand, NodeVoteCommand, ValidatorHeartbeatCommand, RotateKeySubmissionCommand, StateVariableProposalCommand, RotateEthereumKeySubmissionCommand:
return true
default:
return false
Expand Down
Loading

0 comments on commit 4a74854

Please sign in to comment.