Skip to content

Commit

Permalink
fix: PRT - Avalance and Starknet fixes (#1796)
Browse files Browse the repository at this point in the history
* Fix the chain router for avalanche

* Fix ethereum GET_BLOCKNUM

* Fix avalanche spec

* More fixes to the chain fetcher

* Updated the starknet example

* Small fix

* Fix jsonrpc id in e2e proxy

* Fixed and added some tests for internal paths

* Ethereum fixes

* Revert "Ethereum fixes"

This reverts commit 17f9f6d.

* Revert "Fix jsonrpc id in e2e proxy"

This reverts commit 796a4d5.

* Revert "Fix ethereum GET_BLOCKNUM"

This reverts commit 549cd83.

* Extracted to function

* Small fix

* Fix the proxy websocket handler

* Fix lint
  • Loading branch information
shleikes authored Nov 26, 2024
1 parent bf8c706 commit 13835ed
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 85 deletions.
3 changes: 2 additions & 1 deletion config/provider_examples/avalanch_internal_paths_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
endpoints:
- api-interface: jsonrpc
chain-id: AVAX
network-address: 127.0.0.1:2221
network-address:
address: 127.0.0.1:2221
node-urls:
- url: ws://127.0.0.1:3333/C/rpc/ws
internal-path: "/C/rpc" # c chain like specified in the spec
Expand Down
8 changes: 6 additions & 2 deletions config/provider_examples/strk_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ endpoints:
address: "127.0.0.1:2220"
node-urls:
- url: <wss url here>/ws
internal-path: ""
internal-path: "/ws"
- url: <wss url here>/ws/rpc/v0_6
internal-path: "/rpc/v0_6"
internal-path: "/ws/rpc/v0_6"
- url: <wss url here>/ws/rpc/v0_7
internal-path: "/ws/rpc/v0_7"

- url: <https url here>
internal-path: ""
- url: <https url here>/rpc/v0_5
internal-path: "/rpc/v0_5"
- url: <https url here>/rpc/v0_6
internal-path: "/rpc/v0_6"
- url: <https url here>/rpc/v0_7
internal-path: "/rpc/v0_7"
12 changes: 6 additions & 6 deletions cookbook/specs/avalanche.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
"data_reliability_enabled": true,
"block_distance_for_finalized_data": 4,
"blocks_in_finalization_proof": 3,
"average_block_time": 2500,
"allowed_block_lag_for_qos_sync": 4,
"average_block_time": 2000,
"allowed_block_lag_for_qos_sync": 5,
"imports": [
"ETH1"
],
"shares": 1,
"min_stake_provider": {
"denom": "ulava",
"amount": "47500000000"
"amount": "50000000"
},
"api_collections": [
{
Expand Down Expand Up @@ -735,12 +735,12 @@
"data_reliability_enabled": true,
"block_distance_for_finalized_data": 4,
"blocks_in_finalization_proof": 3,
"average_block_time": 2500,
"allowed_block_lag_for_qos_sync": 4,
"average_block_time": 2000,
"allowed_block_lag_for_qos_sync": 5,
"shares": 1,
"min_stake_provider": {
"denom": "ulava",
"amount": "47500000000"
"amount": "50000000"
},
"api_collections": [
{
Expand Down
46 changes: 40 additions & 6 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lavanet/lava/v4/protocol/common"
"github.com/lavanet/lava/v4/utils"
"github.com/lavanet/lava/v4/utils/lavaslices"
"github.com/lavanet/lava/v4/utils/maps"
epochstorage "github.com/lavanet/lava/v4/x/epochstorage/types"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
Expand All @@ -22,8 +23,16 @@ type PolicyInf interface {
GetSupportedExtensions(specID string) (extensions []epochstorage.EndpointService, err error)
}

type InternalPath struct {
Path string
Enabled bool
ApiInterface string
ConnectionType string
Addon string
}

type BaseChainParser struct {
internalPaths map[string]struct{}
internalPaths map[string]InternalPath
taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer
spec spectypes.Spec
rwLock sync.RWMutex
Expand Down Expand Up @@ -231,7 +240,7 @@ func (bcp *BaseChainParser) GetVerifications(supported []string, internalPath st
return retVerifications, nil
}

func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[string]struct{}, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer,
func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[string]InternalPath, taggedApis map[spectypes.FUNCTION_TAG]TaggedContainer,
serverApis map[ApiKey]ApiContainer, apiCollections map[CollectionKey]*spectypes.ApiCollection, headers map[ApiKey]*spectypes.Header,
verifications map[VerificationKey]map[string][]VerificationContainer,
) {
Expand Down Expand Up @@ -280,7 +289,16 @@ func (bcp *BaseChainParser) IsTagInCollection(tag spectypes.FUNCTION_TAG, collec
func (bcp *BaseChainParser) GetAllInternalPaths() []string {
bcp.rwLock.RLock()
defer bcp.rwLock.RUnlock()
return lavaslices.KeysSlice(bcp.internalPaths)
return lavaslices.Map(maps.ValuesSlice(bcp.internalPaths), func(internalPath InternalPath) string {
return internalPath.Path
})
}

func (bcp *BaseChainParser) IsInternalPathEnabled(internalPath string, apiInterface string, addon string) bool {
bcp.rwLock.RLock()
defer bcp.rwLock.RUnlock()
internalPathObj, ok := bcp.internalPaths[internalPath]
return ok && internalPathObj.Enabled && internalPathObj.ApiInterface == apiInterface && internalPathObj.Addon == addon
}

func (bcp *BaseChainParser) ExtensionParsing(addon string, parsedMessageArg *baseChainMessageContainer, extensionInfo extensionslib.ExtensionInfo) {
Expand Down Expand Up @@ -370,8 +388,18 @@ func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addo
return api, nil
}

func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths map[string]struct{}, retServerApis map[ApiKey]ApiContainer, retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer, retApiCollections map[CollectionKey]*spectypes.ApiCollection, retHeaders map[ApiKey]*spectypes.Header, retVerifications map[VerificationKey]map[string][]VerificationContainer) {
retInternalPaths = map[string]struct{}{}
func getServiceApis(
spec spectypes.Spec,
rpcInterface string,
) (
retInternalPaths map[string]InternalPath,
retServerApis map[ApiKey]ApiContainer,
retTaggedApis map[spectypes.FUNCTION_TAG]TaggedContainer,
retApiCollections map[CollectionKey]*spectypes.ApiCollection,
retHeaders map[ApiKey]*spectypes.Header,
retVerifications map[VerificationKey]map[string][]VerificationContainer,
) {
retInternalPaths = map[string]InternalPath{}
serverApis := map[ApiKey]ApiContainer{}
taggedApis := map[spectypes.FUNCTION_TAG]TaggedContainer{}
headers := map[ApiKey]*spectypes.Header{}
Expand All @@ -392,7 +420,13 @@ func getServiceApis(spec spectypes.Spec, rpcInterface string) (retInternalPaths
}

// add as a valid internal path
retInternalPaths[apiCollection.CollectionData.InternalPath] = struct{}{}
retInternalPaths[apiCollection.CollectionData.InternalPath] = InternalPath{
Path: apiCollection.CollectionData.InternalPath,
Enabled: apiCollection.Enabled,
ApiInterface: apiCollection.CollectionData.ApiInterface,
ConnectionType: apiCollection.CollectionData.Type,
Addon: apiCollection.CollectionData.AddOn,
}

for _, parsing := range apiCollection.ParseDirectives {
taggedApis[parsing.FunctionTag] = TaggedContainer{
Expand Down
35 changes: 32 additions & 3 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ func (cf *ChainFetcher) populateCache(relayData *pairingtypes.RelayPrivateData,
}
}

func getExtensionsForVerification(verification VerificationContainer, chainParser ChainParser) []string {
extensions := []string{verification.Extension}

collectionKey := CollectionKey{
InternalPath: verification.InternalPath,
Addon: verification.Addon,
ConnectionType: verification.ConnectionType,
}

if chainParser.IsTagInCollection(spectypes.FUNCTION_TAG_SUBSCRIBE, collectionKey) {
if verification.Extension == "" {
extensions = []string{WebSocketExtension}
} else {
extensions = append(extensions, WebSocketExtension)
}
}

return extensions
}

func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationContainer, latestBlock uint64) error {
parsing := &verification.ParseDirective

Expand Down Expand Up @@ -173,12 +193,21 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon
return utils.LavaFormatError("[-] verify failed creating chainMessage", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}

reply, _, _, proxyUrl, chainId, err := cf.chainRouter.SendNodeMsg(ctx, nil, chainMessage, []string{verification.Extension})
extensions := getExtensionsForVerification(verification, cf.chainParser)

reply, _, _, proxyUrl, chainId, err := cf.chainRouter.SendNodeMsg(ctx, nil, chainMessage, extensions)
if err != nil {
return utils.LavaFormatWarning("[-] verify failed sending chainMessage", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
return utils.LavaFormatWarning("[-] verify failed sending chainMessage", err,
utils.LogAttr("chainID", cf.endpoint.ChainID),
utils.LogAttr("APIInterface", cf.endpoint.ApiInterface),
utils.LogAttr("extensions", extensions),
)
}
if reply == nil || reply.RelayReply == nil {
return utils.LavaFormatWarning("[-] verify failed sending chainMessage, reply or reply.RelayReply are nil", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
return utils.LavaFormatWarning("[-] verify failed sending chainMessage, reply or reply.RelayReply are nil", nil,
utils.LogAttr("chainID", cf.endpoint.ChainID),
utils.LogAttr("APIInterface", cf.endpoint.ApiInterface),
)
}

parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
Expand Down
69 changes: 39 additions & 30 deletions protocol/chainlib/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ func (cri *chainRouterImpl) GetChainProxySupporting(ctx context.Context, addon s
)
}

func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool {
routerKey := lavasession.NewRouterKey(extensions).String()
_, ok := cri.chainProxyRouter[routerKey]
func (cri chainRouterImpl) ExtensionsSupported(internalPath string, extensions []string) bool {
routerKey := lavasession.NewRouterKey(extensions)
routerKey.ApplyInternalPath(internalPath)
_, ok := cri.chainProxyRouter[routerKey.String()]
return ok
}

Expand Down Expand Up @@ -111,39 +112,47 @@ func (cri *chainRouterImpl) autoGenerateMissingInternalPaths(isWs bool, nodeUrl
nodeUrl.InternalPath = internalPath // add internal path to the nodeUrl
nodeUrl.Url = baseUrl + internalPath
routerKey.ApplyInternalPath(internalPath)
if isWs {
addons, _, err := chainParser.SeparateAddonsExtensions(nodeUrl.Addons)
if err != nil {
return err
}

lookForSubscriptionTag := func() bool {
for _, connectionType := range []string{"POST", ""} {
if len(addons) == 0 {
addons = append(addons, "")
addons, _, err := chainParser.SeparateAddonsExtensions(nodeUrl.Addons)
if err != nil {
return err
}

subscriptionTagFound := func() bool {
for _, connectionType := range []string{"POST", ""} {
if len(addons) == 0 {
addons = append(addons, "")
}

for _, addon := range addons {
// check subscription exists, we only care for subscription API's because otherwise we use http anyway.
collectionKey := CollectionKey{
InternalPath: internalPath,
Addon: addon,
ConnectionType: connectionType,
}

for _, addon := range addons {
// check subscription exists, we only care for subscription API's because otherwise we use http anyway.
collectionKey := CollectionKey{
InternalPath: internalPath,
Addon: addon,
ConnectionType: connectionType,
}

if chainParser.IsTagInCollection(spectypes.FUNCTION_TAG_SUBSCRIBE, collectionKey) {
return true
}
if chainParser.IsTagInCollection(spectypes.FUNCTION_TAG_SUBSCRIBE, collectionKey) {
return true
}
}
return false
}
return false
}()

if !lookForSubscriptionTag() {
continue
}
if isWs && !subscriptionTagFound {
// this is ws, don't auto generate http paths
continue
} else if !isWs && subscriptionTagFound {
// this is http, don't auto generate ws paths
continue
}

utils.LavaFormatDebug("auto generated internal path",
utils.LogAttr("nodeUrl", nodeUrl.Url),
utils.LogAttr("internalPath", internalPath),
utils.LogAttr("routerKey", routerKey.String()),
)
cri.setRouterKeyInBatch(nodeUrl, returnedBatch, routerKey, rpcProviderEndpoint, false) // will not override existing entries
}

Expand Down Expand Up @@ -195,9 +204,7 @@ func (cri *chainRouterImpl) BatchNodeUrlsByServices(rpcProviderEndpoint lavasess
}
}

// check if batch has http configured, if not, add a websocket one
// prefer one without internal path
if !httpRootRouteSet {
if !httpRootRouteSet && chainParser.IsInternalPathEnabled("", rpcProviderEndpoint.ApiInterface, "") {
return nil, utils.LavaFormatError("HTTP/HTTPS is mandatory. It is recommended to configure both HTTP/HTTP and WS/WSS.", nil, utils.LogAttr("nodeUrls", rpcProviderEndpoint.NodeUrls))
}

Expand Down Expand Up @@ -329,6 +336,8 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase
}
}

utils.LavaFormatDebug("router keys", utils.LogAttr("chainProxyRouter", chainProxyRouter))

// make sure all chainProxyRouter entries have one without a method routing
for routerKey, chainRouterEntries := range chainProxyRouter {
// get the last entry, if it has methods routed, we need to error out
Expand Down
Loading

0 comments on commit 13835ed

Please sign in to comment.