Skip to content

Commit

Permalink
fix(worker): short-circuit l1 message iteration (#525)
Browse files Browse the repository at this point in the history
* fix(worker): short-circuit l1 message iteration

* Update core/rawdb/accessors_l1_message_test.go

Co-authored-by: colin <[email protected]>

---------

Co-authored-by: colin <[email protected]>
  • Loading branch information
Thegaram and colinlyguo authored Sep 29, 2023
1 parent abcf48f commit fe8232a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
52 changes: 46 additions & 6 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,38 @@ func ReadSyncedL1BlockNumber(db ethdb.Reader) *uint64 {
return &value
}

// WriteHighestSyncedQueueIndex writes the highest synced L1 message queue index to the database.
func WriteHighestSyncedQueueIndex(db ethdb.KeyValueWriter, queueIndex uint64) {
value := big.NewInt(0).SetUint64(queueIndex).Bytes()

if err := db.Put(highestSyncedQueueIndexKey, value); err != nil {
log.Crit("Failed to update highest synced L1 message queue index", "err", err)
}
}

// ReadHighestSyncedQueueIndex retrieves the highest synced L1 message queue index.
func ReadHighestSyncedQueueIndex(db ethdb.Reader) uint64 {
data, err := db.Get(highestSyncedQueueIndexKey)
if err != nil && isNotFoundErr(err) {
return 0
}
if err != nil {
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
}
if len(data) == 0 {
return 0
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected highest synced L1 block number in database", "number", number)
}

return number.Uint64()
}

// WriteL1Message writes an L1 message to the database.
// We assume that L1 messages are written to DB following their queue index order.
func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
bytes, err := rlp.EncodeToBytes(l1Msg)
if err != nil {
Expand All @@ -52,6 +83,8 @@ func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
if err := db.Put(L1MessageKey(l1Msg.QueueIndex), bytes); err != nil {
log.Crit("Failed to store L1 message", "err", err)
}

WriteHighestSyncedQueueIndex(db, l1Msg.QueueIndex)
}

// WriteL1Messages writes an array of L1 messages to the database.
Expand Down Expand Up @@ -91,20 +124,23 @@ func ReadL1Message(db ethdb.Reader, queueIndex uint64) *types.L1MessageTx {
// allows us to iterate over L1 messages in the database. It
// implements an interface similar to ethdb.Iterator.
type L1MessageIterator struct {
inner ethdb.Iterator
keyLength int
inner ethdb.Iterator
keyLength int
maxQueueIndex uint64
}

// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8
maxQueueIndex := ReadHighestSyncedQueueIndex(db)

return L1MessageIterator{
inner: it,
keyLength: keyLength,
inner: it,
keyLength: keyLength,
maxQueueIndex: maxQueueIndex,
}
}

Expand Down Expand Up @@ -145,7 +181,7 @@ func (it *L1MessageIterator) Release() {
}

// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.L1MessageTx {
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)
it := IterateL1MessagesFrom(db, startIndex)
defer it.Release()
Expand All @@ -170,6 +206,10 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
msgs = append(msgs, msg)
index += 1
count -= 1

if msg.QueueIndex == it.maxQueueIndex {
break
}
}

return msgs
Expand Down
32 changes: 32 additions & 0 deletions core/rawdb/accessors_l1_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func TestReadWriteL1Message(t *testing.T) {
if got == nil || got.QueueIndex != queueIndex {
t.Fatal("L1 message mismatch", "expected", queueIndex, "got", got)
}

max := ReadHighestSyncedQueueIndex(db)
if max != 123 {
t.Fatal("max index mismatch", "expected", 123, "got", max)
}
}

func TestIterateL1Message(t *testing.T) {
Expand All @@ -62,6 +67,11 @@ func TestIterateL1Message(t *testing.T) {
db := NewMemoryDatabase()
WriteL1Messages(db, msgs)

max := ReadHighestSyncedQueueIndex(db)
if max != 1000 {
t.Fatal("max index mismatch", "expected", 1000, "got", max)
}

it := IterateL1MessagesFrom(db, 103)
defer it.Release()

Expand Down Expand Up @@ -125,3 +135,25 @@ func TestReadWriteLastL1MessageInL2Block(t *testing.T) {
}
}
}

func TestIterationStopsAtMaxQueueIndex(t *testing.T) {
msgs := []types.L1MessageTx{
newL1MessageTx(100),
newL1MessageTx(101),
newL1MessageTx(102),
newL1MessageTx(103),
}

db := NewMemoryDatabase()
WriteL1Messages(db, msgs)

// artificially change max index from 103 to 102
WriteHighestSyncedQueueIndex(db, 102)

// iteration should terminate at 102 and not read 103
got := ReadL1MessagesFrom(db, 100, 10)

if len(got) != 3 {
t.Fatal("Invalid length", "expected", 3, "got", len(got))
}
}
1 change: 1 addition & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var (
syncedL1BlockNumberKey = []byte("LastSyncedL1BlockNumber")
l1MessagePrefix = []byte("l1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")

// Row consumption
rowConsumptionPrefix = []byte("rc") // rowConsumptionPrefix + hash -> row consumption by block
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 4 // Major version component of the current release
VersionMinor = 4 // Minor version component of the current release
VersionPatch = 13 // Patch version component of the current release
VersionPatch = 14 // Patch version component of the current release
VersionMeta = "sepolia" // Version metadata to append to the version string
)

Expand Down

0 comments on commit fe8232a

Please sign in to comment.