From 7766a5e1832c6571f5600effd3438e577998cb5c Mon Sep 17 00:00:00 2001 From: Evan Forbes <42654277+evan-forbes@users.noreply.github.com> Date: Thu, 18 Aug 2022 07:38:57 -0500 Subject: [PATCH] chore: move share encoding logic from core -> app (#627) * chore: move share encoding logic from core -> app * chore: linter --- app/process_proposal.go | 5 +- app/split_shares.go | 3 +- app/test/process_proposal_test.go | 7 +- app/test/split_shares_test.go | 3 +- pkg/shares/share_merging.go | 308 ++++++++++++++++ pkg/shares/share_splitting.go | 412 +++++++++++++++++++++ pkg/shares/shares.go | 55 +++ pkg/shares/shares_test.go | 573 ++++++++++++++++++++++++++++++ x/payment/types/payfordata.go | 3 +- 9 files changed, 1361 insertions(+), 8 deletions(-) create mode 100644 pkg/shares/share_merging.go create mode 100644 pkg/shares/share_splitting.go create mode 100644 pkg/shares/shares.go create mode 100644 pkg/shares/shares_test.go diff --git a/app/process_proposal.go b/app/process_proposal.go index 549776eaf5..4ecc1283ff 100644 --- a/app/process_proposal.go +++ b/app/process_proposal.go @@ -3,6 +3,7 @@ package app import ( "bytes" + shares "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/celestia-app/x/payment/types" sdk "github.com/cosmos/cosmos-sdk/types" abci "github.com/tendermint/tendermint/abci/types" @@ -120,7 +121,7 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr } } - shares, _, err := data.ComputeShares(req.BlockData.OriginalSquareSize) + nsshares, _, err := shares.ComputeShares(&data, req.BlockData.OriginalSquareSize) if err != nil { app.Logger().Error(rejectedPropBlockLog, "reason", "failure to compute shares from block data:", "error", err, "proposerAddress", req.Header.ProposerAddress) return abci.ResponseProcessProposal{ @@ -128,7 +129,7 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr } } - eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, shares.RawShares()) + eds, err := da.ExtendShares(req.BlockData.OriginalSquareSize, nsshares.RawShares()) if err != nil { app.Logger().Error( rejectedPropBlockLog, diff --git a/app/split_shares.go b/app/split_shares.go index 2eb106a0fe..1750c5b2c7 100644 --- a/app/split_shares.go +++ b/app/split_shares.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "sort" + shares "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/celestia-app/x/payment/types" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/x/auth/signing" @@ -136,7 +137,7 @@ func newShareSplitter(txConf client.TxConfig, squareSize uint64, data *core.Data if err != nil { panic(err) } - sqwr.evdShares = evdData.SplitIntoShares().RawShares() + sqwr.evdShares = shares.SplitEvidenceIntoShares(evdData).RawShares() sqwr.txWriter = coretypes.NewContiguousShareWriter(consts.TxNamespaceID) sqwr.msgWriter = coretypes.NewMessageShareWriter() diff --git a/app/test/process_proposal_test.go b/app/test/process_proposal_test.go index 019393437f..ef27875930 100644 --- a/app/test/process_proposal_test.go +++ b/app/test/process_proposal_test.go @@ -7,6 +7,7 @@ import ( "github.com/celestiaorg/celestia-app/app" "github.com/celestiaorg/celestia-app/app/encoding" + shares "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/celestia-app/testutil" "github.com/celestiaorg/celestia-app/x/payment/types" "github.com/celestiaorg/nmt/namespace" @@ -150,7 +151,7 @@ func TestMessageInclusionCheck(t *testing.T) { data, err := coretypes.DataFromProto(tt.input.BlockData) require.NoError(t, err) - shares, _, err := data.ComputeShares(tt.input.BlockData.OriginalSquareSize) + shares, _, err := shares.ComputeShares(&data, tt.input.BlockData.OriginalSquareSize) require.NoError(t, err) rawShares := shares.RawShares() @@ -206,7 +207,7 @@ func TestProcessMessagesWithReservedNamespaces(t *testing.T) { data, err := coretypes.DataFromProto(input.BlockData) require.NoError(t, err) - shares, _, err := data.ComputeShares(input.BlockData.OriginalSquareSize) + shares, _, err := shares.ComputeShares(&data, input.BlockData.OriginalSquareSize) require.NoError(t, err) rawShares := shares.RawShares() @@ -257,7 +258,7 @@ func TestProcessMessageWithUnsortedMessages(t *testing.T) { data, err := coretypes.DataFromProto(input.BlockData) require.NoError(t, err) - shares, _, err := data.ComputeShares(input.BlockData.OriginalSquareSize) + shares, _, err := shares.ComputeShares(&data, input.BlockData.OriginalSquareSize) require.NoError(t, err) rawShares := shares.RawShares() diff --git a/app/test/split_shares_test.go b/app/test/split_shares_test.go index 1dec675b6d..46e8a4e786 100644 --- a/app/test/split_shares_test.go +++ b/app/test/split_shares_test.go @@ -6,6 +6,7 @@ import ( "github.com/celestiaorg/celestia-app/app" "github.com/celestiaorg/celestia-app/app/encoding" + shares "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/celestia-app/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -105,7 +106,7 @@ func TestSplitShares(t *testing.T) { assert.Equal(t, data.Txs, parsedData.Txs.ToSliceOfBytes()) - parsedShares, _, err := parsedData.ComputeShares(tt.squareSize) + parsedShares, _, err := shares.ComputeShares(&parsedData, tt.squareSize) require.NoError(t, err) require.Equal(t, square, parsedShares.RawShares()) diff --git a/pkg/shares/share_merging.go b/pkg/shares/share_merging.go new file mode 100644 index 0000000000..da6fdf3a03 --- /dev/null +++ b/pkg/shares/share_merging.go @@ -0,0 +1,308 @@ +package types + +import ( + "bytes" + "encoding/binary" + "errors" + + "github.com/celestiaorg/rsmt2d" + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/pkg/consts" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + coretypes "github.com/tendermint/tendermint/types" +) + +// DataFromSquare extracts block data from an extended data square. +func DataFromSquare(eds *rsmt2d.ExtendedDataSquare) (coretypes.Data, error) { + originalWidth := eds.Width() / 2 + + // sort block data shares by namespace + var ( + sortedTxShares [][]byte + sortedEvdShares [][]byte + sortedMsgShares [][]byte + ) + + // iterate over each row index + for x := uint(0); x < originalWidth; x++ { + // iterate over each share in the original data square + row := eds.Row(x) + + for _, share := range row[:originalWidth] { + // sort the data of that share types via namespace + nid := share[:consts.NamespaceSize] + switch { + case bytes.Equal(consts.TxNamespaceID, nid): + sortedTxShares = append(sortedTxShares, share) + + case bytes.Equal(consts.EvidenceNamespaceID, nid): + sortedEvdShares = append(sortedEvdShares, share) + + case bytes.Equal(consts.TailPaddingNamespaceID, nid): + continue + + // ignore unused but reserved namespaces + case bytes.Compare(nid, consts.MaxReservedNamespace) < 1: + continue + + // every other namespaceID should be a message + default: + sortedMsgShares = append(sortedMsgShares, share) + } + } + } + + // pass the raw share data to their respective parsers + txs, err := ParseTxs(sortedTxShares) + if err != nil { + return coretypes.Data{}, err + } + + evd, err := ParseEvd(sortedEvdShares) + if err != nil { + return coretypes.Data{}, err + } + + msgs, err := ParseMsgs(sortedMsgShares) + if err != nil { + return coretypes.Data{}, err + } + + return coretypes.Data{ + Txs: txs, + Evidence: evd, + Messages: msgs, + }, nil +} + +// ParseTxs collects all of the transactions from the shares provided +func ParseTxs(shares [][]byte) (coretypes.Txs, error) { + // parse the sharse + rawTxs, err := processContiguousShares(shares) + if err != nil { + return nil, err + } + + // convert to the Tx type + txs := make(coretypes.Txs, len(rawTxs)) + for i := 0; i < len(txs); i++ { + txs[i] = coretypes.Tx(rawTxs[i]) + } + + return txs, nil +} + +// ParseEvd collects all evidence from the shares provided. +func ParseEvd(shares [][]byte) (coretypes.EvidenceData, error) { + // the raw data returned does not have length delimiters or namespaces and + // is ready to be unmarshaled + rawEvd, err := processContiguousShares(shares) + if err != nil { + return coretypes.EvidenceData{}, err + } + + evdList := make(coretypes.EvidenceList, len(rawEvd)) + + // parse into protobuf bytes + for i := 0; i < len(rawEvd); i++ { + // unmarshal the evidence + var protoEvd tmproto.Evidence + err := proto.Unmarshal(rawEvd[i], &protoEvd) + if err != nil { + return coretypes.EvidenceData{}, err + } + evd, err := coretypes.EvidenceFromProto(&protoEvd) + if err != nil { + return coretypes.EvidenceData{}, err + } + + evdList[i] = evd + } + + return coretypes.EvidenceData{Evidence: evdList}, nil +} + +// ParseMsgs collects all messages from the shares provided +func ParseMsgs(shares [][]byte) (coretypes.Messages, error) { + msgList, err := parseMsgShares(shares) + if err != nil { + return coretypes.Messages{}, err + } + + return coretypes.Messages{ + MessagesList: msgList, + }, nil +} + +// processContiguousShares takes raw shares and extracts out transactions, +// intermediate state roots, or evidence. The returned [][]byte do have +// namespaces or length delimiters and are ready to be unmarshalled +func processContiguousShares(shares [][]byte) (txs [][]byte, err error) { + if len(shares) == 0 { + return nil, nil + } + + ss := newShareStack(shares) + return ss.resolve() +} + +// shareStack hold variables for peel +type shareStack struct { + shares [][]byte + txLen uint64 + txs [][]byte + cursor int +} + +func newShareStack(shares [][]byte) *shareStack { + return &shareStack{shares: shares} +} + +func (ss *shareStack) resolve() ([][]byte, error) { + if len(ss.shares) == 0 { + return nil, nil + } + err := ss.peel(ss.shares[0][consts.NamespaceSize+consts.ShareReservedBytes:], true) + return ss.txs, err +} + +// peel recursively parses each chunk of data (either a transaction, +// intermediate state root, or evidence) and adds it to the underlying slice of data. +func (ss *shareStack) peel(share []byte, delimited bool) (err error) { + if delimited { + var txLen uint64 + share, txLen, err = ParseDelimiter(share) + if err != nil { + return err + } + if txLen == 0 { + return nil + } + ss.txLen = txLen + } + // safeLen describes the point in the share where it can be safely split. If + // split beyond this point, it is possible to break apart a length + // delimiter, which will result in incorrect share merging + safeLen := len(share) - binary.MaxVarintLen64 + if safeLen < 0 { + safeLen = 0 + } + if ss.txLen <= uint64(safeLen) { + ss.txs = append(ss.txs, share[:ss.txLen]) + share = share[ss.txLen:] + return ss.peel(share, true) + } + // add the next share to the current share to continue merging if possible + if len(ss.shares) > ss.cursor+1 { + ss.cursor++ + share := append(share, ss.shares[ss.cursor][consts.NamespaceSize+consts.ShareReservedBytes:]...) + return ss.peel(share, false) + } + // collect any remaining data + if ss.txLen <= uint64(len(share)) { + ss.txs = append(ss.txs, share[:ss.txLen]) + share = share[ss.txLen:] + return ss.peel(share, true) + } + return errors.New("failure to parse block data: transaction length exceeded data length") +} + +// parseMsgShares iterates through raw shares and separates the contiguous chunks +// of data. It is only used for Messages, i.e. shares with a non-reserved namespace. +func parseMsgShares(shares [][]byte) ([]coretypes.Message, error) { + if len(shares) == 0 { + return nil, nil + } + + // set the first nid and current share + nid := shares[0][:consts.NamespaceSize] + currentShare := shares[0][consts.NamespaceSize:] + // find and remove the msg len delimiter + currentShare, msgLen, err := ParseDelimiter(currentShare) + if err != nil { + return nil, err + } + + var msgs []coretypes.Message + for cursor := uint64(0); cursor < uint64(len(shares)); { + var msg coretypes.Message + currentShare, nid, cursor, msgLen, msg, err = nextMsg( + shares, + currentShare, + nid, + cursor, + msgLen, + ) + if err != nil { + return nil, err + } + if msg.Data != nil { + msgs = append(msgs, msg) + } + } + + return msgs, nil +} + +func nextMsg( + shares [][]byte, + current, + nid []byte, + cursor, + msgLen uint64, +) ([]byte, []byte, uint64, uint64, coretypes.Message, error) { + switch { + // the message uses all of the current share data and at least some of the + // next share + case msgLen > uint64(len(current)): + // add the next share to the current one and try again + cursor++ + current = append(current, shares[cursor][consts.NamespaceSize:]...) + return nextMsg(shares, current, nid, cursor, msgLen) + + // the msg we're looking for is contained in the current share + case msgLen <= uint64(len(current)): + msg := coretypes.Message{NamespaceID: nid, Data: current[:msgLen]} + cursor++ + + // call it a day if the work is done + if cursor >= uint64(len(shares)) { + return nil, nil, cursor, 0, msg, nil + } + + nextNid := shares[cursor][:consts.NamespaceSize] + next, msgLen, err := ParseDelimiter(shares[cursor][consts.NamespaceSize:]) + return next, nextNid, cursor, msgLen, msg, err + } + // this code is unreachable but the compiler doesn't know that + return nil, nil, 0, 0, coretypes.Message{}, nil +} + +// ParseDelimiter finds and returns the length delimiter of the message provided +// while also removing the delimiter bytes from the input +func ParseDelimiter(input []byte) ([]byte, uint64, error) { + if len(input) == 0 { + return input, 0, nil + } + + l := binary.MaxVarintLen64 + if len(input) < binary.MaxVarintLen64 { + l = len(input) + } + + delimiter := zeroPadIfNecessary(input[:l], binary.MaxVarintLen64) + + // read the length of the message + r := bytes.NewBuffer(delimiter) + msgLen, err := binary.ReadUvarint(r) + if err != nil { + return nil, 0, err + } + + // calculate the number of bytes used by the delimiter + lenBuf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(lenBuf, msgLen) + + // return the input without the length delimiter + return input[n:], msgLen, nil +} diff --git a/pkg/shares/share_splitting.go b/pkg/shares/share_splitting.go new file mode 100644 index 0000000000..4bff378db2 --- /dev/null +++ b/pkg/shares/share_splitting.go @@ -0,0 +1,412 @@ +package types + +import ( + "bytes" + "errors" + "fmt" + "math" + "sort" + + "github.com/celestiaorg/nmt/namespace" + "github.com/tendermint/tendermint/libs/protoio" + "github.com/tendermint/tendermint/pkg/consts" + coretypes "github.com/tendermint/tendermint/types" +) + +// MessageShareWriter lazily merges messages into shares that will eventually be +// included in a data square. It also has methods to help progressively count +// how many shares the messages written take up. +type MessageShareWriter struct { + shares [][]NamespacedShare + count int +} + +func NewMessageShareWriter() *MessageShareWriter { + return &MessageShareWriter{} +} + +// Write adds the delimited data to the underlying contiguous shares. +func (msw *MessageShareWriter) Write(msg coretypes.Message) { + rawMsg, err := msg.MarshalDelimited() + if err != nil { + panic(fmt.Sprintf("app accepted a Message that can not be encoded %#v", msg)) + } + newShares := make([]NamespacedShare, 0) + newShares = AppendToShares(newShares, msg.NamespaceID, rawMsg) + msw.shares = append(msw.shares, newShares) + msw.count += len(newShares) +} + +// Export finalizes and returns the underlying contiguous shares. +func (msw *MessageShareWriter) Export() NamespacedShares { + msw.sortMsgs() + shares := make([]NamespacedShare, msw.count) + cursor := 0 + for _, messageShares := range msw.shares { + for _, share := range messageShares { + shares[cursor] = share + cursor++ + } + } + return shares +} + +func (msw *MessageShareWriter) sortMsgs() { + sort.Slice(msw.shares, func(i, j int) bool { + return bytes.Compare(msw.shares[i][0].ID, msw.shares[j][0].ID) < 0 + }) +} + +// Count returns the current number of shares that will be made if exporting. +func (msw *MessageShareWriter) Count() int { + return msw.count +} + +// appendToShares appends raw data as shares. +// Used for messages. +func AppendToShares(shares []NamespacedShare, nid namespace.ID, rawData []byte) []NamespacedShare { + if len(rawData) <= consts.MsgShareSize { + rawShare := append(append( + make([]byte, 0, len(nid)+len(rawData)), + nid...), + rawData..., + ) + paddedShare := zeroPadIfNecessary(rawShare, consts.ShareSize) + share := NamespacedShare{paddedShare, nid} + shares = append(shares, share) + } else { // len(rawData) > MsgShareSize + shares = append(shares, splitMessage(rawData, nid)...) + } + return shares +} + +// splitMessage breaks the data in a message into the minimum number of +// namespaced shares +func splitMessage(rawData []byte, nid namespace.ID) NamespacedShares { + shares := make([]NamespacedShare, 0) + firstRawShare := append(append( + make([]byte, 0, consts.ShareSize), + nid...), + rawData[:consts.MsgShareSize]..., + ) + shares = append(shares, NamespacedShare{firstRawShare, nid}) + rawData = rawData[consts.MsgShareSize:] + for len(rawData) > 0 { + shareSizeOrLen := min(consts.MsgShareSize, len(rawData)) + rawShare := append(append( + make([]byte, 0, consts.ShareSize), + nid...), + rawData[:shareSizeOrLen]..., + ) + paddedShare := zeroPadIfNecessary(rawShare, consts.ShareSize) + share := NamespacedShare{paddedShare, nid} + shares = append(shares, share) + rawData = rawData[shareSizeOrLen:] + } + return shares +} + +// ContiguousShareWriter will write raw data contiguously across a progressively +// increasing set of shares. It is used to lazily split block data such as transactions +// into shares. +type ContiguousShareWriter struct { + shares []NamespacedShare + pendingShare NamespacedShare + namespace namespace.ID +} + +// NewContiguousShareWriter returns a ContigousShareWriter using the provided +// namespace. +func NewContiguousShareWriter(ns namespace.ID) *ContiguousShareWriter { + pendingShare := NamespacedShare{ID: ns, Share: make([]byte, 0, consts.ShareSize)} + pendingShare.Share = append(pendingShare.Share, ns...) + return &ContiguousShareWriter{pendingShare: pendingShare, namespace: ns} +} + +// Write adds the delimited data to the underlying contiguous shares. +func (csw *ContiguousShareWriter) Write(rawData []byte) { + // if this is the first time writing to a pending share, we must add the + // reserved bytes + if len(csw.pendingShare.Share) == consts.NamespaceSize { + csw.pendingShare.Share = append(csw.pendingShare.Share, 0) + } + + txCursor := len(rawData) + for txCursor != 0 { + // find the len left in the pending share + pendingLeft := consts.ShareSize - len(csw.pendingShare.Share) + + // if we can simply add the tx to the share without creating a new + // pending share, do so and return + if len(rawData) <= pendingLeft { + csw.pendingShare.Share = append(csw.pendingShare.Share, rawData...) + break + } + + // if we can only add a portion of the transaction to the pending share, + // then we add it and add the pending share to the finalized shares. + chunk := rawData[:pendingLeft] + csw.pendingShare.Share = append(csw.pendingShare.Share, chunk...) + csw.stackPending() + + // update the cursor + rawData = rawData[pendingLeft:] + txCursor = len(rawData) + + // add the share reserved bytes to the new pending share + pendingCursor := len(rawData) + consts.NamespaceSize + consts.ShareReservedBytes + var reservedByte byte + if pendingCursor >= consts.ShareSize { + // the share reserve byte is zero when some contiguously written + // data takes up the entire share + reservedByte = byte(0) + } else { + reservedByte = byte(pendingCursor) + } + + csw.pendingShare.Share = append(csw.pendingShare.Share, reservedByte) + } + + // if the share is exactly the correct size, then append to shares + if len(csw.pendingShare.Share) == consts.ShareSize { + csw.stackPending() + } +} + +// stackPending will add the pending share to accumlated shares provided that it is long enough +func (csw *ContiguousShareWriter) stackPending() { + if len(csw.pendingShare.Share) < consts.ShareSize { + return + } + csw.shares = append(csw.shares, csw.pendingShare) + newPendingShare := make([]byte, 0, consts.ShareSize) + newPendingShare = append(newPendingShare, csw.namespace...) + csw.pendingShare = NamespacedShare{ + Share: newPendingShare, + ID: csw.namespace, + } +} + +// Export finalizes and returns the underlying contiguous shares. +func (csw *ContiguousShareWriter) Export() NamespacedShares { + // add the pending share to the current shares before returning + if len(csw.pendingShare.Share) > consts.NamespaceSize { + csw.pendingShare.Share = zeroPadIfNecessary(csw.pendingShare.Share, consts.ShareSize) + csw.shares = append(csw.shares, csw.pendingShare) + } + // force the last share to have a reserve byte of zero + if len(csw.shares) == 0 { + return csw.shares + } + lastShare := csw.shares[len(csw.shares)-1] + rawLastShare := lastShare.Data() + + for i := 0; i < consts.ShareReservedBytes; i++ { + // here we force the last share reserved byte to be zero to avoid any + // confusion for light clients parsing these shares, as the rest of the + // data after transaction is padding. See + // https://github.com/celestiaorg/celestia-specs/blob/master/src/specs/data_structures.md#share + rawLastShare[consts.NamespaceSize+i] = byte(0) + } + + newLastShare := NamespacedShare{ + Share: rawLastShare, + ID: lastShare.NamespaceID(), + } + csw.shares[len(csw.shares)-1] = newLastShare + return csw.shares +} + +// Count returns the current number of shares that will be made if exporting. +func (csw *ContiguousShareWriter) Count() (count, availableBytes int) { + availableBytes = consts.TxShareSize - (len(csw.pendingShare.Share) - consts.NamespaceSize) + return len(csw.shares), availableBytes +} + +// tail is filler for all tail padded shares +// it is allocated once and used everywhere +var tailPaddingShare = append( + append(make([]byte, 0, consts.ShareSize), consts.TailPaddingNamespaceID...), + bytes.Repeat([]byte{0}, consts.ShareSize-consts.NamespaceSize)..., +) + +func TailPaddingShares(n int) NamespacedShares { + shares := make([]NamespacedShare, n) + for i := 0; i < n; i++ { + shares[i] = NamespacedShare{ + Share: tailPaddingShare, + ID: consts.TailPaddingNamespaceID, + } + } + return shares +} + +func min(a, b int) int { + if a <= b { + return a + } + return b +} + +func zeroPadIfNecessary(share []byte, width int) []byte { + oldLen := len(share) + if oldLen < width { + missingBytes := width - oldLen + padByte := []byte{0} + padding := bytes.Repeat(padByte, missingBytes) + share = append(share, padding...) + return share + } + return share +} + +func SplitTxsIntoShares(txs coretypes.Txs) NamespacedShares { + rawDatas := make([][]byte, len(txs)) + for i, tx := range txs { + rawData, err := tx.MarshalDelimited() + if err != nil { + panic(fmt.Sprintf("included Tx in mem-pool that can not be encoded %v", tx)) + } + rawDatas[i] = rawData + } + + w := NewContiguousShareWriter(consts.TxNamespaceID) + for _, tx := range rawDatas { + w.Write(tx) + } + + return w.Export() +} + +func SplitEvidenceIntoShares(data *coretypes.EvidenceData) NamespacedShares { + rawDatas := make([][]byte, 0, len(data.Evidence)) + for _, ev := range data.Evidence { + pev, err := coretypes.EvidenceToProto(ev) + if err != nil { + panic("failure to convert evidence to equivalent proto type") + } + rawData, err := protoio.MarshalDelimited(pev) + if err != nil { + panic(err) + } + rawDatas = append(rawDatas, rawData) + } + w := NewContiguousShareWriter(consts.EvidenceNamespaceID) + for _, evd := range rawDatas { + w.Write(evd) + } + return w.Export() +} + +func SplitMessagesIntoShares(msgs coretypes.Messages) NamespacedShares { + shares := make([]NamespacedShare, 0) + msgs.SortMessages() + for _, m := range msgs.MessagesList { + rawData, err := m.MarshalDelimited() + if err != nil { + panic(fmt.Sprintf("app accepted a Message that can not be encoded %#v", m)) + } + shares = AppendToShares(shares, m.NamespaceID, rawData) + } + return shares +} + +// SortMessages sorts messages by ascending namespace id +func SortMessages(msgs *coretypes.Messages) { + sort.SliceStable(msgs.MessagesList, func(i, j int) bool { + return bytes.Compare(msgs.MessagesList[i].NamespaceID, msgs.MessagesList[j].NamespaceID) < 0 + }) +} + +// ComputeShares splits block data into shares of an original data square and +// returns them along with an amount of non-redundant shares. If a square size +// of 0 is passed, then it is determined based on how many shares are needed to +// fill the square for the underlying block data. The square size is stored in +// the local instance of the struct. +func ComputeShares(data *coretypes.Data, squareSize uint64) (NamespacedShares, int, error) { + if squareSize != 0 { + if !powerOf2(squareSize) { + return nil, 0, errors.New("square size is not a power of two") + } + } + + // reserved shares: + txShares := SplitTxsIntoShares(data.Txs) + evidenceShares := SplitEvidenceIntoShares(&data.Evidence) + + // application data shares from messages: + msgShares := SplitMessagesIntoShares(data.Messages) + curLen := len(txShares) + len(evidenceShares) + len(msgShares) + + if curLen > consts.MaxShareCount { + panic(fmt.Sprintf("Block data exceeds the max square size. Number of shares required: %d\n", curLen)) + } + + // find the number of shares needed to create a square that has a power of + // two width + wantLen := int(squareSize * squareSize) + if squareSize == 0 { + wantLen = paddedLen(curLen) + } + + if wantLen < curLen { + return nil, 0, errors.New("square size too small to fit block data") + } + + // ensure that the min square size is used + if wantLen < consts.MinSharecount { + wantLen = consts.MinSharecount + } + + tailShares := TailPaddingShares(wantLen - curLen) + + shares := append(append(append( + txShares, + evidenceShares...), + msgShares...), + tailShares...) + + if squareSize == 0 { + squareSize = uint64(math.Sqrt(float64(wantLen))) + } + + data.OriginalSquareSize = squareSize + + return shares, curLen, nil +} + +// paddedLen calculates the number of shares needed to make a power of 2 square +// given the current number of shares +func paddedLen(length int) int { + width := uint32(math.Ceil(math.Sqrt(float64(length)))) + width = nextHighestPowerOf2(width) + return int(width * width) +} + +// nextPowerOf2 returns the next highest power of 2 unless the input is a power +// of two, in which case it returns the input +func nextHighestPowerOf2(v uint32) uint32 { + if v == 0 { + return 0 + } + + // find the next highest power using bit mashing + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + + // return the next highest power + return v +} + +// powerOf2 checks if number is power of 2 +func powerOf2(v uint64) bool { + if v&(v-1) == 0 && v != 0 { + return true + } + return false +} diff --git a/pkg/shares/shares.go b/pkg/shares/shares.go new file mode 100644 index 0000000000..077d35db6d --- /dev/null +++ b/pkg/shares/shares.go @@ -0,0 +1,55 @@ +package types + +import ( + "encoding/binary" + + "github.com/celestiaorg/nmt/namespace" + coretypes "github.com/tendermint/tendermint/types" +) + +// Share contains the raw share data without the corresponding namespace. +type Share []byte + +// NamespacedShare extends a Share with the corresponding namespace. +type NamespacedShare struct { + Share + ID namespace.ID +} + +func (n NamespacedShare) NamespaceID() namespace.ID { + return n.ID +} + +func (n NamespacedShare) Data() []byte { + return n.Share +} + +// NamespacedShares is just a list of NamespacedShare elements. +// It can be used to extract the raw raw shares. +type NamespacedShares []NamespacedShare + +// RawShares returns the raw shares that can be fed into the erasure coding +// library (e.g. rsmt2d). +func (ns NamespacedShares) RawShares() [][]byte { + res := make([][]byte, len(ns)) + for i, nsh := range ns { + res[i] = nsh.Share + } + return res +} + +func MarshalDelimitedTx(tx coretypes.Tx) ([]byte, error) { + lenBuf := make([]byte, binary.MaxVarintLen64) + length := uint64(len(tx)) + n := binary.PutUvarint(lenBuf, length) + return append(lenBuf[:n], tx...), nil +} + +// MarshalDelimited marshals the raw data (excluding the namespace) of this +// message and prefixes it with the length of that encoding. +func MarshalDelimitedMessage(msg coretypes.Message) ([]byte, error) { + lenBuf := make([]byte, binary.MaxVarintLen64) + length := uint64(len(msg.Data)) + n := binary.PutUvarint(lenBuf, length) + return append(lenBuf[:n], msg.Data...), nil +} diff --git a/pkg/shares/shares_test.go b/pkg/shares/shares_test.go new file mode 100644 index 0000000000..62cd65c7f2 --- /dev/null +++ b/pkg/shares/shares_test.go @@ -0,0 +1,573 @@ +package types + +import ( + "bytes" + "context" + "fmt" + "math" + "math/rand" + "reflect" + "sort" + "testing" + "time" + + "github.com/celestiaorg/rsmt2d" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/pkg/consts" + coretypes "github.com/tendermint/tendermint/types" +) + +type Splitter interface { + SplitIntoShares() NamespacedShares +} + +// func TestMakeShares(t *testing.T) { +// reservedTxNamespaceID := append(bytes.Repeat([]byte{0}, 7), 1) +// reservedEvidenceNamespaceID := append(bytes.Repeat([]byte{0}, 7), 3) +// val := NewMockPV() +// blockID := makeBlockID([]byte("blockhash"), 1000, []byte("partshash")) +// blockID2 := makeBlockID([]byte("blockhash2"), 1000, []byte("partshash")) +// vote1 := makeVote(t, val, "chainID", 0, 10, 2, 1, blockID, defaultVoteTime) +// vote2 := makeVote(t, val, "chainID", 0, 10, 2, 1, blockID2, defaultVoteTime) +// testEvidence := &DuplicateVoteEvidence{ +// VoteA: vote1, +// VoteB: vote2, +// } +// protoTestEvidence, err := EvidenceToProto(testEvidence) +// if err != nil { +// t.Error(err) +// } +// testEvidenceBytes, err := protoio.MarshalDelimited(protoTestEvidence) +// largeTx := Tx(bytes.Repeat([]byte("large Tx"), 50)) +// largeTxLenDelimited, _ := largeTx.MarshalDelimited() +// smolTx := Tx("small Tx") +// smolTxLenDelimited, _ := smolTx.MarshalDelimited() +// msg1 := coretypes.Message{ +// NamespaceID: namespace.ID("8bytesss"), +// Data: []byte("some data"), +// } +// msg1Marshaled, _ := msg1.MarshalDelimited() +// if err != nil { +// t.Fatalf("Could not encode evidence: %v, error: %v\n", testEvidence, err) +// } + +// type args struct { +// data Splitter +// } +// tests := []struct { +// name string +// args args +// want NamespacedShares +// }{ +// { +// name: "evidence", +// args: args{ +// data: &coretypes.EvidenceData{ +// Evidence: []Evidence{testEvidence}, +// }, +// }, +// want: NamespacedShares{ +// NamespacedShare{ +// Share: append( +// append(reservedEvidenceNamespaceID, byte(0)), +// testEvidenceBytes[:consts.TxShareSize]..., +// ), +// ID: reservedEvidenceNamespaceID, +// }, +// NamespacedShare{ +// Share: append( +// append(reservedEvidenceNamespaceID, byte(0)), +// zeroPadIfNecessary(testEvidenceBytes[consts.TxShareSize:], consts.TxShareSize)..., +// ), +// ID: reservedEvidenceNamespaceID, +// }, +// }, +// }, +// {"small LL Tx", +// args{ +// data: Txs{smolTx}, +// }, +// NamespacedShares{ +// NamespacedShare{ +// Share: append( +// append(reservedTxNamespaceID, byte(0)), +// zeroPadIfNecessary(smolTxLenDelimited, consts.TxShareSize)..., +// ), +// ID: reservedTxNamespaceID, +// }, +// }, +// }, +// {"one large LL Tx", +// args{ +// data: Txs{largeTx}, +// }, +// NamespacedShares{ +// NamespacedShare{ +// Share: append( +// append(reservedTxNamespaceID, byte(0)), +// largeTxLenDelimited[:consts.TxShareSize]..., +// ), +// ID: reservedTxNamespaceID, +// }, +// NamespacedShare{ +// Share: append( +// append(reservedTxNamespaceID, byte(0)), +// zeroPadIfNecessary(largeTxLenDelimited[consts.TxShareSize:], consts.TxShareSize)..., +// ), +// ID: reservedTxNamespaceID, +// }, +// }, +// }, +// {"large then small LL Tx", +// args{ +// data: Txs{largeTx, smolTx}, +// }, +// NamespacedShares{ +// NamespacedShare{ +// Share: append( +// append(reservedTxNamespaceID, byte(0)), +// largeTxLenDelimited[:consts.TxShareSize]..., +// ), +// ID: reservedTxNamespaceID, +// }, +// NamespacedShare{ +// Share: append( +// append( +// reservedTxNamespaceID, +// byte(0), +// ), +// zeroPadIfNecessary( +// append(largeTxLenDelimited[consts.TxShareSize:], smolTxLenDelimited...), +// consts.TxShareSize, +// )..., +// ), +// ID: reservedTxNamespaceID, +// }, +// }, +// }, +// {"ll-app message", +// args{ +// data: Messages{[]coretypes.coretypes.Message{msg1}}, +// }, +// NamespacedShares{ +// NamespacedShare{ +// Share: append( +// []byte(msg1.NamespaceID), +// zeroPadIfNecessary(msg1Marshaled, consts.MsgShareSize)..., +// ), +// ID: msg1.NamespaceID, +// }, +// }, +// }, +// } +// for i, tt := range tests { +// tt := tt // stupid scopelint :-/ +// i := i +// t.Run(tt.name, func(t *testing.T) { +// got := tt.args.data.SplitIntoShares() +// if !reflect.DeepEqual(got, tt.want) { +// t.Errorf("%v: makeShares() = \n%+v\nwant\n%+v\n", i, got, tt.want) +// } +// }) +// } +// } + +func Test_zeroPadIfNecessary(t *testing.T) { + type args struct { + share []byte + width int + } + tests := []struct { + name string + args args + want []byte + }{ + {"pad", args{[]byte{1, 2, 3}, 6}, []byte{1, 2, 3, 0, 0, 0}}, + {"not necessary (equal to shareSize)", args{[]byte{1, 2, 3}, 3}, []byte{1, 2, 3}}, + {"not necessary (greater shareSize)", args{[]byte{1, 2, 3}, 2}, []byte{1, 2, 3}}, + } + for _, tt := range tests { + tt := tt // stupid scopelint :-/ + t.Run(tt.name, func(t *testing.T) { + if got := zeroPadIfNecessary(tt.args.share, tt.args.width); !reflect.DeepEqual(got, tt.want) { + t.Errorf("zeroPadIfNecessary() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_appendToSharesOverwrite(t *testing.T) { + var shares NamespacedShares + + // generate some arbitrary namespaced shares first share that must be split + newShare := generateRandomNamespacedShares(1, consts.MsgShareSize+1)[0] + + // make a copy of the portion of the share to check if it's overwritten later + extraCopy := make([]byte, consts.MsgShareSize) + copy(extraCopy, newShare.Share[:consts.MsgShareSize]) + + // use appendToShares to add our new share + AppendToShares(shares, newShare.ID, newShare.Share) + + // check if the original share data has been overwritten. + assert.Equal(t, extraCopy, []byte(newShare.Share[:consts.MsgShareSize])) +} + +func TestDataFromSquare(t *testing.T) { + type test struct { + name string + txCount int + evdCount int + msgCount int + maxSize int // max size of each tx or msg + } + + tests := []test{ + {"one of each random small size", 1, 1, 1, 40}, + {"one of each random large size", 1, 1, 1, 400}, + {"many of each random large size", 10, 10, 10, 40}, + {"many of each random large size", 10, 10, 10, 400}, + {"only transactions", 10, 0, 0, 400}, + {"only evidence", 0, 10, 0, 400}, + {"only messages", 0, 0, 10, 400}, + } + + for _, tc := range tests { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + // generate random data + data := generateRandomBlockData( + tc.txCount, + tc.evdCount, + tc.msgCount, + tc.maxSize, + ) + + shares, _, err := ComputeShares(&data, 0) + require.NoError(t, err) + rawShares := shares.RawShares() + + eds, err := rsmt2d.ComputeExtendedDataSquare(rawShares, consts.DefaultCodec(), rsmt2d.NewDefaultTree) + if err != nil { + t.Error(err) + } + + res, err := DataFromSquare(eds) + if err != nil { + t.Fatal(err) + } + + // we have to compare the evidence by string because the the + // timestamps differ not by actual time represented, but by + // internals see https://github.com/stretchr/testify/issues/666 + for i := 0; i < len(data.Evidence.Evidence); i++ { + inputEvidence := data.Evidence.Evidence[i].(*coretypes.DuplicateVoteEvidence) + resultEvidence := res.Evidence.Evidence[i].(*coretypes.DuplicateVoteEvidence) + assert.Equal(t, inputEvidence.String(), resultEvidence.String()) + } + + // compare the original to the result w/o the evidence + data.Evidence = coretypes.EvidenceData{} + res.Evidence = coretypes.EvidenceData{} + + res.OriginalSquareSize = data.OriginalSquareSize + + assert.Equal(t, data, res) + }) + } +} + +func TestFuzz_DataFromSquare(t *testing.T) { + t.Skip() + // run random shares through processContiguousShares for a minute + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + TestDataFromSquare(t) + } + } +} + +func Test_processContiguousShares(t *testing.T) { + // exactTxShareSize is the length of tx that will fit exactly into a single + // share, accounting for namespace id and the length delimiter prepended to + // each tx + const exactTxShareSize = consts.TxShareSize - 1 + + type test struct { + name string + txSize int + txCount int + } + + // each test is ran twice, once using txSize as an exact size, and again + // using it as a cap for randomly sized txs + tests := []test{ + {"single small tx", 10, 1}, + {"many small txs", 10, 10}, + {"single big tx", 1000, 1}, + {"many big txs", 1000, 10}, + {"single exact size tx", exactTxShareSize, 1}, + {"many exact size txs", exactTxShareSize, 10}, + } + + for _, tc := range tests { + tc := tc + + // run the tests with identically sized txs + t.Run(fmt.Sprintf("%s idendically sized ", tc.name), func(t *testing.T) { + txs := generateRandomContiguousShares(tc.txCount, tc.txSize) + + shares := txs.SplitIntoShares() + + parsedTxs, err := processContiguousShares(shares.RawShares()) + if err != nil { + t.Error(err) + } + + // check that the data parsed is identical + for i := 0; i < len(txs); i++ { + assert.Equal(t, []byte(txs[i]), parsedTxs[i]) + } + }) + + // run the same tests using randomly sized txs with caps of tc.txSize + t.Run(fmt.Sprintf("%s randomly sized", tc.name), func(t *testing.T) { + txs := generateRandomlySizedContiguousShares(tc.txCount, tc.txSize) + + shares := txs.SplitIntoShares() + + parsedTxs, err := processContiguousShares(shares.RawShares()) + if err != nil { + t.Error(err) + } + + // check that the data parsed is identical to the original + for i := 0; i < len(txs); i++ { + assert.Equal(t, []byte(txs[i]), parsedTxs[i]) + } + }) + } +} + +func TestFuzz_processContiguousShares(t *testing.T) { + t.Skip() + // run random shares through processContiguousShares for a minute + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + Test_processContiguousShares(t) + } + } +} + +func Test_parseMsgShares(t *testing.T) { + // exactMsgShareSize is the length of message that will fit exactly into a single + // share, accounting for namespace id and the length delimiter prepended to + // each message + const exactMsgShareSize = consts.MsgShareSize - 2 + + type test struct { + name string + msgSize int + msgCount int + } + + // each test is ran twice, once using msgSize as an exact size, and again + // using it as a cap for randomly sized leaves + tests := []test{ + {"single small msg", 1, 1}, + {"many small msgs", 4, 10}, + {"single big msg", 1000, 1}, + {"many big msgs", 1000, 10}, + {"single exact size msg", exactMsgShareSize, 1}, + {"many exact size msgs", exactMsgShareSize, 10}, + } + + for _, tc := range tests { + tc := tc + + // run the tests with identically sized messagses + t.Run(fmt.Sprintf("%s idendically sized ", tc.name), func(t *testing.T) { + rawmsgs := make([]coretypes.Message, tc.msgCount) + for i := 0; i < tc.msgCount; i++ { + rawmsgs[i] = generateRandomMessage(tc.msgSize) + } + msgs := coretypes.Messages{MessagesList: rawmsgs} + + shares := msgs.SplitIntoShares() + + parsedMsgs, err := parseMsgShares(shares.RawShares()) + if err != nil { + t.Error(err) + } + + // check that the namesapces and data are the same + for i := 0; i < len(msgs.MessagesList); i++ { + assert.Equal(t, msgs.MessagesList[i].NamespaceID, parsedMsgs[i].NamespaceID) + assert.Equal(t, msgs.MessagesList[i].Data, parsedMsgs[i].Data) + } + }) + + // run the same tests using randomly sized messages with caps of tc.msgSize + t.Run(fmt.Sprintf("%s randomly sized", tc.name), func(t *testing.T) { + msgs := generateRandomlySizedMessages(tc.msgCount, tc.msgSize) + shares := msgs.SplitIntoShares() + + parsedMsgs, err := parseMsgShares(shares.RawShares()) + if err != nil { + t.Error(err) + } + + // check that the namesapces and data are the same + for i := 0; i < len(msgs.MessagesList); i++ { + assert.Equal(t, msgs.MessagesList[i].NamespaceID, parsedMsgs[i].NamespaceID) + assert.Equal(t, msgs.MessagesList[i].Data, parsedMsgs[i].Data) + } + }) + } +} + +func TestContigShareWriter(t *testing.T) { + // note that this test is mainly for debugging purposes, the main round trip + // tests occur in TestDataFromSquare and Test_processContiguousShares + w := NewContiguousShareWriter(consts.TxNamespaceID) + txs := generateRandomContiguousShares(33, 200) + for _, tx := range txs { + rawTx, _ := tx.MarshalDelimited() + w.Write(rawTx) + } + resShares := w.Export() + rawResTxs, err := processContiguousShares(resShares.RawShares()) + resTxs := coretypes.ToTxs(rawResTxs) + require.NoError(t, err) + + assert.Equal(t, txs, resTxs) +} + +func Test_parseDelimiter(t *testing.T) { + for i := uint64(0); i < 100; i++ { + tx := generateRandomContiguousShares(1, int(i))[0] + input, err := tx.MarshalDelimited() + if err != nil { + panic(err) + } + res, txLen, err := ParseDelimiter(input) + if err != nil { + panic(err) + } + assert.Equal(t, i, txLen) + assert.Equal(t, []byte(tx), res) + } +} + +// generateRandomBlockData returns randomly generated block data for testing purposes +func generateRandomBlockData(txCount, evdCount, msgCount, maxSize int) coretypes.Data { + var out coretypes.Data + out.Txs = generateRandomlySizedContiguousShares(txCount, maxSize) + out.Evidence = generateIdenticalEvidence(evdCount) + out.Messages = generateRandomlySizedMessages(msgCount, maxSize) + return out +} + +func generateRandomlySizedContiguousShares(count, max int) coretypes.Txs { + txs := make(coretypes.Txs, count) + for i := 0; i < count; i++ { + size := rand.Intn(max) + if size == 0 { + size = 1 + } + txs[i] = generateRandomContiguousShares(1, size)[0] + } + return txs +} + +func generateRandomContiguousShares(count, size int) coretypes.Txs { + txs := make(coretypes.Txs, count) + for i := 0; i < count; i++ { + tx := make([]byte, size) + _, err := rand.Read(tx) + if err != nil { + panic(err) + } + txs[i] = tx + } + return txs +} + +func generateIdenticalEvidence(count int) coretypes.EvidenceData { + evidence := make([]coretypes.Evidence, count) + for i := 0; i < count; i++ { + ev := coretypes.NewMockDuplicateVoteEvidence(math.MaxInt64, time.Now(), "chainID") + evidence[i] = ev + } + return coretypes.EvidenceData{Evidence: evidence} +} + +func generateRandomlySizedMessages(count, maxMsgSize int) coretypes.Messages { + msgs := make([]coretypes.Message, count) + for i := 0; i < count; i++ { + msgs[i] = generateRandomMessage(rand.Intn(maxMsgSize)) + } + + // this is just to let us use assert.Equal + if count == 0 { + msgs = nil + } + + messages := coretypes.Messages{MessagesList: msgs} + messages.SortMessages() + return messages +} + +func generateRandomMessage(size int) coretypes.Message { + share := generateRandomNamespacedShares(1, size)[0] + msg := coretypes.Message{ + NamespaceID: share.NamespaceID(), + Data: share.Data(), + } + return msg +} + +func generateRandomNamespacedShares(count, msgSize int) NamespacedShares { + shares := generateRandNamespacedRawData(uint32(count), consts.NamespaceSize, uint32(msgSize)) + msgs := make([]coretypes.Message, count) + for i, s := range shares { + msgs[i] = coretypes.Message{ + Data: s[consts.NamespaceSize:], + NamespaceID: s[:consts.NamespaceSize], + } + } + return SplitMessagesIntoShares(coretypes.Messages{MessagesList: msgs}) +} + +func generateRandNamespacedRawData(total, nidSize, leafSize uint32) [][]byte { + data := make([][]byte, total) + for i := uint32(0); i < total; i++ { + nid := make([]byte, nidSize) + rand.Read(nid) + data[i] = nid + } + sortByteArrays(data) + for i := uint32(0); i < total; i++ { + d := make([]byte, leafSize) + rand.Read(d) + data[i] = append(data[i], d...) + } + + return data +} + +func sortByteArrays(src [][]byte) { + sort.Slice(src, func(i, j int) bool { return bytes.Compare(src[i], src[j]) < 0 }) +} diff --git a/x/payment/types/payfordata.go b/x/payment/types/payfordata.go index 86cc4d239c..e251eb89a5 100644 --- a/x/payment/types/payfordata.go +++ b/x/payment/types/payfordata.go @@ -5,6 +5,7 @@ import ( "fmt" "math/bits" + shares "github.com/celestiaorg/celestia-app/pkg/shares" "github.com/celestiaorg/rsmt2d" sdkclient "github.com/cosmos/cosmos-sdk/client" sdk "github.com/cosmos/cosmos-sdk/types" @@ -134,7 +135,7 @@ func CreateCommitment(k uint64, namespace, message []byte) ([]byte, error) { // split into shares that are length delimited and include the namespace in // each share - shares := msg.SplitIntoShares().RawShares() + shares := shares.SplitMessagesIntoShares(msg).RawShares() // if the number of shares is larger than that in the square, throw an error // note, we use k*k-1 here because at least a single share will be reserved // for the transaction paying for the message, therefore the max number of