From 9410b67e60b7e526e1fde0ba1141bc8e71097f47 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 2 Dec 2023 14:34:12 +0100 Subject: [PATCH 1/2] chore: better resources closing for orchestrator and relayer --- cmd/blobstream/orchestrator/cmd.go | 26 +++++++++++++------------- cmd/blobstream/relayer/cmd.go | 25 ++++++++++++------------- orchestrator/orchestrator.go | 12 ++++++++---- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/cmd/blobstream/orchestrator/cmd.go b/cmd/blobstream/orchestrator/cmd.go index 47f57178..f016f32e 100644 --- a/cmd/blobstream/orchestrator/cmd.go +++ b/cmd/blobstream/orchestrator/cmd.go @@ -75,22 +75,30 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: false, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { + stopFuncs = append(stopFuncs, storeStops...) return err } @@ -110,20 +118,12 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // creating the broadcaster broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT) if err != nil { diff --git a/cmd/blobstream/relayer/cmd.go b/cmd/blobstream/relayer/cmd.go index 06c66322..2229d9a8 100644 --- a/cmd/blobstream/relayer/cmd.go +++ b/cmd/blobstream/relayer/cmd.go @@ -128,21 +128,28 @@ func Start() *cobra.Command { defer cancel() stopFuncs := make([]func() error, 0) + defer func() { + for _, f := range stopFuncs { + err := f() + if err != nil { + logger.Error(err.Error()) + } + } + }() - tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) - stopFuncs = append(stopFuncs, stops...) + tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure) + stopFuncs = append(stopFuncs, storeStops...) if err != nil { return err } - s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ + s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{ HasDataStore: true, BadgerOptions: store.DefaultBadgerOptions(config.Home), HasSignatureStore: true, HasEVMKeyStore: true, HasP2PKeyStore: true, }) - stopFuncs = append(stopFuncs, stops...) if err != nil { return err } @@ -163,20 +170,12 @@ func Start() *cobra.Command { return err } stopFuncs = append(stopFuncs, func() error { return dht.Close() }) + stopFuncs = append(stopFuncs, storeStops...) // creating the p2p querier p2pQuerier := p2p.NewQuerier(dht, logger) retrier := helpers.NewRetrier(logger, 6, time.Minute) - defer func() { - for _, f := range stopFuncs { - err := f() - if err != nil { - logger.Error(err.Error()) - } - } - }() - // connecting to a Blobstream contract ethClient, err := ethclient.Dial(config.EvmRPC) if err != nil { diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 02ac174d..51966b72 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + goerrors "errors" "fmt" "math/big" "strconv" @@ -82,7 +83,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error listening to new attestations", "err", err) cancel() } - orch.Logger.Error("stopping listening to new attestations") + orch.Logger.Info("stopping listening to new attestations") }() // go routine for processing nonces @@ -94,7 +95,7 @@ func (orch Orchestrator) Start(ctx context.Context) { orch.Logger.Error("error processing attestations", "err", err) cancel() } - orch.Logger.Error("stopping processing attestations") + orch.Logger.Info("stopping processing attestations") }() // go routine for handling the previous attestation nonces @@ -137,8 +138,11 @@ func (orch Orchestrator) StartNewEventsListener( for { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): + if goerrors.Is(ctx.Err(), context.Canceled) { + return nil + } return ctx.Err() case <-ticker.C: running := orch.TmQuerier.IsRunning(ctx) @@ -217,7 +221,7 @@ func (orch Orchestrator) EnqueueMissingEvents( for i := uint64(0); i < latestNonce-uint64(earliestAttestationNonce)+1; i++ { select { case <-signalChan: - return ErrSignalChanNotif + return nil case <-ctx.Done(): return ctx.Err() default: From 1dbf2a3049eb9435d11292db739b80134025e7af Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 2 Dec 2023 14:57:49 +0100 Subject: [PATCH 2/2] chore: remove unnecessary error check --- cmd/blobstream/orchestrator/cmd.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/blobstream/orchestrator/cmd.go b/cmd/blobstream/orchestrator/cmd.go index f016f32e..bf4ea780 100644 --- a/cmd/blobstream/orchestrator/cmd.go +++ b/cmd/blobstream/orchestrator/cmd.go @@ -126,9 +126,6 @@ func Start() *cobra.Command { // creating the broadcaster broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT) - if err != nil { - return err - } // creating the orchestrator orch := orchestrator.New(