Skip to content

Commit

Permalink
Validator nodes return error on tx submission (#1792)
Browse files Browse the repository at this point in the history
* validate tx

* more checks

* cleanup

* cleanup

* cleanup

* address pr comment

* increased timeouts

* geth network stopping more robust

* Step to close ports to avoid lingering process bound to ports

* Close all ports in step

* Revert "Close all ports in step"

This reverts commit 6c870d5.

* add port 52300

* Expanded port range

* additional ports and expand range.

* de-flake

* retry the kill

* add check to protect enclave

---------

Co-authored-by: Anthony Nixon <[email protected]>
  • Loading branch information
tudor-malene and anixon604 authored Feb 15, 2024
1 parent aee40a8 commit 83469ca
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 33 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ jobs:
- name: Download eth2network binaries
run: go test ./... -v -count=1 -run TestEnsureBinariesAreAvail

# Close specified ports using lsof before testing / local port list compiled from ./integration/constants.go
- name: Close Integration Test Ports
run: |
lowest_port=8000 # Lowest starting port
highest_port=58000 # Highest port considering the offset
additional_ports=(80 81 99) # Additional specific ports
# Find processes listening on ports within the range and kill them
for pid in $(lsof -iTCP:$lowest_port-$highest_port -sTCP:LISTEN -t); do
echo "Killing process $pid on one of the ports from $lowest_port to $highest_port"
kill $pid || true
done
# Close additional specific ports
for port in "${additional_ports[@]}"; do
for pid in $(lsof -ti TCP:$port); do
echo "Killing process $pid on port $port"
kill $pid || true
done
done
- name: Test
run: go test --failfast -v ./... -count=1 -timeout 5m

Expand Down
12 changes: 10 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewEnclave(
blockchain,
)
} else {
service = nodetype.NewValidator(blockProcessor, batchExecutor, registry, rConsumer, chainConfig, config.SequencerID, storage, sigVerifier, logger)
service = nodetype.NewValidator(blockProcessor, batchExecutor, registry, rConsumer, chainConfig, config.SequencerID, storage, sigVerifier, mempool, logger)
}

chain := l2chain.NewChain(
Expand Down Expand Up @@ -473,9 +473,17 @@ func (e *enclaveImpl) SubmitBatch(extBatch *common.ExtBatch) common.SystemError
return responses.ToInternalError(fmt.Errorf("requested SubmitBatch with the enclave stopping"))
}

core.LogMethodDuration(e.logger, measure.NewStopwatch(), "SubmitBatch call completed.", log.BatchHashKey, extBatch.Hash())
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "SubmitBatch call completed.", log.BatchHashKey, extBatch.Hash())

e.logger.Info("Received new p2p batch", log.BatchHeightKey, extBatch.Header.Number, log.BatchHashKey, extBatch.Hash(), "l1", extBatch.Header.L1Proof)
seqNo := extBatch.Header.SequencerOrderNo.Uint64()
if seqNo > common.L2GenesisSeqNo+1 {
_, err := e.storage.FetchBatchBySeqNo(seqNo - 1)
if err != nil {
return responses.ToInternalError(fmt.Errorf("could not find previous batch with seq: %d", seqNo-1))
}
}

batch, err := core.ToBatch(extBatch, e.dataEncryptionService, e.dataCompressionService)
if err != nil {
return responses.ToInternalError(fmt.Errorf("could not convert batch. Cause: %w", err))
Expand Down
45 changes: 28 additions & 17 deletions go/enclave/nodetype/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"math/big"

"github.com/ten-protocol/go-ten/go/enclave/txpool"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ten-protocol/go-ten/go/common/errutil"
Expand All @@ -30,22 +32,14 @@ type obsValidator struct {
sequencerID gethcommon.Address
storage storage.Storage
sigValidator *components.SignatureValidator
logger gethlog.Logger
}
mempool *txpool.TxPool

func NewValidator(
consumer components.L1BlockProcessor,
batchExecutor components.BatchExecutor,
registry components.BatchRegistry,
rollupConsumer components.RollupConsumer,
logger gethlog.Logger
}

chainConfig *params.ChainConfig,
func NewValidator(consumer components.L1BlockProcessor, batchExecutor components.BatchExecutor, registry components.BatchRegistry, rollupConsumer components.RollupConsumer, chainConfig *params.ChainConfig, sequencerID gethcommon.Address, storage storage.Storage, sigValidator *components.SignatureValidator, mempool *txpool.TxPool, logger gethlog.Logger) ObsValidator {
startMempool(registry, mempool)

sequencerID gethcommon.Address,
storage storage.Storage,
sigValidator *components.SignatureValidator,
logger gethlog.Logger,
) ObsValidator {
return &obsValidator{
blockProcessor: consumer,
batchExecutor: batchExecutor,
Expand All @@ -55,13 +49,17 @@ func NewValidator(
sequencerID: sequencerID,
storage: storage,
sigValidator: sigValidator,
mempool: mempool,
logger: logger,
}
}

func (val *obsValidator) SubmitTransaction(transaction *common.L2Tx) error {
val.logger.Trace(fmt.Sprintf("Transaction %s submitted to validator but there is nothing to do with it.", transaction.Hash().Hex()))
return nil
func (val *obsValidator) SubmitTransaction(tx *common.L2Tx) error {
headBatch := val.batchRegistry.HeadBatchSeq()
if headBatch == nil || headBatch.Uint64() <= common.L2GenesisSeqNo+1 {
return fmt.Errorf("not initialised")
}
return val.mempool.Validate(tx)
}

func (val *obsValidator) OnL1Fork(_ *common.ChainFork) error {
Expand All @@ -86,6 +84,8 @@ func (val *obsValidator) ExecuteStoredBatches() error {
return err
}

startMempool(val.batchRegistry, val.mempool)

for _, batch := range batches {
if batch.IsGenesis() {
if err = val.handleGenesis(batch); err != nil {
Expand Down Expand Up @@ -155,5 +155,16 @@ func (val *obsValidator) OnL1Block(_ types.Block, _ *components.BlockIngestionTy
}

func (val *obsValidator) Close() error {
return nil
return val.mempool.Close()
}

func startMempool(registry components.BatchRegistry, mempool *txpool.TxPool) {
// the mempool can only be started when there are a couple of blocks already processed
headBatchSeq := registry.HeadBatchSeq()
if !mempool.Running() && headBatchSeq != nil && headBatchSeq.Uint64() > common.L2GenesisSeqNo+1 {
err := mempool.Start()
if err != nil {
panic(fmt.Errorf("could not start mempool: %w", err))
}
}
}
12 changes: 12 additions & 0 deletions go/enclave/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"math/big"
"strings"

// unsafe package imported in order to link to a private function in go-ethereum.
// This allows us to validate transactions against the tx pool rules.
_ "unsafe"

gethlog "github.com/ethereum/go-ethereum/log"
"github.com/ten-protocol/go-ten/go/common/log"

Expand Down Expand Up @@ -78,6 +82,14 @@ func (t *TxPool) Add(transaction *common.L2Tx) error {
return nil
}

//go:linkname validateTxBasics github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTxBasics
func validateTxBasics(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

// Validate - run the underlying tx pool validation logic
func (t *TxPool) Validate(tx *common.L2Tx) error {
return validateTxBasics(t.legacyPool, tx, false)
}

func (t *TxPool) Running() bool {
return t.running
}
Expand Down
3 changes: 2 additions & 1 deletion integration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const (
StartPortSimulationInMem = 22000
StartPortSimulationFullNetwork = 26000
StartPortSmartContractTests = 30000
StartPortContractDeployerTest = 34000
StartPortContractDeployerTest1 = 34000
StartPortContractDeployerTest2 = 35000
StartPortWalletExtensionUnitTest = 38000
StartPortFaucetUnitTest = 42000
StartPortFaucetHTTPUnitTest = 48000
Expand Down
4 changes: 2 additions & 2 deletions integration/contractdeployer/contract_deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func init() { //nolint:gochecknoinits
}

func TestCanDeployLayer2ERC20Contract(t *testing.T) {
startPort := integration.StartPortContractDeployerTest
startPort := integration.StartPortContractDeployerTest1
hostWSPort := startPort + integration.DefaultHostRPCWSOffset
createObscuroNetwork(t, startPort)
// This sleep is required to ensure the initial rollup exists, and thus contract deployer can check its balance.
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestCanDeployLayer2ERC20Contract(t *testing.T) {
}

func TestFaucetSendsFundsOnlyIfNeeded(t *testing.T) {
startPort := integration.StartPortContractDeployerTest
startPort := integration.StartPortContractDeployerTest2
hostWSPort := startPort + integration.DefaultHostRPCWSOffset
createObscuroNetwork(t, startPort)

Expand Down
29 changes: 22 additions & 7 deletions integration/eth2network/eth2_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eth2network
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -245,7 +246,7 @@ func (n *Impl) Start() error {
for i := range n.dataDirs {
nodeID := i
eg.Go(func() error {
return n.waitForNodeUp(nodeID, 30*time.Second)
return n.waitForNodeUp(nodeID, time.Minute)
})
}
err = eg.Wait()
Expand Down Expand Up @@ -321,22 +322,36 @@ func (n *Impl) Start() error {
// Stop stops the network
func (n *Impl) Stop() error {
for i := 0; i < len(n.dataDirs); i++ {
err := n.gethProcesses[i].Process.Kill()
err := kill(n.gethProcesses[i].Process, 0)
if err != nil {
fmt.Printf("unable to kill geth node - %s\n", err.Error())
return err
}
err = n.prysmBeaconProcesses[i].Process.Kill()
err = kill(n.prysmBeaconProcesses[i].Process, 0)
if err != nil {
fmt.Printf("unable to kill prysm beacon node - %s\n", err.Error())
return err
}
err = n.prysmValidatorProcesses[i].Process.Kill()
err = kill(n.prysmValidatorProcesses[i].Process, 0)
if err != nil {
fmt.Printf("unable to kill prysm validator node - %s\n", err.Error())
return err
}
}
// wait a second for the kill signal
time.Sleep(time.Second)
return nil
}

const maxTryKill = 5

func kill(p *os.Process, cnt int) error {
if killErr := p.Kill(); killErr == nil {
return nil
} else if !errors.Is(killErr, os.ErrProcessDone) {
if cnt >= maxTryKill {
return killErr
}
time.Sleep(time.Second)
return kill(p, cnt+1)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion integration/eth2network/eth2_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestStartEth2Network(t *testing.T) {
2,
2,
randomWalletAddrs,
time.Minute,
2*time.Minute,
)
// wait until the merge has happened
assert.Nil(t, network.Start())
Expand Down
2 changes: 1 addition & 1 deletion integration/noderunner/noderunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCanStartStandaloneObscuroHostAndEnclave(t *testing.T) {
2,
2,
[]string{hostAddr.String()},
time.Minute,
2*time.Minute,
)
defer network.Stop() //nolint: errcheck
err = network.Start()
Expand Down
2 changes: 1 addition & 1 deletion integration/simulation/network/geth_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func StartGethNetwork(wallets *params.SimWallets, startPort int, blockDurationSe
2,
2,
walletAddresses,
time.Minute,
2*time.Minute,
)

err = eth2Network.Start()
Expand Down
2 changes: 1 addition & 1 deletion integration/smartcontract/smartcontracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func runGethNetwork(t *testing.T) *netInfo {
2,
2,
[]string{workerWallet.Address().String()},
time.Minute,
2*time.Minute,
)

if err = eth2Network.Start(); err != nil {
Expand Down

0 comments on commit 83469ca

Please sign in to comment.