Skip to content

Commit

Permalink
chore: refactor state query access (#1766)
Browse files Browse the repository at this point in the history
* refactor state query access

* remove direct usage of client.Context to allow the rewiring of lava over lava

* refactor rpcconsumer, allow creating a server with a function

* lint

* added custom lava transport
  • Loading branch information
omerlavanet authored Dec 1, 2024
1 parent ad24517 commit fcfbef4
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 237 deletions.
5 changes: 3 additions & 2 deletions ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type LavaVisorStateTracker struct {

func NewLavaVisorStateTracker(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, chainFetcher chaintracker.ChainFetcher) (lvst *LavaVisorStateTracker, err error) {
// validate chainId
status, err := clientCtx.Client.Status(ctx)
stateQuery := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx))
status, err := stateQuery.Status(ctx)
if err != nil {
return nil, utils.LavaFormatError("[Lavavisor] failed getting status", err)
}
Expand All @@ -36,7 +37,7 @@ func NewLavaVisorStateTracker(ctx context.Context, txFactory tx.Factory, clientC
if err != nil {
utils.LavaFormatFatal("chain is missing Lava spec, cant initialize lavavisor", err)
}
lst := &LavaVisorStateTracker{stateQuery: updaters.NewStateQuery(ctx, clientCtx), averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond}
lst := &LavaVisorStateTracker{stateQuery: stateQuery, averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond}
return lst, nil
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/badgegenerator/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c
emergencyTracker, blockNotFoundCallback := statetracker.NewEmergencyTracker(nil)
txFactory := tx.Factory{}
txFactory = txFactory.WithChainID(chainId)
stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback)
sq := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx))
stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, sq, chainFetcher, blockNotFoundCallback)
if err != nil {
return nil, err
}
sq := updaters.NewStateQuery(ctx, clientCtx)
esq := updaters.NewEpochStateQuery(sq)

pst := &BadgeStateTracker{StateTracker: stateTrackerBase, stateQuery: esq, ConsumerEmergencyTrackerInf: emergencyTracker}
Expand Down
6 changes: 3 additions & 3 deletions protocol/badgeserver/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c
emergencyTracker, blockNotFoundCallback := statetracker.NewEmergencyTracker(nil)
txFactory := tx.Factory{}
txFactory = txFactory.WithChainID(chainId)
stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, clientCtx, chainFetcher, blockNotFoundCallback)
stateQuery := updaters.NewStateQuery(ctx, updaters.NewStateQueryAccessInst(clientCtx))
stateTrackerBase, err := statetracker.NewStateTracker(ctx, txFactory, stateQuery, chainFetcher, blockNotFoundCallback)
if err != nil {
return nil, err
}
stateTracker := updaters.NewStateQuery(ctx, clientCtx)
epochStateTracker := updaters.NewEpochStateQuery(stateTracker)
epochStateTracker := updaters.NewEpochStateQuery(stateQuery)

badgeStateTracker := &BadgeStateTracker{
StateTracker: stateTrackerBase,
Expand Down
23 changes: 23 additions & 0 deletions protocol/rpcconsumer/custom_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rpcconsumer

import (
"net/http"
)

type CustomLavaTransport struct {
transport http.RoundTripper
}

func NewCustomLavaTransport(httpTransport http.RoundTripper) *CustomLavaTransport {
return &CustomLavaTransport{transport: httpTransport}
}

func (c *CustomLavaTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// Custom logic before the request

// Delegate to the underlying RoundTripper (usually http.Transport)
resp, err := c.transport.RoundTrip(req)

// Custom logic after the request
return resp, err
}
267 changes: 154 additions & 113 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
"sync"
"time"

rpchttp "github.com/cometbft/cometbft/rpc/client/http"
jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/config"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/tx"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/lavanet/lava/v4/app"
"github.com/lavanet/lava/v4/protocol/chainlib"
"github.com/lavanet/lava/v4/protocol/common"
Expand Down Expand Up @@ -151,7 +154,16 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
if err != nil {
utils.LavaFormatFatal("failed creating RPCConsumer logs", err)
}

consumerMetricsManager.SetVersion(upgrade.GetCurrentVersion().ConsumerVersion)
httpClient, err := jsonrpcclient.DefaultHTTPClient(options.clientCtx.NodeURI)
if err == nil {
httpClient.Transport = NewCustomLavaTransport(httpClient.Transport)
client, err := rpchttp.NewWithClient(options.clientCtx.NodeURI, "/websocket", httpClient)
if err == nil {
options.clientCtx = options.clientCtx.WithClient(client)
}
}

// spawn up ConsumerStateTracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx)
Expand All @@ -161,6 +173,8 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
}
rpcc.consumerStateTracker = consumerStateTracker

lavaChainFetcher.FetchLatestBlockNum(ctx)

lavaChainID := options.clientCtx.ChainID
keyName, err := sigs.GetKeyName(options.clientCtx)
if err != nil {
Expand Down Expand Up @@ -213,119 +227,11 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
for _, rpcEndpoint := range options.rpcEndpoints {
go func(rpcEndpoint *lavasession.RPCEndpoint) error {
defer wg.Done()
chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface)
if err != nil {
err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}
chainID := rpcEndpoint.ChainID
// create policyUpdaters per chain
newPolicyUpdater := updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint)
policyUpdater, ok, err := policyUpdaters.LoadOrStore(chainID, newPolicyUpdater)
if err != nil {
errCh <- err
return utils.LavaFormatError("failed loading or storing policy updater", err, utils.LogAttr("endpoint", rpcEndpoint))
}
if ok {
err := policyUpdater.AddPolicySetter(chainParser, *rpcEndpoint)
if err != nil {
errCh <- err
return utils.LavaFormatError("failed adding policy setter", err)
}
}

err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker)
if err != nil {
err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}

_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
var optimizer *provideroptimizer.ProviderOptimizer
var consumerConsistency *ConsumerConsistency
var finalizationConsensus *finalizationconsensus.FinalizationConsensus
getOrCreateChainAssets := func() error {
// this is locked so we don't race optimizers creation
chainMutexes[chainID].Lock()
defer chainMutexes[chainID].Unlock()
var loaded bool
var err error

baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better

// Create / Use existing optimizer
newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID)
optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer)
if err != nil {
return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

if !loaded {
// if this is a new optimizer, register it in the consumerOptimizerQoSClient
consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID)
}

// Create / Use existing ConsumerConsistency
newConsumerConsistency := NewConsumerConsistency(chainID)
consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency)
if err != nil {
return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

// Create / Use existing FinalizationConsensus
newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus)
if err != nil {
return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}
if !loaded { // when creating new finalization consensus instance we need to register it to updates
consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus)
}
return nil
}
err = getOrCreateChainAssets()
if err != nil {
errCh <- err
return err
}

if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil {
err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()})
errCh <- err
return err
}

// Create active subscription provider storage for each unique chain
activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage()
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage)
// Register For Updates
rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList)

var relaysMonitor *metrics.RelaysMonitor
if options.cmdFlags.RelaysHealthEnableFlag {
relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface)
relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor)
}

rpcConsumerServer := &RPCConsumerServer{}

var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager
var specMethodType string
if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC {
specMethodType = http.MethodPost
}
consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager)

utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()})
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager)
if err != nil {
err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return err
}
return nil
_, err := rpcc.CreateConsumerEndpoint(ctx, rpcEndpoint, errCh, consumerAddr, consumerStateTracker,
policyUpdaters, optimizers, consumerConsistencies, finalizationConsensuses, chainMutexes,
options, privKey, lavaChainID, rpcConsumerMetrics, consumerReportsManager, consumerOptimizerQoSClient,
consumerMetricsManager, relaysMonitorAggregator)
return err
}(rpcEndpoint)
}

Expand Down Expand Up @@ -361,6 +267,141 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt
return nil
}

func (rpcc *RPCConsumer) CreateConsumerEndpoint(
ctx context.Context,
rpcEndpoint *lavasession.RPCEndpoint,
errCh chan error,
consumerAddr sdk.AccAddress,
consumerStateTracker *statetracker.ConsumerStateTracker,
policyUpdaters *common.SafeSyncMap[string, *updaters.PolicyUpdater],
optimizers *common.SafeSyncMap[string, *provideroptimizer.ProviderOptimizer],
consumerConsistencies *common.SafeSyncMap[string, *ConsumerConsistency],
finalizationConsensuses *common.SafeSyncMap[string, *finalizationconsensus.FinalizationConsensus],
chainMutexes map[string]*sync.Mutex,
options *rpcConsumerStartOptions,
privKey *secp256k1.PrivateKey,
lavaChainID string,
rpcConsumerMetrics *metrics.RPCConsumerLogs,
consumerReportsManager *metrics.ConsumerReportsClient,
consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient,
consumerMetricsManager *metrics.ConsumerMetricsManager,
relaysMonitorAggregator *metrics.RelaysMonitorAggregator,
) (*RPCConsumerServer, error) {
chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface)
if err != nil {
err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return nil, err
}
chainID := rpcEndpoint.ChainID
// create policyUpdaters per chain
newPolicyUpdater := updaters.NewPolicyUpdater(chainID, consumerStateTracker, consumerAddr.String(), chainParser, *rpcEndpoint)
policyUpdater, ok, err := policyUpdaters.LoadOrStore(chainID, newPolicyUpdater)
if err != nil {
errCh <- err
return nil, utils.LavaFormatError("failed loading or storing policy updater", err, utils.LogAttr("endpoint", rpcEndpoint))
}
if ok {
err := policyUpdater.AddPolicySetter(chainParser, *rpcEndpoint)
if err != nil {
errCh <- err
return nil, utils.LavaFormatError("failed adding policy setter", err)
}
}

err = statetracker.RegisterForSpecUpdatesOrSetStaticSpec(ctx, chainParser, options.cmdFlags.StaticSpecPath, *rpcEndpoint, rpcc.consumerStateTracker)
if err != nil {
err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return nil, err
}

_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
var optimizer *provideroptimizer.ProviderOptimizer
var consumerConsistency *ConsumerConsistency
var finalizationConsensus *finalizationconsensus.FinalizationConsensus
getOrCreateChainAssets := func() error {
// this is locked so we don't race optimizers creation
chainMutexes[chainID].Lock()
defer chainMutexes[chainID].Unlock()
var loaded bool
var err error

baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better

// Create / Use existing optimizer
newOptimizer := provideroptimizer.NewProviderOptimizer(options.strategy, averageBlockTime, baseLatency, options.maxConcurrentProviders, consumerOptimizerQoSClient, chainID)
optimizer, loaded, err = optimizers.LoadOrStore(chainID, newOptimizer)
if err != nil {
return utils.LavaFormatError("failed loading optimizer", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

if !loaded {
// if this is a new optimizer, register it in the consumerOptimizerQoSClient
consumerOptimizerQoSClient.RegisterOptimizer(optimizer, chainID)
}

// Create / Use existing ConsumerConsistency
newConsumerConsistency := NewConsumerConsistency(chainID)
consumerConsistency, _, err = consumerConsistencies.LoadOrStore(chainID, newConsumerConsistency)
if err != nil {
return utils.LavaFormatError("failed loading consumer consistency", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}

// Create / Use existing FinalizationConsensus
newFinalizationConsensus := finalizationconsensus.NewFinalizationConsensus(rpcEndpoint.ChainID)
finalizationConsensus, loaded, err = finalizationConsensuses.LoadOrStore(chainID, newFinalizationConsensus)
if err != nil {
return utils.LavaFormatError("failed loading finalization consensus", err, utils.LogAttr("endpoint", rpcEndpoint.Key()))
}
if !loaded { // when creating new finalization consensus instance we need to register it to updates
consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus)
}
return nil
}
err = getOrCreateChainAssets()
if err != nil {
errCh <- err
return nil, err
}

if finalizationConsensus == nil || optimizer == nil || consumerConsistency == nil {
err = utils.LavaFormatError("failed getting assets, found a nil", nil, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()})
errCh <- err
return nil, err
}

// Create active subscription provider storage for each unique chain
activeSubscriptionProvidersStorage := lavasession.NewActiveSubscriptionProvidersStorage()
consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager, consumerAddr.String(), activeSubscriptionProvidersStorage)
// Register For Updates
rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager, options.staticProvidersList)

var relaysMonitor *metrics.RelaysMonitor
if options.cmdFlags.RelaysHealthEnableFlag {
relaysMonitor = metrics.NewRelaysMonitor(options.cmdFlags.RelaysHealthIntervalFlag, rpcEndpoint.ChainID, rpcEndpoint.ApiInterface)
relaysMonitorAggregator.RegisterRelaysMonitor(rpcEndpoint.String(), relaysMonitor)
}

rpcConsumerServer := &RPCConsumerServer{}

var consumerWsSubscriptionManager *chainlib.ConsumerWSSubscriptionManager
var specMethodType string
if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC {
specMethodType = http.MethodPost
}
consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager)

utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()})
err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager)
if err != nil {
err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint})
errCh <- err
return nil, err
}
return rpcConsumerServer, nil
}

func ParseEndpoints(viper_endpoints *viper.Viper, geolocation uint64) (endpoints []*lavasession.RPCEndpoint, err error) {
err = viper_endpoints.UnmarshalKey(common.EndpointsConfigName, &endpoints)
if err != nil {
Expand Down
Loading

0 comments on commit fcfbef4

Please sign in to comment.