Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

feat: support backup relayers #640

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions cmd/blobstream/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ const (

FlagLogLevel = "log.level"
FlagLogFormat = "log.format"

FlagBackupRelayer = "relayer.backup"
FlagBackupRelayerWaitTime = "relayer.wait-time"
)

func AddLogLevelFlag(cmd *cobra.Command) {
Expand Down Expand Up @@ -271,6 +274,40 @@ func GetEVMGasLimitFlag(cmd *cobra.Command) (uint64, bool, error) {
return val, changed, nil
}

func AddBackupRelayerFlag(cmd *cobra.Command) {
cmd.Flags().Bool(
FlagBackupRelayer,
false,
"Set the relayer to be a backup, i.e. not relay attestations until the `relayer.wait-time` is elapsed and no primary relayer has relayed any",
)
}

func GetBackupRelayerFlag(cmd *cobra.Command) (bool, bool, error) {
changed := cmd.Flags().Changed(FlagBackupRelayer)
val, err := cmd.Flags().GetBool(FlagBackupRelayer)
if err != nil {
return false, changed, err
}
return val, changed, nil
}

func AddBackupRelayerWaitTimeFlag(cmd *cobra.Command) {
cmd.Flags().Uint64(
FlagBackupRelayerWaitTime,
15,
"The wait time, in minutes, to wait for primary relayers to relay attestations before proceeding to relay them",
)
}

func GetBackupRelayerWaitTimeFlag(cmd *cobra.Command) (uint64, bool, error) {
changed := cmd.Flags().Changed(FlagBackupRelayerWaitTime)
val, err := cmd.Flags().GetUint64(FlagBackupRelayerWaitTime)
if err != nil {
return 0, changed, err
}
return val, changed, nil
}

func AddHomeFlag(cmd *cobra.Command, serviceName string, defaultHomeDir string) {
cmd.Flags().String(FlagHome, defaultHomeDir, fmt.Sprintf("The Blobstream %s home directory", serviceName))
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/blobstream/relayer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ func Start() *cobra.Command {
retrier,
s.SignatureStore,
time.Duration(config.EVMRetryTimeout)*time.Minute,
config.isBackupRelayer,
time.Duration(config.backupRelayerWaitTime)*time.Minute,
)

// Listen for and trap any OS signal to graceful shutdown and exit
Expand Down
47 changes: 33 additions & 14 deletions cmd/blobstream/relayer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,30 @@ func addRelayerStartFlags(cmd *cobra.Command) *cobra.Command {
base.AddLogLevelFlag(cmd)
base.AddLogFormatFlag(cmd)
base.AddEVMRetryTimeoutFlag(cmd)
base.AddBackupRelayerFlag(cmd)
base.AddBackupRelayerWaitTimeFlag(cmd)

return cmd
}

type StartConfig struct {
base.Config
EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"`
EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"`
CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"`
CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"`
evmAccAddress string
ContractAddr string `mapstructure:"contract-address" json:"contract-address"`
EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"`
Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"`
P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"`
p2pNickname string
GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"`
LogLevel string
LogFormat string
EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"`
EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"`
EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"`
CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"`
CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"`
evmAccAddress string
ContractAddr string `mapstructure:"contract-address" json:"contract-address"`
EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"`
Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"`
P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"`
p2pNickname string
GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"`
LogLevel string
LogFormat string
EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"`
isBackupRelayer bool
backupRelayerWaitTime uint64
rach-id marked this conversation as resolved.
Show resolved Hide resolved
}

func DefaultStartConfig() *StartConfig {
Expand All @@ -133,6 +137,9 @@ func (cfg StartConfig) ValidateBasics() error {
if err := base.ValidateEVMAddress(cfg.ContractAddr); err != nil {
return fmt.Errorf("%s: flag --%s", err.Error(), base.FlagEVMContractAddress)
}
if cfg.isBackupRelayer && cfg.backupRelayerWaitTime == 0 {
return fmt.Errorf("backup relayer wait time cannot be 0 if backup relayer flag is set")
}
return nil
}

Expand Down Expand Up @@ -256,6 +263,18 @@ func parseRelayerStartFlags(cmd *cobra.Command, fileConfig *StartConfig) (StartC
fileConfig.EVMRetryTimeout = retryTimeout
}

isBackupRelayer, _, err := base.GetBackupRelayerFlag(cmd)
if err != nil {
return StartConfig{}, err
}
fileConfig.isBackupRelayer = isBackupRelayer

backupRelayerWaitTime, _, err := base.GetBackupRelayerWaitTimeFlag(cmd)
if err != nil {
return StartConfig{}, err
}
fileConfig.backupRelayerWaitTime = backupRelayerWaitTime

return *fileConfig, nil
}

Expand Down
56 changes: 40 additions & 16 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import (
)

type Relayer struct {
TmQuerier *rpc.TmQuerier
AppQuerier *rpc.AppQuerier
P2PQuerier *p2p.Querier
EVMClient *evm.Client
logger tmlog.Logger
Retrier *helpers.Retrier
SignatureStore *badger.Datastore
RetryTimeout time.Duration
TmQuerier *rpc.TmQuerier
AppQuerier *rpc.AppQuerier
P2PQuerier *p2p.Querier
EVMClient *evm.Client
logger tmlog.Logger
Retrier *helpers.Retrier
SignatureStore *badger.Datastore
RetryTimeout time.Duration
IsBackupRelayer bool
BackupRelayerWaitTime time.Duration
}

func NewRelayer(
Expand All @@ -53,16 +55,20 @@ func NewRelayer(
retrier *helpers.Retrier,
sigStore *badger.Datastore,
retryTimeout time.Duration,
isBackupRelayer bool,
backupRelayerWaitTime time.Duration,
) *Relayer {
return &Relayer{
TmQuerier: tmQuerier,
AppQuerier: appQuerier,
P2PQuerier: p2pQuerier,
EVMClient: evmClient,
logger: logger,
Retrier: retrier,
SignatureStore: sigStore,
RetryTimeout: retryTimeout,
TmQuerier: tmQuerier,
AppQuerier: appQuerier,
P2PQuerier: p2pQuerier,
EVMClient: evmClient,
logger: logger,
Retrier: retrier,
SignatureStore: sigStore,
RetryTimeout: retryTimeout,
IsBackupRelayer: isBackupRelayer,
BackupRelayerWaitTime: backupRelayerWaitTime,
}
}

Expand All @@ -74,6 +80,7 @@ func (r *Relayer) Start(ctx context.Context) error {
}
defer ethClient.Close()

backupRelayerShouldRelay := false
processFunc := func() error {
// this function will relay attestations as long as there are confirms. And, after the contract is
// up-to-date with the chain, it will stop.
Expand All @@ -98,6 +105,17 @@ func (r *Relayer) Start(ctx context.Context) error {
return nil
}

if r.IsBackupRelayer {
if !backupRelayerShouldRelay {
// if the relayer is a backup relayer, sleep for the wait time before checking
// if the signatures haven't been relayed to relay them.
r.logger.Debug("waiting for the backup relayer wait time to elapse before trying to relay attestation", "nonce", lastContractNonce+1)
time.Sleep(r.BackupRelayerWaitTime)
backupRelayerShouldRelay = true
continue
}
rach-id marked this conversation as resolved.
Show resolved Hide resolved
}

att, err := r.AppQuerier.QueryAttestationByNonce(ctx, lastContractNonce+1)
if err != nil {
return err
rach-id marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -120,6 +138,12 @@ func (r *Relayer) Start(ctx context.Context) error {
if err != nil {
return err
}

if r.IsBackupRelayer {
// if the transaction was mined correctly, the relayer gets back to the pending
// state waiting for the next nonce + the backup relayer wait time.
backupRelayerShouldRelay = false
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion testing/blobstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRelayer(
tempDir := t.TempDir()
sigStore, err := badger.NewDatastore(tempDir, store.DefaultBadgerOptions(tempDir))
require.NoError(t, err)
r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second)
r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second, false, 0)
return r
}

Expand Down
Loading