Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA: Support multiple enclaves on a single host #1852

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/common/host/host_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,22 @@ func (l *BasicErrHealthStatus) OK() bool {
func (l *BasicErrHealthStatus) Message() string {
return l.ErrMsg
}

type GroupErrsHealthStatus struct {
Errors []error
}

func (g *GroupErrsHealthStatus) OK() bool {
return len(g.Errors) == 0
}

func (g *GroupErrsHealthStatus) Message() string {
msg := ""
for i, err := range g.Errors {
if i > 0 {
msg += ", "
}
msg += err.Error()
}
return msg
}
10 changes: 5 additions & 5 deletions go/config/host_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type HostInputConfig struct {
// Host on which to handle client RPC requests
ClientRPCHost string
// Address on which to connect to the enclave
EnclaveRPCAddress string
EnclaveRPCAddresses []string
// P2PBindAddress is the address where the P2P server is bound to
P2PBindAddress string
// P2PPublicAddress is the advertised P2P server address
Expand Down Expand Up @@ -111,7 +111,7 @@ func (p HostInputConfig) ToHostConfig() *HostConfig {
HasClientRPCWebsockets: p.HasClientRPCWebsockets,
ClientRPCPortWS: p.ClientRPCPortWS,
ClientRPCHost: p.ClientRPCHost,
EnclaveRPCAddress: p.EnclaveRPCAddress,
EnclaveRPCAddresses: p.EnclaveRPCAddresses,
P2PBindAddress: p.P2PBindAddress,
P2PPublicAddress: p.P2PPublicAddress,
L1WebsocketURL: p.L1WebsocketURL,
Expand Down Expand Up @@ -208,8 +208,8 @@ type HostConfig struct {
ClientRPCPortWS uint64
// Host on which to handle client RPC requests
ClientRPCHost string
// Address on which to connect to the enclave
EnclaveRPCAddress string
// Addresses on which to connect to the node's enclaves (HA setups may have multiple)
EnclaveRPCAddresses []string
// P2PBindAddress is the address where the P2P server is bound to
P2PBindAddress string
// P2PPublicAddress is the advertised P2P server address
Expand Down Expand Up @@ -244,7 +244,7 @@ func DefaultHostParsedConfig() *HostInputConfig {
HasClientRPCWebsockets: true,
ClientRPCPortWS: 81,
ClientRPCHost: "127.0.0.1",
EnclaveRPCAddress: "127.0.0.1:11000",
EnclaveRPCAddresses: []string{"127.0.0.1:11000"},
P2PBindAddress: "0.0.0.0:10000",
P2PPublicAddress: "127.0.0.1:10000",
L1WebsocketURL: "ws://127.0.0.1:8546",
Expand Down
9 changes: 5 additions & 4 deletions go/host/container/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/ten-protocol/go-ten/go/common"
Expand All @@ -24,7 +25,7 @@ type HostConfigToml struct {
HasClientRPCWebsockets bool
ClientRPCPortWS uint
ClientRPCHost string
EnclaveRPCAddress string
EnclaveRPCAddresses string // comma-separated
P2PBindAddress string
P2PPublicAddress string
L1WebsocketURL string
Expand Down Expand Up @@ -66,7 +67,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
clientRPCPortHTTP := flag.Uint64(clientRPCPortHTTPName, cfg.ClientRPCPortHTTP, flagUsageMap[clientRPCPortHTTPName])
clientRPCPortWS := flag.Uint64(clientRPCPortWSName, cfg.ClientRPCPortWS, flagUsageMap[clientRPCPortWSName])
clientRPCHost := flag.String(clientRPCHostName, cfg.ClientRPCHost, flagUsageMap[clientRPCHostName])
enclaveRPCAddress := flag.String(enclaveRPCAddressName, cfg.EnclaveRPCAddress, flagUsageMap[enclaveRPCAddressName])
enclaveRPCAddressesStr := flag.String(enclaveRPCAddressesName, strings.Join(cfg.EnclaveRPCAddresses, ","), flagUsageMap[enclaveRPCAddressesName])
p2pBindAddress := flag.String(p2pBindAddressName, cfg.P2PBindAddress, flagUsageMap[p2pBindAddressName])
p2pPublicAddress := flag.String(p2pPublicAddressName, cfg.P2PPublicAddress, flagUsageMap[p2pPublicAddressName])
l1WSURL := flag.String(l1WebsocketURLName, cfg.L1WebsocketURL, flagUsageMap[l1WebsocketURLName])
Expand Down Expand Up @@ -112,7 +113,7 @@ func ParseConfig() (*config.HostInputConfig, error) {
cfg.HasClientRPCWebsockets = true
cfg.ClientRPCPortWS = *clientRPCPortWS
cfg.ClientRPCHost = *clientRPCHost
cfg.EnclaveRPCAddress = *enclaveRPCAddress
cfg.EnclaveRPCAddresses = strings.Split(*enclaveRPCAddressesStr, ",")
cfg.P2PBindAddress = *p2pBindAddress
cfg.P2PPublicAddress = *p2pPublicAddress
cfg.L1WebsocketURL = *l1WSURL
Expand Down Expand Up @@ -189,7 +190,7 @@ func fileBasedConfig(configPath string) (*config.HostInputConfig, error) {
HasClientRPCWebsockets: tomlConfig.HasClientRPCWebsockets,
ClientRPCPortWS: uint64(tomlConfig.ClientRPCPortWS),
ClientRPCHost: tomlConfig.ClientRPCHost,
EnclaveRPCAddress: tomlConfig.EnclaveRPCAddress,
EnclaveRPCAddresses: strings.Split(tomlConfig.EnclaveRPCAddresses, ","),
P2PBindAddress: tomlConfig.P2PBindAddress,
P2PPublicAddress: tomlConfig.P2PPublicAddress,
L1WebsocketURL: tomlConfig.L1WebsocketURL,
Expand Down
4 changes: 2 additions & 2 deletions go/host/container/cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
clientRPCPortHTTPName = "clientRPCPortHttp"
clientRPCPortWSName = "clientRPCPortWs"
clientRPCHostName = "clientRPCHost"
enclaveRPCAddressName = "enclaveRPCAddress"
enclaveRPCAddressesName = "enclaveRPCAddresses"
p2pBindAddressName = "p2pBindAddress"
p2pPublicAddressName = "p2pPublicAddress"
l1WebsocketURLName = "l1WSURL"
Expand Down Expand Up @@ -49,7 +49,7 @@ func getFlagUsageMap() map[string]string {
clientRPCPortHTTPName: "The port on which to listen for client application RPC requests over HTTP",
clientRPCPortWSName: "The port on which to listen for client application RPC requests over websockets",
clientRPCHostName: "The host on which to handle client application RPC requests",
enclaveRPCAddressName: "The address to use to connect to the Obscuro enclave service",
enclaveRPCAddressesName: "The comma-separated addresses to use to connect to the Ten enclaves",
p2pBindAddressName: "The address where the p2p server is bound to. Defaults to 0.0.0.0:10000",
p2pPublicAddressName: "The P2P address where the other servers should connect to. Defaults to 127.0.0.1:10000",
l1WebsocketURLName: "The websocket RPC address the host can use for L1 requests",
Expand Down
11 changes: 7 additions & 4 deletions go/host/container/host_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func NewHostContainerFromConfig(parsedConfig *config.HostInputConfig, logger get

fmt.Println("Connecting to the enclave...")
services := host.NewServicesRegistry(logger)
enclaveClient := enclaverpc.NewClient(cfg, logger)
enclaveClients := make([]common.Enclave, len(cfg.EnclaveRPCAddresses))
for i, addr := range cfg.EnclaveRPCAddresses {
enclaveClients[i] = enclaverpc.NewClient(addr, cfg.EnclaveRPCTimeout, logger)
}
p2pLogger := logger.New(log.CmpKey, log.P2PCmp)
metricsService := metrics.New(cfg.MetricsEnabled, cfg.MetricsHTTPPort, logger)

Expand All @@ -150,13 +153,13 @@ func NewHostContainerFromConfig(parsedConfig *config.HostInputConfig, logger get
obscuroRelevantContracts := []gethcommon.Address{cfg.ManagementContractAddress, cfg.MessageBusAddress}
l1Repo := l1.NewL1Repository(l1Client, obscuroRelevantContracts, logger)

return NewHostContainer(cfg, services, aggP2P, l1Client, l1Repo, enclaveClient, mgmtContractLib, ethWallet, rpcServer, logger, metricsService)
return NewHostContainer(cfg, services, aggP2P, l1Client, l1Repo, enclaveClients, mgmtContractLib, ethWallet, rpcServer, logger, metricsService)
}

// NewHostContainer builds a host container with dependency injection rather than from config.
// Useful for testing etc. (want to be able to pass in logger, and also have option to mock out dependencies)
func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p2p hostcommon.P2PHostService, l1Client ethadapter.EthClient, l1Repo hostcommon.L1RepoService, enclaveClient common.Enclave, contractLib mgmtcontractlib.MgmtContractLib, hostWallet wallet.Wallet, rpcServer node.Server, logger gethlog.Logger, metricsService *metrics.Service) *HostContainer {
h := host.NewHost(cfg, services, p2p, l1Client, l1Repo, enclaveClient, hostWallet, contractLib, logger, metricsService.Registry())
func NewHostContainer(cfg *config.HostConfig, services *host.ServicesRegistry, p2p hostcommon.P2PHostService, l1Client ethadapter.EthClient, l1Repo hostcommon.L1RepoService, enclaveClients []common.Enclave, contractLib mgmtcontractlib.MgmtContractLib, hostWallet wallet.Wallet, rpcServer node.Server, logger gethlog.Logger, metricsService *metrics.Service) *HostContainer {
h := host.NewHost(cfg, services, p2p, l1Client, l1Repo, enclaveClients, hostWallet, contractLib, logger, metricsService.Registry())

hostContainer := &HostContainer{
host: h,
Expand Down
2 changes: 1 addition & 1 deletion go/host/container/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ clientRPCPortHTTP = 80
HasClientRPCWebsockets = true
ClientRPCPortWS = 81
ClientRPCHost = "127.0.0.1"
EnclaveRPCAddress = "127.0.0.1:11000"
EnclaveRPCAddresses = "127.0.0.1:11000"
EnclaveRPCTimeout = 10
P2PBindAddress = "0.0.0.0:10000"
P2PPublicAddress = "127.0.0.1:10000"
Expand Down
83 changes: 57 additions & 26 deletions go/host/enclave/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,73 +23,104 @@ type enclaveServiceLocator interface {
type Service struct {
hostData host.Identity
sl enclaveServiceLocator
// eventually this service will support multiple enclaves for HA, but currently there's only one
// The service goes via the Guardian to talk to the enclave (because guardian knows if the enclave is healthy etc.)
enclaveGuardian *Guardian

// The service goes via the Guardians to talk to the enclave (because guardian knows if the enclave is healthy etc.)
enclaveGuardians []*Guardian

running atomic.Bool
logger gethlog.Logger
}

func NewService(hostData host.Identity, serviceLocator enclaveServiceLocator, enclaveGuardian *Guardian, logger gethlog.Logger) *Service {
func NewService(hostData host.Identity, serviceLocator enclaveServiceLocator, enclaveGuardians []*Guardian, logger gethlog.Logger) *Service {
return &Service{
hostData: hostData,
sl: serviceLocator,
enclaveGuardian: enclaveGuardian,
logger: logger,
hostData: hostData,
sl: serviceLocator,
enclaveGuardians: enclaveGuardians,
logger: logger,
}
}

func (e *Service) Start() error {
e.running.Store(true)
return e.enclaveGuardian.Start()
for _, guardian := range e.enclaveGuardians {
if err := guardian.Start(); err != nil {
// abandon starting the rest of the guardians if one fails
return err
}
}
return nil
}

func (e *Service) Stop() error {
e.running.Store(false)
return e.enclaveGuardian.Stop()
var errors []error
for i, guardian := range e.enclaveGuardians {
if err := guardian.Stop(); err != nil {
errors = append(errors, fmt.Errorf("error stopping enclave guardian [%d]: %w", i, err))
}
}
if len(errors) > 0 {
return fmt.Errorf("errors stopping enclave guardians: %v", errors)
}
return nil
}

func (e *Service) HealthStatus() host.HealthStatus {
if !e.running.Load() {
return &host.BasicErrHealthStatus{ErrMsg: "not running"}
}

// check the enclave health, which in turn checks the DB health
enclaveHealthy, err := e.enclaveGuardian.enclaveClient.HealthCheck()
if err != nil {
return &host.BasicErrHealthStatus{ErrMsg: fmt.Sprintf("unable to HealthCheck enclave - %s", err.Error())}
} else if !enclaveHealthy {
return &host.BasicErrHealthStatus{ErrMsg: "enclave reported itself as not healthy"}
}
errors := make([]error, 0, len(e.enclaveGuardians))

if !e.enclaveGuardian.GetEnclaveState().InSyncWithL1() {
return &host.BasicErrHealthStatus{ErrMsg: "enclave not in sync with L1"}
for i, guardian := range e.enclaveGuardians {
// check the enclave health, which in turn checks the DB health
enclaveHealthy, err := guardian.enclaveClient.HealthCheck()
if err != nil {
errors = append(errors, fmt.Errorf("unable to HealthCheck enclave[%d] - %w", i, err))
} else if !enclaveHealthy {
errors = append(errors, fmt.Errorf("enclave[%d] reported itself not healthy", i))
}

if !guardian.GetEnclaveState().InSyncWithL1() {
errors = append(errors, fmt.Errorf("enclave[%d] not in sync with L1", i))
}
}

// empty error msg means healthy
return &host.BasicErrHealthStatus{ErrMsg: ""}
return &host.GroupErrsHealthStatus{Errors: errors}
}

func (e *Service) HealthyGuardian() *Guardian {
for _, guardian := range e.enclaveGuardians {
if guardian.HealthStatus().OK() {
return guardian
}
}
return nil
}

// LookupBatchBySeqNo is used to fetch batch data from the enclave - it is only used as a fallback for the sequencer
// host if it's missing a batch (other host services should use L2Repo to fetch batch data)
func (e *Service) LookupBatchBySeqNo(seqNo *big.Int) (*common.ExtBatch, error) {
state := e.enclaveGuardian.GetEnclaveState()
hg := e.HealthyGuardian()
state := hg.GetEnclaveState()
if state.GetEnclaveL2Head().Cmp(seqNo) < 0 {
return nil, errutil.ErrNotFound
}
client := e.enclaveGuardian.GetEnclaveClient()
client := hg.GetEnclaveClient()
return client.GetBatchBySeqNo(seqNo.Uint64())
}

func (e *Service) GetEnclaveClient() common.Enclave {
return e.enclaveGuardian.GetEnclaveClient()
// for now we always return first guardian's enclave client
// in future be good to load balance and failover but need to improve subscribe/unsubscribe (unsubscribe from same enclave)
return e.enclaveGuardians[0].GetEnclaveClient()
}

func (e *Service) SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSendRawTx) (*responses.RawTx, error) {
encryptedTx := common.EncryptedTx(encryptedParams)

enclaveResponse, sysError := e.enclaveGuardian.GetEnclaveClient().SubmitTx(encryptedTx)
enclaveResponse, sysError := e.GetEnclaveClient().SubmitTx(encryptedTx)
if sysError != nil {
e.logger.Warn("Could not submit transaction due to sysError.", log.ErrKey, sysError)
return nil, sysError
Expand All @@ -110,9 +141,9 @@ func (e *Service) SubmitAndBroadcastTx(encryptedParams common.EncryptedParamsSen
}

func (e *Service) Subscribe(id rpc.ID, encryptedParams common.EncryptedParamsLogSubscription) error {
return e.enclaveGuardian.GetEnclaveClient().Subscribe(id, encryptedParams)
return e.GetEnclaveClient().Subscribe(id, encryptedParams)
}

func (e *Service) Unsubscribe(id rpc.ID) error {
return e.enclaveGuardian.GetEnclaveClient().Unsubscribe(id)
return e.GetEnclaveClient().Unsubscribe(id)
}
18 changes: 15 additions & 3 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type host struct {
l2MessageBusAddress *gethcommon.Address
}

func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p hostcommon.P2PHostService, ethClient ethadapter.EthClient, l1Repo hostcommon.L1RepoService, enclaveClient common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host {
func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p hostcommon.P2PHostService, ethClient ethadapter.EthClient, l1Repo hostcommon.L1RepoService, enclaveClients []common.Enclave, ethWallet wallet.Wallet, mgmtContractLib mgmtcontractlib.MgmtContractLib, logger gethlog.Logger, regMetrics gethmetrics.Registry) hostcommon.Host {
database, err := db.CreateDBFromConfig(config, regMetrics, logger)
if err != nil {
logger.Crit("unable to create database for host", log.ErrKey, err)
Expand All @@ -72,8 +72,20 @@ func NewHost(config *config.HostConfig, hostServices *ServicesRegistry, p2p host
stopControl: stopcontrol.New(),
}

enclGuardian := enclave.NewGuardian(config, hostIdentity, hostServices, enclaveClient, database, host.stopControl, logger)
enclService := enclave.NewService(hostIdentity, hostServices, enclGuardian, logger)
enclGuardians := make([]*enclave.Guardian, 0, len(enclaveClients))
for i, enclClient := range enclaveClients {
// clone the hostIdentity data for each enclave
enclHostID := hostIdentity
if i > 0 {
// only the first enclave can be the sequencer for now, others behave as read-only validators
enclHostID.IsSequencer = false
enclHostID.IsGenesis = false
}
enclGuardian := enclave.NewGuardian(config, enclHostID, hostServices, enclClient, database, host.stopControl, logger)
enclGuardians = append(enclGuardians, enclGuardian)
}

enclService := enclave.NewService(hostIdentity, hostServices, enclGuardians, logger)
l2Repo := l2.NewBatchRepository(config, hostServices, database, logger)
subsService := events.NewLogEventManager(hostServices, logger)

Expand Down
Loading
Loading