From 7a9c3d7469f70f84d937596c9916212db1a5b087 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Mon, 4 Dec 2023 14:30:42 +0100 Subject: [PATCH] chore: better resources closing for orchestrator and relayer (#638) * chore: better resources closing for orchestrator and relayer * chore: remove unnecessary error check --- cmd/blobstream/orchestrator/cmd.go | 29 +++++++++++++---------------- cmd/blobstream/relayer/cmd.go | 25 ++++++++++++------------- orchestrator/orchestrator.go | 12 ++++++++---- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/cmd/blobstream/orchestrator/cmd.go b/cmd/blobstream/orchestrator/cmd.go index 47f57178..bf4ea780 100644 --- a/cmd/blobstream/orchestrator/cmd.go +++ b/cmd/blobstream/orchestrator/cmd.go @@ -75,22 +75,30 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: false, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { + stopFuncs = append(stopFuncs, storeStops...) return err } @@ -110,25 +118,14 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // creating the broadcaster broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT) - if err != nil { - return err - } // creating the orchestrator orch := orchestrator.New( diff --git a/cmd/blobstream/relayer/cmd.go b/cmd/blobstream/relayer/cmd.go index 06c66322..2229d9a8 100644 --- a/cmd/blobstream/relayer/cmd.go +++ b/cmd/blobstream/relayer/cmd.go @@ -128,21 +128,28 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: true, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { return err } @@ -163,20 +170,12 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // connecting to a Blobstream contract ethClient, err := ethclient.Dial(config.EvmRPC) if err != nil { diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 02ac174d..51966b72 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + goerrors "errors" "fmt" "math/big" "strconv" @@ -82,7 +83,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error listening to new attestations", "err", err) cancel() } - orch.Logger.Error("stopping listening to new attestations") + orch.Logger.Info("stopping listening to new attestations") }() // go routine for processing nonces @@ -94,7 +95,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error processing attestations", "err", err) cancel() } - orch.Logger.Error("stopping processing attestations") + orch.Logger.Info("stopping processing attestations") }() // go routine for handling the previous attestation nonces @@ -137,8 +138,11 @@ func (orch Orchestrator) StartNewEventsListener( for { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): + if goerrors.Is(ctx.Err(), context.Canceled) { + return nil + } return ctx.Err() case <-ticker.C: running := orch.TmQuerier.IsRunning(ctx) @@ -217,7 +221,7 @@ func (orch Orchestrator) EnqueueMissingEvents( for i := uint64(0); i < latestNonce-uint64(earliestAttestationNonce)+1; i++ { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): return ctx.Err() default: