From b1ad4035f301effe27cb6bf50dcb9d4dc5a03e03 Mon Sep 17 00:00:00 2001 From: shailu-s Date: Wed, 13 Nov 2024 03:01:25 +0530 Subject: [PATCH] Initialised pipeline for coordinator --- sequencer/batchbuilder/batchbuilder.go | 18 + sequencer/common/account.go | 6 + sequencer/common/batch.go | 5 + sequencer/common/pooll2tx.go | 25 ++ sequencer/coordinator/batch.go | 55 ++- sequencer/coordinator/pipeline.go | 594 +++++++++++++++++++++++++ sequencer/coordinator/proverspool.go | 33 +- sequencer/coordinator/purger.go | 75 ++++ 8 files changed, 792 insertions(+), 19 deletions(-) diff --git a/sequencer/batchbuilder/batchbuilder.go b/sequencer/batchbuilder/batchbuilder.go index 1e6babf..a1d7cb3 100644 --- a/sequencer/batchbuilder/batchbuilder.go +++ b/sequencer/batchbuilder/batchbuilder.go @@ -58,3 +58,21 @@ func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) e //TODO: Check and Update this reseting functionality // return tracerr.Wrap(bb.localStateDB.Reset(batchNum, fromSynchronizer)) } + +// BuildBatch takes the transactions and returns the common.ZKInputs of the next batch +func (bb *BatchBuilder) BuildBatch(configBatch *ConfigBatch, l1usertxs []common.L1Tx, pooll2txs []common.PoolL2Tx) (*common.ZKInputs, error) { + bbStateDB := bb.localStateDB.StateDB + tp := txprocessor.NewTxProcessor(bbStateDB, configBatch.TxProcessorConfig) + + //TODO: Need to update this once PR which has updates regarding tx processor is merged + ptOut, err := tp.ProcessTxs(nil, l1usertxs, nil, pooll2txs) + if err != nil { + return nil, common.Wrap(err) + } + return ptOut.ZKInputs, nil +} + +// LocalStateDB returns the underlying LocalStateDB +func (bb *BatchBuilder) LocalStateDB() *statedb.LocalStateDB { + return bb.localStateDB +} diff --git a/sequencer/common/account.go b/sequencer/common/account.go index fae6852..23c42df 100644 --- a/sequencer/common/account.go +++ b/sequencer/common/account.go @@ -205,6 +205,12 @@ func AccountFromBytes(b [32 * NAccountLeafElems]byte) (*Account, error) { return &a, nil } +// IdxNonce is a pair of Idx and Nonce representing an account +type IdxAccountNonce struct { + Idx AccountIdx `db:"idx"` + Nonce Nonce `db:"nonce"` +} + // AccountUpdate represents an account balance and/or nonce update after a // processed batch type AccountUpdate struct { diff --git a/sequencer/common/batch.go b/sequencer/common/batch.go index 8a07555..483f6f8 100644 --- a/sequencer/common/batch.go +++ b/sequencer/common/batch.go @@ -42,6 +42,11 @@ func (bn BatchNum) Bytes() []byte { return batchNumBytes[:] } +// BigInt returns a *big.Int representing the BatchNum +func (bn BatchNum) BigInt() *big.Int { + return big.NewInt(int64(bn)) +} + // BatchNumFromBytes returns BatchNum from a []byte func BatchNumFromBytes(b []byte) (BatchNum, error) { if len(b) != batchNumBytesLen { diff --git a/sequencer/common/pooll2tx.go b/sequencer/common/pooll2tx.go index e1f08e2..326e62e 100644 --- a/sequencer/common/pooll2tx.go +++ b/sequencer/common/pooll2tx.go @@ -73,6 +73,13 @@ type PoolL2Tx struct { // PoolL2TxState is a string that represents the status of a L2 transaction type PoolL2TxState string +// TxSelectorError struct that gives more details about the error +type TxSelectorError struct { + Message string `meddler:"info,zeroisnull"` + Code int `meddler:"error_code,zeroisnull"` + Type string `meddler:"error_type,zeroisnull"` +} + const ( // PoolL2TxStatePending represents a valid L2Tx that hasn't started the // forging process @@ -347,3 +354,21 @@ func (tx *PoolL2Tx) HashToSign(chainID uint16) (*big.Int, error) { return poseidon.Hash([]*big.Int{toCompressedData, e1, toBJJY, rqTxCompressedDataV2, rqToEthAddr, rqToBJJY}) } + +// TxIDsFromPoolL2Txs returns an array of TxID from the []PoolL2Tx +func TxIDsFromPoolL2Txs(txs []PoolL2Tx) []TxID { + txIDs := make([]TxID, len(txs)) + for i, tx := range txs { + txIDs[i] = tx.TxID + } + return txIDs +} + +// PoolL2TxsToL2Txs returns an array of []L2Tx from an array of []PoolL2Tx +func PoolL2TxsToL2Txs(txs []PoolL2Tx) ([]L2Tx, error) { + l2Txs := make([]L2Tx, len(txs)) + for i, poolTx := range txs { + l2Txs[i] = poolTx.L2Tx() + } + return l2Txs, nil +} diff --git a/sequencer/coordinator/batch.go b/sequencer/coordinator/batch.go index 90dfa5c..f3760cd 100644 --- a/sequencer/coordinator/batch.go +++ b/sequencer/coordinator/batch.go @@ -1,7 +1,11 @@ package coordinator import ( + "encoding/json" + "fmt" + "io/ioutil" "math/big" + "path" "time" "tokamak-sybil-resistance/common" "tokamak-sybil-resistance/coordinator/prover" @@ -72,24 +76,21 @@ type Debug struct { // BatchInfo contans the Batch information type BatchInfo struct { - PipelineNum int - BatchNum common.BatchNum - ServerProof prover.Client - ProofStart time.Time - ZKInputs *common.ZKInputs - Proof *prover.Proof - PublicInputs []*big.Int - L1Batch bool - VerifierIdx uint8 - L1UserTxs []common.L1Tx - L1CoordTxs []common.L1Tx - L1CoordinatorTxsAuths [][]byte - L2Txs []common.L2Tx - CoordIdxs []common.AccountIdx - ForgeBatchArgs *eth.RollupForgeBatchArgs - Auth *bind.TransactOpts `json:"-"` - EthTxs []*types.Transaction - EthTxsErrs []error + PipelineNum int + BatchNum common.BatchNum + ServerProof prover.Client + ProofStart time.Time + ZKInputs *common.ZKInputs + Proof *prover.Proof + PublicInputs []*big.Int + L1Batch bool + L1UserTxs []common.L1Tx + L2Txs []common.L2Tx + VerifierIdx uint8 + ForgeBatchArgs *eth.RollupForgeBatchArgs + Auth *bind.TransactOpts `json:"-"` + EthTxs []*types.Transaction + EthTxsErrs []error // SendTimestamp the time of batch sent to ethereum SendTimestamp time.Time Receipt *types.Receipt @@ -99,3 +100,21 @@ type BatchInfo struct { Fail bool Debug Debug } + +// DebugStore is a debug function to store the BatchInfo as a json text file in +// storePath. The filename contains the batchNumber followed by a timestamp of +// batch start. +func (b *BatchInfo) DebugStore(storePath string) error { + batchJSON, err := json.MarshalIndent(b, "", " ") + if err != nil { + return common.Wrap(err) + } + // nolint reason: hardcoded 1_000_000 is the number of nanoseconds in a + // millisecond + //nolint:gomnd + filename := fmt.Sprintf("%08d-%v.%03d.json", b.BatchNum, + b.Debug.StartTimestamp.Unix(), b.Debug.StartTimestamp.Nanosecond()/1_000_000) + // nolint reason: 0640 allows rw to owner and r to group + //nolint:gosec + return ioutil.WriteFile(path.Join(storePath, filename), batchJSON, 0640) +} diff --git a/sequencer/coordinator/pipeline.go b/sequencer/coordinator/pipeline.go index b469969..9628f2f 100644 --- a/sequencer/coordinator/pipeline.go +++ b/sequencer/coordinator/pipeline.go @@ -2,6 +2,10 @@ package coordinator import ( "context" + "database/sql" + "fmt" + "math/big" + "strconv" "sync" "time" "tokamak-sybil-resistance/batchbuilder" @@ -9,6 +13,9 @@ import ( "tokamak-sybil-resistance/coordinator/prover" "tokamak-sybil-resistance/database/historydb" "tokamak-sybil-resistance/database/l2db" + "tokamak-sybil-resistance/eth" + "tokamak-sybil-resistance/log" + "tokamak-sybil-resistance/metric" "tokamak-sybil-resistance/synchronizer" "tokamak-sybil-resistance/txselector" ) @@ -57,3 +64,590 @@ type Pipeline struct { wg sync.WaitGroup cancel context.CancelFunc } + +// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats +func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, + vars *common.SCVariablesPtr) { + select { + case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}: + case <-ctx.Done(): + } +} + +// NewPipeline creates a new Pipeline +func NewPipeline(ctx context.Context, + cfg Config, + num int, // Pipeline sequential number + historyDB *historydb.HistoryDB, + l2DB *l2db.L2DB, + txSelector *txselector.TxSelector, + batchBuilder *batchbuilder.BatchBuilder, + mutexL2DBUpdateDelete *sync.Mutex, + purger *Purger, + coord *Coordinator, + txManager *TxManager, + provers []prover.Client, + scConsts *common.SCConsts, +) (*Pipeline, error) { + proversPool := NewProversPool(len(provers)) + proversPoolSize := 0 + for _, prover := range provers { + ctxTimeout, ctxTimeoutCancel := context.WithTimeout(ctx, cfg.ProverReadTimeout) + defer ctxTimeoutCancel() + if err := prover.WaitReady(ctxTimeout); err != nil { + log.Errorw("prover.WaitReady", "err", err) + } else { + proversPool.Add(ctx, prover) + proversPoolSize++ + } + } + if proversPoolSize == 0 { + return nil, common.Wrap(fmt.Errorf("no provers in the pool")) + } + return &Pipeline{ + num: num, + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + mutexL2DBUpdateDelete: mutexL2DBUpdateDelete, + purger: purger, + coord: coord, + txManager: txManager, + consts: *scConsts, + statsVarsCh: make(chan statsVars, queueLen), + }, nil +} + +// reset pipeline state +func (p *Pipeline) reset(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *common.SCVariables) error { + p.state = state{ + batchNum: batchNum, + lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, + lastScheduledL1BatchBlockNum: 0, + lastSlotForged: -1, + } + p.stats = *stats + p.vars = *vars + + // Reset the StateDB in TxSelector and BatchBuilder from the + // synchronizer only if the checkpoint we reset from either: + // a. Doesn't exist in the TxSelector/BatchBuilder + // b. The batch has already been synced by the synchronizer and has a + // different MTRoot than the BatchBuilder + // Otherwise, reset from the local checkpoint. + + // First attempt to reset from local checkpoint if such checkpoint exists + existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum) + if err != nil { + return common.Wrap(err) + } + fromSynchronizerTxSelector := !existsTxSelector + if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil { + return common.Wrap(err) + } + existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum) + if err != nil { + return common.Wrap(err) + } + fromSynchronizerBatchBuilder := !existsBatchBuilder + if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil { + return common.Wrap(err) + } + + // After reset, check that if the batch exists in the historyDB, the + // stateRoot matches with the local one, if not, force a reset from + // synchronizer + batch, err := p.historyDB.GetBatch(p.state.batchNum) + if common.Unwrap(err) == sql.ErrNoRows { + // nothing to do + } else if err != nil { + return common.Wrap(err) + } else { + localStateRoot := p.batchBuilder.LocalStateDB().AccountTree.Root().BigInt() + if batch.StateRoot.Cmp(localStateRoot) != 0 { + log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+ + "Forcing reset from Synchronizer", localStateRoot, batch.StateRoot) + // StateRoot from synchronizer doesn't match StateRoot + // from batchBuilder, force a reset from synchronizer + if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { + return common.Wrap(err) + } + if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { + return common.Wrap(err) + } + } + } + return nil +} + +func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { + p.rw.Lock() + defer p.rw.Unlock() + p.errAtBatchNum = batchNum +} + +func (p *Pipeline) getErrAtBatchNum() common.BatchNum { + p.rw.RLock() + defer p.rw.RUnlock() + return p.errAtBatchNum +} + +func (p *Pipeline) syncSCVars(vars common.SCVariablesPtr) { + updateSCVars(&p.vars, vars) +} + +// sendServerProof sends the circuit inputs to the proof server +func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { + p.cfg.debugBatchStore(batchInfo) + + // Call the selected idle server proof with BatchBuilder output, + // save server proof info for batchNum + if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { + return common.Wrap(err) + } + return nil +} + +// forgePolicySkipPreSelection is called before doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPreSelection(now time.Time) (bool, string) { + //TODO: Need to check this + // if p.cfg.ForgeOncePerSlotIfTxs { + // if slotCommitted { + // return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed" + // } + // return false, "" + // } + // // Determine if we must commit the slot + // if !p.cfg.IgnoreSlotCommitment && !slotCommitted { + // return false, "" + // } + + // If we haven't reached the ForgeDelay, skip forging the batch + if now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return true, "we haven't reached the forge delay" + } + return false, "" +} + +func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { + // Take the lastL1BatchBlockNum as the biggest between the last + // scheduled one, and the synchronized one. + lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum + if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock + } + // Set Debug information + batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum + batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock + batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum + batchInfo.Debug.L1BatchBlockScheduleDeadline = + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) + // Return true if we have passed the l1BatchTimeoutPerc portion of the + // range before the l1batch timeout. + return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) +} + +// forgePolicySkipPostSelection is called after doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPostSelection(now time.Time, l1UserTxsExtra []common.L1Tx, + poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) (bool, string, error) { + // Check if the slot is not yet fulfilled + // slotCommitted := p.slotCommitted() + + pendingTxs := true + if len(l1UserTxsExtra) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the number of unforged L1UserTxs + // (either in a open queue or in a frozen + // not-yet-forged queue). + count, err := p.historyDB.GetUnforgedL1UserTxsCount() + if err != nil { + return false, "", err + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues to be able to + // forge the L1UserTxs in the future. + // Otherwise, skip. + if count == 0 { + pendingTxs = false + } + } else { + pendingTxs = false + } + } + + if p.cfg.ForgeOncePerSlotIfTxs { + // if slotCommitted { + // return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed", + // nil + // } + if pendingTxs { + return false, "", nil + } + return true, "cfg.ForgeOncePerSlotIfTxs = true and no pending txs", + nil + } + + // Determine if we must commit the slot + if !p.cfg.IgnoreSlotCommitment { + return false, "", nil + } + + // check if there is no txs to forge, no l1UserTxs in the open queue to + // freeze and we haven't reached the ForgeNoTxsDelay + if now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + if !pendingTxs { + return true, "no txs to forge and we haven't reached the forge no txs delay", + nil + } + } + return false, "", nil +} + +// forgeBatch forges the batchNum batch. +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, + skipReason *string, err error) { + // remove transactions from the pool that have been there for too long + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, nil, common.Wrap(err) + } + _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, nil, common.Wrap(err) + } + // Structure to accumulate data and metadata of the batch + now := time.Now() + batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} + batchInfo.Debug.StartTimestamp = now + batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 + + var poolL2Txs []common.PoolL2Tx + var discardedL2Txs []common.PoolL2Tx + var l1UserTxs []common.L1Tx + + if skip, reason := p.forgePolicySkipPreSelection(now); skip { + return nil, &reason, nil + } + + // 1. Decide if we forge L2Tx or L1+L2Tx + if p.shouldL1L2Batch(batchInfo) { + batchInfo.L1Batch = true + // 2a: L1+L2 txs + _l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) + if err != nil { + return nil, nil, common.Wrap(err) + } + // l1UserFutureTxs are the l1UserTxs that are not being forged + // in the next batch, but that are also in the queue for the + // future batches + l1UserFutureTxs, err := p.historyDB.GetUnforgedL1UserFutureTxs(p.state.lastForgeL1TxsNum + 1) + if err != nil { + return nil, nil, common.Wrap(err) + } + + l1UserTxs, poolL2Txs, discardedL2Txs, err = + p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs, l1UserFutureTxs) + if err != nil { + return nil, nil, common.Wrap(err) + } + } else { + // get l1UserFutureTxs which are all the l1 pending in all the + // queues + l1UserFutureTxs, err := p.historyDB.GetUnforgedL1UserFutureTxs(p.state.lastForgeL1TxsNum) //nolint:gomnd + if err != nil { + return nil, nil, common.Wrap(err) + } + + // 2b: only L2 txs + poolL2Txs, discardedL2Txs, err = + p.txSelector.GetL2TxSelection(p.cfg.TxProcessorConfig, l1UserFutureTxs) + if err != nil { + return nil, nil, common.Wrap(err) + } + l1UserTxs = nil + } + + if skip, reason, err := p.forgePolicySkipPostSelection(now, + l1UserTxs, poolL2Txs, batchInfo); err != nil { + return nil, nil, common.Wrap(err) + } else if skip { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, nil, common.Wrap(err) + } + return nil, &reason, common.Wrap(err) + } + + if batchInfo.L1Batch { + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ + } + + // 3. Save metadata from TxSelector output for BatchNum + batchInfo.L1UserTxs = l1UserTxs + + if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), + batchInfo.BatchNum); err != nil { + return nil, nil, common.Wrap(err) + } + if err := p.l2DB.UpdateTxsInfo(discardedL2Txs, batchInfo.BatchNum); err != nil { + return nil, nil, common.Wrap(err) + } + + // Invalidate transactions that become invalid because of + // the poolL2Txs selected. Will mark as invalid the txs that have a + // (fromIdx, nonce) which already appears in the selected txs (includes + // all the nonces smaller than the current one) + err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) + if err != nil { + return nil, nil, common.Wrap(err) + } + + // 4. Call BatchBuilder with TxSelector output + configBatch := &batchbuilder.ConfigBatch{ + TxProcessorConfig: p.cfg.TxProcessorConfig, + } + zkInputs, err := p.batchBuilder.BuildBatch(configBatch, l1UserTxs, + poolL2Txs) + if err != nil { + return nil, nil, common.Wrap(err) + } + l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way + if err != nil { + return nil, nil, common.Wrap(err) + } + batchInfo.L2Txs = l2Txs + + // 5. Save metadata from BatchBuilder output for BatchNum + batchInfo.ZKInputs = zkInputs + batchInfo.Debug.Status = StatusForged + p.cfg.debugBatchStore(batchInfo) + log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum) + + return batchInfo, nil, nil +} + +// handleForgeBatch waits for an available proof server, calls p.forgeBatch to +// forge the batch and get the zkInputs, and then sends the zkInputs to the +// selected proof server so that the proof computation begins. +func (p *Pipeline) handleForgeBatch(ctx context.Context, + batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { + // 1. Wait for an available serverProof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, common.Wrap(err) + } + defer func() { + // If we encounter any error (notice that this function returns + // errors to notify that a batch is not forged not only because + // of unexpected errors but also due to benign causes), add the + // serverProof back to the pool + if err != nil { + p.proversPool.Add(ctx, serverProof) + } + }() + + // 2. Forge the batch internally (make a selection of txs and prepare + // all the smart contract arguments) + var skipReason *string + p.mutexL2DBUpdateDelete.Lock() + batchInfo, skipReason, err = p.forgeBatch(batchNum) + p.mutexL2DBUpdateDelete.Unlock() + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("forgeBatch", "err", err) + return nil, common.Wrap(err) + } else if skipReason != nil { + log.Debugw("skipping batch", "batch", batchNum, "reason", *skipReason) + return nil, common.Wrap(errSkipBatchByPolicy) + } + + // 3. Send the ZKInputs to the proof server + batchInfo.ServerProof = serverProof + batchInfo.ProofStart = time.Now() + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + return nil, common.Wrap(err) + } + return batchInfo, nil +} + +// Start the forging pipeline +func (p *Pipeline) Start(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *common.SCVariables) error { + if p.started { + log.Fatal("Pipeline already started") + } + p.started = true + + if err := p.reset(batchNum, stats, vars); err != nil { + return common.Wrap(err) + } + p.ctx, p.cancel = context.WithCancel(context.Background()) + + queueSize := 1 + batchChSentServerProof := make(chan *BatchInfo, queueSize) + + p.wg.Add(1) + go func() { + timer := time.NewTimer(zeroDuration) + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline forgeBatch loop done") + p.wg.Done() + return + case statsVars := <-p.statsVarsCh: + p.stats = statsVars.Stats + p.syncSCVars(statsVars.Vars) + case <-timer.C: + timer.Reset(p.cfg.ForgeRetryInterval) + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + continue + } + batchNum = p.state.batchNum + 1 + batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) + if p.ctx.Err() != nil { + p.revertPoolChanges(batchNum) + continue + } else if common.Unwrap(err) == errSkipBatchByPolicy { + p.revertPoolChanges(batchNum) + continue + } else if err != nil { + p.setErrAtBatchNum(batchNum) + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.handleForgBatch: %v", err), + FailedBatchNum: batchNum, + }) + p.revertPoolChanges(batchNum) + continue + } + p.lastForgeTime = time.Now() + + p.state.batchNum = batchNum + select { + case batchChSentServerProof <- batchInfo: + case <-p.ctx.Done(): + } + if !timer.Stop() { + <-timer.C + } + timer.Reset(zeroDuration) + } + } + }() + + p.wg.Add(1) + go func() { + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline waitServerProofSendEth loop done") + p.wg.Done() + return + case batchInfo := <-batchChSentServerProof: + go func(p *Pipeline, batchInfo *BatchInfo, batchNum common.BatchNum) { + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + p.revertPoolChanges(batchNum) + return + } + err := p.waitServerProof(p.ctx, batchInfo) + if p.ctx.Err() != nil { + p.revertPoolChanges(batchNum) + return + } else if err != nil { + log.Errorw("waitServerProof", "err", err) + p.setErrAtBatchNum(batchInfo.BatchNum) + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.waitServerProof: %v", err), + FailedBatchNum: batchInfo.BatchNum, + }) + p.revertPoolChanges(batchNum) + return + } + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(p.ctx, batchInfo.ServerProof) + p.txManager.AddBatch(p.ctx, batchInfo) + }(p, batchInfo, batchNum) + } + } + }() + return nil +} + +// waitServerProof gets the generated zkProof & sends it to the SmartContract +func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { + defer metric.MeasureDuration(metric.WaitServerProof, batchInfo.ProofStart, + batchInfo.BatchNum.BigInt().String(), strconv.Itoa(batchInfo.PipelineNum)) + + proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, + // until not resolved don't continue. Returns when the proof server has calculated the proof + if err != nil { + return common.Wrap(err) + } + batchInfo.Proof = proof + batchInfo.PublicInputs = pubInputs + batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) + batchInfo.Debug.Status = StatusProof + p.cfg.debugBatchStore(batchInfo) + log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum) + return nil +} + +// revertPoolChanges will undo changes made to the pool while trying to forge failedBatch. +// Call this function only if the porcess of forging a batch fails +func (p *Pipeline) revertPoolChanges(failedBatch common.BatchNum) { + if err := p.l2DB.Reorg(failedBatch - 1); err != nil { + // NOTE: the reason why this error si not returned is that this function is used in a error handling situation + // and at this point the flow shouldn't change (handling the error of handling an error), things could get really meesy + log.Error("Error trying to revert changes on the pool after the porcess of forging a batch failed: ", err) + } +} + +func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { + proof := batchInfo.Proof + zki := batchInfo.ZKInputs + return ð.RollupForgeBatchArgs{ + NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), + NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), + NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), + L1UserTxs: batchInfo.L1UserTxs, + + L2TxsData: batchInfo.L2Txs, + // Circuit selector + // We'll have a single verifier + VerifierIdx: batchInfo.VerifierIdx, + L1Batch: batchInfo.L1Batch, + ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, + // Implementation of the verifier need a swap on the proofB vector + ProofB: [2][2]*big.Int{ + {proof.PiB[0][1], proof.PiB[0][0]}, + {proof.PiB[1][1], proof.PiB[1][0]}, + }, + ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, + } +} diff --git a/sequencer/coordinator/proverspool.go b/sequencer/coordinator/proverspool.go index 8a834df..eef8c7a 100644 --- a/sequencer/coordinator/proverspool.go +++ b/sequencer/coordinator/proverspool.go @@ -1,8 +1,39 @@ package coordinator -import "tokamak-sybil-resistance/coordinator/prover" +import ( + "context" + "tokamak-sybil-resistance/common" + "tokamak-sybil-resistance/coordinator/prover" + "tokamak-sybil-resistance/log" +) // ProversPool contains the multiple prover clients type ProversPool struct { pool chan prover.Client } + +// NewProversPool creates a new pool of provers. +func NewProversPool(maxServerProofs int) *ProversPool { + return &ProversPool{ + pool: make(chan prover.Client, maxServerProofs), + } +} + +// Add a prover to the pool +func (p *ProversPool) Add(ctx context.Context, serverProof prover.Client) { + select { + case p.pool <- serverProof: + case <-ctx.Done(): + } +} + +// Get returns the next available prover +func (p *ProversPool) Get(ctx context.Context) (prover.Client, error) { + select { + case <-ctx.Done(): + log.Info("ServerProofPool.Get done") + return nil, common.Wrap(common.ErrDone) + case serverProof := <-p.pool: + return serverProof, nil + } +} diff --git a/sequencer/coordinator/purger.go b/sequencer/coordinator/purger.go index b669f7f..afd379d 100644 --- a/sequencer/coordinator/purger.go +++ b/sequencer/coordinator/purger.go @@ -1,5 +1,12 @@ package coordinator +import ( + "tokamak-sybil-resistance/common" + "tokamak-sybil-resistance/database/l2db" + "tokamak-sybil-resistance/database/statedb" + "tokamak-sybil-resistance/log" +) + // PurgerCfg is the purger configuration type PurgerCfg struct { // PurgeBatchDelay is the delay between batches to purge outdated @@ -30,3 +37,71 @@ type Purger struct { lastInvalidateBlock int64 lastInvalidateBatch int64 } + +// CanInvalidate returns true if it's a good time to invalidate according to +// the configuration +func (p *Purger) CanInvalidate(blockNum, batchNum int64) bool { + if blockNum >= p.lastInvalidateBlock+p.cfg.InvalidateBlockDelay { + return true + } + if batchNum >= p.lastInvalidateBatch+p.cfg.InvalidateBatchDelay { + return true + } + return false +} + +// CanPurge returns true if it's a good time to purge according to the +// configuration +func (p *Purger) CanPurge(blockNum, batchNum int64) bool { + if blockNum >= p.lastPurgeBlock+p.cfg.PurgeBlockDelay { + return true + } + if batchNum >= p.lastPurgeBatch+p.cfg.PurgeBatchDelay { + return true + } + return false +} + +// PurgeMaybe purges txs if it's a good time to do so +func (p *Purger) PurgeMaybe(l2DB *l2db.L2DB, blockNum, batchNum int64) (bool, error) { + if !p.CanPurge(blockNum, batchNum) { + return false, nil + } + p.lastPurgeBlock = blockNum + p.lastPurgeBatch = batchNum + log.Debugw("Purger: purging l2txs in pool", "block", blockNum, "batch", batchNum) + err := l2DB.Purge(common.BatchNum(batchNum)) + return true, common.Wrap(err) +} + +// InvalidateMaybe invalidates txs if it's a good time to do so +func (p *Purger) InvalidateMaybe(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB, + blockNum, batchNum int64) (bool, error) { + if !p.CanInvalidate(blockNum, batchNum) { + return false, nil + } + p.lastInvalidateBlock = blockNum + p.lastInvalidateBatch = batchNum + log.Debugw("Purger: invalidating l2txs in pool", "block", blockNum, "batch", batchNum) + + //TODO: Update the functionality to handle old nouces for L2 txs check this needs to be for vouches or accounts + // err := poolMarkInvalidOldNonces(l2DB, stateDB, common.BatchNum(batchNum)) + + return true, nil +} + +func idxsNonceFromPoolL2Txs(txs []common.PoolL2Tx) []common.IdxAccountNonce { + idxNonceMap := map[common.AccountIdx]common.Nonce{} + for _, tx := range txs { + if nonce, ok := idxNonceMap[tx.FromIdx]; !ok { + idxNonceMap[tx.FromIdx] = tx.Nonce + } else if tx.Nonce > nonce { + idxNonceMap[tx.FromIdx] = tx.Nonce + } + } + idxsNonce := make([]common.IdxAccountNonce, 0, len(idxNonceMap)) + for idx, nonce := range idxNonceMap { + idxsNonce = append(idxsNonce, common.IdxAccountNonce{Idx: idx, Nonce: nonce}) + } + return idxsNonce +}