Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor share splitting and merging #637

Merged
merged 13 commits into from
Aug 29, 2022
4 changes: 2 additions & 2 deletions app/child_tx_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

func MalleatedTxDecoder(dec sdk.TxDecoder) sdk.TxDecoder {
return func(txBytes []byte) (sdk.Tx, error) {
if _, childTx, has := coretypes.UnwrapMalleatedTx(txBytes); has {
return dec(childTx)
if malleatedTx, has := coretypes.UnwrapMalleatedTx(txBytes); has {
return dec(malleatedTx.Tx)
}
return dec(txBytes)
}
Expand Down
4 changes: 2 additions & 2 deletions app/process_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
}
}

nsshares, _, err := shares.ComputeShares(&data, req.BlockData.OriginalSquareSize)
rawShares, err := shares.Split(data)
if err != nil {
app.Logger().Error(rejectedPropBlockLog, "reason", "failure to compute shares from block data:", "error", err, "proposerAddress", req.Header.ProposerAddress)
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}

eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, nsshares.RawShares())
eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, rawShares)
if err != nil {
app.Logger().Error(
rejectedPropBlockLog,
Expand Down
7 changes: 5 additions & 2 deletions app/split_shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ func newShareSplitter(txConf client.TxConfig, squareSize uint64, data *core.Data
if err != nil {
panic(err)
}
sqwr.evdShares = shares.SplitEvidenceIntoShares(evdData).RawShares()
sqwr.evdShares, err = shares.SplitEvidence(evdData.Evidence)
if err != nil {
panic(err)
}

sqwr.txWriter = coretypes.NewContiguousShareWriter(consts.TxNamespaceID)
sqwr.msgWriter = coretypes.NewMessageShareWriter()
Expand Down Expand Up @@ -188,7 +191,7 @@ func (sqwr *shareSplitter) writeMalleatedTx(
return false, nil, nil, err
}

wrappedTx, err := coretypes.WrapMalleatedTx(parentHash[:], rawProcessedTx)
wrappedTx, err := coretypes.WrapMalleatedTx(parentHash[:], 0, rawProcessedTx)
rootulp marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, nil, nil, err
}
Expand Down
34 changes: 16 additions & 18 deletions app/test/process_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ func TestMessageInclusionCheck(t *testing.T) {
data, err := coretypes.DataFromProto(tt.input.BlockData)
require.NoError(t, err)

shares, _, err := shares.ComputeShares(&data, tt.input.BlockData.OriginalSquareSize)
shares, err := shares.Split(data)
require.NoError(t, err)

rawShares := shares.RawShares()
rawShares := shares

require.NoError(t, err)
eds, err := da.ExtendShares(tt.input.BlockData.OriginalSquareSize, rawShares)
Expand Down Expand Up @@ -207,13 +207,11 @@ func TestProcessMessagesWithReservedNamespaces(t *testing.T) {
data, err := coretypes.DataFromProto(input.BlockData)
require.NoError(t, err)

shares, _, err := shares.ComputeShares(&data, input.BlockData.OriginalSquareSize)
shares, err := shares.Split(data)
require.NoError(t, err)

rawShares := shares.RawShares()

require.NoError(t, err)
eds, err := da.ExtendShares(input.BlockData.OriginalSquareSize, rawShares)
eds, err := da.ExtendShares(input.BlockData.OriginalSquareSize, shares)
require.NoError(t, err)
dah := da.NewDataAvailabilityHeader(eds)
input.Header.DataHash = dah.Hash()
Expand All @@ -234,6 +232,9 @@ func TestProcessMessageWithUnsortedMessages(t *testing.T) {
pfdOne, msgOne := genRandMsgPayForDataForNamespace(t, signer, 8, namespaceOne)
pfdTwo, msgTwo := genRandMsgPayForDataForNamespace(t, signer, 8, namespaceTwo)

cMsgOne := &core.Message{NamespaceId: pfdOne.GetMessageNamespaceId(), Data: msgOne}
cMsgTwo := &core.Message{NamespaceId: pfdTwo.GetMessageNamespaceId(), Data: msgTwo}

input := abci.RequestProcessProposal{
BlockData: &core.Data{
Txs: [][]byte{
Expand All @@ -242,14 +243,8 @@ func TestProcessMessageWithUnsortedMessages(t *testing.T) {
},
Messages: core.Messages{
MessagesList: []*core.Message{
{
NamespaceId: pfdTwo.GetMessageNamespaceId(),
Data: msgTwo,
},
{
NamespaceId: pfdOne.GetMessageNamespaceId(),
Data: msgOne,
},
cMsgOne,
cMsgTwo,
},
},
OriginalSquareSize: 8,
Expand All @@ -258,17 +253,20 @@ func TestProcessMessageWithUnsortedMessages(t *testing.T) {
data, err := coretypes.DataFromProto(input.BlockData)
require.NoError(t, err)

shares, _, err := shares.ComputeShares(&data, input.BlockData.OriginalSquareSize)
shares, err := shares.Split(data)
require.NoError(t, err)

rawShares := shares.RawShares()

require.NoError(t, err)
eds, err := da.ExtendShares(input.BlockData.OriginalSquareSize, rawShares)
eds, err := da.ExtendShares(input.BlockData.OriginalSquareSize, shares)

require.NoError(t, err)
dah := da.NewDataAvailabilityHeader(eds)
input.Header.DataHash = dah.Hash()

// swap the messages
input.BlockData.Messages.MessagesList[0] = cMsgTwo
input.BlockData.Messages.MessagesList[1] = cMsgOne

got := testApp.ProcessProposal(input)

assert.Equal(t, got.Result, abci.ResponseProcessProposal_REJECT)
Expand Down
7 changes: 3 additions & 4 deletions app/test/split_shares_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/tendermint/tendermint/pkg/consts"
"github.com/tendermint/tendermint/pkg/da"
core "github.com/tendermint/tendermint/proto/tendermint/types"
coretypes "github.com/tendermint/tendermint/types"
)

func TestSplitShares(t *testing.T) {
Expand Down Expand Up @@ -101,14 +100,14 @@ func TestSplitShares(t *testing.T) {
dah := da.NewDataAvailabilityHeader(eds)
data.Hash = dah.Hash()

parsedData, err := coretypes.DataFromSquare(eds)
parsedData, err := shares.Merge(eds)
require.NoError(t, err)

assert.Equal(t, data.Txs, parsedData.Txs.ToSliceOfBytes())

parsedShares, _, err := shares.ComputeShares(&parsedData, tt.squareSize)
parsedShares, err := shares.Split(parsedData)
require.NoError(t, err)

require.Equal(t, square, parsedShares.RawShares())
require.Equal(t, square, parsedShares)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ require (
replace (
github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.2.0-sdk-v0.46.0
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.3.3-tm-v0.34.20
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.3.5-tm-v0.34.20
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/bwesterb/go-ristretto v1.2.0/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/celestiaorg/celestia-core v1.3.3-tm-v0.34.20 h1:XspZtkoIfJ68TCVOOGWkI4W7taQFJLLqSu6GME5RbqU=
github.com/celestiaorg/celestia-core v1.3.3-tm-v0.34.20/go.mod h1:f4R8qNJrP1CDH0SNwj4jA3NymBLQM4lNdx6Ijmfllbw=
github.com/celestiaorg/celestia-core v1.3.5-tm-v0.34.20 h1:Z66gewBIJIQ5T72v2ktu4FMA/sgDE4ok4y9DvfnCUUE=
github.com/celestiaorg/celestia-core v1.3.5-tm-v0.34.20/go.mod h1:f4R8qNJrP1CDH0SNwj4jA3NymBLQM4lNdx6Ijmfllbw=
github.com/celestiaorg/cosmos-sdk v1.2.0-sdk-v0.46.0 h1:A1F7L/09uGClsU+kmugMy47Ezv4ll0uxNRNdaGa37T8=
github.com/celestiaorg/cosmos-sdk v1.2.0-sdk-v0.46.0/go.mod h1:OXRC0p460CFKl77uQZWY/8p5uZmDrNum7BmVZDupq0Q=
github.com/celestiaorg/go-leopard v0.1.0 h1:28z2EkvKJIez5J9CEaiiUEC+OxalRLtTGJJ1oScfE1g=
Expand Down
9 changes: 9 additions & 0 deletions pkg/appconsts/appconsts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package appconsts

import (
"bytes"

"github.com/tendermint/tendermint/pkg/consts"
)

var NameSpacedPaddedShareBytes = bytes.Repeat([]byte{0}, consts.MsgShareSize)
81 changes: 81 additions & 0 deletions pkg/shares/merge_contiguous_shares.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package shares

import (
"encoding/binary"
"errors"

"github.com/tendermint/tendermint/pkg/consts"
)

// processContiguousShares takes raw shares and extracts out transactions,
// intermediate state roots, or evidence. The returned [][]byte do have
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
// namespaces or length delimiters and are ready to be unmarshalled
func processContiguousShares(shares [][]byte) (txs [][]byte, err error) {
if len(shares) == 0 {
return nil, nil
}

ss := newShareStack(shares)
return ss.resolve()
}

// shareStack hold variables for peel
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
type shareStack struct {
shares [][]byte
txLen uint64
txs [][]byte
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional][proposal] since this variable may also contain intermediate state roots or evidence

// shareStack hold variables for peel
type shareStack struct {
	shares [][]byte
	dataLen  uint64
	// data may be transactions, intermediate state roots, or evidence depending
	// on the namespace ID for this share
	data    [][]byte
	cursor int
}

a side benefit is that the comment below is now more precise

// intermediate state root, or evidence) and adds it to the underlying slice of data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, df122c8

cursor int
}

func newShareStack(shares [][]byte) *shareStack {
return &shareStack{shares: shares}
}

func (ss *shareStack) resolve() ([][]byte, error) {
if len(ss.shares) == 0 {
return nil, nil
}
err := ss.peel(ss.shares[0][consts.NamespaceSize+consts.ShareReservedBytes:], true)
return ss.txs, err
}

// peel recursively parses each chunk of data (either a transaction,
rootulp marked this conversation as resolved.
Show resolved Hide resolved
// intermediate state root, or evidence) and adds it to the underlying slice of data.
func (ss *shareStack) peel(share []byte, delimited bool) (err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a difficult time following the iteration logic in this function. I think it would be beneficial if unit tests verified its behavior (or preferably the behavior of processContiguousShares)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE: disregard, I see there are tests for processContiguousShares in shares_test.go. Thoughts on moving them to merge_contiguous_shares_test.go?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeeaaaaahhhh tbh I'm having a hard time groking it as well lololol. We could probably rewrite it in a better way. I remember there being a lot of weird edge cases for contiguous shares... it should probably at least be a single function and keep all of the state in there instead of in a struct.

we could also split up the tests, that only makes sense and would hopefully help future readers.

if delimited {
var txLen uint64
share, txLen, err = ParseDelimiter(share)
if err != nil {
return err
}
if txLen == 0 {
return nil
}
ss.txLen = txLen
}
// safeLen describes the point in the share where it can be safely split. If
// split beyond this point, it is possible to break apart a length
// delimiter, which will result in incorrect share merging
safeLen := len(share) - binary.MaxVarintLen64
if safeLen < 0 {
safeLen = 0
}
if ss.txLen <= uint64(safeLen) {
ss.txs = append(ss.txs, share[:ss.txLen])
share = share[ss.txLen:]
return ss.peel(share, true)
}
// add the next share to the current share to continue merging if possible
if len(ss.shares) > ss.cursor+1 {
ss.cursor++
share := append(share, ss.shares[ss.cursor][consts.NamespaceSize+consts.ShareReservedBytes:]...)
return ss.peel(share, false)
}
// collect any remaining data
if ss.txLen <= uint64(len(share)) {
ss.txs = append(ss.txs, share[:ss.txLen])
share = share[ss.txLen:]
return ss.peel(share, true)
}
return errors.New("failure to parse block data: transaction length exceeded data length")
}
113 changes: 113 additions & 0 deletions pkg/shares/merge_message_shares.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package shares

import (
"bytes"
"encoding/binary"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/tendermint/tendermint/pkg/consts"
coretypes "github.com/tendermint/tendermint/types"
)

// parseMsgShares iterates through raw shares and separates the contiguous chunks
// of data. It is only used for Messages, i.e. shares with a non-reserved namespace.
func parseMsgShares(shares [][]byte) ([]coretypes.Message, error) {
if len(shares) == 0 {
return nil, nil
}
// msgs returned
msgs := []coretypes.Message{}
currentMsgLen := 0
currentMsg := coretypes.Message{}
// whether the current share contains the start of a new message
isNewMessage := true
// the len in bytes of the current chunk of data that will eventually become
// a message. This is identical to len(currentMsg.Data) + consts.MsgShareSize
// but we cache it here for readability
dataLen := 0
saveMessage := func() {
msgs = append(msgs, currentMsg)
dataLen = 0
isNewMessage = true
}
// iterate through all the shares and parse out each msg
for i := 0; i < len(shares); i++ {
dataLen = len(currentMsg.Data) + consts.MsgShareSize
switch {
case isNewMessage:
nextMsgChunk, nextMsgLen, err := ParseDelimiter(shares[i][consts.NamespaceSize:])
if err != nil {
return nil, err
}
// the current share is namespaced padding so we ignore it
if bytes.Equal(shares[i][consts.NamespaceSize:], appconsts.NameSpacedPaddedShareBytes) {
continue
}
currentMsgLen = int(nextMsgLen)
nid := shares[i][:consts.NamespaceSize]
currentMsg = coretypes.Message{
NamespaceID: nid,
Data: nextMsgChunk,
}
// the current share contains the entire msg so we save it and
// progress
if currentMsgLen <= len(nextMsgChunk) {
currentMsg.Data = currentMsg.Data[:currentMsgLen]
saveMessage()
continue
}
isNewMessage = false
// this entire share contains a chunk of message that we need to save
case currentMsgLen > dataLen:
currentMsg.Data = append(currentMsg.Data, shares[i][consts.NamespaceSize:]...)
// this share contains the last chunk of data needed to complete the
// message
case currentMsgLen <= dataLen:
remaining := currentMsgLen - len(currentMsg.Data) + consts.NamespaceSize
currentMsg.Data = append(currentMsg.Data, shares[i][consts.NamespaceSize:remaining]...)
saveMessage()
}
}
return msgs, nil
}

// ParseDelimiter finds and returns the length delimiter of the message provided
// while also removing the delimiter bytes from the input
func ParseDelimiter(input []byte) ([]byte, uint64, error) {
if len(input) == 0 {
return input, 0, nil
}

l := binary.MaxVarintLen64
if len(input) < binary.MaxVarintLen64 {
l = len(input)
}

delimiter := zeroPadIfNecessary(input[:l], binary.MaxVarintLen64)

// read the length of the message
r := bytes.NewBuffer(delimiter)
msgLen, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}

// calculate the number of bytes used by the delimiter
lenBuf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(lenBuf, msgLen)

// return the input without the length delimiter
return input[n:], msgLen, nil
}

func zeroPadIfNecessary(share []byte, width int) []byte {
oldLen := len(share)
if oldLen < width {
missingBytes := width - oldLen
padByte := []byte{0}
padding := bytes.Repeat(padByte, missingBytes)
share = append(share, padding...)
return share
}
return share
}
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
Loading