Skip to content

Commit

Permalink
feat: store skipped transactions in local db (#467)
Browse files Browse the repository at this point in the history
* feat: store skipped txs in local db

* bump version

* bump version

* fix test

* include L2 txs in skipped index

* goimports

* rename more

* bump version

* fix missed renames

* fix the bug when calculating l2TxCount. (#479)

* fix bug when calculate l2 tx count

* Update version

* fix: exclude L1 message from block payload size validation (#476)

* fix: exclude L1 message from block payload size validation

* fix the bug when calculating l2TxCount. (#479)

* fix bug when calculate l2 tx count

* Update version

* bump version

---------

Co-authored-by: maskpp <[email protected]>

* fix: update row estimation with scroll-prover `v0.7.2` (#475)

* Fix row estimation.

* Update libzkp.

* more

* prepare

* finish

* upgrade

* bump version

* fix tests

* Update to scroll-prover `v0.7.2`.

* fix tests

* Update miner/worker.go

Co-authored-by: Péter Garamvölgyi <[email protected]>

* Update miner/worker.go

Co-authored-by: Péter Garamvölgyi <[email protected]>

* Reset ccc when skips first tx.

* do not unnecessarily skip L1 message

* fix ccc reset and improve code readability

* seal block on circuitcapacitychecker.ErrUnknown

---------

Co-authored-by: HAOYUatHZ <[email protected]>
Co-authored-by: Péter Garamvölgyi <[email protected]>

* refactor: simplify ccc revert to snapshot (#480)

* simplify ccc revert to snapshot

* Update version.go

---------

Co-authored-by: HAOYUatHZ <[email protected]>

* feat: use --gcmode=archive and --cache.noprefetch=true by default (#482)

* feat: use --gcmode=archive and --cache.noprefetch=true by default

* refuse to start with invalid config

* lint

* lint

* feat(sdk): support compressed response (#469)

* enable use compression algorithm

* fix ci

* Just enable decode compressed content at ethclient

* fix comments

---------

Co-authored-by: Haichen Shen <[email protected]>

* fix: disable pruning and prefetch if not flags are provided (#483)

Co-authored-by: colin <[email protected]>

* fix: update libzkp to use scroll-prover `v0.7.5` (#484)

* Update libzkp to use scroll-prover `v0.7.5`.

* Update version.

* address comments

* bump version

* nit

---------

Co-authored-by: maskpp <[email protected]>
Co-authored-by: Steven <[email protected]>
Co-authored-by: HAOYUatHZ <[email protected]>
Co-authored-by: iczc <[email protected]>
Co-authored-by: HAOYUatHZ <[email protected]>
Co-authored-by: Haichen Shen <[email protected]>
Co-authored-by: colin <[email protected]>
  • Loading branch information
8 people authored Aug 25, 2023
1 parent 637cb37 commit 7de261b
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 15 deletions.
9 changes: 7 additions & 2 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
return nil
}

blockHash := block.Hash()

if v.config.Scroll.L1Config == nil {
// TODO: should we allow follower nodes to skip L1 message verification?
panic("Running on L1Message-enabled network but no l1Config was provided")
Expand Down Expand Up @@ -168,13 +170,16 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
// skipped messages
// TODO: consider verifying that skipped messages overflow
for index := queueIndex; index < txQueueIndex; index++ {
log.Debug("Skipped L1 message", "queueIndex", index, "tx", tx.Hash().String(), "block", block.Hash().String())

if exists := it.Next(); !exists {
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
}

l1msg := it.L1Message()
skippedTx := types.NewTx(&l1msg)
log.Debug("Skipped L1 message", "queueIndex", index, "tx", skippedTx.Hash().String(), "block", blockHash.String())
rawdb.WriteSkippedTransaction(v.db, skippedTx, "unknown", block.NumberU64(), &blockHash)
}

queueIndex = txQueueIndex + 1
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type L1MessageIterator struct {
// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
start := encodeQueueIndex(fromQueueIndex)
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8

Expand Down Expand Up @@ -180,7 +180,7 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
// The L2 block is identified by its block hash. If the L2 block contains zero
// L1 messages, this value MUST equal its parent's value.
func WriteFirstQueueIndexNotInL2Block(db ethdb.KeyValueWriter, l2BlockHash common.Hash, queueIndex uint64) {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeQueueIndex(queueIndex)); err != nil {
if err := db.Put(FirstQueueIndexNotInL2BlockKey(l2BlockHash), encodeBigEndian(queueIndex)); err != nil {
log.Crit("Failed to store first L1 message not in L2 block", "l2BlockHash", l2BlockHash, "err", err)
}
}
Expand Down
202 changes: 202 additions & 0 deletions core/rawdb/accessors_skipped_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package rawdb

import (
"bytes"
"encoding/binary"
"math/big"
"sync"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rlp"
)

// mutex used to avoid concurrent updates of NumSkippedTransactions
var mu sync.Mutex

// writeNumSkippedTransactions writes the number of skipped transactions to the database.
func writeNumSkippedTransactions(db ethdb.KeyValueWriter, numSkipped uint64) {
value := big.NewInt(0).SetUint64(numSkipped).Bytes()

if err := db.Put(numSkippedTransactionsKey, value); err != nil {
log.Crit("Failed to update the number of skipped transactions", "err", err)
}
}

// ReadNumSkippedTransactions retrieves the number of skipped transactions.
func ReadNumSkippedTransactions(db ethdb.Reader) uint64 {
data, err := db.Get(numSkippedTransactionsKey)
if err != nil && isNotFoundErr(err) {
return 0
}
if err != nil {
log.Crit("Failed to read number of skipped transactions from database", "err", err)
}
if len(data) == 0 {
return 0
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected number of skipped transactions in database", "number", number)
}
return number.Uint64()
}

// SkippedTransaction stores the transaction object, along with the skip reason and block context.
type SkippedTransaction struct {
// Tx is the skipped transaction.
// We store the tx itself because otherwise geth will discard it after skipping.
Tx *types.Transaction

// Reason is the skip reason.
Reason string

// BlockNumber is the number of the block in which this transaction was skipped.
BlockNumber uint64

// BlockNumber is the hash of the block in which this transaction was skipped or nil.
BlockHash *common.Hash
}

// writeSkippedTransaction writes a skipped transaction to the database.
func writeSkippedTransaction(db ethdb.KeyValueWriter, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// workaround: RLP decoding fails if this is nil
if blockHash == nil {
blockHash = &common.Hash{}
}
stx := SkippedTransaction{Tx: tx, Reason: reason, BlockNumber: blockNumber, BlockHash: blockHash}
bytes, err := rlp.EncodeToBytes(stx)
if err != nil {
log.Crit("Failed to RLP encode skipped transaction", "err", err)
}
if err := db.Put(SkippedTransactionKey(tx.Hash()), bytes); err != nil {
log.Crit("Failed to store skipped transaction", "hash", tx.Hash().String(), "err", err)
}
}

// readSkippedTransactionRLP retrieves a skipped transaction in its raw RLP database encoding.
func readSkippedTransactionRLP(db ethdb.Reader, txHash common.Hash) rlp.RawValue {
data, err := db.Get(SkippedTransactionKey(txHash))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped transaction", "hash", txHash.String(), "err", err)
}
return data
}

// ReadSkippedTransaction retrieves a skipped transaction by its hash, along with its skipped reason.
func ReadSkippedTransaction(db ethdb.Reader, txHash common.Hash) *SkippedTransaction {
data := readSkippedTransactionRLP(db, txHash)
if len(data) == 0 {
return nil
}
var stx SkippedTransaction
if err := rlp.Decode(bytes.NewReader(data), &stx); err != nil {
log.Crit("Invalid skipped transaction RLP", "hash", txHash.String(), "data", data, "err", err)
}
if stx.BlockHash != nil && *stx.BlockHash == (common.Hash{}) {
stx.BlockHash = nil
}
return &stx
}

// writeSkippedTransactionHash writes the hash of a skipped transaction to the database.
func writeSkippedTransactionHash(db ethdb.KeyValueWriter, index uint64, txHash common.Hash) {
if err := db.Put(SkippedTransactionHashKey(index), txHash[:]); err != nil {
log.Crit("Failed to store skipped transaction hash", "index", index, "hash", txHash.String(), "err", err)
}
}

// ReadSkippedTransactionHash retrieves the hash of a skipped transaction by its index.
func ReadSkippedTransactionHash(db ethdb.Reader, index uint64) *common.Hash {
data, err := db.Get(SkippedTransactionHashKey(index))
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to load skipped transaction hash", "index", index, "err", err)
}
hash := common.BytesToHash(data)
return &hash
}

// WriteSkippedTransaction writes a skipped transaction to the database and also updates the count and lookup index.
// Note: The lookup index and count will include duplicates if there are chain reorgs.
func WriteSkippedTransaction(db ethdb.Database, tx *types.Transaction, reason string, blockNumber uint64, blockHash *common.Hash) {
// this method is not accessed concurrently, but just to be sure...
mu.Lock()
defer mu.Unlock()

index := ReadNumSkippedTransactions(db)

// update in a batch
batch := db.NewBatch()
writeSkippedTransaction(db, tx, reason, blockNumber, blockHash)
writeSkippedTransactionHash(db, index, tx.Hash())
writeNumSkippedTransactions(db, index+1)

// write to DB
if err := batch.Write(); err != nil {
log.Crit("Failed to store skipped transaction", "hash", tx.Hash().String(), "err", err)
}
}

// SkippedTransactionIterator is a wrapper around ethdb.Iterator that
// allows us to iterate over skipped transaction hashes in the database.
// It implements an interface similar to ethdb.Iterator.
type SkippedTransactionIterator struct {
inner ethdb.Iterator
db ethdb.Reader
keyLength int
}

// IterateSkippedTransactionsFrom creates a SkippedTransactionIterator that iterates
// over all skipped transaction hashes in the database starting at the provided index.
func IterateSkippedTransactionsFrom(db ethdb.Database, index uint64) SkippedTransactionIterator {
start := encodeBigEndian(index)
it := db.NewIterator(skippedTransactionHashPrefix, start)
keyLength := len(skippedTransactionHashPrefix) + 8

return SkippedTransactionIterator{
inner: it,
db: db,
keyLength: keyLength,
}
}

// Next moves the iterator to the next key/value pair.
// It returns false when the iterator is exhausted.
// TODO: Consider reading items in batches.
func (it *SkippedTransactionIterator) Next() bool {
for it.inner.Next() {
key := it.inner.Key()
if len(key) == it.keyLength {
return true
}
}
return false
}

// Index returns the index of the current skipped transaction hash.
func (it *SkippedTransactionIterator) Index() uint64 {
key := it.inner.Key()
raw := key[len(skippedTransactionHashPrefix) : len(skippedTransactionHashPrefix)+8]
index := binary.BigEndian.Uint64(raw)
return index
}

// TransactionHash returns the current skipped transaction hash.
func (it *SkippedTransactionIterator) TransactionHash() common.Hash {
data := it.inner.Value()
return common.BytesToHash(data)
}

// Release releases the associated resources.
func (it *SkippedTransactionIterator) Release() {
it.inner.Release()
}
134 changes: 134 additions & 0 deletions core/rawdb/accessors_skipped_txs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package rawdb

import (
"math/big"
"sync"
"testing"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
)

func TestReadWriteNumSkippedTransactions(t *testing.T) {
blockNumbers := []uint64{
1,
1 << 2,
1 << 8,
1 << 16,
1 << 32,
}

db := NewMemoryDatabase()
for _, num := range blockNumbers {
writeNumSkippedTransactions(db, num)
got := ReadNumSkippedTransactions(db)

if got != num {
t.Fatal("Num skipped transactions mismatch", "expected", num, "got", got)
}
}
}

func newTestTransaction(queueIndex uint64) *types.Transaction {
l1msg := types.L1MessageTx{
QueueIndex: queueIndex,
Gas: 0,
To: &common.Address{},
Value: big.NewInt(0),
Data: nil,
Sender: common.Address{},
}
return types.NewTx(&l1msg)
}

func TestReadWriteSkippedTransactionNoIndex(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
writeSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
}

func TestReadWriteSkippedTransaction(t *testing.T) {
tx := newTestTransaction(123)
db := NewMemoryDatabase()
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
got := ReadSkippedTransaction(db, tx.Hash())
if got == nil || got.Tx.Hash() != tx.Hash() || got.Reason != "random reason" || got.BlockNumber != 1 || got.BlockHash == nil || *got.BlockHash != (common.Hash{1}) {
t.Fatal("Skipped transaction mismatch", "got", got)
}
count := ReadNumSkippedTransactions(db)
if count != 1 {
t.Fatal("Skipped transaction count mismatch", "expected", 1, "got", count)
}
hash := ReadSkippedTransactionHash(db, 0)
if hash == nil || *hash != tx.Hash() {
t.Fatal("Skipped L1 message hash mismatch", "expected", tx.Hash(), "got", hash)
}
}

func TestSkippedTransactionConcurrentUpdate(t *testing.T) {
count := 20
tx := newTestTransaction(123)
db := NewMemoryDatabase()
var wg sync.WaitGroup
for ii := 0; ii < count; ii++ {
wg.Add(1)
go func() {
defer wg.Done()
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
}()
}
wg.Wait()
got := ReadNumSkippedTransactions(db)
if got != uint64(count) {
t.Fatal("Skipped transaction count mismatch", "expected", count, "got", got)
}
}

func TestIterateSkippedTransactions(t *testing.T) {
db := NewMemoryDatabase()

txs := []*types.Transaction{
newTestTransaction(1),
newTestTransaction(2),
newTestTransaction(3),
newTestTransaction(4),
newTestTransaction(5),
}

for _, tx := range txs {
WriteSkippedTransaction(db, tx, "random reason", 1, &common.Hash{1})
}

// simulate skipped L2 tx that's not included in the index
l2tx := newTestTransaction(6)
writeSkippedTransaction(db, l2tx, "random reason", 1, &common.Hash{1})

it := IterateSkippedTransactionsFrom(db, 2)
defer it.Release()

for ii := 2; ii < len(txs); ii++ {
finished := !it.Next()
if finished {
t.Fatal("Iterator terminated early", "ii", ii)
}

index := it.Index()
if index != uint64(ii) {
t.Fatal("Invalid skipped transaction index", "expected", ii, "got", index)
}

hash := it.TransactionHash()
if hash != txs[ii].Hash() {
t.Fatal("Invalid skipped transaction hash", "expected", txs[ii].Hash(), "got", hash)
}
}

finished := !it.Next()
if !finished {
t.Fatal("Iterator did not terminate")
}
}
Loading

0 comments on commit 7de261b

Please sign in to comment.