Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PreprocessTxs #21

Merged
merged 10 commits into from
Feb 23, 2021
144 changes: 144 additions & 0 deletions app/abci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package app

import (
"bytes"
"errors"
"fmt"
"sort"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lazyledger/lazyledger-app/x/lazyledgerapp/types"
abci "github.com/lazyledger/lazyledger-core/abci/types"
core "github.com/lazyledger/lazyledger-core/proto/tendermint/types"
)

// This file should contain all of the altered ABCI methods

// PreprocessTxs fullfills the lazyledger-core version of the ACBI interface, by
// performing basic validation for the incoming txs, and by cleanly separating
// share messages from transactions
func (app *App) PreprocessTxs(txs abci.RequestPreprocessTxs) abci.ResponsePreprocessTxs {
squareSize := app.SquareSize()
shareCounter := uint64(0)
var shareMsgs []*core.Message
var processedTxs [][]byte
for _, rawTx := range txs.Txs {
// decode the Tx
tx, err := app.txConfig.TxDecoder()(rawTx)
if err != nil {
continue
}

// don't process the tx if the transaction doesn't contain a
// PayForMessage sdk.Msg
if !hasWirePayForMessage(tx) {
processedTxs = append(processedTxs, rawTx)
continue
}

// only support transactions that contain a single sdk.Msg
if len(tx.GetMsgs()) != 1 {
continue
}

msg := tx.GetMsgs()[0]

// run basic validation on the transaction
err = tx.ValidateBasic()
if err != nil {
continue
}

// process the message
coreMsg, signedTx, err := app.processMsg(msg)
if err != nil {
continue
}

// increment the share counter by the number of shares taken by the message
sharesTaken := uint64(len(coreMsg.Data) / types.ShareSize)
shareCounter += sharesTaken

// if there are too many shares stop processing and return the transactions
if shareCounter > squareSize*squareSize {
break
}

// encode the processed tx
rawProcessedTx, err := app.appCodec.MarshalBinaryBare(signedTx)
if err != nil {
continue
}

// add the message and tx to the output
shareMsgs = append(shareMsgs, &coreMsg)
processedTxs = append(processedTxs, rawProcessedTx)
}

// sort messages lexigraphically
sort.Slice(shareMsgs, func(i, j int) bool {
return bytes.Compare(shareMsgs[i].NamespaceId, shareMsgs[j].NamespaceId) < 0
})

return abci.ResponsePreprocessTxs{
Txs: processedTxs,
Messages: &core.Messages{MessagesList: shareMsgs},
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in PreProcess the app also needs to track the (original / mempool) Tx that were processed, s.t. on re-check it can tell ll-core to remove them from the mempool.

This has the caveat that currently only the proposer will call preprocess: the app needs to know if a proposed block didn't make it through consensus (e.g. because of timeouts) and other nodes to be able to evict their mempools too...
@adlerjohn you and I need to think about, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily? Conflicting transactions (which includes duplicates in the mempool) between the block and the mempool are removed with CheckTx, so PreProcess doesn't technically need to do that?

Copy link
Member

@liamsi liamsi Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I'm confused about this again, then 🤔
How does the app know which (mempool) Tx made it into the block if the app doesn't track this?

Sure, it "sees" the Tx during each deliver Tx but only the already processed which are different from the mempool Tx potenially.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here for reference (quoting @adlerjohn):

You don't need to know which txs from the mempool were included in a block, you only need to know which mempool transactions conflict with the ones in a block.
So running CheckTx of all txs in the mempool after you commit a block is sufficient.
If the tx in the mempool conflicts. It'll have the same nonce and sender.

So my concern was the mapping between mempool tx <-> block tx but as mentioned by John that mapping is essentially there via nonce and sender.


func hasWirePayForMessage(tx sdk.Tx) bool {
for _, msg := range tx.GetMsgs() {
if msg.Type() == types.TypeMsgPayforMessage {
return true
}
}
return false
}

// processMsgs will perform the processing required by PreProcessTxs for a set
// of sdk.Msg's from a single sdk.Tx
func (app *App) processMsg(msg sdk.Msg) (core.Message, *types.TxSignedTransactionDataPayForMessage, error) {
squareSize := app.SquareSize()
// reject all msgs in tx if a single included msg is not correct type
wireMsg, ok := msg.(*types.MsgWirePayForMessage)
if !ok {
return core.Message{},
nil,
errors.New("transaction contained a message type other than types.MsgWirePayForMessage")
}

// make sure that a ShareCommitAndSignature of the correct size is
// included in the message
var shareCommit types.ShareCommitAndSignature
for _, commit := range wireMsg.MessageShareCommitment {
if commit.K == squareSize {
shareCommit = commit
}
}
// K == 0 means there was no share commit with the desired current square size
if shareCommit.K == 0 {
return core.Message{},
nil,
fmt.Errorf("No share commit for correct square size. Current square size: %d", squareSize)
}

// add the message to the list of core message to be returned to ll-core
coreMsg := core.Message{
NamespaceId: wireMsg.GetMessageNameSpaceId(),
Data: wireMsg.GetMessage(),
}

// wrap the signed transaction data
sTxData, err := wireMsg.SignedTransactionDataPayForMessage(squareSize)
if err != nil {
return core.Message{}, nil, err
}

signedData := &types.TxSignedTransactionDataPayForMessage{
Message: sTxData,
Signature: shareCommit.Signature,
PublicKey: wireMsg.PublicKey,
}

return coreMsg, signedData, nil
}
Loading