Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for trading transaction ordering #11286

Merged
merged 7 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [11196](https://github.com/vegaprotocol/vega/issues/11196) - Add an active field in the price monitoring bounds payload.
- [11211](https://github.com/vegaprotocol/vega/issues/11211) - Liquidation engine includes `vAMM` shapes as available volume.
- [11217](https://github.com/vegaprotocol/vega/issues/11217) - Allow market proposals to override risk factors.
- [11285](https://github.com/vegaprotocol/vega/issues/11285) - Add support for trading transaction ordering.
- [11282](https://github.com/vegaprotocol/vega/issues/11282) - Allow a party to withdraw rewards from an AMM vested account.

### 🐛 Fixes
Expand Down
2 changes: 2 additions & 0 deletions commands/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func CheckInputData(rawInputData []byte) (*commandspb.InputData, Errors) {
errs.Merge(checkAmendAMM(cmd.AmendAmm))
case *commandspb.InputData_CancelAmm:
errs.Merge(checkCancelAMM(cmd.CancelAmm))
case *commandspb.InputData_DelayedTransactionsWrapper:
break
default:
errs.AddForProperty("tx.input_data.command", ErrIsNotSupported)
}
Expand Down
5 changes: 5 additions & 0 deletions core/blockchain/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (app *App) InitChain(_ context.Context, req *types.RequestInitChain) (*type
return &types.ResponseInitChain{}, nil
}

func (app *App) GetTx(tx []byte) (Tx, error) {
txx, _, err := app.getTx(tx)
return txx, err
}

// PrepareProposal will take the given transactions from the mempool and attempts to prepare a
// proposal from them when it's our turn to do so while keeping the size, gas, pow, and spam constraints.
func (app *App) PrepareProposal(_ context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
Expand Down
2 changes: 2 additions & 0 deletions core/events/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewTransactionResultEventFailure(

func (t *TransactionResult) setTx(tx interface{}) *TransactionResult {
switch tv := tx.(type) {
case *commandspb.DelayedTransactionsWrapper:
break
case *commandspb.OrderSubmission:
t.evt.Transaction = &eventspb.TransactionResult_OrderSubmission{
OrderSubmission: tv,
Expand Down
36 changes: 36 additions & 0 deletions core/nodewallets/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,36 @@ func NewCommander(cfg Config, log *logging.Logger, bc Chain, w *vega.Wallet, bst
}, nil
}

func (c *Commander) NewTransaction(ctx context.Context, cmd txn.Command, payload proto.Message) ([]byte, error) {
chainID, err := c.bc.GetChainID(ctx)
if err != nil {
c.log.Error("couldn't retrieve chain ID",
logging.Error(err),
)
return nil, err
}
inputData := commands.NewInputData(c.bstats.Height())
wrapPayloadIntoInputData(inputData, cmd, payload)
marshalInputData, err := commands.MarshalInputData(inputData)
if err != nil {
// this should never be possible
c.log.Panic("could not marshal core transaction", logging.Error(err))
}

signature, err := c.sign(commands.BundleInputDataForSigning(marshalInputData, chainID))
if err != nil {
// this should never be possible too
c.log.Panic("could not sign command", logging.Error(err))
}

tx := commands.NewTransaction(c.wallet.PubKey().Hex(), marshalInputData, signature)
marshalledTx, err := proto.Marshal(tx)
if err != nil {
return nil, err
}
return marshalledTx, nil
}

// Command - send command to chain.
// Note: beware when passing in an exponential back off since the done function may be called many times.
func (c *Commander) Command(ctx context.Context, cmd txn.Command, payload proto.Message, done func(string, error), bo *backoff.ExponentialBackOff) {
Expand Down Expand Up @@ -165,6 +195,12 @@ func wrapPayloadIntoInputData(data *commandspb.InputData, cmd txn.Command, paylo
switch cmd {
case txn.SubmitOrderCommand, txn.CancelOrderCommand, txn.AmendOrderCommand, txn.VoteCommand, txn.WithdrawCommand, txn.LiquidityProvisionCommand, txn.ProposeCommand, txn.BatchProposeCommand, txn.SubmitOracleDataCommand, txn.StopOrdersCancellationCommand, txn.StopOrdersSubmissionCommand:
panic("command is not supported to be sent by a node.")
case txn.DelayedTransactionsWrapper:
if underlyingCmd, ok := payload.(*commandspb.DelayedTransactionsWrapper); ok {
data.Command = &commandspb.InputData_DelayedTransactionsWrapper{
DelayedTransactionsWrapper: underlyingCmd,
}
}
case txn.ProtocolUpgradeCommand:
if underlyingCmd, ok := payload.(*commandspb.ProtocolUpgradeProposal); ok {
data.Command = &commandspb.InputData_ProtocolUpgradeProposal{
Expand Down
148 changes: 142 additions & 6 deletions core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ type EthCallEngine interface {
Start()
}

type TxCache interface {
SetRawTxs(rtx [][]byte)
GetRawTxs() [][]byte
NewDelayedTransaction(ctx context.Context, delayed [][]byte) []byte
}

type App struct {
abci *abci.App
currentTimestamp time.Time
Expand Down Expand Up @@ -253,6 +259,9 @@ type App struct {
nilSpam bool

maxBatchSize atomic.Uint64
txCache TxCache

seenDelayedTxTransactions bool
}

func NewApp(log *logging.Logger,
Expand Down Expand Up @@ -297,6 +306,7 @@ func NewApp(log *logging.Logger,
ethCallEngine EthCallEngine,
balanceChecker BalanceChecker,
partiesEngine PartiesEngine,
txCache TxCache,
) *App {
log = log.Named(namedLogger)
log.SetLevel(config.Level.Get())
Expand Down Expand Up @@ -345,6 +355,7 @@ func NewApp(log *logging.Logger,
ethCallEngine: ethCallEngine,
balanceChecker: balanceChecker,
partiesEngine: partiesEngine,
txCache: txCache,
}

// setup handlers
Expand Down Expand Up @@ -527,7 +538,9 @@ func NewApp(log *logging.Logger,
).
HandleDeliverTx(txn.UpdatePartyProfileCommand,
app.SendTransactionResult(app.UpdatePartyProfile),
)
).
HandleDeliverTx(txn.DelayedTransactionsWrapper,
app.SendTransactionResult(app.handleDelayedTransactionWrapper))

app.time.NotifyOnTick(app.onTick)

Expand Down Expand Up @@ -863,19 +876,59 @@ func (app *App) OnInitChain(req *tmtypes.RequestInitChain) (*tmtypes.ResponseIni
// caused the party to get blocked.
func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
var totalBytes int64
validationResults := []pow.ValidationEntry{}

// internally we use this as max bytes, externally to consensus params we return max ints. This is done so that cometbft always returns to us the full mempool
// and we can first sort it by priority and then reap by size.
maxBytes := tmtypesint.DefaultBlockParams().MaxBytes * 4
app.log.Debug("prepareProposal called with", logging.Int("txs", len(rawTxs)), logging.Int64("max-bytes", maxBytes))

// as transactions that are wrapped for sending in the next block are not removed from the mempool
// to avoid adding them both from the mempool and from the cache we need to check
// they were not in the cache.
// we still need to check that the transactions from previous block are passing pow and spam requirements.
addedFromPreviousHash := map[string]struct{}{}
delayedTxs := [][]byte{}
for _, txx := range app.txCache.GetRawTxs() {
tx, err := app.abci.GetTx(txx)
if err != nil {
continue
}
if !app.nilPow {
vr, d := app.pow.CheckBlockTx(tx)
validationResults = append(validationResults, pow.ValidationEntry{Tx: tx, Difficulty: d, ValResult: vr})
if vr != pow.ValidationResultSuccess && vr != pow.ValidationResultValidatorCommand {
app.log.Debug("pow failure", logging.Int64("validation-result", int64(vr)))
continue
}
}
if !app.nilSpam {
err := app.spam.CheckBlockTx(tx)
if err != nil {
app.log.Debug("spam error", logging.Error(err))
continue
}
}
if err := app.canSubmitTx(tx); err != nil {
continue
}

addedFromPreviousHash[hex.EncodeToString(tx.Hash())] = struct{}{}
delayedTxs = append(delayedTxs, txx)
totalBytes += int64(len(txx))
}

// wrap the transaction with information about gas wanted and priority
wrappedTxs := make([]*TxWrapper, 0, len(txs))
for i, v := range txs {
wtx, error := app.wrapTx(v, rawTxs[i], i)
if error != nil {
continue
}
if _, ok := addedFromPreviousHash[hex.EncodeToString(wtx.tx.Hash())]; ok {
app.log.Debug("ignoring mempool transaction corresponding to a delayed transaction from previous block")
continue
}
wrappedTxs = append(wrappedTxs, wtx)
}

Expand All @@ -888,10 +941,12 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
})

// add transactions to the block as long as we can without breaking size and gas limits in order of priority
validationResults := []pow.ValidationEntry{}
maxGas := app.getMaxGas()
totalGasWanted := uint64(0)
blockTxs := [][]byte{}
cancellations := [][]byte{}
postOnly := [][]byte{}
anythingElseFromThisBlock := [][]byte{}
nextBlockRtx := [][]byte{}

for _, tx := range wrappedTxs {
totalBytes += int64(len(tx.raw))
Expand All @@ -903,6 +958,11 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
break
}

if tx.tx.Command() == txn.DelayedTransactionsWrapper {
app.log.Debug("delayed transaction wrapper should never be submitted into the mempool")
continue
}

if !app.nilPow {
vr, d := app.pow.CheckBlockTx(tx.tx)
validationResults = append(validationResults, pow.ValidationEntry{Tx: tx.tx, Difficulty: d, ValResult: vr})
Expand All @@ -923,10 +983,61 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
if err := app.canSubmitTx(tx.tx); err != nil {
continue
}
app.log.Debug("adding tx to blockProposal", logging.String("tx-hash", hex.EncodeToString(tx.tx.Hash())), logging.String("tid", tx.tx.GetPoWTID()))
blockTxs = append(blockTxs, tx.raw)

switch tx.tx.Command() {
case txn.CancelOrderCommand, txn.CancelAMMCommand, txn.StopOrdersCancellationCommand:
cancellations = append(cancellations, tx.raw)
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.tx.Unmarshal(s); err != nil {
continue
}
if s.PostOnly {
postOnly = append(postOnly, tx.raw)
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
case txn.AmendOrderCommand, txn.AmendAMMCommand, txn.StopOrdersSubmissionCommand:
nextBlockRtx = append(nextBlockRtx, tx.raw)
case txn.BatchMarketInstructions:
batch := &commandspb.BatchMarketInstructions{}
if err := tx.tx.Unmarshal(batch); err != nil {
continue
}
// if there are no amends/submissions
if len(batch.Amendments) == 0 && len(batch.Submissions) == 0 && len(batch.StopOrdersSubmission) == 0 {
cancellations = append(cancellations, tx.raw)
} else if len(batch.Amendments) == 0 && len(batch.StopOrdersSubmission) == 0 {
allPostOnly := true
for _, sub := range batch.Submissions {
if !sub.PostOnly {
allPostOnly = false
break
}
}
if allPostOnly {
postOnly = append(postOnly, tx.raw)
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
default:
anythingElseFromThisBlock = append(anythingElseFromThisBlock, tx.raw)
}
}
blockTxs := [][]byte{}
blockTxs = append(blockTxs, cancellations...) // cancellations go first
blockTxs = append(blockTxs, postOnly...) // then post only orders
if delayedTxs != nil {
blockTxs = append(blockTxs, delayedTxs...) // then anything from previous block
}
blockTxs = append(blockTxs, anythingElseFromThisBlock...) // finally anything else from this block
if len(nextBlockRtx) > 0 {
wrapperTx := app.txCache.NewDelayedTransaction(app.blockCtx, nextBlockRtx)
blockTxs = append(blockTxs, wrapperTx)
}
app.log.Debug("prepareProposal returned with", logging.Int("blockTxs", len(blockTxs)))
if !app.nilPow {
app.pow.EndPrepareProposal(validationResults)
}
Expand All @@ -947,6 +1058,8 @@ func (app *App) processProposal(txs []abci.Tx) bool {
maxGas := app.gastimator.GetMaxGas()
maxBytes := tmtypesint.DefaultBlockParams().MaxBytes * 4
size := int64(0)
delayedTxCount := 0

for _, tx := range txs {
size += int64(tx.GetLength())
if size > maxBytes {
Expand All @@ -960,6 +1073,14 @@ func (app *App) processProposal(txs []abci.Tx) bool {
if totalGasWanted > int(maxGas) {
return false
}
// allow only one delayed transaction wrapper in one block and its transactions must match what we expect.
if tx.Command() == txn.DelayedTransactionsWrapper {
if delayedTxCount > 0 {
app.log.Debug("more than one DelayedTransactionsWrapper")
return false
}
delayedTxCount += 1
}
}

if !app.nilPow && !app.pow.ProcessProposal(txs) {
Expand All @@ -983,6 +1104,11 @@ func (app *App) OnEndBlock(blockHeight uint64) (tmtypes.ValidatorUpdates, types1
logging.String("previous-datetime", vegatime.Format(app.previousTimestamp)),
)

if !app.seenDelayedTxTransactions {
app.txCache.SetRawTxs(nil)
}
app.seenDelayedTxTransactions = false

app.epoch.OnBlockEnd(app.blockCtx)
app.stateVar.OnBlockEnd(app.blockCtx)
app.banking.OnBlockEnd(app.blockCtx, app.currentTimestamp)
Expand Down Expand Up @@ -2715,6 +2841,16 @@ func (app *App) JoinTeam(ctx context.Context, tx abci.Tx) error {
return nil
}

func (app *App) handleDelayedTransactionWrapper(ctx context.Context, tx abci.Tx) error {
txs := &commandspb.DelayedTransactionsWrapper{}
if err := tx.Unmarshal(txs); err != nil {
return fmt.Errorf("could not deserialize DelayedTransactionsWrapper command: %w", err)
}
app.txCache.SetRawTxs(txs.Transactions)
app.seenDelayedTxTransactions = true
return nil
}

func (app *App) UpdatePartyProfile(ctx context.Context, tx abci.Tx) error {
params := &commandspb.UpdatePartyProfile{}
if err := tx.Unmarshal(params); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/processor/gastimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (g *Gastimator) GetPriority(tx abci.Tx) uint64 {

func (g *Gastimator) CalcGasWantedForTx(tx abci.Tx) (uint64, error) {
switch tx.Command() {
case txn.DelayedTransactionsWrapper:
return 0, nil
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.Unmarshal(s); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/processor/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func DecodeTx(payload []byte, chainID string) (*Tx, error) {

func (t Tx) Command() txn.Command {
switch cmd := t.inputData.Command.(type) {
case *commandspb.InputData_DelayedTransactionsWrapper:
return txn.DelayedTransactionsWrapper
case *commandspb.InputData_OrderSubmission:
return txn.SubmitOrderCommand
case *commandspb.InputData_OrderCancellation:
Expand Down Expand Up @@ -261,6 +263,8 @@ func (t Tx) GetCmd() interface{} {
return cmd.AmendAmm
case *commandspb.InputData_CancelAmm:
return cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
return cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down Expand Up @@ -490,6 +494,12 @@ func (t Tx) Unmarshal(i interface{}) error {
return errors.New("failed to unmarshall to CancelAMM")
}
*underlyingCmd = *cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
underlyingCmd, ok := i.(*commandspb.DelayedTransactionsWrapper)
if !ok {
return errors.New("failed to unmarshall to DelayedTransactionsWrapper")
}
*underlyingCmd = *cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down
Loading
Loading