Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: support backup relayers
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Dec 3, 2023
1 parent 95e2ae1 commit fca9e0f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 31 deletions.
37 changes: 37 additions & 0 deletions cmd/blobstream/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (

FlagLogLevel = "log.level"
FlagLogFormat = "log.format"

FlagBackupRelayer = "relayer.backup"
FlagBackupRelayerWaitTime = "relayer.wait-time"
)

func AddLogLevelFlag(cmd *cobra.Command) {
Expand Down Expand Up @@ -271,6 +274,40 @@ func GetEVMGasLimitFlag(cmd *cobra.Command) (uint64, bool, error) {
return val, changed, nil
}

func AddBackupRelayerFlag(cmd *cobra.Command) {
cmd.Flags().Bool(
FlagBackupRelayer,
false,
"Set the relayer to be a backup, i.e. not relay attestations until the `relayer.wait-time` is elapsed and no primary relayer has relayed any",
)
}

func GetBackupRelayerFlag(cmd *cobra.Command) (bool, bool, error) {
changed := cmd.Flags().Changed(FlagBackupRelayer)
val, err := cmd.Flags().GetBool(FlagBackupRelayer)
if err != nil {
return false, changed, err
}
return val, changed, nil
}

func AddBackupRelayerWaitTimeFlag(cmd *cobra.Command) {
cmd.Flags().Uint64(
FlagBackupRelayerWaitTime,
15,
"The wait time, in minutes, to wait for primary relayers to relay attestations before proceeding to relay them",
)
}

func GetBackupRelayerWaitTimeFlag(cmd *cobra.Command) (uint64, bool, error) {
changed := cmd.Flags().Changed(FlagBackupRelayerWaitTime)
val, err := cmd.Flags().GetUint64(FlagBackupRelayerWaitTime)
if err != nil {
return 0, changed, err
}
return val, changed, nil
}

func AddHomeFlag(cmd *cobra.Command, serviceName string, defaultHomeDir string) {
cmd.Flags().String(FlagHome, defaultHomeDir, fmt.Sprintf("The Blobstream %s home directory", serviceName))
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/blobstream/relayer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func Start() *cobra.Command {
retrier,
s.SignatureStore,
time.Duration(config.EVMRetryTimeout)*time.Minute,
config.isBackupRelayer,
time.Duration(config.backupRelayerWaitTime)*time.Minute,
)

// Listen for and trap any OS signal to graceful shutdown and exit
Expand Down
47 changes: 33 additions & 14 deletions cmd/blobstream/relayer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,30 @@ func addRelayerStartFlags(cmd *cobra.Command) *cobra.Command {
base.AddLogLevelFlag(cmd)
base.AddLogFormatFlag(cmd)
base.AddEVMRetryTimeoutFlag(cmd)
base.AddBackupRelayerFlag(cmd)
base.AddBackupRelayerWaitTimeFlag(cmd)

return cmd
}

type StartConfig struct {
base.Config
EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"`
EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"`
CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"`
CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"`
evmAccAddress string
ContractAddr string `mapstructure:"contract-address" json:"contract-address"`
EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"`
Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"`
P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"`
p2pNickname string
GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"`
LogLevel string
LogFormat string
EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"`
EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"`
EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"`
CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"`
CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"`
evmAccAddress string
ContractAddr string `mapstructure:"contract-address" json:"contract-address"`
EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"`
Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"`
P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"`
p2pNickname string
GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"`
LogLevel string
LogFormat string
EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"`
isBackupRelayer bool
backupRelayerWaitTime uint64
}

func DefaultStartConfig() *StartConfig {
Expand All @@ -133,6 +137,9 @@ func (cfg StartConfig) ValidateBasics() error {
if err := base.ValidateEVMAddress(cfg.ContractAddr); err != nil {
return fmt.Errorf("%s: flag --%s", err.Error(), base.FlagEVMContractAddress)
}
if cfg.isBackupRelayer && cfg.backupRelayerWaitTime == 0 {
return fmt.Errorf("backup relayer wait time cannot be 0 if backup relayer flag is set")
}
return nil
}

Expand Down Expand Up @@ -256,6 +263,18 @@ func parseRelayerStartFlags(cmd *cobra.Command, fileConfig *StartConfig) (StartC
fileConfig.EVMRetryTimeout = retryTimeout
}

isBackupRelayer, changed, err := base.GetBackupRelayerFlag(cmd)

Check failure on line 266 in cmd/blobstream/relayer/config.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

ineffectual assignment to changed (ineffassign)
if err != nil {
return StartConfig{}, err
}
fileConfig.isBackupRelayer = isBackupRelayer

backupRelayerWaitTime, changed, err := base.GetBackupRelayerWaitTimeFlag(cmd)

Check failure on line 272 in cmd/blobstream/relayer/config.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

ineffectual assignment to changed (ineffassign)
if err != nil {
return StartConfig{}, err
}
fileConfig.backupRelayerWaitTime = backupRelayerWaitTime

return *fileConfig, nil
}

Expand Down
56 changes: 40 additions & 16 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import (
)

type Relayer struct {
TmQuerier *rpc.TmQuerier
AppQuerier *rpc.AppQuerier
P2PQuerier *p2p.Querier
EVMClient *evm.Client
logger tmlog.Logger
Retrier *helpers.Retrier
SignatureStore *badger.Datastore
RetryTimeout time.Duration
TmQuerier *rpc.TmQuerier
AppQuerier *rpc.AppQuerier
P2PQuerier *p2p.Querier
EVMClient *evm.Client
logger tmlog.Logger
Retrier *helpers.Retrier
SignatureStore *badger.Datastore
RetryTimeout time.Duration
IsBackupRelayer bool
BackupRelayerWaitTime time.Duration
}

func NewRelayer(
Expand All @@ -50,16 +52,20 @@ func NewRelayer(
retrier *helpers.Retrier,
sigStore *badger.Datastore,
retryTimeout time.Duration,
isBackupRelayer bool,
backupRelayerWaitTime time.Duration,
) *Relayer {
return &Relayer{
TmQuerier: tmQuerier,
AppQuerier: appQuerier,
P2PQuerier: p2pQuerier,
EVMClient: evmClient,
logger: logger,
Retrier: retrier,
SignatureStore: sigStore,
RetryTimeout: retryTimeout,
TmQuerier: tmQuerier,
AppQuerier: appQuerier,
P2PQuerier: p2pQuerier,
EVMClient: evmClient,
logger: logger,
Retrier: retrier,
SignatureStore: sigStore,
RetryTimeout: retryTimeout,
IsBackupRelayer: isBackupRelayer,
BackupRelayerWaitTime: backupRelayerWaitTime,
}
}

Expand All @@ -71,6 +77,7 @@ func (r *Relayer) Start(ctx context.Context) error {
}
defer ethClient.Close()

backupRelayerShouldRelay := false
processFunc := func() error {
// this function will relay attestations as long as there are confirms. And, after the contract is
// up-to-date with the chain, it will stop.
Expand All @@ -95,6 +102,17 @@ func (r *Relayer) Start(ctx context.Context) error {
return nil
}

if r.IsBackupRelayer {
if !backupRelayerShouldRelay {
// if the relayer is a backup relayer, sleep for the wait time before checking
// if the signatures haven't been relayed to relay them.
r.logger.Debug("waiting for the backup relayer wait time to elapse before trying to relay attestation", "nonce", lastContractNonce+1)
time.Sleep(r.BackupRelayerWaitTime)
backupRelayerShouldRelay = true
continue
}
}

att, err := r.AppQuerier.QueryAttestationByNonce(ctx, lastContractNonce+1)
if err != nil {
return err
Expand All @@ -117,6 +135,12 @@ func (r *Relayer) Start(ctx context.Context) error {
_, err = r.EVMClient.WaitForTransaction(ctx, ethClient, tx, r.RetryTimeout)
if err != nil {
return err
} else {

Check warning on line 138 in relayer/relayer.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
if r.IsBackupRelayer {
// if the transaction was mined correctly, the relayer gets back to the pending
// state waiting for the next nonce + the backup relayer wait time.
backupRelayerShouldRelay = false
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion testing/blobstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRelayer(
tempDir := t.TempDir()
sigStore, err := badger.NewDatastore(tempDir, store.DefaultBadgerOptions(tempDir))
require.NoError(t, err)
r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second)
r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second, false, 0)
return r
}

Expand Down

0 comments on commit fca9e0f

Please sign in to comment.