From fca9e0f79c7c6da0d848b582677d5bd3ca23115f Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sun, 3 Dec 2023 22:09:00 +0100 Subject: [PATCH 1/3] feat: support backup relayers --- cmd/blobstream/base/config.go | 37 +++++++++++++++++++++ cmd/blobstream/relayer/cmd.go | 2 ++ cmd/blobstream/relayer/config.go | 47 +++++++++++++++++++-------- relayer/relayer.go | 56 +++++++++++++++++++++++--------- testing/blobstream.go | 2 +- 5 files changed, 113 insertions(+), 31 deletions(-) diff --git a/cmd/blobstream/base/config.go b/cmd/blobstream/base/config.go index e18ebca0..35fb8772 100644 --- a/cmd/blobstream/base/config.go +++ b/cmd/blobstream/base/config.go @@ -85,6 +85,9 @@ const ( FlagLogLevel = "log.level" FlagLogFormat = "log.format" + + FlagBackupRelayer = "relayer.backup" + FlagBackupRelayerWaitTime = "relayer.wait-time" ) func AddLogLevelFlag(cmd *cobra.Command) { @@ -271,6 +274,40 @@ func GetEVMGasLimitFlag(cmd *cobra.Command) (uint64, bool, error) { return val, changed, nil } +func AddBackupRelayerFlag(cmd *cobra.Command) { + cmd.Flags().Bool( + FlagBackupRelayer, + false, + "Set the relayer to be a backup, i.e. not relay attestations until the `relayer.wait-time` is elapsed and no primary relayer has relayed any", + ) +} + +func GetBackupRelayerFlag(cmd *cobra.Command) (bool, bool, error) { + changed := cmd.Flags().Changed(FlagBackupRelayer) + val, err := cmd.Flags().GetBool(FlagBackupRelayer) + if err != nil { + return false, changed, err + } + return val, changed, nil +} + +func AddBackupRelayerWaitTimeFlag(cmd *cobra.Command) { + cmd.Flags().Uint64( + FlagBackupRelayerWaitTime, + 15, + "The wait time, in minutes, to wait for primary relayers to relay attestations before proceeding to relay them", + ) +} + +func GetBackupRelayerWaitTimeFlag(cmd *cobra.Command) (uint64, bool, error) { + changed := cmd.Flags().Changed(FlagBackupRelayerWaitTime) + val, err := cmd.Flags().GetUint64(FlagBackupRelayerWaitTime) + if err != nil { + return 0, changed, err + } + return val, changed, nil +} + func AddHomeFlag(cmd *cobra.Command, serviceName string, defaultHomeDir string) { cmd.Flags().String(FlagHome, defaultHomeDir, fmt.Sprintf("The Blobstream %s home directory", serviceName)) } diff --git a/cmd/blobstream/relayer/cmd.go b/cmd/blobstream/relayer/cmd.go index 06c66322..0856f745 100644 --- a/cmd/blobstream/relayer/cmd.go +++ b/cmd/blobstream/relayer/cmd.go @@ -206,6 +206,8 @@ func Start() *cobra.Command { retrier, s.SignatureStore, time.Duration(config.EVMRetryTimeout)*time.Minute, + config.isBackupRelayer, + time.Duration(config.backupRelayerWaitTime)*time.Minute, ) // Listen for and trap any OS signal to graceful shutdown and exit diff --git a/cmd/blobstream/relayer/config.go b/cmd/blobstream/relayer/config.go index 4ae53b1a..be89b20b 100644 --- a/cmd/blobstream/relayer/config.go +++ b/cmd/blobstream/relayer/config.go @@ -90,26 +90,30 @@ func addRelayerStartFlags(cmd *cobra.Command) *cobra.Command { base.AddLogLevelFlag(cmd) base.AddLogFormatFlag(cmd) base.AddEVMRetryTimeoutFlag(cmd) + base.AddBackupRelayerFlag(cmd) + base.AddBackupRelayerWaitTimeFlag(cmd) return cmd } type StartConfig struct { base.Config - EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"` - EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"` - CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"` - CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"` - evmAccAddress string - ContractAddr string `mapstructure:"contract-address" json:"contract-address"` - EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"` - Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"` - P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"` - p2pNickname string - GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"` - LogLevel string - LogFormat string - EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"` + EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"` + EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"` + CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"` + CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"` + evmAccAddress string + ContractAddr string `mapstructure:"contract-address" json:"contract-address"` + EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"` + Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"` + P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"` + p2pNickname string + GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"` + LogLevel string + LogFormat string + EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"` + isBackupRelayer bool + backupRelayerWaitTime uint64 } func DefaultStartConfig() *StartConfig { @@ -133,6 +137,9 @@ func (cfg StartConfig) ValidateBasics() error { if err := base.ValidateEVMAddress(cfg.ContractAddr); err != nil { return fmt.Errorf("%s: flag --%s", err.Error(), base.FlagEVMContractAddress) } + if cfg.isBackupRelayer && cfg.backupRelayerWaitTime == 0 { + return fmt.Errorf("backup relayer wait time cannot be 0 if backup relayer flag is set") + } return nil } @@ -256,6 +263,18 @@ func parseRelayerStartFlags(cmd *cobra.Command, fileConfig *StartConfig) (StartC fileConfig.EVMRetryTimeout = retryTimeout } + isBackupRelayer, changed, err := base.GetBackupRelayerFlag(cmd) + if err != nil { + return StartConfig{}, err + } + fileConfig.isBackupRelayer = isBackupRelayer + + backupRelayerWaitTime, changed, err := base.GetBackupRelayerWaitTimeFlag(cmd) + if err != nil { + return StartConfig{}, err + } + fileConfig.backupRelayerWaitTime = backupRelayerWaitTime + return *fileConfig, nil } diff --git a/relayer/relayer.go b/relayer/relayer.go index a79ea0bf..4ebda823 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -31,14 +31,16 @@ import ( ) type Relayer struct { - TmQuerier *rpc.TmQuerier - AppQuerier *rpc.AppQuerier - P2PQuerier *p2p.Querier - EVMClient *evm.Client - logger tmlog.Logger - Retrier *helpers.Retrier - SignatureStore *badger.Datastore - RetryTimeout time.Duration + TmQuerier *rpc.TmQuerier + AppQuerier *rpc.AppQuerier + P2PQuerier *p2p.Querier + EVMClient *evm.Client + logger tmlog.Logger + Retrier *helpers.Retrier + SignatureStore *badger.Datastore + RetryTimeout time.Duration + IsBackupRelayer bool + BackupRelayerWaitTime time.Duration } func NewRelayer( @@ -50,16 +52,20 @@ func NewRelayer( retrier *helpers.Retrier, sigStore *badger.Datastore, retryTimeout time.Duration, + isBackupRelayer bool, + backupRelayerWaitTime time.Duration, ) *Relayer { return &Relayer{ - TmQuerier: tmQuerier, - AppQuerier: appQuerier, - P2PQuerier: p2pQuerier, - EVMClient: evmClient, - logger: logger, - Retrier: retrier, - SignatureStore: sigStore, - RetryTimeout: retryTimeout, + TmQuerier: tmQuerier, + AppQuerier: appQuerier, + P2PQuerier: p2pQuerier, + EVMClient: evmClient, + logger: logger, + Retrier: retrier, + SignatureStore: sigStore, + RetryTimeout: retryTimeout, + IsBackupRelayer: isBackupRelayer, + BackupRelayerWaitTime: backupRelayerWaitTime, } } @@ -71,6 +77,7 @@ func (r *Relayer) Start(ctx context.Context) error { } defer ethClient.Close() + backupRelayerShouldRelay := false processFunc := func() error { // this function will relay attestations as long as there are confirms. And, after the contract is // up-to-date with the chain, it will stop. @@ -95,6 +102,17 @@ func (r *Relayer) Start(ctx context.Context) error { return nil } + if r.IsBackupRelayer { + if !backupRelayerShouldRelay { + // if the relayer is a backup relayer, sleep for the wait time before checking + // if the signatures haven't been relayed to relay them. + r.logger.Debug("waiting for the backup relayer wait time to elapse before trying to relay attestation", "nonce", lastContractNonce+1) + time.Sleep(r.BackupRelayerWaitTime) + backupRelayerShouldRelay = true + continue + } + } + att, err := r.AppQuerier.QueryAttestationByNonce(ctx, lastContractNonce+1) if err != nil { return err @@ -117,6 +135,12 @@ func (r *Relayer) Start(ctx context.Context) error { _, err = r.EVMClient.WaitForTransaction(ctx, ethClient, tx, r.RetryTimeout) if err != nil { return err + } else { + if r.IsBackupRelayer { + // if the transaction was mined correctly, the relayer gets back to the pending + // state waiting for the next nonce + the backup relayer wait time. + backupRelayerShouldRelay = false + } } } } diff --git a/testing/blobstream.go b/testing/blobstream.go index 8dfedd26..4e50ef5e 100644 --- a/testing/blobstream.go +++ b/testing/blobstream.go @@ -49,7 +49,7 @@ func NewRelayer( tempDir := t.TempDir() sigStore, err := badger.NewDatastore(tempDir, store.DefaultBadgerOptions(tempDir)) require.NoError(t, err) - r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second) + r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second, false, 0) return r } From 7f681bb0931e66b7b230b090ea07eb070c016155 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sun, 3 Dec 2023 22:11:44 +0100 Subject: [PATCH 2/3] chore: golangci --- cmd/blobstream/relayer/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/blobstream/relayer/config.go b/cmd/blobstream/relayer/config.go index be89b20b..a70220db 100644 --- a/cmd/blobstream/relayer/config.go +++ b/cmd/blobstream/relayer/config.go @@ -263,13 +263,13 @@ func parseRelayerStartFlags(cmd *cobra.Command, fileConfig *StartConfig) (StartC fileConfig.EVMRetryTimeout = retryTimeout } - isBackupRelayer, changed, err := base.GetBackupRelayerFlag(cmd) + isBackupRelayer, _, err := base.GetBackupRelayerFlag(cmd) if err != nil { return StartConfig{}, err } fileConfig.isBackupRelayer = isBackupRelayer - backupRelayerWaitTime, changed, err := base.GetBackupRelayerWaitTimeFlag(cmd) + backupRelayerWaitTime, _, err := base.GetBackupRelayerWaitTimeFlag(cmd) if err != nil { return StartConfig{}, err } From 89cd73053936704756e1c9958c6ccd35699725ef Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sun, 3 Dec 2023 22:15:23 +0100 Subject: [PATCH 3/3] chore: golangci --- relayer/relayer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/relayer/relayer.go b/relayer/relayer.go index 4ebda823..b2d9493b 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -135,12 +135,12 @@ func (r *Relayer) Start(ctx context.Context) error { _, err = r.EVMClient.WaitForTransaction(ctx, ethClient, tx, r.RetryTimeout) if err != nil { return err - } else { - if r.IsBackupRelayer { - // if the transaction was mined correctly, the relayer gets back to the pending - // state waiting for the next nonce + the backup relayer wait time. - backupRelayerShouldRelay = false - } + } + + if r.IsBackupRelayer { + // if the transaction was mined correctly, the relayer gets back to the pending + // state waiting for the next nonce + the backup relayer wait time. + backupRelayerShouldRelay = false } } }