Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - provider load forwarded to provider optimizer #1759

Draft
wants to merge 67 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
7b8ea69
load rate report in trailer
Sep 30, 2024
4f7838a
fix trailer name
Sep 30, 2024
4fead99
merge main
Sep 30, 2024
ea1598b
fix lint
Sep 30, 2024
04f1762
fix load manager logic
Sep 30, 2024
eb2b345
fix lint
Sep 30, 2024
2c33935
fix spelling
Sep 30, 2024
46e6faf
fix logic
Sep 30, 2024
1f977ae
fixed flag & header names
Oct 1, 2024
6db7dd3
fix load provider manager and creation logic
Oct 1, 2024
b9e199b
fix logs for relay load rate
Oct 1, 2024
8b8a05a
fix rpcprovider server relay load handling
Oct 1, 2024
5e3b7a4
fix tests
Oct 1, 2024
199ff2c
fix typo
Oct 1, 2024
9faf8ef
fix init lava script
Oct 1, 2024
cc1af1e
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
56191f6
fix provider load manager
Oct 1, 2024
9eabb5a
fix provider server and load manager
Oct 1, 2024
ffdb986
fix lint - fix protocol test
Oct 1, 2024
19cc454
fix provider load manager applyProviderLoadMetadataToContextTrailer
Oct 1, 2024
b73d267
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 1, 2024
5a29efd
change cmdRPCProvider load rate flag to uint64
Oct 1, 2024
93c3220
try fix
Oct 1, 2024
eeb46ea
fix cmd flag reading
Oct 1, 2024
55221f6
adjusting uint64
ranlavanet Oct 2, 2024
e81afb9
fix redundent nil check in provider load manager
Oct 3, 2024
56c2b3f
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 3, 2024
725f40a
fix providerLoadManager per chain creation
Oct 3, 2024
6da0e99
rename and fix instance passing unnecessarily
ranlavanet Oct 3, 2024
9458d6c
fixed chainlib common formatting
Oct 6, 2024
44a5e5c
fix provider load manager comments
Oct 6, 2024
03e1b17
fix e2e tests
Oct 6, 2024
c4bc4ec
fix pr - unite add relay load and set trailer
Oct 6, 2024
2bd9032
fix common.go provider load header
Oct 6, 2024
a44f0ab
fix edge case of getProviderLoad
Oct 6, 2024
50dab3f
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 7, 2024
c88aee2
fix command flag description
Oct 8, 2024
30efbf1
fix command flag description
Oct 8, 2024
810db13
add metric for load rate
Oct 8, 2024
82f3020
fix division to be float and not uint
Oct 8, 2024
14d8c68
roll back init lava only with node two consumers
Oct 8, 2024
2d61289
fix load metric
Oct 8, 2024
280074b
merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
97f72ec
merge main
Oct 9, 2024
454db08
Update protocol/chainlib/common.go
Oct 9, 2024
78ed595
Merge branch 'main' into prt-add-provider-relay-load-trailer
Oct 9, 2024
63a0e89
Merge branch 'prt-add-provider-relay-load-trailer' of github.com:lava…
Oct 9, 2024
d4c7258
fix load calculation
ranlavanet Oct 10, 2024
273a32a
tidy code
ranlavanet Oct 10, 2024
6fb7276
changing rate limit to 1k
ranlavanet Oct 10, 2024
272e676
fix bug
ranlavanet Oct 10, 2024
42486ac
fix pr
ranlavanet Oct 10, 2024
9484f9f
Merge branch 'main' into prt-add-provider-relay-load-trailer
omerlavanet Oct 15, 2024
1974720
Merge branch 'prt-add-provider-relay-load-trailer' into PRT-provider-…
ranlavanet Oct 27, 2024
a42cfdd
WIP
ranlavanet Oct 27, 2024
ecd416c
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
974dbcb
v4
ranlavanet Oct 27, 2024
258578f
Merge branch 'main' into prt-add-provider-relay-load-trailer
ranlavanet Oct 27, 2024
2f5241d
wip 2
ranlavanet Oct 27, 2024
feeb373
fix pr
ranlavanet Oct 27, 2024
a2b4de2
Merge branch 'prt-add-provider-relay-load-trailer' into PRT-provider-…
ranlavanet Oct 27, 2024
39f439f
WIP3
ranlavanet Oct 27, 2024
51894ce
fix
ranlavanet Oct 27, 2024
1c98ff7
Merge branch 'prt-add-provider-relay-load-trailer' into PRT-provider-…
ranlavanet Oct 27, 2024
93a7b78
forwarding provider load to optimizer
ranlavanet Oct 27, 2024
4f04611
wip
ranlavanet Oct 27, 2024
4280c67
append relay data - WIP
ranlavanet Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ const (
relayMsgLogMaxChars = 200
RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash"
RPCProviderNodeExtension = "Lava-Provider-Node-Extension"
RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate"
RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id"
WebSocketExtension = "websocket"
)

var (
TrailersToAddToHeaderResponse = []string{RPCProviderNodeExtension, RpcProviderLoadRateHeader}
InvalidResponses = []string{"null", "", "nil", "undefined"}
FailedSendingSubscriptionToClients = sdkerrors.New("failed Sending Subscription To Clients", 1015, "Failed Sending Subscription To Clients connection might have been closed by the user")
NoActiveSubscriptionFound = sdkerrors.New("failed finding an active subscription on provider side", 1016, "no active subscriptions for hashed params.")
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st
// Check API is supported and save it in nodeMsg.
apiCont, err := apip.getSupportedApi(url, connectionType)
if err != nil {
return nil, utils.LavaFormatError("failed to getSupportedApi gRPC", err)
return nil, utils.LavaFormatError("failed to getSupportedApi gRPC", err, utils.LogAttr("url", url))
}

apiCollection, err := apip.getApiCollection(connectionType, apiCont.collectionKey.InternalPath, apiCont.collectionKey.Addon)
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
// websocket flags
RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection"
BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded"
RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc
chainTracker.StartAndServe(ctx)
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser)
mockReliabilityManager := NewMockReliabilityManager(reliabilityManager)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false)
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil)
listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health")
err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,8 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
}
cuToDecrease := consumerSession.LatestRelayCu
// latency, isHangingApi, syncScore aren't updated when there is a failure
go csm.providerOptimizer.AppendRelayFailure(consumerSession.Parent.PublicLavaAddress)

go csm.providerOptimizer.AppendRelayFailure(consumerSession.Parent.PublicLavaAddress, consumerSession.GetProviderLoad())
consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way
consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors))
parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking
Expand Down Expand Up @@ -1042,7 +1043,8 @@ func (csm *ConsumerSessionManager) OnSessionDone(
consumerSession.LatestBlock = latestServicedBlock // update latest serviced block
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))

go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock), consumerSession.GetProviderLoad())
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ type ConsumerSessionsMap map[string]*SessionInfo

type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
AppendRelayFailure(providerAddress string, providerLoad *provideroptimizer.ProviderLoadReport)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, providerLoad *provideroptimizer.ProviderLoadReport)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
Expand Down
51 changes: 44 additions & 7 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/v4/protocol/provideroptimizer"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
)
Expand All @@ -21,13 +22,49 @@
RelayNum uint64
LatestBlock int64
// Each session will holds a pointer to a connection, if the connection is lost, this session will be banned (wont be picked)
EndpointConnection *EndpointConnection
BlockListed bool // if session lost sync we blacklist it.
ConsecutiveErrors []error
errorsCount uint64
relayProcessor UsedProvidersInf
providerUniqueId string
StaticProvider bool
EndpointConnection *EndpointConnection
BlockListed bool // if session lost sync we blacklist it.
ConsecutiveErrors []error
errorsCount uint64
relayProcessor UsedProvidersInf
providerUniqueId string
StaticProvider bool
latestKnownLoadReport *provideroptimizer.ProviderLoadReport
}

// should only be called when locked, returning a copy of the object
func (cs *SingleConsumerSession) GetProviderLoad() *provideroptimizer.ProviderLoadReport {
// create new provider load pointer so we can read it later without locks
var providerLoadReport *provideroptimizer.ProviderLoadReport
if cs.latestKnownLoadReport != nil {
providerLoadReport = &provideroptimizer.ProviderLoadReport{
ProviderLoad: cs.latestKnownLoadReport.ProviderLoad,
TimeStamp: cs.latestKnownLoadReport.TimeStamp,
}
}
return providerLoadReport
}

// should only be called when locked.
func (cs *SingleConsumerSession) SetLoadReport(loadReport []string) {
if len(loadReport) <= 0 {

Check failure on line 50 in protocol/lavasession/single_consumer_session.go

View workflow job for this annotation

GitHub Actions / lint

sloppyLen: len(loadReport) <= 0 can be len(loadReport) == 0 (gocritic)
// no load report
return
}
load := loadReport[0]
floatLoad, err := strconv.ParseFloat(load, 64)
if err != nil {
utils.LavaFormatWarning("Failed parsing load report from provider", err, utils.LogAttr("load_reported", loadReport))
return
}
if floatLoad == 0 {
// Provider did not set his max load options or has 0 load.
return
}
cs.latestKnownLoadReport = &provideroptimizer.ProviderLoadReport{
TimeStamp: time.Now(),
ProviderLoad: floatLoad,
}
}

// returns the expected latency to a threshold.
Expand Down
10 changes: 10 additions & 0 deletions protocol/metrics/provider_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ProviderMetrics struct {
totalRelaysServicedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
consumerQoSMetric *prometheus.GaugeVec
loadRateMetric *prometheus.GaugeVec
}

func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pairingtypes.QualityOfServiceReport) {
Expand Down Expand Up @@ -49,6 +50,13 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair
}
}

func (pm *ProviderMetrics) SetLoadRate(loadRate float64) {
if pm == nil {
return
}
pm.loadRateMetric.WithLabelValues(pm.specID).Set(loadRate)
}

func (pm *ProviderMetrics) AddPayment(cu uint64) {
if pm == nil {
return
Expand All @@ -72,6 +80,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric *prometheus.CounterVec,
totalErroredMetric *prometheus.CounterVec,
consumerQoSMetric *prometheus.GaugeVec,
loadRateMetric *prometheus.GaugeVec,
) *ProviderMetrics {
pm := &ProviderMetrics{
specID: specID,
Expand All @@ -82,6 +91,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom
totalRelaysServicedMetric: totalRelaysServicedMetric,
totalErroredMetric: totalErroredMetric,
consumerQoSMetric: consumerQoSMetric,
loadRateMetric: loadRateMetric,
}
return pm
}
10 changes: 9 additions & 1 deletion protocol/metrics/provider_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProviderMetricsManager struct {
endpointsHealthChecksOk uint64
relaysMonitors map[string]*RelaysMonitor
relaysMonitorsLock sync.RWMutex
loadRateMetric *prometheus.GaugeVec
}

func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Expand Down Expand Up @@ -107,6 +108,11 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Help: "The total number of get latest block queries that succeeded by chainfetcher",
}, []string{"spec"})

loadRateMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_load_rate",
Help: "The load rate according to the load rate limit - Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.",
}, []string{"spec"})

fetchBlockSuccessMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_provider_fetch_block_success",
Help: "The total number of get specific block queries that succeeded by chainfetcher",
Expand Down Expand Up @@ -141,6 +147,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(loadRateMetric)

providerMetricsManager := &ProviderMetricsManager{
providerMetrics: map[string]*ProviderMetrics{},
Expand All @@ -161,6 +168,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
relaysMonitors: map[string]*RelaysMonitor{},
loadRateMetric: loadRateMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -209,7 +217,7 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin
}

if pme.getProviderMetric(specID, apiInterface) == nil {
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric)
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric)
pme.setProviderMetric(providerMetric)

endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface)
Expand Down
21 changes: 16 additions & 5 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,19 @@ type Exploration struct {
time time.Time
}

type ProviderLoadReport struct {
ProviderLoad float64 // float describing load set by the provider can go above 1.0.
TimeStamp time.Time
}

type ProviderData struct {
Availability score.ScoreStore // will be used to calculate the probability of error
Latency score.ScoreStore // will be used to calculate the latency score
Sync score.ScoreStore // will be used to calculate the sync score for spectypes.LATEST_BLOCK/spectypes.NOT_APPLICABLE requests
SyncBlock uint64 // will be used to calculate the probability of block error
LatencyRaw score.ScoreStore // will be used when reporting reputation to the node (Latency = LatencyRaw / baseLatency)
SyncRaw score.ScoreStore // will be used when reporting reputation to the node (Sync = SyncRaw / baseSync)
ProviderLoad *ProviderLoadReport
}

type Strategy int
Expand All @@ -101,17 +107,22 @@ func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64, epoch uint6
}
}

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
// TODO forward load also on relay failure
func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string, providerLoad *ProviderLoadReport) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now(), providerLoad)
}

func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) {
po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now())
func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, providerLoad *ProviderLoadReport) {
po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now(), providerLoad)
}

func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time) {
func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time, providerLoad *ProviderLoadReport) {
latestSync, timeSync := po.updateLatestSyncData(syncBlock, sampleTime)
providerData, _ := po.getProviderData(providerAddress)
// set current provider load only if incoming data is more fresh than previous stored data
if providerLoad != nil && (providerData.ProviderLoad == nil || providerLoad.TimeStamp.After(providerData.ProviderLoad.TimeStamp)) {
providerData.ProviderLoad = providerLoad
}
halfTime := po.calculateHalfTime(providerAddress, sampleTime)
providerData = po.updateProbeEntryAvailability(providerData, success, RELAY_UPDATE_WEIGHT, halfTime, sampleTime)
if success {
Expand Down
Loading
Loading