diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 601302e536..f870e3a6fa 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbnode/dataposter/storage" + "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" @@ -67,7 +68,7 @@ type BatchPoster struct { building *buildingBatch daWriter das.DataAvailabilityServiceWriter dataPoster *dataposter.DataPoster - redisLock *SimpleRedisLock + redisLock *redislock.Simple firstAccErr time.Time // first time a continuous missing accumulator occurred backlog uint64 // An estimate of the number of unposted batches @@ -99,7 +100,7 @@ type BatchPosterConfig struct { GasRefunderAddress string `koanf:"gas-refunder-address" reload:"hot"` DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"` RedisUrl string `koanf:"redis-url"` - RedisLock SimpleRedisLockConfig `koanf:"redis-lock" reload:"hot"` + RedisLock redislock.SimpleCfg `koanf:"redis-lock" reload:"hot"` ExtraBatchGas uint64 `koanf:"extra-batch-gas" reload:"hot"` L1Wallet genericconf.WalletConfig `koanf:"parent-chain-wallet"` L1BlockBound string `koanf:"l1-block-bound" reload:"hot"` @@ -150,7 +151,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".redis-url", DefaultBatchPosterConfig.RedisUrl, "if non-empty, the Redis URL to store queued transactions in") f.String(prefix+".l1-block-bound", DefaultBatchPosterConfig.L1BlockBound, "only post messages to batches when they're within the max future block/timestamp as of this L1 block tag (\"safe\", \"finalized\", \"latest\", or \"ignore\" to ignore this check)") f.Duration(prefix+".l1-block-bound-bypass", DefaultBatchPosterConfig.L1BlockBoundBypass, "post batches even if not within the layer 1 future bounds if we're within this margin of the max delay") - RedisLockConfigAddOptions(prefix+".redis-lock", f) + redislock.AddConfigOptions(prefix+".redis-lock", f) dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f) genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.L1Wallet.Pathname) } @@ -218,10 +219,10 @@ func NewBatchPoster(dataPosterDB ethdb.Database, l1Reader *headerreader.HeaderRe if err != nil { return nil, err } - redisLockConfigFetcher := func() *SimpleRedisLockConfig { + redisLockConfigFetcher := func() *redislock.SimpleCfg { return &config().RedisLock } - redisLock, err := NewSimpleRedisLock(redisClient, redisLockConfigFetcher, func() bool { return syncMonitor.Synced() }) + redisLock, err := redislock.NewSimple(redisClient, redisLockConfigFetcher, func() bool { return syncMonitor.Synced() }) if err != nil { return nil, err } @@ -871,8 +872,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error) if err != nil { return false, err } - err = b.dataPoster.PostTransaction(ctx, firstMsgTime, nonce, newMeta, b.seqInboxAddr, data, gasLimit) - if err != nil { + if _, err := b.dataPoster.PostTransaction(ctx, firstMsgTime, nonce, newMeta, b.seqInboxAddr, data, gasLimit, new(big.Int)); err != nil { return false, err } log.Info( diff --git a/arbnode/dataposter/data_poster.go b/arbnode/dataposter/data_poster.go index 2efa234de5..656d3cf4bd 100644 --- a/arbnode/dataposter/data_poster.go +++ b/arbnode/dataposter/data_poster.go @@ -75,7 +75,7 @@ func parseReplacementTimes(val string) ([]time.Duration, error) { for _, s := range strings.Split(val, ",") { t, err := time.ParseDuration(s) if err != nil { - return nil, err + return nil, fmt.Errorf("parsing durations: %w", err) } if t <= lastReplacementTime { return nil, errors.New("replacement times must be increasing") @@ -273,16 +273,16 @@ func (p *DataPoster) feeAndTipCaps(ctx context.Context, gasLimit uint64, lastFee return newFeeCap, newTipCap, nil } -func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Time, nonce uint64, meta []byte, to common.Address, calldata []byte, gasLimit uint64) error { +func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Time, nonce uint64, meta []byte, to common.Address, calldata []byte, gasLimit uint64, value *big.Int) (*types.Transaction, error) { p.mutex.Lock() defer p.mutex.Unlock() err := p.updateBalance(ctx) if err != nil { - return fmt.Errorf("failed to update data poster balance: %w", err) + return nil, fmt.Errorf("failed to update data poster balance: %w", err) } feeCap, tipCap, err := p.feeAndTipCaps(ctx, gasLimit, nil, nil, dataCreatedAt, 0) if err != nil { - return err + return nil, err } inner := types.DynamicFeeTx{ Nonce: nonce, @@ -290,12 +290,12 @@ func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Tim GasFeeCap: feeCap, Gas: gasLimit, To: &to, - Value: new(big.Int), + Value: value, Data: calldata, } fullTx, err := p.signer(p.sender, types.NewTx(&inner)) if err != nil { - return err + return nil, fmt.Errorf("signing transaction: %w", err) } queuedTx := storage.QueuedTransaction{ Data: inner, @@ -305,7 +305,7 @@ func (p *DataPoster) PostTransaction(ctx context.Context, dataCreatedAt time.Tim Created: dataCreatedAt, NextReplacement: time.Now().Add(p.replacementTimes[0]), } - return p.sendTx(ctx, nil, &queuedTx) + return fullTx, p.sendTx(ctx, nil, &queuedTx) } // the mutex must be held by the caller diff --git a/arbnode/dataposter/storage/storage.go b/arbnode/dataposter/storage/storage.go index e6af3818c7..174ab131ac 100644 --- a/arbnode/dataposter/storage/storage.go +++ b/arbnode/dataposter/storage/storage.go @@ -10,7 +10,8 @@ import ( var ( ErrStorageRace = errors.New("storage race error") - DataPosterPrefix string = "d" // the prefix for all data poster keys + BlockValidatorPrefix string = "v" // the prefix for all block validator keys + BatchPosterPrefix string = "b" // the prefix for all batch poster keys // TODO(anodar): move everything else from schema.go file to here once // execution split is complete. ) diff --git a/arbnode/maintenance.go b/arbnode/maintenance.go index 8b0c7811ea..2b1837a25b 100644 --- a/arbnode/maintenance.go +++ b/arbnode/maintenance.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/util/stopwaiter" flag "github.com/spf13/pflag" ) @@ -27,12 +28,12 @@ type MaintenanceRunner struct { // lock is used to ensures that at any given time, only single node is on // maintenance mode. - lock *SimpleRedisLock + lock *redislock.Simple } type MaintenanceConfig struct { - TimeOfDay string `koanf:"time-of-day" reload:"hot"` - Lock SimpleRedisLockConfig `koanf:"lock" reload:"hot"` + TimeOfDay string `koanf:"time-of-day" reload:"hot"` + Lock redislock.SimpleCfg `koanf:"lock" reload:"hot"` // Generated: the minutes since start of UTC day to compact at minutesAfterMidnight int @@ -70,7 +71,7 @@ func (c *MaintenanceConfig) Validate() error { func MaintenanceConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".time-of-day", DefaultMaintenanceConfig.TimeOfDay, "UTC 24-hour time of day to run maintenance (currently only db compaction) at (e.g. 15:00)") - RedisLockConfigAddOptions(prefix+".lock", f) + redislock.AddConfigOptions(prefix+".lock", f) } var DefaultMaintenanceConfig = MaintenanceConfig{ @@ -94,9 +95,9 @@ func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCo } if seqCoordinator != nil { - c := func() *SimpleRedisLockConfig { return &cfg.Lock } + c := func() *redislock.SimpleCfg { return &cfg.Lock } r := func() bool { return true } // always ready to lock - rl, err := NewSimpleRedisLock(seqCoordinator.Client, c, r) + rl, err := redislock.NewSimple(seqCoordinator.Client, c, r) if err != nil { return nil, fmt.Errorf("creating new simple redis lock: %w", err) } diff --git a/arbnode/node.go b/arbnode/node.go index 35a104d48f..8bb17045c1 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -26,7 +26,10 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/offchainlabs/nitro/arbnode/dataposter" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbnode/execution" + "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" @@ -41,6 +44,7 @@ import ( "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/contracts" "github.com/offchainlabs/nitro/util/headerreader" + "github.com/offchainlabs/nitro/util/redisutil" "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/wsbroadcastserver" ) @@ -539,6 +543,29 @@ func checkArbDbSchemaVersion(arbDb ethdb.Database) error { return nil } +func ValidatorDataposter(db ethdb.Database, l1Reader *headerreader.HeaderReader, + transactOpts *bind.TransactOpts, cfgFetcher ConfigFetcher, syncMonitor *SyncMonitor) (*dataposter.DataPoster, error) { + cfg := cfgFetcher.Get() + mdRetriever := func(ctx context.Context, blockNum *big.Int) ([]byte, error) { + return nil, nil + } + redisC, err := redisutil.RedisClientFromURL(cfg.BlockValidator.RedisUrl) + if err != nil { + return nil, fmt.Errorf("creating redis client from url: %w", err) + } + lockCfgFetcher := func() *redislock.SimpleCfg { + return &cfg.BlockValidator.RedisLock + } + redisLock, err := redislock.NewSimple(redisC, lockCfgFetcher, func() bool { return syncMonitor.Synced() }) + if err != nil { + return nil, err + } + dpCfg := func() *dataposter.DataPosterConfig { + return &cfg.BlockValidator.DataPoster + } + return dataposter.NewDataPoster(db, l1Reader, transactOpts, redisC, redisLock, dpCfg, mdRetriever) +} + func createNodeImpl( ctx context.Context, stack *node.Node, @@ -741,7 +768,7 @@ func createNodeImpl( inboxTracker, txStreamer, exec.Recorder, - rawdb.NewTable(arbDb, BlockValidatorPrefix), + rawdb.NewTable(arbDb, storage.BlockValidatorPrefix), daReader, func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator }, stack, @@ -775,6 +802,16 @@ func createNodeImpl( var messagePruner *MessagePruner if config.Staker.Enable { + dp, err := ValidatorDataposter( + rawdb.NewTable(arbDb, storage.BlockValidatorPrefix), + l1Reader, + txOptsValidator, + configFetcher, + syncMonitor, + ) + if err != nil { + return nil, err + } var wallet staker.ValidatorWalletInterface if config.Staker.UseSmartContractWallet || txOptsValidator == nil { var existingWalletAddress *common.Address @@ -794,7 +831,7 @@ func createNodeImpl( if len(config.Staker.ContractWalletAddress) > 0 { return nil, errors.New("validator contract wallet specified but flag to use a smart contract wallet was not specified") } - wallet, err = staker.NewEoaValidatorWallet(deployInfo.Rollup, l1client, txOptsValidator) + wallet, err = staker.NewEoaValidatorWallet(dp, deployInfo.Rollup, l1client, txOptsValidator) if err != nil { return nil, err } @@ -833,7 +870,7 @@ func createNodeImpl( if txOptsBatchPoster == nil { return nil, errors.New("batchposter, but no TxOpts") } - batchPoster, err = NewBatchPoster(rawdb.NewTable(arbDb, BlockValidatorPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) + batchPoster, err = NewBatchPoster(rawdb.NewTable(arbDb, storage.BatchPosterPrefix), l1Reader, inboxTracker, txStreamer, syncMonitor, func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, deployInfo, txOptsBatchPoster, daWriter) if err != nil { return nil, err } diff --git a/arbnode/simple_redis_lock.go b/arbnode/redislock/redis.go similarity index 71% rename from arbnode/simple_redis_lock.go rename to arbnode/redislock/redis.go index f6f37cc42d..c02476f04a 100644 --- a/arbnode/simple_redis_lock.go +++ b/arbnode/redislock/redis.go @@ -1,4 +1,4 @@ -package arbnode +package redislock import ( "context" @@ -8,6 +8,7 @@ import ( "math/big" "strconv" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" @@ -16,10 +17,10 @@ import ( flag "github.com/spf13/pflag" ) -type SimpleRedisLock struct { +type Simple struct { stopwaiter.StopWaiter client redis.UniversalClient - config SimpleRedisLockConfigFetcher + config SimpleCfgFetcher lockedUntil int64 mutex sync.Mutex stopping bool @@ -27,7 +28,7 @@ type SimpleRedisLock struct { myId string } -type SimpleRedisLockConfig struct { +type SimpleCfg struct { MyId string `koanf:"my-id"` LockoutDuration time.Duration `koanf:"lockout-duration" reload:"hot"` RefreshDuration time.Duration `koanf:"refresh-duration" reload:"hot"` @@ -35,22 +36,22 @@ type SimpleRedisLockConfig struct { BackgroundLock bool `koanf:"background-lock"` } -type SimpleRedisLockConfigFetcher func() *SimpleRedisLockConfig +type SimpleCfgFetcher func() *SimpleCfg -func RedisLockConfigAddOptions(prefix string, f *flag.FlagSet) { +func AddConfigOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".my-id", "", "this node's id prefix when acquiring the lock (optional)") - f.Duration(prefix+".lockout-duration", DefaultRedisLockConfig.LockoutDuration, "how long lock is held") - f.Duration(prefix+".refresh-duration", DefaultRedisLockConfig.RefreshDuration, "how long between consecutive calls to redis") + f.Duration(prefix+".lockout-duration", DefaultCfg.LockoutDuration, "how long lock is held") + f.Duration(prefix+".refresh-duration", DefaultCfg.RefreshDuration, "how long between consecutive calls to redis") f.String(prefix+".key", prefix+".simple-lock-key", "key for lock") - f.Bool(prefix+".background-lock", DefaultRedisLockConfig.BackgroundLock, "should node always try grabing lock in background") + f.Bool(prefix+".background-lock", DefaultCfg.BackgroundLock, "should node always try grabing lock in background") } -func NewSimpleRedisLock(client redis.UniversalClient, config SimpleRedisLockConfigFetcher, readyToLock func() bool) (*SimpleRedisLock, error) { +func NewSimple(client redis.UniversalClient, config SimpleCfgFetcher, readyToLock func() bool) (*Simple, error) { randBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt64)) if err != nil { return nil, err } - return &SimpleRedisLock{ + return &Simple{ myId: config().MyId + "-" + strconv.FormatInt(randBig.Int64(), 16), // unique even if config is not client: client, config: config, @@ -58,14 +59,14 @@ func NewSimpleRedisLock(client redis.UniversalClient, config SimpleRedisLockConf }, nil } -var DefaultRedisLockConfig = SimpleRedisLockConfig{ +var DefaultCfg = SimpleCfg{ LockoutDuration: time.Minute, RefreshDuration: time.Second * 10, Key: "", BackgroundLock: false, } -func (l *SimpleRedisLock) attemptLock(ctx context.Context) (bool, error) { +func (l *Simple) attemptLock(ctx context.Context) (bool, error) { l.mutex.Lock() defer l.mutex.Unlock() if l.stopping || l.client == nil { @@ -120,7 +121,7 @@ func (l *SimpleRedisLock) attemptLock(ctx context.Context) (bool, error) { return gotLock, nil } -func (l *SimpleRedisLock) AttemptLock(ctx context.Context) bool { +func (l *Simple) AttemptLock(ctx context.Context) bool { if l.Locked() { return true } @@ -135,14 +136,14 @@ func (l *SimpleRedisLock) AttemptLock(ctx context.Context) bool { return res } -func (l *SimpleRedisLock) Locked() bool { +func (l *Simple) Locked() bool { if l.client == nil { return true } return time.Now().Before(atomicTimeRead(&l.lockedUntil)) } -func (l *SimpleRedisLock) Release(ctx context.Context) { +func (l *Simple) Release(ctx context.Context) { l.mutex.Lock() defer l.mutex.Unlock() @@ -179,7 +180,7 @@ func (l *SimpleRedisLock) Release(ctx context.Context) { } } -func (l *SimpleRedisLock) Start(ctxin context.Context) { +func (l *Simple) Start(ctxin context.Context) { l.StopWaiter.Start(ctxin, l) if l.config().BackgroundLock && l.client != nil { l.CallIteratively(func(ctx context.Context) time.Duration { @@ -192,10 +193,34 @@ func (l *SimpleRedisLock) Start(ctxin context.Context) { } } -func (l *SimpleRedisLock) StopAndWait() { +func (l *Simple) StopAndWait() { l.mutex.Lock() l.stopping = true l.mutex.Unlock() l.Release(l.GetContext()) l.StopWaiter.StopAndWait() } + +func execTestPipe(pipe redis.Pipeliner, ctx context.Context) error { + cmders, err := pipe.Exec(ctx) + if err != nil { + return err + } + for _, cmder := range cmders { + if err := cmder.Err(); err != nil { + return err + } + } + return nil +} + +// notice: It is possible for two consecutive reads to get decreasing values. That shouldn't matter. +func atomicTimeRead(addr *int64) time.Time { + asint64 := atomic.LoadInt64(addr) + return time.UnixMilli(asint64) +} + +func atomicTimeWrite(addr *int64, t time.Time) { + asint64 := t.UnixMilli() + atomic.StoreInt64(addr, asint64) +} diff --git a/arbnode/schema.go b/arbnode/schema.go index dddff11753..ddc7cf54fd 100644 --- a/arbnode/schema.go +++ b/arbnode/schema.go @@ -4,7 +4,6 @@ package arbnode var ( - BlockValidatorPrefix string = "v" // the prefix for all block validator keys messagePrefix []byte = []byte("m") // maps a message sequence number to a message legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1 rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message diff --git a/arbnode/simple_redis_lock_test.go b/arbnode/simple_redis_lock_test.go index 35947279b8..b7506145c3 100644 --- a/arbnode/simple_redis_lock_test.go +++ b/arbnode/simple_redis_lock_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/util/redisutil" ) @@ -20,7 +21,7 @@ const test_release_frac = 5 const test_delay = time.Millisecond const test_redisKey_prefix = "__TEMP_SimpleRedisLockTest__" -func attemptLock(ctx context.Context, s *SimpleRedisLock, flag *int32, wg *sync.WaitGroup) { +func attemptLock(ctx context.Context, s *redislock.Simple, flag *int32, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < test_attempts; i++ { if s.AttemptLock(ctx) { @@ -46,22 +47,22 @@ func simpleRedisLockTest(t *testing.T, redisKeySuffix string, chosen int, backgo Require(t, err) Require(t, redisClient.Del(ctx, redisKey).Err()) - conf := &SimpleRedisLockConfig{ + conf := &redislock.SimpleCfg{ LockoutDuration: test_delay * test_attempts * 10, RefreshDuration: test_delay * 2, Key: redisKey, BackgroundLock: backgound, } - confFetcher := func() *SimpleRedisLockConfig { return conf } + confFetcher := func() *redislock.SimpleCfg { return conf } - locks := make([]*SimpleRedisLock, 0) + locks := make([]*redislock.Simple, 0) for i := 0; i < test_threads; i++ { var err error - var lock *SimpleRedisLock + var lock *redislock.Simple if chosen < 0 || chosen == i { - lock, err = NewSimpleRedisLock(redisClient, confFetcher, prepareTrue) + lock, err = redislock.NewSimple(redisClient, confFetcher, prepareTrue) } else { - lock, err = NewSimpleRedisLock(redisClient, confFetcher, prepareFalse) + lock, err = redislock.NewSimple(redisClient, confFetcher, prepareFalse) } if err != nil { t.Fatal(err) diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 399555098c..a1df2cbb2f 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbnode/execution" "github.com/offchainlabs/nitro/arbos/arbosState" "github.com/offchainlabs/nitro/arbos/arbostypes" @@ -329,7 +330,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node log.Warn("missing latest confirmed block", "hash", confirmedHash) } - validatorDb := rawdb.NewTable(arbDb, arbnode.BlockValidatorPrefix) + validatorDb := rawdb.NewTable(arbDb, storage.BlockValidatorPrefix) lastValidated, err := staker.ReadLastValidatedInfo(validatorDb) if err != nil { return nil, err diff --git a/staker/block_validator.go b/staker/block_validator.go index 0ff74a8014..58c20c4cf6 100644 --- a/staker/block_validator.go +++ b/staker/block_validator.go @@ -19,6 +19,8 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/offchainlabs/nitro/arbnode/dataposter" + "github.com/offchainlabs/nitro/arbnode/redislock" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/rpcclient" @@ -86,6 +88,9 @@ type BlockValidatorConfig struct { PendingUpgradeModuleRoot string `koanf:"pending-upgrade-module-root"` // TODO(magic) requires StatelessBlockValidator recreation on hot reload FailureIsFatal bool `koanf:"failure-is-fatal" reload:"hot"` Dangerous BlockValidatorDangerousConfig `koanf:"dangerous"` + DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"` + RedisUrl string `koanf:"redis-url"` + RedisLock redislock.SimpleCfg `koanf:"redis-lock" reload:"hot"` } func (c *BlockValidatorConfig) Validate() error { @@ -108,6 +113,9 @@ func BlockValidatorConfigAddOptions(prefix string, f *flag.FlagSet) { f.String(prefix+".pending-upgrade-module-root", DefaultBlockValidatorConfig.PendingUpgradeModuleRoot, "pending upgrade wasm module root to additionally validate (hash, 'latest' or empty)") f.Bool(prefix+".failure-is-fatal", DefaultBlockValidatorConfig.FailureIsFatal, "failing a validation is treated as a fatal error") BlockValidatorDangerousConfigAddOptions(prefix+".dangerous", f) + dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f) + f.String(prefix+".redis-url", DefaultBlockValidatorConfig.RedisUrl, "redis url for block validator") + redislock.AddConfigOptions(prefix+".redis-lock", f) } func BlockValidatorDangerousConfigAddOptions(prefix string, f *flag.FlagSet) { @@ -124,6 +132,9 @@ var DefaultBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + DataPoster: dataposter.DefaultDataPosterConfig, + RedisUrl: "", + RedisLock: redislock.DefaultCfg, } var TestBlockValidatorConfig = BlockValidatorConfig{ @@ -136,6 +147,9 @@ var TestBlockValidatorConfig = BlockValidatorConfig{ PendingUpgradeModuleRoot: "latest", FailureIsFatal: true, Dangerous: DefaultBlockValidatorDangerousConfig, + DataPoster: dataposter.TestDataPosterConfig, + RedisUrl: "", + RedisLock: redislock.DefaultCfg, } var DefaultBlockValidatorDangerousConfig = BlockValidatorDangerousConfig{ diff --git a/staker/eoa_validator_wallet.go b/staker/eoa_validator_wallet.go index f7b83aac9f..f514969434 100644 --- a/staker/eoa_validator_wallet.go +++ b/staker/eoa_validator_wallet.go @@ -5,30 +5,41 @@ package staker import ( "context" + "fmt" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbnode/dataposter" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/solgen/go/challengegen" "github.com/offchainlabs/nitro/solgen/go/rollupgen" + "github.com/offchainlabs/nitro/util/stopwaiter" ) type EoaValidatorWallet struct { + stopwaiter.StopWaiter auth *bind.TransactOpts client arbutil.L1Interface rollupAddress common.Address challengeManager *challengegen.ChallengeManager challengeManagerAddress common.Address + dataPoster *dataposter.DataPoster + txCount atomic.Uint64 } var _ ValidatorWalletInterface = (*EoaValidatorWallet)(nil) -func NewEoaValidatorWallet(rollupAddress common.Address, l1Client arbutil.L1Interface, auth *bind.TransactOpts) (*EoaValidatorWallet, error) { +func NewEoaValidatorWallet(dataPoster *dataposter.DataPoster, rollupAddress common.Address, l1Client arbutil.L1Interface, auth *bind.TransactOpts) (*EoaValidatorWallet, error) { return &EoaValidatorWallet{ auth: auth, client: l1Client, rollupAddress: rollupAddress, + dataPoster: dataPoster, + txCount: atomic.Uint64{}, }, nil } @@ -75,13 +86,52 @@ func (w *EoaValidatorWallet) TestTransactions(context.Context, []*types.Transact return nil } +// Polls until the nonce from dataposter catches up with transactions posted +// by validator wallet. +func (w *EoaValidatorWallet) pollForNonce(ctx context.Context) (uint64, error) { + var nonce uint64 + flag := true + for flag { + var err error + select { + // TODO: consider adding config for eoa validator wallet and pull this + // polling time from there. + case <-time.After(100 * time.Millisecond): + nonce, _, err = w.dataPoster.GetNextNonceAndMeta(ctx) + if err != nil { + return 0, fmt.Errorf("get next nonce and meta: %w", err) + } + if nonce >= w.txCount.Load() { + flag = false + break + } + log.Warn("Dataposter nonce too low", "nonce", nonce, "validator tx count", w.txCount.Load()) + case <-ctx.Done(): + return 0, ctx.Err() + } + } + return nonce, nil +} + func (w *EoaValidatorWallet) ExecuteTransactions(ctx context.Context, builder *ValidatorTxBuilder, _ common.Address) (*types.Transaction, error) { if len(builder.transactions) == 0 { return nil, nil } + nonce, err := w.pollForNonce(ctx) + if err != nil { + return nil, fmt.Errorf("polling for dataposter nonce to catch up: %w", err) + } + if nonce > w.txCount.Load() { + // If this happens, it probably means the dataposter is used by another client, besides validator. + log.Warn("Precondition failure, dataposter nonce is higher than validator transactio count", "dataposter nonce", nonce, "validator tx count", w.txCount.Load()) + } tx := builder.transactions[0] // we ignore future txs and only execute the first - err := w.client.SendTransaction(ctx, tx) - return tx, err + trans, err := w.dataPoster.PostTransaction(ctx, time.Now(), nonce, nil, *tx.To(), tx.Data(), tx.Gas(), tx.Value()) + if err != nil { + return nil, fmt.Errorf("post transaction: %w", err) + } + w.txCount.Store(nonce) + return trans, nil } func (w *EoaValidatorWallet) TimeoutChallenges(ctx context.Context, timeouts []uint64) (*types.Transaction, error) { @@ -100,3 +150,13 @@ func (w *EoaValidatorWallet) CanBatchTxs() bool { func (w *EoaValidatorWallet) AuthIfEoa() *bind.TransactOpts { return w.auth } + +func (w *EoaValidatorWallet) Start(ctx context.Context) { + w.dataPoster.Start(ctx) + w.StopWaiter.Start(ctx, w) +} + +func (b *EoaValidatorWallet) StopAndWait() { + b.StopWaiter.StopAndWait() + b.dataPoster.StopAndWait() +} diff --git a/staker/staker.go b/staker/staker.go index 230c381d05..1fe1b83fcf 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -335,7 +335,13 @@ func (s *Staker) getLatestStakedState(ctx context.Context, staker common.Address return latestStaked, count, &globalState, nil } +func (s *Staker) StopAndWait() { + s.StopWaiter.StopAndWait() + s.wallet.StopAndWait() +} + func (s *Staker) Start(ctxIn context.Context) { + s.wallet.Start(ctxIn) s.StopWaiter.Start(ctxIn, s) backoff := time.Second s.CallIteratively(func(ctx context.Context) (returningWait time.Duration) { diff --git a/staker/validator_wallet.go b/staker/validator_wallet.go index d878749f35..cdc3dd28c2 100644 --- a/staker/validator_wallet.go +++ b/staker/validator_wallet.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/solgen/go/rollupgen" + "github.com/offchainlabs/nitro/util/stopwaiter" ) var validatorABI abi.ABI @@ -52,9 +53,12 @@ type ValidatorWalletInterface interface { TimeoutChallenges(context.Context, []uint64) (*types.Transaction, error) CanBatchTxs() bool AuthIfEoa() *bind.TransactOpts + Start(context.Context) + StopAndWait() } type ContractValidatorWallet struct { + stopwaiter.StopWaiter con *rollupgen.ValidatorWallet address atomic.Pointer[common.Address] onWalletCreated func(common.Address) @@ -334,6 +338,14 @@ func (v *ContractValidatorWallet) AuthIfEoa() *bind.TransactOpts { return nil } +func (w *ContractValidatorWallet) Start(ctx context.Context) { + w.StopWaiter.Start(ctx, w) +} + +func (b *ContractValidatorWallet) StopAndWait() { + b.StopWaiter.StopAndWait() +} + func GetValidatorWalletContract( ctx context.Context, validatorWalletFactoryAddr common.Address, diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 2aae1fc794..9a09d72458 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -18,12 +18,14 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/offchainlabs/nitro/arbnode" + "github.com/offchainlabs/nitro/arbnode/dataposter/storage" "github.com/offchainlabs/nitro/arbos/l2pricing" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/solgen/go/mocksgen" @@ -172,7 +174,11 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) } Require(t, err) - valWalletB, err := staker.NewEoaValidatorWallet(l2nodeB.DeployInfo.Rollup, l2nodeB.L1Reader.Client(), &l1authB) + dp, err := arbnode.ValidatorDataposter(rawdb.NewTable(l2nodeB.ArbDB, storage.BlockValidatorPrefix), l2nodeB.L1Reader, &l1authB, NewFetcherFromConfig(arbnode.ConfigDefaultL1NonSequencerTest()), nil) + if err != nil { + t.Fatalf("Error creating validator dataposter: %v", err) + } + valWalletB, err := staker.NewEoaValidatorWallet(dp, l2nodeB.DeployInfo.Rollup, l2nodeB.L1Reader.Client(), &l1authB) Require(t, err) valConfig.Strategy = "MakeNodes" statelessB, err := staker.NewStatelessBlockValidator(