Skip to content

Commit

Permalink
Implement concurrent safe WASM GRPC query handler. (#1034)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitryhil authored Dec 4, 2024
1 parent e64bdaa commit 28bc015
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ concurrency:

jobs:
ci:
timeout-minutes: 60
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ require (
golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sync v0.7.0
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
62 changes: 2 additions & 60 deletions x/wasm/handler/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"

msgv1 "cosmossdk.io/api/cosmos/msg/v1"
queryv1 "cosmossdk.io/api/cosmos/query/v1"
sdkmath "cosmossdk.io/math"
nfttypes "cosmossdk.io/x/nft"
wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"
Expand All @@ -17,10 +14,6 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"

assetfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/ft/types"
assetnfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/nft/types"
Expand Down Expand Up @@ -160,64 +153,13 @@ type coreumQuery struct {
NFT *nftQuery `json:"nft"`
}

// newModuleQuerySafeAllowList returns a map of all query paths labeled with module_query_safe in the proto files to
// their response proto.
func newModuleQuerySafeAllowList() wasmkeeper.AcceptedQueries {
fds, err := gogoproto.MergedGlobalFileDescriptors()
if err != nil {
panic(err)
}
// create the files using 'AllowUnresolvable' to avoid
// unnecessary panic: https://github.com/cosmos/ibc-go/issues/6435
protoFiles, err := protodesc.FileOptions{
AllowUnresolvable: true,
}.NewFiles(fds)
if err != nil {
panic(err)
}

allowList := wasmkeeper.AcceptedQueries{}
protoFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
for i := range fd.Services().Len() {
// Get the service descriptor
sd := fd.Services().Get(i)

// Skip services that are annotated with the "cosmos.msg.v1.service" option.
if ext := proto.GetExtension(sd.Options(), msgv1.E_Service); ext != nil && ext.(bool) {
continue
}

for j := range sd.Methods().Len() {
// Get the method descriptor
md := sd.Methods().Get(j)

// Skip methods that are not annotated with the "cosmos.query.v1.module_query_safe" option.
if ext := proto.GetExtension(md.Options(), queryv1.E_ModuleQuerySafe); ext == nil || !ext.(bool) {
continue
}

// Add the method to the whitelist
path := fmt.Sprintf("/%s/%s", sd.FullName(), md.Name())
allowList[path] = dynamicpb.NewMessage(md.Output())
}
}
return true
})

return allowList
}

// NewCoreumQueryHandler returns the coreum handler which handles queries from smart contracts.
func NewCoreumQueryHandler(
assetFTQueryServer assetfttypes.QueryServer, assetNFTQueryServer assetnfttypes.QueryServer,
nftQueryServer nfttypes.QueryServer, gRPCQueryRouter *baseapp.GRPCQueryRouter, codec *codec.ProtoCodec,
nftQueryServer nfttypes.QueryServer, gRPCQueryRouter *baseapp.GRPCQueryRouter, codec codec.Codec,
) *wasmkeeper.QueryPlugins {
acceptList := newModuleQuerySafeAllowList()
// TODO: "/cosmos.nft.v1beta1.Query/Owner" is not marked as module_query_safe in cosmos, but we need it
acceptList["/cosmos.nft.v1beta1.Query/Owner"] = &nfttypes.QueryOwnerResponse{}

return &wasmkeeper.QueryPlugins{
Grpc: wasmkeeper.AcceptListGrpcQuerier(acceptList, gRPCQueryRouter, codec),
Grpc: NewGRPCQuerier(gRPCQueryRouter, codec).Query,
Custom: func(ctx sdk.Context, query json.RawMessage) ([]byte, error) {
var coreumQuery coreumQuery
if err := json.Unmarshal(query, &coreumQuery); err != nil {
Expand Down
129 changes: 129 additions & 0 deletions x/wasm/handler/query_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package handler

import (
"fmt"
"sync"

msgv1 "cosmossdk.io/api/cosmos/msg/v1"
queryv1 "cosmossdk.io/api/cosmos/query/v1"
nfttypes "cosmossdk.io/x/nft"
wasmvmtypes "github.com/CosmWasm/wasmvm/v2/types"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
gogoproto "github.com/cosmos/gogoproto/proto"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
)

// GRPCQuerier is a WASM grpc querier.
type GRPCQuerier struct {
gRPCQueryRouter *baseapp.GRPCQueryRouter
codec codec.Codec
// map[query proto URL]proto response type
acceptedQueries map[string]func() gogoproto.Message
mu sync.Mutex
}

// NewGRPCQuerier returns a new instance of GRPCQuerier.
func NewGRPCQuerier(gRPCQueryRouter *baseapp.GRPCQueryRouter, codec codec.Codec) *GRPCQuerier {
acceptedQueries := newModuleQuerySafeAllowList()
// "/cosmos.nft.v1beta1.Query/Owner" is not marked as module_query_safe in cosmos, but we need it
acceptedQueries["/cosmos.nft.v1beta1.Query/Owner"] = func() gogoproto.Message {
return &nfttypes.QueryOwnerResponse{}
}

return &GRPCQuerier{
gRPCQueryRouter: gRPCQueryRouter,
codec: codec,
acceptedQueries: acceptedQueries,
mu: sync.Mutex{},
}
}

// Query returns WASM GRPC query handler.
func (q *GRPCQuerier) Query(ctx sdk.Context, request *wasmvmtypes.GrpcQuery) (gogoproto.Message, error) {
protoResponseBuilder, accepted := q.acceptedQueries[request.Path]
if !accepted {
return nil, wasmvmtypes.UnsupportedRequest{
Kind: fmt.Sprintf("'%s' path is not allowed from the contract", request.Path),
}
}
protoResponse := protoResponseBuilder()

handler := q.gRPCQueryRouter.Route(request.Path)
if handler == nil {
return nil, wasmvmtypes.UnsupportedRequest{Kind: fmt.Sprintf("No route to query '%s'", request.Path)}
}

q.mu.Lock()
res, err := handler(ctx, &abci.RequestQuery{
Data: request.Data,
Path: request.Path,
})
q.mu.Unlock()
if err != nil {
return nil, err
}

// decode the query response into the expected protobuf message
err = q.codec.Unmarshal(res.Value, protoResponse)
if err != nil {
return nil, err
}

return protoResponse, nil
}

// newModuleQuerySafeAllowList returns a map of all query paths labeled with module_query_safe in the proto files to
// their response proto.
func newModuleQuerySafeAllowList() map[string]func() gogoproto.Message {
fds, err := gogoproto.MergedGlobalFileDescriptors()
if err != nil {
panic(err)
}
// create the files using 'AllowUnresolvable' to avoid
// unnecessary panic: https://github.com/cosmos/ibc-go/issues/6435
protoFiles, err := protodesc.FileOptions{
AllowUnresolvable: true,
}.NewFiles(fds)
if err != nil {
panic(err)
}

allowList := make(map[string]func() gogoproto.Message)
protoFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool {
for i := range fd.Services().Len() {
// Get the service descriptor
sd := fd.Services().Get(i)

// Skip services that are annotated with the "cosmos.msg.v1.service" option.
if ext := proto.GetExtension(sd.Options(), msgv1.E_Service); ext != nil && ext.(bool) {
continue
}

for j := range sd.Methods().Len() {
// Get the method descriptor
md := sd.Methods().Get(j)

// Skip methods that are not annotated with the "cosmos.query.v1.module_query_safe" option.
if ext := proto.GetExtension(md.Options(), queryv1.E_ModuleQuerySafe); ext == nil || !ext.(bool) {
continue
}

// Add the method to the whitelist
path := fmt.Sprintf("/%s/%s", sd.FullName(), md.Name())
out := md.Output()
allowList[path] = func() gogoproto.Message {
return dynamicpb.NewMessage(out)
}
}
}
return true
})

return allowList
}
92 changes: 92 additions & 0 deletions x/wasm/handler/query_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package handler_test

import (
"context"
"reflect"
"testing"
"time"

sdkmath "cosmossdk.io/math"
wasmvmtypes "github.com/CosmWasm/wasmvm/v2/types"
tmproto "github.com/cometbft/cometbft/proto/tendermint/types"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/CoreumFoundation/coreum/v5/testutil/simapp"
assetfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/ft/types"
"github.com/CoreumFoundation/coreum/v5/x/wasm/handler"
)

func TestGRPCQuerier_Query(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

testApp := simapp.New()
sdkCtx := testApp.BaseApp.NewContextLegacy(false, tmproto.Header{
Time: time.Now(),
AppHash: []byte("some-hash"),
})

issuer, _ := testApp.GenAccount(sdkCtx)
settingsWithExtension := assetfttypes.IssueSettings{
Issuer: issuer,
Symbol: "DEFEXT",
Subunit: "defext",
Precision: 6,
InitialAmount: sdkmath.NewIntWithDecimal(1, 10),
}
denom, err := testApp.AssetFTKeeper.Issue(sdkCtx, settingsWithExtension)
require.NoError(t, err)

q := handler.NewGRPCQuerier(testApp.GRPCQueryRouter(), testApp.AppCodec())
queryTokenReq := &assetfttypes.QueryTokenRequest{
Denom: denom,
}
wasmGrpcData, err := testApp.AppCodec().Marshal(queryTokenReq)
require.NoError(t, err)

eg, _ := errgroup.WithContext(ctx)
for range 1000 {
eg.Go(func() error {
wasmGrpcReq := &wasmvmtypes.GrpcQuery{
Data: wasmGrpcData,
// url which corresponds query token
Path: "/coreum.asset.ft.v1.Query/Token",
}
wasmGrpcRes, err := q.Query(sdkCtx, wasmGrpcReq)
if err != nil {
return err
}

queryTokenResData, err := gogoproto.Marshal(wasmGrpcRes)
if err != nil {
return err
}

queryTokenRes := &assetfttypes.QueryTokenResponse{}
if err := testApp.AppCodec().Unmarshal(queryTokenResData, queryTokenRes); err != nil {
return err
}

want := assetfttypes.Token{
Denom: denom,
Issuer: issuer.String(),
Symbol: settingsWithExtension.Symbol,
Subunit: settingsWithExtension.Subunit,
Precision: settingsWithExtension.Precision,
BurnRate: sdkmath.LegacyNewDec(0),
SendCommissionRate: sdkmath.LegacyNewDec(0),
Version: assetfttypes.CurrentTokenVersion,
Admin: issuer.String(),
}
if !reflect.DeepEqual(want, queryTokenRes.Token) {
return errors.Errorf("unexpected token, want:%v, got:%v", want, queryTokenRes.Token)
}
return nil
})
}

require.NoError(t, eg.Wait())
}

0 comments on commit 28bc015

Please sign in to comment.