Skip to content

Commit

Permalink
Track malleated tx events (#607)
Browse files Browse the repository at this point in the history
* patch events to look for "parent" (before being malleated) tx hash

* clean up

* test

* change name DecodeChildTx -> UnwrapChildTx

* improve test by getting rid of superfluous goroutine

* rename ChildTx -> MalleatedTx and ParentTx -> OriginalTx

* add the buf.gen.yaml to appease the ever sadistic protobuf gods
  • Loading branch information
evan-forbes authored Jan 10, 2022
1 parent 199a1e4 commit 266919c
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 213 deletions.
14 changes: 14 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# The version of the generation template (required).
# The only currently-valid value is v1beta1.
version: v1beta1

# The plugins to run.
plugins:
# The name of the plugin.
- name: gogofaster
# The directory where the generated proto output will be written.
# The directory is relative to where the generation tool was run.
out: proto
# Set options to assign import paths to the well-known types
# and to enable service generation.
opt: Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/golang/protobuf/ptypes/duration,plugins=grpc,paths=source_relative
4 changes: 2 additions & 2 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,9 @@ func (mem *CListMempool) Update(
// https://github.com/tendermint/tendermint/issues/3322.
if e, ok := mem.txsMap.Load(TxKey(tx)); ok {
mem.removeTx(tx, e.(*clist.CElement), false)
// see if the transaction is a child transaction of a some parent
// see if the transaction is a malleated transaction of a some parent
// transaction that exists in the mempool
} else if parentHash, _, isChild := types.DecodeChildTx(tx); isChild {
} else if parentHash, _, isMalleated := types.UnwrapMalleatedTx(tx); isMalleated {
var parentKey [TxKeySize]byte
copy(parentKey[:], parentHash)
mem.RemoveTxByKey(parentKey, false)
Expand Down
18 changes: 9 additions & 9 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,22 @@ func TestMempoolUpdate(t *testing.T) {
require.NoError(t, err)
}

// 4. Removes a parent transaction after receiving a child transaction in the update
// 4. Removes a original transaction after receiving a malleated transaction in the update
{
mempool.Flush()
parentTx := []byte{1, 2, 3, 4}
childTx := []byte{1, 2}
parentHash := sha256.Sum256(parentTx)
originalTx := []byte{1, 2, 3, 4}
malleated := []byte{1, 2}
originalHash := sha256.Sum256(originalTx)

// create the wrapped child transaction
wTx, err := types.WrapChildTx(parentHash[:], childTx)
// create the wrapped malleated transaction
wTx, err := types.WrapMalleatedTx(originalHash[:], malleated)
require.NoError(t, err)

// add the parent transaction to the mempool
err = mempool.CheckTx(parentTx, nil, TxInfo{})
// add the original transaction to the mempool
err = mempool.CheckTx(originalTx, nil, TxInfo{})
require.NoError(t, err)

// remove the parent from the mempool using the wrapped child tx
// remove the original from the mempool using the wrapped malleated tx
err = mempool.Update(1, []types.Tx{wTx}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)

Expand Down
311 changes: 156 additions & 155 deletions proto/tendermint/types/types.pb.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions proto/tendermint/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,10 @@ message TxProof {
tendermint.crypto.Proof proof = 3;
}

// ChildTx wraps a transaction that was derived from a parent transaction. This
// allows for removal of the parent transaction from the mempool.
message ChildTx {
bytes parent_tx_hash = 1;
// MalleatedTx wraps a transaction that was derived from a different original
// transaction. This allows for tendermint to track malleated and original
// transactions
message MalleatedTx {
bytes original_tx_hash = 1;
bytes tx = 2;
}
10 changes: 9 additions & 1 deletion types/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,19 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()

var txHash []byte
if originalHash, malleated, ismalleated := UnwrapMalleatedTx(data.Tx); ismalleated {
txHash = originalHash
data.Tx = malleated
} else {
txHash = Tx(data.Tx).Hash()
}

events := b.validateAndStringifyEvents(data.Result.Events, b.Logger.With("tx", data.Tx))

// add predefined compositeKeys
events[EventTypeKey] = append(events[EventTypeKey], EventTx)
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", Tx(data.Tx).Hash()))
events[TxHashKey] = append(events[TxHashKey], fmt.Sprintf("%X", txHash))
events[TxHeightKey] = append(events[TxHeightKey], fmt.Sprintf("%d", data.Height))

return b.pubsub.PublishWithEvents(ctx, data, events)
Expand Down
78 changes: 62 additions & 16 deletions types/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,59 @@ func TestEventBusPublishEventTx(t *testing.T) {
}
}

func TestEventBusPublishEventMalleatedTx(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})

tx := Tx("foo")
malleatedTx := Tx("foo-malleated")
wrappedMalleatedTx, err := WrapMalleatedTx(tx.Hash(), malleatedTx)
require.NoError(t, err)

result := abci.ResponseDeliverTx{
Data: []byte("bar"),
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{{Key: []byte("baz"), Value: []byte("1")}}},
},
}

// PublishEventTx adds 3 composite keys, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND testType.baz=1", tx.Hash())
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)

done := make(chan struct{})
go func() {
msg := <-txsSub.Out()
edt := msg.Data().(EventDataTx)
assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index)
assert.EqualValues(t, malleatedTx, edt.Tx)
assert.Equal(t, result, edt.Result)
close(done)
}()

err = eventBus.PublishEventTx(EventDataTx{abci.TxResult{
Height: 1,
Index: 0,
Tx: wrappedMalleatedTx,
Result: result,
}})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a transaction after 1 sec.")
}
}

func TestEventBusPublishEventNewBlock(t *testing.T) {
eventBus := NewEventBus()
err := eventBus.Start()
Expand Down Expand Up @@ -93,28 +146,21 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err)

done := make(chan struct{})
go func() {
msg := <-blocksSub.Out()
edt := msg.Data().(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
}()

err = eventBus.PublishEventNewBlock(EventDataNewBlock{
Block: block,
ResultBeginBlock: resultBeginBlock,
ResultEndBlock: resultEndBlock,
})
assert.NoError(t, err)

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("did not receive a block after 1 sec.")
}
done := make(chan struct{})
// go func() {
msg := <-blocksSub.Out()
edt := msg.Data().(EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
close(done)
assert.NoError(t, err)
}

func TestEventBusPublishEventTxDuplicateKeys(t *testing.T) {
Expand Down
41 changes: 21 additions & 20 deletions types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,38 +161,39 @@ func ComputeProtoSizeForTxs(txs []Tx) int64 {
return int64(pdData.Size())
}

// DecodeChildTx attempts to unmarshal the provided transaction into a child
// transaction wrapper, if this an be done, then it returns true. A child
// transaction is a normal transaction that has been derived from a different
// parent transaction. The returned hash is that of the parent transaction,
// which allows us to remove the parent transaction from the mempool. NOTE:
// protobuf sometimes does not throw an error if the transaction passed is not
// a tmproto.ChildTx, since the schema for PayForMessage is kept in the app, we
// cannot perform further checks without creating an import cycle.
func DecodeChildTx(tx Tx) (hash []byte, unwrapped Tx, has bool) {
// attempt to unmarshal into a a child transaction
var childTx tmproto.ChildTx
err := proto.Unmarshal(tx, &childTx)
// UnwrapMalleatedTx attempts to unmarshal the provided transaction into a malleated
// transaction wrapper, if this an be done, then it returns true. A malleated
// transaction is a normal transaction that has been derived (malleated) from a
// different original transaction. The returned hash is that of the original
// transaction, which allows us to remove the original transaction from the
// mempool. NOTE: protobuf sometimes does not throw an error if the transaction
// passed is not a tmproto.MalleatedTx, since the schema for PayForMessage is kept
// in the app, we cannot perform further checks without creating an import
// cycle.
func UnwrapMalleatedTx(tx Tx) (originalHash []byte, unwrapped Tx, isMalleated bool) {
// attempt to unmarshal into a a malleated transaction
var malleatedTx tmproto.MalleatedTx
err := proto.Unmarshal(tx, &malleatedTx)
if err != nil {
return nil, nil, false
}
// this check will fail to catch unwanted types should those unmarshalled
// types happen to have a hash sized slice of bytes in the same field number
// as ParentTxHash. TODO(evan): either fix this, or better yet use a different
// as originalTxHash. TODO(evan): either fix this, or better yet use a different
// mechanism
if len(childTx.ParentTxHash) != tmhash.Size {
if len(malleatedTx.OriginalTxHash) != tmhash.Size {
return nil, nil, false
}
return childTx.ParentTxHash, childTx.Tx, true
return malleatedTx.OriginalTxHash, malleatedTx.Tx, true
}

// WrapChildTx creates a wrapped Tx that includes the parent transaction's hash
// WrapMalleatedTx creates a wrapped Tx that includes the original transaction's hash
// so that it can be easily removed from the mempool. note: must be unwrapped to
// be a viable sdk.Tx
func WrapChildTx(parentHash []byte, child Tx) (Tx, error) {
wTx := tmproto.ChildTx{
ParentTxHash: parentHash,
Tx: child,
func WrapMalleatedTx(originalHash []byte, malleated Tx) (Tx, error) {
wTx := tmproto.MalleatedTx{
OriginalTxHash: originalHash,
Tx: malleated,
}
return proto.Marshal(&wTx)
}
12 changes: 6 additions & 6 deletions types/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ func assertBadProof(t *testing.T, root []byte, bad []byte, good TxProof) {
}
}

func TestDecodeChildTx(t *testing.T) {
func TestUnwrapMalleatedTx(t *testing.T) {
// perform a simple test for being unable to decode a non
// child transaction
// malleated transaction
tx := Tx{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
_, _, ok := DecodeChildTx(tx)
_, _, ok := UnwrapMalleatedTx(tx)
require.False(t, ok)

// create a proto message that used to be decoded when it shouldn't have
Expand All @@ -181,15 +181,15 @@ func TestDecodeChildTx(t *testing.T) {

// due to protobuf not actually requiring type compatibility
// we need to make sure that there is some check
_, _, ok = DecodeChildTx(rawBlock)
_, _, ok = UnwrapMalleatedTx(rawBlock)
require.False(t, ok)

pHash := sha256.Sum256(rawBlock)
childTx, err := WrapChildTx(pHash[:], rawBlock)
MalleatedTx, err := WrapMalleatedTx(pHash[:], rawBlock)
require.NoError(t, err)

// finally, ensure that the unwrapped bytes are identical to the input
unwrappedHash, unwrapped, ok := DecodeChildTx(childTx)
unwrappedHash, unwrapped, ok := UnwrapMalleatedTx(MalleatedTx)
require.True(t, ok)
assert.Equal(t, 32, len(unwrappedHash))
require.Equal(t, rawBlock, []byte(unwrapped))
Expand Down

0 comments on commit 266919c

Please sign in to comment.