Skip to content

Commit

Permalink
feat: store skipped txs in local db
Browse files Browse the repository at this point in the history
  • Loading branch information
Thegaram committed Aug 17, 2023
1 parent 7b8c20f commit d698c6d
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 15 deletions.
9 changes: 7 additions & 2 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
return nil
}

blockHash := block.Hash()

if v.config.Scroll.L1Config == nil {
// TODO: should we allow follower nodes to skip L1 message verification?
panic("Running on L1Message-enabled network but no l1Config was provided")
Expand Down Expand Up @@ -168,13 +170,16 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
// skipped messages
// TODO: consider verifying that skipped messages overflow
for index := queueIndex; index < txQueueIndex; index++ {
log.Debug("Skipped L1 message", "queueIndex", index, "tx", tx.Hash().String(), "block", block.Hash().String())

if exists := it.Next(); !exists {
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
}

l1msg := it.L1Message()
skippedTx := types.NewTx(&l1msg)
log.Debug("Skipped L1 message", "queueIndex", index, "tx", skippedTx.Hash().String(), "block", blockHash.String())
rawdb.WriteSkippedL1Message(v.db, skippedTx, "unknown", block.NumberU64(), &blockHash)
}

queueIndex = txQueueIndex + 1
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type L1MessageIterator struct {
// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
start := encodeQueueIndex(fromQueueIndex)
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8

Expand Down Expand Up @@ -180,7 +180,7 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
// The L2 block is identified by its block hash. If the L2 block contains zero
// L1 messages, this value MUST equal its parent's value.
func WriteFirstQueueIndexNotInL2Block(db ethdb.KeyValueWriter, l2BlockHash common.Hash, queueIndex uint64) {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeQueueIndex(queueIndex)); err != nil {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeBigEndian(queueIndex)); err != nil {
log.Crit("Failed to store first L1 message not in L2 block", "l2BlockHash", l2BlockHash, "err", err)
}
}
Expand Down
202 changes: 202 additions & 0 deletions core/rawdb/accessors_skipped_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package rawdb

import (
"bytes"
"encoding/binary"
"math/big"
"sync"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rlp"
)

// mutex used to avoid concurrent updates of NumSkippedL1Messages
var mu sync.Mutex

// WriteNumSkippedL1Messages writes the number of skipped L1 messages to the database.
func WriteNumSkippedL1Messages(db ethdb.KeyValueWriter, numSkipped uint64) {
value := big.NewInt(0).SetUint64(numSkipped).Bytes()

if err := db.Put(numSkippedL1MessagesKey, value); err != nil {
log.Crit("Failed to update the number of skipped L1 messages", "err", err)
}
}

// ReadNumSkippedL1Messages retrieves the number of skipped messages.
func ReadNumSkippedL1Messages(db ethdb.Reader) uint64 {
data, err := db.Get(numSkippedL1MessagesKey)
if err != nil && isNotFoundErr(err) {
return 0
}
if err != nil {
log.Crit("Failed to read number of skipped L1 messages from database", "err", err)
}
if len(data) == 0 {
return 0
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected number of skipped L1 messages in database", "number", number)
}
return number.Uint64()
}

// SkippedTransaction stores the transaction object, along with the skip reason and block context.
type SkippedTransaction struct {
// Tx is the skipped transaction.
// We store the tx itself because otherwise geth will discard it after skipping.
Tx *types.Transaction

// Reason is the skip reason.
Reason string

// BlockNumber is the number of the block in which this transaction was skipped.
BlockNumber uint64

// BlockNumber is the hash of the block in which this transaction was skipped or nil.
BlockHash *common.Hash
}

// WriteSkippedTransaction writes a skipped transaction to the database.
func WriteSkippedTransaction(db ethdb.KeyValueWriter, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// workaround: RLP decoding fails if this is nil
if blockHash == nil {
blockHash = &common.Hash{}
}
stx := SkippedTransaction{Tx: tx, Reason: reason, BlockNumber: blockNumber, BlockHash: blockHash}
bytes, err := rlp.EncodeToBytes(stx)
if err != nil {
log.Crit("Failed to RLP encode skipped transaction", "err", err)
}
if err := db.Put(SkippedTransactionKey(tx.Hash()), bytes); err != nil {
log.Crit("Failed to store skipped transaction", "hash", tx.Hash().String(), "err", err)
}
}

// ReadSkippedTransactionRLP retrieves a skipped transaction in its raw RLP database encoding.
func ReadSkippedTransactionRLP(db ethdb.Reader, txHash common.Hash) rlp.RawValue {
data, err := db.Get(SkippedTransactionKey(txHash))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped transaction", "hash", txHash.String(), "err", err)
}
return data
}

// ReadSkippedTransaction retrieves a skipped transaction by its hash, along with its skipped reason.
func ReadSkippedTransaction(db ethdb.Reader, txHash common.Hash) *SkippedTransaction {
data := ReadSkippedTransactionRLP(db, txHash)
if len(data) == 0 {
return nil
}
var stx SkippedTransaction
if err := rlp.Decode(bytes.NewReader(data), &stx); err != nil {
log.Crit("Invalid skipped transaction RLP", "hash", txHash.String(), "data", data, "err", err)
}
if stx.BlockHash != nil && *stx.BlockHash == (common.Hash{}) {
stx.BlockHash = nil
}
return &stx
}

// WriteSkippedL1MessageHash writes the hash of a skipped L1 message to the database.
func WriteSkippedL1MessageHash(db ethdb.KeyValueWriter, index uint64, txHash common.Hash) {
if err := db.Put(SkippedL1MessageHashKey(index), txHash[:]); err != nil {
log.Crit("Failed to store skipped transaction index", "index", index, "hash", txHash.String(), "err", err)
}
}

// ReadSkippedL1MessageHash retrieves the hash of a skipped L1 message by its index.
func ReadSkippedL1MessageHash(db ethdb.Reader, index uint64) *common.Hash {
data, err := db.Get(SkippedL1MessageHashKey(index))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped L1 message index index", "index", index, "err", err)
}
hash := common.BytesToHash(data)
return &hash
}

// WriteSkippedL1Message writes a skipped L1 message to the database and also updates the count and lookup index.
// Note: The lookup index and count will include duplicates if there are chain reorgs.
func WriteSkippedL1Message(db ethdb.Database, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// this method is not accessed concurrently, but just to be sure...
mu.Lock()
defer mu.Unlock()

index := ReadNumSkippedL1Messages(db)

// update in a batch
batch := db.NewBatch()
WriteSkippedTransaction(db, tx, reason, blockNumber, blockHash)
WriteSkippedL1MessageHash(db, index, tx.Hash())
WriteNumSkippedL1Messages(db, index+1)

// write to DB
if err := batch.Write(); err != nil {
log.Crit("Failed to store skipped L1 message", "hash", tx.Hash().String(), "err", err)
}
}

// SkippedTransactionIterator is a wrapper around ethdb.Iterator that
// allows us to iterate over skipped L1 message hashes in the database.
// It implements an interface similar to ethdb.Iterator.
type SkippedTransactionIterator struct {
inner ethdb.Iterator
db ethdb.Reader
keyLength int
}

// IterateSkippedTransactionsFrom creates a SkippedTransactionIterator that iterates
// over all skipped L1 message hashes in the database starting at the provided index.
func IterateSkippedTransactionsFrom(db ethdb.Database, index uint64) SkippedTransactionIterator {
start := encodeBigEndian(index)
it := db.NewIterator(skippedL1MessageHashPrefix, start)
keyLength := len(skippedL1MessageHashPrefix) + 8

return SkippedTransactionIterator{
inner: it,
db: db,
keyLength: keyLength,
}
}

// Next moves the iterator to the next key/value pair.
// It returns false when the iterator is exhausted.
// TODO: Consider reading items in batches.
func (it *SkippedTransactionIterator) Next() bool {
for it.inner.Next() {
key := it.inner.Key()
if len(key) == it.keyLength {
return true
}
}
return false
}

// Index returns the index of the current skipped L1 message hash.
func (it *SkippedTransactionIterator) Index() uint64 {
key := it.inner.Key()
raw := key[len(skippedL1MessageHashPrefix) : len(skippedL1MessageHashPrefix)+8]
index := binary.BigEndian.Uint64(raw)
return index
}

// TransactionHash returns the current skipped L1 message hash.
func (it *SkippedTransactionIterator) TransactionHash() common.Hash {
data := it.inner.Value()
return common.BytesToHash(data)
}

// Release releases the associated resources.
func (it *SkippedTransactionIterator) Release() {
it.inner.Release()
}
134 changes: 134 additions & 0 deletions core/rawdb/accessors_skipped_txs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package rawdb

import (
"math/big"
"sync"
"testing"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
)

func TestReadWriteNumSkippedL1Messages(t *testing.T) {
blockNumbers := []uint64{
1,
1 << 2,
1 << 8,
1 << 16,
1 << 32,
}

db := NewMemoryDatabase()
for _, num := range blockNumbers {
WriteNumSkippedL1Messages(db, num)
got := ReadNumSkippedL1Messages(db)

if got != num {
t.Fatal("Num L1 messages mismatch", "expected", num, "got", got)
}
}
}

func newTestTransaction(queueIndex uint64) *types.Transaction {
l1msg := types.L1MessageTx{
QueueIndex: queueIndex,
Gas: 0,
To: &common.Address{},
Value: big.NewInt(0),
Data: nil,
Sender: common.Address{},
}
return types.NewTx(&l1msg)
}

func TestReadWriteSkippedTransaction(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
}

func TestReadWriteSkippedL1Message(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
WriteSkippedL1Message(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
count := ReadNumSkippedL1Messages(db)
if count != 1 {
t.Fatal("Skipped transaction count mismatch", "expected", 1, "got", count)
}
hash := ReadSkippedL1MessageHash(db, 0)
if hash == nil || *hash != tx.Hash() {
t.Fatal("Skipped L1 message hash mismatch", "expected", tx.Hash(), "got", hash)
}
}

func TestSkippedL1MessageConcurrentUpdate(t *testing.T) {
count := 20
tx := newTestTransaction(123)
db := NewMemoryDatabase()
var wg sync.WaitGroup
for ii := 0; ii < count; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
WriteSkippedL1Message(db, tx, "random reason", 1, &common.Hash{1})
}()
}
wg.Wait()
got := ReadNumSkippedL1Messages(db)
if got != uint64(count) {
t.Fatal("Skipped transaction count mismatch", "expected", count, "got", got)
}
}

func TestIterateSkippedL1Messages(t *testing.T) {
db := NewMemoryDatabase()

txs := []*types.Transaction{
newTestTransaction(1),
newTestTransaction(2),
newTestTransaction(3),
newTestTransaction(4),
newTestTransaction(5),
}

for _, tx := range txs {
WriteSkippedL1Message(db, tx, "random reason", 1, &common.Hash{1})
}

// simulate skipped L2 tx that's not included in the index
l2tx := newTestTransaction(6)
WriteSkippedTransaction(db, l2tx, "random reason", 1, &common.Hash{1})

it := IterateSkippedTransactionsFrom(db, 2)
defer it.Release()

for ii := 2; ii < len(txs); ii++ {
finished := !it.Next()
if finished {
t.Fatal("Iterator terminated early", "ii", ii)
}

index := it.Index()
if index != uint64(ii) {
t.Fatal("Invalid skipped L1 message index", "expected", ii, "got", index)
}

hash := it.TransactionHash()
if hash != txs[ii].Hash() {
t.Fatal("Invalid skipped L1 message hash", "expected", txs[ii].Hash(), "got", hash)
}
}

finished := !it.Next()
if !finished {
t.Fatal("Iterator did not terminate")
}
}
Loading

0 comments on commit d698c6d

Please sign in to comment.