Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
chore: better resources closing for orchestrator and relayer (#638)
Browse files Browse the repository at this point in the history
* chore: better resources closing for orchestrator and relayer

* chore: remove unnecessary error check
  • Loading branch information
rach-id authored Dec 4, 2023
1 parent cc39d83 commit 7a9c3d7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
29 changes: 13 additions & 16 deletions cmd/blobstream/orchestrator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,30 @@ func Start() *cobra.Command {
defer cancel()

stopFuncs := make([]func() error, 0)
defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure)
stopFuncs = append(stopFuncs, stops...)
tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GRPCInsecure)
stopFuncs = append(stopFuncs, storeStops...)
if err != nil {
return err
}

s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
HasDataStore: true,
BadgerOptions: store.DefaultBadgerOptions(config.Home),
HasSignatureStore: false,
HasEVMKeyStore: true,
HasP2PKeyStore: true,
})
stopFuncs = append(stopFuncs, stops...)
if err != nil {
stopFuncs = append(stopFuncs, storeStops...)
return err
}

Expand All @@ -110,25 +118,14 @@ func Start() *cobra.Command {
return err
}
stopFuncs = append(stopFuncs, func() error { return dht.Close() })
stopFuncs = append(stopFuncs, storeStops...)

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)

defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

// creating the broadcaster
broadcaster := orchestrator.NewBroadcaster(p2pQuerier.BlobstreamDHT)
if err != nil {
return err
}

// creating the orchestrator
orch := orchestrator.New(
Expand Down
25 changes: 12 additions & 13 deletions cmd/blobstream/relayer/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,28 @@ func Start() *cobra.Command {
defer cancel()

stopFuncs := make([]func() error, 0)
defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

tmQuerier, appQuerier, stops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure)
stopFuncs = append(stopFuncs, stops...)
tmQuerier, appQuerier, storeStops, err := common.NewTmAndAppQuerier(logger, config.CoreRPC, config.CoreGRPC, config.GrpcInsecure)
stopFuncs = append(stopFuncs, storeStops...)
if err != nil {
return err
}

s, stops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
s, storeStops, err := common.OpenStore(logger, config.Home, store.OpenOptions{
HasDataStore: true,
BadgerOptions: store.DefaultBadgerOptions(config.Home),
HasSignatureStore: true,
HasEVMKeyStore: true,
HasP2PKeyStore: true,
})
stopFuncs = append(stopFuncs, stops...)
if err != nil {
return err
}
Expand All @@ -163,20 +170,12 @@ func Start() *cobra.Command {
return err
}
stopFuncs = append(stopFuncs, func() error { return dht.Close() })
stopFuncs = append(stopFuncs, storeStops...)

// creating the p2p querier
p2pQuerier := p2p.NewQuerier(dht, logger)
retrier := helpers.NewRetrier(logger, 6, time.Minute)

defer func() {
for _, f := range stopFuncs {
err := f()
if err != nil {
logger.Error(err.Error())
}
}
}()

// connecting to a Blobstream contract
ethClient, err := ethclient.Dial(config.EvmRPC)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package orchestrator

import (
"context"
goerrors "errors"
"fmt"
"math/big"
"strconv"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
orch.Logger.Error("error listening to new attestations", "err", err)
cancel()
}
orch.Logger.Error("stopping listening to new attestations")
orch.Logger.Info("stopping listening to new attestations")
}()

// go routine for processing nonces
Expand All @@ -94,7 +95,7 @@ func (orch Orchestrator) Start(ctx context.Context) {
orch.Logger.Error("error processing attestations", "err", err)
cancel()
}
orch.Logger.Error("stopping processing attestations")
orch.Logger.Info("stopping processing attestations")
}()

// go routine for handling the previous attestation nonces
Expand Down Expand Up @@ -137,8 +138,11 @@ func (orch Orchestrator) StartNewEventsListener(
for {
select {
case <-signalChan:
return ErrSignalChanNotif
return nil
case <-ctx.Done():
if goerrors.Is(ctx.Err(), context.Canceled) {
return nil
}
return ctx.Err()
case <-ticker.C:
running := orch.TmQuerier.IsRunning(ctx)
Expand Down Expand Up @@ -217,7 +221,7 @@ func (orch Orchestrator) EnqueueMissingEvents(
for i := uint64(0); i < latestNonce-uint64(earliestAttestationNonce)+1; i++ {
select {
case <-signalChan:
return ErrSignalChanNotif
return nil
case <-ctx.Done():
return ctx.Err()
default:
Expand Down

0 comments on commit 7a9c3d7

Please sign in to comment.