From 7a9c3d7469f70f84d937596c9916212db1a5b087 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Mon, 4 Dec 2023 14:30:42 +0100 Subject: [PATCH 1/3] chore: better resources closing for orchestrator and relayer (#638) * chore: better resources closing for orchestrator and relayer * chore: remove unnecessary error check --- cmd/blobstream/orchestrator/cmd.go | 29 +++++++++++++---------------- cmd/blobstream/relayer/cmd.go | 25 ++++++++++++------------- orchestrator/orchestrator.go | 12 ++++++++---- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/cmd/blobstream/orchestrator/cmd.go b/cmd/blobstream/orchestrator/cmd.go index 47f57178..bf4ea780 100644 --- a/cmd/blobstream/orchestrator/cmd.go +++ b/cmd/blobstream/orchestrator/cmd.go @@ -75,22 +75,30 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: false, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { + stopFuncs = append(stopFuncs, storeStops...) return err } @@ -110,25 +118,14 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // creating the broadcaster broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT) - if err != nil { - return err - } // creating the orchestrator orch := orchestrator.New( diff --git a/cmd/blobstream/relayer/cmd.go b/cmd/blobstream/relayer/cmd.go index 06c66322..2229d9a8 100644 --- a/cmd/blobstream/relayer/cmd.go +++ b/cmd/blobstream/relayer/cmd.go @@ -128,21 +128,28 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: true, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { return err } @@ -163,20 +170,12 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // connecting to a Blobstream contract ethClient, err := ethclient.Dial(config.EvmRPC) if err != nil { diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 02ac174d..51966b72 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + goerrors "errors" "fmt" "math/big" "strconv" @@ -82,7 +83,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error listening to new attestations", "err", err) cancel() } - orch.Logger.Error("stopping listening to new attestations") + orch.Logger.Info("stopping listening to new attestations") }() // go routine for processing nonces @@ -94,7 +95,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error processing attestations", "err", err) cancel() } - orch.Logger.Error("stopping processing attestations") + orch.Logger.Info("stopping processing attestations") }() // go routine for handling the previous attestation nonces @@ -137,8 +138,11 @@ func (orch Orchestrator) StartNewEventsListener( for { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): + if goerrors.Is(ctx.Err(), context.Canceled) { + return nil + } return ctx.Err() case <-ticker.C: running := orch.TmQuerier.IsRunning(ctx) @@ -217,7 +221,7 @@ func (orch Orchestrator) EnqueueMissingEvents( for i := uint64(0); i < latestNonce-uint64(earliestAttestationNonce)+1; i++ { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): return ctx.Err() default: From a9f64c3843aef4031351a09c05ae14da17c63299 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Mon, 4 Dec 2023 14:51:36 +0100 Subject: [PATCH 2/3] feat: support backup relayers (#640) * feat: support backup relayers * chore: golangci * chore: golangci --- cmd/blobstream/base/config.go | 37 +++++++++++++++++++++ cmd/blobstream/relayer/cmd.go | 2 ++ cmd/blobstream/relayer/config.go | 47 +++++++++++++++++++-------- relayer/relayer.go | 56 +++++++++++++++++++++++--------- testing/blobstream.go | 2 +- 5 files changed, 113 insertions(+), 31 deletions(-) diff --git a/cmd/blobstream/base/config.go b/cmd/blobstream/base/config.go index e18ebca0..35fb8772 100644 --- a/cmd/blobstream/base/config.go +++ b/cmd/blobstream/base/config.go @@ -85,6 +85,9 @@ const ( FlagLogLevel = "log.level" FlagLogFormat = "log.format" + + FlagBackupRelayer = "relayer.backup" + FlagBackupRelayerWaitTime = "relayer.wait-time" ) func AddLogLevelFlag(cmd *cobra.Command) { @@ -271,6 +274,40 @@ func GetEVMGasLimitFlag(cmd *cobra.Command) (uint64, bool, error) { return val, changed, nil } +func AddBackupRelayerFlag(cmd *cobra.Command) { + cmd.Flags().Bool( + FlagBackupRelayer, + false, + "Set the relayer to be a backup, i.e. not relay attestations until the `relayer.wait-time` is elapsed and no primary relayer has relayed any", + ) +} + +func GetBackupRelayerFlag(cmd *cobra.Command) (bool, bool, error) { + changed := cmd.Flags().Changed(FlagBackupRelayer) + val, err := cmd.Flags().GetBool(FlagBackupRelayer) + if err != nil { + return false, changed, err + } + return val, changed, nil +} + +func AddBackupRelayerWaitTimeFlag(cmd *cobra.Command) { + cmd.Flags().Uint64( + FlagBackupRelayerWaitTime, + 15, + "The wait time, in minutes, to wait for primary relayers to relay attestations before proceeding to relay them", + ) +} + +func GetBackupRelayerWaitTimeFlag(cmd *cobra.Command) (uint64, bool, error) { + changed := cmd.Flags().Changed(FlagBackupRelayerWaitTime) + val, err := cmd.Flags().GetUint64(FlagBackupRelayerWaitTime) + if err != nil { + return 0, changed, err + } + return val, changed, nil +} + func AddHomeFlag(cmd *cobra.Command, serviceName string, defaultHomeDir string) { cmd.Flags().String(FlagHome, defaultHomeDir, fmt.Sprintf("The Blobstream %s home directory", serviceName)) } diff --git a/cmd/blobstream/relayer/cmd.go b/cmd/blobstream/relayer/cmd.go index 2229d9a8..801638be 100644 --- a/cmd/blobstream/relayer/cmd.go +++ b/cmd/blobstream/relayer/cmd.go @@ -205,6 +205,8 @@ func Start() *cobra.Command { retrier, s.SignatureStore, time.Duration(config.EVMRetryTimeout)*time.Minute, + config.isBackupRelayer, + time.Duration(config.backupRelayerWaitTime)*time.Minute, ) // Listen for and trap any OS signal to graceful shutdown and exit diff --git a/cmd/blobstream/relayer/config.go b/cmd/blobstream/relayer/config.go index 4ae53b1a..a70220db 100644 --- a/cmd/blobstream/relayer/config.go +++ b/cmd/blobstream/relayer/config.go @@ -90,26 +90,30 @@ func addRelayerStartFlags(cmd *cobra.Command) *cobra.Command { base.AddLogLevelFlag(cmd) base.AddLogFormatFlag(cmd) base.AddEVMRetryTimeoutFlag(cmd) + base.AddBackupRelayerFlag(cmd) + base.AddBackupRelayerWaitTimeFlag(cmd) return cmd } type StartConfig struct { base.Config - EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"` - EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"` - CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"` - CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"` - evmAccAddress string - ContractAddr string `mapstructure:"contract-address" json:"contract-address"` - EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"` - Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"` - P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"` - p2pNickname string - GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"` - LogLevel string - LogFormat string - EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"` + EvmChainID uint64 `mapstructure:"evm-chain-id" json:"evm-chain-id"` + EvmRPC string `mapstructure:"evm-rpc" json:"evm-rpc"` + CoreGRPC string `mapstructure:"core-grpc" json:"core-grpc"` + CoreRPC string `mapstructure:"core-rpc" json:"core-rpc"` + evmAccAddress string + ContractAddr string `mapstructure:"contract-address" json:"contract-address"` + EvmGasLimit uint64 `mapstructure:"gas-limit" json:"gas-limit"` + Bootstrappers string `mapstructure:"bootstrappers" json:"bootstrappers"` + P2PListenAddr string `mapstructure:"listen-addr" json:"listen-addr"` + p2pNickname string + GrpcInsecure bool `mapstructure:"grpc-insecure" json:"grpc-insecure"` + LogLevel string + LogFormat string + EVMRetryTimeout uint64 `mapstructure:"retry-timeout" json:"retry-timeout"` + isBackupRelayer bool + backupRelayerWaitTime uint64 } func DefaultStartConfig() *StartConfig { @@ -133,6 +137,9 @@ func (cfg StartConfig) ValidateBasics() error { if err := base.ValidateEVMAddress(cfg.ContractAddr); err != nil { return fmt.Errorf("%s: flag --%s", err.Error(), base.FlagEVMContractAddress) } + if cfg.isBackupRelayer && cfg.backupRelayerWaitTime == 0 { + return fmt.Errorf("backup relayer wait time cannot be 0 if backup relayer flag is set") + } return nil } @@ -256,6 +263,18 @@ func parseRelayerStartFlags(cmd *cobra.Command, fileConfig *StartConfig) (StartC fileConfig.EVMRetryTimeout = retryTimeout } + isBackupRelayer, _, err := base.GetBackupRelayerFlag(cmd) + if err != nil { + return StartConfig{}, err + } + fileConfig.isBackupRelayer = isBackupRelayer + + backupRelayerWaitTime, _, err := base.GetBackupRelayerWaitTimeFlag(cmd) + if err != nil { + return StartConfig{}, err + } + fileConfig.backupRelayerWaitTime = backupRelayerWaitTime + return *fileConfig, nil } diff --git a/relayer/relayer.go b/relayer/relayer.go index db8e2937..ce43f707 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -34,14 +34,16 @@ import ( ) type Relayer struct { - TmQuerier *rpc.TmQuerier - AppQuerier *rpc.AppQuerier - P2PQuerier *p2p.Querier - EVMClient *evm.Client - logger tmlog.Logger - Retrier *helpers.Retrier - SignatureStore *badger.Datastore - RetryTimeout time.Duration + TmQuerier *rpc.TmQuerier + AppQuerier *rpc.AppQuerier + P2PQuerier *p2p.Querier + EVMClient *evm.Client + logger tmlog.Logger + Retrier *helpers.Retrier + SignatureStore *badger.Datastore + RetryTimeout time.Duration + IsBackupRelayer bool + BackupRelayerWaitTime time.Duration } func NewRelayer( @@ -53,16 +55,20 @@ func NewRelayer( retrier *helpers.Retrier, sigStore *badger.Datastore, retryTimeout time.Duration, + isBackupRelayer bool, + backupRelayerWaitTime time.Duration, ) *Relayer { return &Relayer{ - TmQuerier: tmQuerier, - AppQuerier: appQuerier, - P2PQuerier: p2pQuerier, - EVMClient: evmClient, - logger: logger, - Retrier: retrier, - SignatureStore: sigStore, - RetryTimeout: retryTimeout, + TmQuerier: tmQuerier, + AppQuerier: appQuerier, + P2PQuerier: p2pQuerier, + EVMClient: evmClient, + logger: logger, + Retrier: retrier, + SignatureStore: sigStore, + RetryTimeout: retryTimeout, + IsBackupRelayer: isBackupRelayer, + BackupRelayerWaitTime: backupRelayerWaitTime, } } @@ -74,6 +80,7 @@ func (r *Relayer) Start(ctx context.Context) error { } defer ethClient.Close() + backupRelayerShouldRelay := false processFunc := func() error { // this function will relay attestations as long as there are confirms. And, after the contract is // up-to-date with the chain, it will stop. @@ -98,6 +105,17 @@ func (r *Relayer) Start(ctx context.Context) error { return nil } + if r.IsBackupRelayer { + if !backupRelayerShouldRelay { + // if the relayer is a backup relayer, sleep for the wait time before checking + // if the signatures haven't been relayed to relay them. + r.logger.Debug("waiting for the backup relayer wait time to elapse before trying to relay attestation", "nonce", lastContractNonce+1) + time.Sleep(r.BackupRelayerWaitTime) + backupRelayerShouldRelay = true + continue + } + } + att, err := r.AppQuerier.QueryAttestationByNonce(ctx, lastContractNonce+1) if err != nil { return err @@ -120,6 +138,12 @@ func (r *Relayer) Start(ctx context.Context) error { if err != nil { return err } + + if r.IsBackupRelayer { + // if the transaction was mined correctly, the relayer gets back to the pending + // state waiting for the next nonce + the backup relayer wait time. + backupRelayerShouldRelay = false + } } } } diff --git a/testing/blobstream.go b/testing/blobstream.go index 8dfedd26..4e50ef5e 100644 --- a/testing/blobstream.go +++ b/testing/blobstream.go @@ -49,7 +49,7 @@ func NewRelayer( tempDir := t.TempDir() sigStore, err := badger.NewDatastore(tempDir, store.DefaultBadgerOptions(tempDir)) require.NoError(t, err) - r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second) + r := relayer.NewRelayer(tmQuerier, appQuerier, p2pQuerier, evmClient, logger, retrier, sigStore, 30*time.Second, false, 0) return r } From 2dc89ef66433511fbc793f09d219c6f7a665b6e5 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Mon, 4 Dec 2023 15:13:46 +0100 Subject: [PATCH 3/3] feat: support querying the number of orchestrator signatures for a nonces range (#639) * feat: support querying the number of orchestrator signatures for a nonces range * chore: golangci --- cmd/blobstream/query/cmd.go | 316 ++++++++++++++++++++++++++++-------- 1 file changed, 249 insertions(+), 67 deletions(-) diff --git a/cmd/blobstream/query/cmd.go b/cmd/blobstream/query/cmd.go index cbce2ebc..dcf0a166 100644 --- a/cmd/blobstream/query/cmd.go +++ b/cmd/blobstream/query/cmd.go @@ -52,10 +52,28 @@ func Command() *cobra.Command { } func Signers() *cobra.Command { + signersCmd := &cobra.Command{ + Use: "signers", + Aliases: []string{"s"}, + Short: "Query orchestrators information that are running blobstream", + SilenceUsage: true, + } + + signersCmd.AddCommand( + SignersNonce(), + SignersRange(), + ) + + signersCmd.SetHelpCommand(&cobra.Command{}) + + return signersCmd +} + +func SignersNonce() *cobra.Command { command := &cobra.Command{ - Use: "signers ", + Use: "nonce ", Args: cobra.ExactArgs(1), - Short: "Queries the Blobstream for attestations signers", + Short: "Queries the Blobstream for attestations signers by nonce", Long: "Queries the Blobstream for attestations signers. The nonce is the attestation nonce that the command" + " will query signatures for. It should be either a specific nonce starting from 2 and on." + " Or, use 'latest' as argument to check the latest attestation nonce", @@ -76,7 +94,10 @@ func Signers() *cobra.Command { ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - stopFuncs := make([]func() error, 0, 1) + tmQuerier, appQuerier, p2pQuerier, stopFuncs, err := startResources(ctx, config, logger) + if err != nil { + return err + } defer func() { for _, f := range stopFuncs { err := f() @@ -86,70 +107,247 @@ func Signers() *cobra.Command { } }() - // create tm querier and app querier - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier( - logger, - config.coreRPC, - config.coreGRPC, - config.grpcInsecure, - ) - stopFuncs = append(stopFuncs, stops...) + nonce, err := parseNonce(ctx, appQuerier, args[0]) if err != nil { return err } - - // creating the host - h, err := libp2p.New() - if err != nil { - return err + if nonce == 1 { + return fmt.Errorf("nonce 1 doesn't need to be signed. signatures start from nonce 2") } - addrInfo, err := peer.AddrInfoFromString(config.targetNode) + + qOutput, err := getSignatures(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce) if err != nil { return err } - for i := 0; i < 5; i++ { - logger.Debug("connecting to target node...") - err := h.Connect(ctx, *addrInfo) + if config.outputFile == "" { + printConfirms(logger, *qOutput) + } else { + err := writeConfirmsToJSONFile(logger, *qOutput, config.outputFile) if err != nil { - logger.Error("couldn't connect to target node", "err", err.Error()) + return err } - if err == nil { - logger.Debug("connected to target node") - break - } - time.Sleep(5 * time.Second) } + return nil + }, + } + return addFlags(command) +} - // creating the data store - dataStore := dssync.MutexWrap(ds.NewMapDatastore()) +func startResources(ctx context.Context, config Config, logger tmlog.Logger) (*rpc.TmQuerier, *rpc.AppQuerier, *p2p.Querier, []func() error, error) { + stopFuncs := make([]func() error, 0, 1) - // creating the dht - dht, err := p2p.NewBlobstreamDHT(cmd.Context(), h, dataStore, []peer.AddrInfo{}, logger) + // create tm querier and app querier + tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier( + logger, + config.coreRPC, + config.coreGRPC, + config.grpcInsecure, + ) + stopFuncs = append(stopFuncs, stops...) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + + // creating the host + h, err := libp2p.New() + if err != nil { + return nil, nil, nil, stopFuncs, err + } + addrInfo, err := peer.AddrInfoFromString(config.targetNode) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + for i := 0; i < 5; i++ { + logger.Debug("connecting to target node...") + err := h.Connect(ctx, *addrInfo) + if err != nil { + logger.Error("couldn't connect to target node", "err", err.Error()) + } + if err == nil { + logger.Debug("connected to target node") + break + } + time.Sleep(5 * time.Second) + } + + // creating the data store + dataStore := dssync.MutexWrap(ds.NewMapDatastore()) + + // creating the dht + dht, err := p2p.NewBlobstreamDHT(ctx, h, dataStore, []peer.AddrInfo{}, logger) + if err != nil { + return nil, nil, nil, stopFuncs, err + } + + // creating the p2p querier + p2pQuerier := p2p.NewQuerier(dht, logger) + return tmQuerier, appQuerier, p2pQuerier, stopFuncs, nil +} + +func SignersRange() *cobra.Command { + command := &cobra.Command{ + Use: "range ", + Args: cobra.ExactArgs(2), + Short: "Queries the Blobstream for the orchestrators that signed a certain range of nonces", + Long: "Queries the Blobstream for attestations signers. The range of the nonces can start from two" + + " and can use the keyword `latest` to denominate the latest nonce. The range is end exclusive.", + RunE: func(cmd *cobra.Command, args []string) error { + // creating the logger + logger := tmlog.NewTMLogger(os.Stdout) + fileConfig, err := tryToGetExistingConfig(cmd, logger) + if err != nil { + return err + } + config, err := parseFlags(cmd, &fileConfig) if err != nil { return err } - // creating the p2p querier - p2pQuerier := p2p.NewQuerier(dht, logger) + logger.Debug("initializing queriers") - nonce, err := parseNonce(ctx, appQuerier, args[0]) + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + tmQuerier, appQuerier, p2pQuerier, stopFuncs, err := startResources(ctx, config, logger) if err != nil { return err } - if nonce == 1 { - return fmt.Errorf("nonce 1 doesn't need to be signed. signatures start from nonce 2") - } + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - err = getSignaturesAndPrintThem(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce, config.outputFile) + if args[0] == "latest" { + return fmt.Errorf("start nonce can't be the `latest` nonce") + } + startNonce, err := parseNonce(ctx, appQuerier, args[0]) + if err != nil { + return err + } + endNonce, err := parseNonce(ctx, appQuerier, args[1]) + if err != nil { + return err + } + err = validateRange(startNonce, endNonce) if err != nil { return err } + + qOutputs := make([]*queryOutput, endNonce-startNonce) + for nonce := startNonce; nonce < endNonce; nonce++ { + qOutputs[nonce-startNonce], err = getSignatures(ctx, logger, appQuerier, tmQuerier, p2pQuerier, nonce) + if err != nil { + return err + } + } + + signersMap := make(map[validatorInfo]uint64) + for _, qOutput := range qOutputs { + for _, sig := range qOutput.Signatures { + if sig.Signed { + // increment the number of the signatures + signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }]++ + } else { + // keep the same number of signatures the same but still have the value in the map + val := signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }] + signersMap[validatorInfo{ + EvmAddress: sig.EvmAddress, + Moniker: sig.Moniker, + ValopAddress: sig.ValopAddress, + }] = val + } + } + } + + if config.outputFile == "" { + printSignersForRange(logger, signersMap) + } else { + err := writeSignersForRangeToJSONFile(logger, signersMap, config.outputFile) + if err != nil { + return err + } + } + return nil }, } return addFlags(command) } +func writeSignersForRangeToJSONFile(logger tmlog.Logger, signersMap map[validatorInfo]uint64, outputFile string) error { + logger.Info("writing number of signatures to json file", "path", outputFile) + + file, err := os.OpenFile(outputFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) + if err != nil { + return err + } + defer func(file *os.File) { + err := file.Close() + if err != nil { + logger.Error("failed to close file", "err", err.Error()) + } + }(file) + + encoder := json.NewEncoder(file) + type encodedSignersStruct struct { + EvmAddress string `json:"evmAddress"` + Moniker string `json:"moniker"` + ValopAddress string `json:"valopAddress"` + NumberOfSignatures uint64 `json:"number_of_signatures"` + } + encodedSigners := make([]encodedSignersStruct, 0) + for key, val := range signersMap { + encodedSigners = append(encodedSigners, encodedSignersStruct{ + EvmAddress: key.EvmAddress, + Moniker: key.Moniker, + ValopAddress: key.ValopAddress, + NumberOfSignatures: val, + }) + } + err = encoder.Encode(encodedSigners) + if err != nil { + return err + } + + logger.Info("output written to file successfully", "path", outputFile) + return nil +} + +func printSignersForRange(logger tmlog.Logger, signersMap map[validatorInfo]uint64) { + for signer, numberOfSignatures := range signersMap { + logger.Info(signer.Moniker, "evm_address", signer.EvmAddress, "valop_address", signer.ValopAddress, "number_of_signatures", numberOfSignatures) + } + logger.Info("done") +} + +func validateRange(start uint64, end uint64) error { + if start == 0 { + return fmt.Errorf("start cannot be 0. attestations start at 1") + } + if end == 0 { + return fmt.Errorf("end cannot be 0. attestations start at 1") + } + if start == end { + return fmt.Errorf("start cannot be equal to end") + } + if start > end { + return fmt.Errorf("start cannot be higher than end") + } + return nil +} + type signature struct { EvmAddress string `json:"evmAddress"` Moniker string `json:"moniker"` @@ -172,59 +370,51 @@ type queryOutput struct { CanRelay bool `json:"can_relay"` } -func getSignaturesAndPrintThem( +func getSignatures( ctx context.Context, logger tmlog.Logger, appQuerier *rpc.AppQuerier, tmQuerier *rpc.TmQuerier, p2pQuerier *p2p.Querier, nonce uint64, - outputFile string, -) error { +) (*queryOutput, error) { logger.Info("getting signatures for nonce", "nonce", nonce) lastValset, err := appQuerier.QueryLastValsetBeforeNonce(ctx, nonce) if err != nil { - return err + return nil, err } validatorSet, err := appQuerier.QueryStakingValidatorSet(ctx) if err != nil { - return err + return nil, err } validatorsInfo, err := toValidatorsInfo(ctx, appQuerier, validatorSet) if err != nil { - return err + return nil, err } att, err := appQuerier.QueryAttestationByNonce(ctx, nonce) if err != nil { - return err + return nil, err } if att == nil { - return celestiatypes.ErrAttestationNotFound + return nil, celestiatypes.ErrAttestationNotFound } switch castedAtt := att.(type) { case *celestiatypes.Valset: signBytes, err := castedAtt.SignBytes() if err != nil { - return err + return nil, err } confirms, err := p2pQuerier.QueryValsetConfirms(ctx, nonce, *lastValset, signBytes.Hex()) if err != nil { - return err + return nil, err } qOutput := toQueryOutput(toValsetConfirmsMap(confirms), validatorsInfo, nonce, *lastValset) - if outputFile == "" { - printConfirms(logger, qOutput) - } else { - err := writeConfirmsToJSONFile(logger, qOutput, outputFile) - if err != nil { - return err - } - } + return &qOutput, nil case *celestiatypes.DataCommitment: commitment, err := tmQuerier.QueryCommitment( ctx, @@ -232,26 +422,18 @@ func getSignaturesAndPrintThem( castedAtt.EndBlock, ) if err != nil { - return err + return nil, err } dataRootHash := types.DataCommitmentTupleRootSignBytes(big.NewInt(int64(castedAtt.Nonce)), commitment) confirms, err := p2pQuerier.QueryDataCommitmentConfirms(ctx, *lastValset, nonce, dataRootHash.Hex()) if err != nil { - return err + return nil, err } qOutput := toQueryOutput(toDataCommitmentConfirmsMap(confirms), validatorsInfo, nonce, *lastValset) - if outputFile == "" { - printConfirms(logger, qOutput) - } else { - err := writeConfirmsToJSONFile(logger, qOutput, outputFile) - if err != nil { - return err - } - } + return &qOutput, nil default: - return errors.Wrap(types.ErrUnknownAttestationType, strconv.FormatUint(nonce, 10)) + return nil, errors.Wrap(types.ErrUnknownAttestationType, strconv.FormatUint(nonce, 10)) } - return nil } func toValidatorsInfo(ctx context.Context, appQuerier *rpc.AppQuerier, validatorSet []stakingtypes.Validator) (map[string]validatorInfo, error) {