From 28bc0158b8660158b7f27b4d2923854800f4f2dc Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Wed, 4 Dec 2024 12:32:20 +0300 Subject: [PATCH] Implement concurrent safe WASM GRPC query handler. (#1034) --- .github/workflows/ci.yml | 2 +- go.mod | 2 +- x/wasm/handler/query.go | 62 +------------- x/wasm/handler/query_grpc.go | 129 ++++++++++++++++++++++++++++++ x/wasm/handler/query_grpc_test.go | 92 +++++++++++++++++++++ 5 files changed, 225 insertions(+), 62 deletions(-) create mode 100644 x/wasm/handler/query_grpc.go create mode 100644 x/wasm/handler/query_grpc_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ff4297c35..131a7e0e1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ concurrency: jobs: ci: - timeout-minutes: 60 + timeout-minutes: 90 strategy: fail-fast: false matrix: diff --git a/go.mod b/go.mod index 5f60da7cb..8350aad43 100644 --- a/go.mod +++ b/go.mod @@ -232,7 +232,7 @@ require ( golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/sync v0.7.0 golang.org/x/sys v0.23.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/x/wasm/handler/query.go b/x/wasm/handler/query.go index 0a100ad32..3319d9854 100644 --- a/x/wasm/handler/query.go +++ b/x/wasm/handler/query.go @@ -4,10 +4,7 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" - msgv1 "cosmossdk.io/api/cosmos/msg/v1" - queryv1 "cosmossdk.io/api/cosmos/query/v1" sdkmath "cosmossdk.io/math" nfttypes "cosmossdk.io/x/nft" wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper" @@ -17,10 +14,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/pkg/errors" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/reflect/protodesc" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/types/dynamicpb" assetfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/ft/types" assetnfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/nft/types" @@ -160,64 +153,13 @@ type coreumQuery struct { NFT *nftQuery `json:"nft"` } -// newModuleQuerySafeAllowList returns a map of all query paths labeled with module_query_safe in the proto files to -// their response proto. -func newModuleQuerySafeAllowList() wasmkeeper.AcceptedQueries { - fds, err := gogoproto.MergedGlobalFileDescriptors() - if err != nil { - panic(err) - } - // create the files using 'AllowUnresolvable' to avoid - // unnecessary panic: https://github.com/cosmos/ibc-go/issues/6435 - protoFiles, err := protodesc.FileOptions{ - AllowUnresolvable: true, - }.NewFiles(fds) - if err != nil { - panic(err) - } - - allowList := wasmkeeper.AcceptedQueries{} - protoFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool { - for i := range fd.Services().Len() { - // Get the service descriptor - sd := fd.Services().Get(i) - - // Skip services that are annotated with the "cosmos.msg.v1.service" option. - if ext := proto.GetExtension(sd.Options(), msgv1.E_Service); ext != nil && ext.(bool) { - continue - } - - for j := range sd.Methods().Len() { - // Get the method descriptor - md := sd.Methods().Get(j) - - // Skip methods that are not annotated with the "cosmos.query.v1.module_query_safe" option. - if ext := proto.GetExtension(md.Options(), queryv1.E_ModuleQuerySafe); ext == nil || !ext.(bool) { - continue - } - - // Add the method to the whitelist - path := fmt.Sprintf("/%s/%s", sd.FullName(), md.Name()) - allowList[path] = dynamicpb.NewMessage(md.Output()) - } - } - return true - }) - - return allowList -} - // NewCoreumQueryHandler returns the coreum handler which handles queries from smart contracts. func NewCoreumQueryHandler( assetFTQueryServer assetfttypes.QueryServer, assetNFTQueryServer assetnfttypes.QueryServer, - nftQueryServer nfttypes.QueryServer, gRPCQueryRouter *baseapp.GRPCQueryRouter, codec *codec.ProtoCodec, + nftQueryServer nfttypes.QueryServer, gRPCQueryRouter *baseapp.GRPCQueryRouter, codec codec.Codec, ) *wasmkeeper.QueryPlugins { - acceptList := newModuleQuerySafeAllowList() - // TODO: "/cosmos.nft.v1beta1.Query/Owner" is not marked as module_query_safe in cosmos, but we need it - acceptList["/cosmos.nft.v1beta1.Query/Owner"] = &nfttypes.QueryOwnerResponse{} - return &wasmkeeper.QueryPlugins{ - Grpc: wasmkeeper.AcceptListGrpcQuerier(acceptList, gRPCQueryRouter, codec), + Grpc: NewGRPCQuerier(gRPCQueryRouter, codec).Query, Custom: func(ctx sdk.Context, query json.RawMessage) ([]byte, error) { var coreumQuery coreumQuery if err := json.Unmarshal(query, &coreumQuery); err != nil { diff --git a/x/wasm/handler/query_grpc.go b/x/wasm/handler/query_grpc.go new file mode 100644 index 000000000..7b91515fe --- /dev/null +++ b/x/wasm/handler/query_grpc.go @@ -0,0 +1,129 @@ +package handler + +import ( + "fmt" + "sync" + + msgv1 "cosmossdk.io/api/cosmos/msg/v1" + queryv1 "cosmossdk.io/api/cosmos/query/v1" + nfttypes "cosmossdk.io/x/nft" + wasmvmtypes "github.com/CosmWasm/wasmvm/v2/types" + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" + gogoproto "github.com/cosmos/gogoproto/proto" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/dynamicpb" +) + +// GRPCQuerier is a WASM grpc querier. +type GRPCQuerier struct { + gRPCQueryRouter *baseapp.GRPCQueryRouter + codec codec.Codec + // map[query proto URL]proto response type + acceptedQueries map[string]func() gogoproto.Message + mu sync.Mutex +} + +// NewGRPCQuerier returns a new instance of GRPCQuerier. +func NewGRPCQuerier(gRPCQueryRouter *baseapp.GRPCQueryRouter, codec codec.Codec) *GRPCQuerier { + acceptedQueries := newModuleQuerySafeAllowList() + // "/cosmos.nft.v1beta1.Query/Owner" is not marked as module_query_safe in cosmos, but we need it + acceptedQueries["/cosmos.nft.v1beta1.Query/Owner"] = func() gogoproto.Message { + return &nfttypes.QueryOwnerResponse{} + } + + return &GRPCQuerier{ + gRPCQueryRouter: gRPCQueryRouter, + codec: codec, + acceptedQueries: acceptedQueries, + mu: sync.Mutex{}, + } +} + +// Query returns WASM GRPC query handler. +func (q *GRPCQuerier) Query(ctx sdk.Context, request *wasmvmtypes.GrpcQuery) (gogoproto.Message, error) { + protoResponseBuilder, accepted := q.acceptedQueries[request.Path] + if !accepted { + return nil, wasmvmtypes.UnsupportedRequest{ + Kind: fmt.Sprintf("'%s' path is not allowed from the contract", request.Path), + } + } + protoResponse := protoResponseBuilder() + + handler := q.gRPCQueryRouter.Route(request.Path) + if handler == nil { + return nil, wasmvmtypes.UnsupportedRequest{Kind: fmt.Sprintf("No route to query '%s'", request.Path)} + } + + q.mu.Lock() + res, err := handler(ctx, &abci.RequestQuery{ + Data: request.Data, + Path: request.Path, + }) + q.mu.Unlock() + if err != nil { + return nil, err + } + + // decode the query response into the expected protobuf message + err = q.codec.Unmarshal(res.Value, protoResponse) + if err != nil { + return nil, err + } + + return protoResponse, nil +} + +// newModuleQuerySafeAllowList returns a map of all query paths labeled with module_query_safe in the proto files to +// their response proto. +func newModuleQuerySafeAllowList() map[string]func() gogoproto.Message { + fds, err := gogoproto.MergedGlobalFileDescriptors() + if err != nil { + panic(err) + } + // create the files using 'AllowUnresolvable' to avoid + // unnecessary panic: https://github.com/cosmos/ibc-go/issues/6435 + protoFiles, err := protodesc.FileOptions{ + AllowUnresolvable: true, + }.NewFiles(fds) + if err != nil { + panic(err) + } + + allowList := make(map[string]func() gogoproto.Message) + protoFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool { + for i := range fd.Services().Len() { + // Get the service descriptor + sd := fd.Services().Get(i) + + // Skip services that are annotated with the "cosmos.msg.v1.service" option. + if ext := proto.GetExtension(sd.Options(), msgv1.E_Service); ext != nil && ext.(bool) { + continue + } + + for j := range sd.Methods().Len() { + // Get the method descriptor + md := sd.Methods().Get(j) + + // Skip methods that are not annotated with the "cosmos.query.v1.module_query_safe" option. + if ext := proto.GetExtension(md.Options(), queryv1.E_ModuleQuerySafe); ext == nil || !ext.(bool) { + continue + } + + // Add the method to the whitelist + path := fmt.Sprintf("/%s/%s", sd.FullName(), md.Name()) + out := md.Output() + allowList[path] = func() gogoproto.Message { + return dynamicpb.NewMessage(out) + } + } + } + return true + }) + + return allowList +} diff --git a/x/wasm/handler/query_grpc_test.go b/x/wasm/handler/query_grpc_test.go new file mode 100644 index 000000000..514a34379 --- /dev/null +++ b/x/wasm/handler/query_grpc_test.go @@ -0,0 +1,92 @@ +package handler_test + +import ( + "context" + "reflect" + "testing" + "time" + + sdkmath "cosmossdk.io/math" + wasmvmtypes "github.com/CosmWasm/wasmvm/v2/types" + tmproto "github.com/cometbft/cometbft/proto/tendermint/types" + gogoproto "github.com/cosmos/gogoproto/proto" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/CoreumFoundation/coreum/v5/testutil/simapp" + assetfttypes "github.com/CoreumFoundation/coreum/v5/x/asset/ft/types" + "github.com/CoreumFoundation/coreum/v5/x/wasm/handler" +) + +func TestGRPCQuerier_Query(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + testApp := simapp.New() + sdkCtx := testApp.BaseApp.NewContextLegacy(false, tmproto.Header{ + Time: time.Now(), + AppHash: []byte("some-hash"), + }) + + issuer, _ := testApp.GenAccount(sdkCtx) + settingsWithExtension := assetfttypes.IssueSettings{ + Issuer: issuer, + Symbol: "DEFEXT", + Subunit: "defext", + Precision: 6, + InitialAmount: sdkmath.NewIntWithDecimal(1, 10), + } + denom, err := testApp.AssetFTKeeper.Issue(sdkCtx, settingsWithExtension) + require.NoError(t, err) + + q := handler.NewGRPCQuerier(testApp.GRPCQueryRouter(), testApp.AppCodec()) + queryTokenReq := &assetfttypes.QueryTokenRequest{ + Denom: denom, + } + wasmGrpcData, err := testApp.AppCodec().Marshal(queryTokenReq) + require.NoError(t, err) + + eg, _ := errgroup.WithContext(ctx) + for range 1000 { + eg.Go(func() error { + wasmGrpcReq := &wasmvmtypes.GrpcQuery{ + Data: wasmGrpcData, + // url which corresponds query token + Path: "/coreum.asset.ft.v1.Query/Token", + } + wasmGrpcRes, err := q.Query(sdkCtx, wasmGrpcReq) + if err != nil { + return err + } + + queryTokenResData, err := gogoproto.Marshal(wasmGrpcRes) + if err != nil { + return err + } + + queryTokenRes := &assetfttypes.QueryTokenResponse{} + if err := testApp.AppCodec().Unmarshal(queryTokenResData, queryTokenRes); err != nil { + return err + } + + want := assetfttypes.Token{ + Denom: denom, + Issuer: issuer.String(), + Symbol: settingsWithExtension.Symbol, + Subunit: settingsWithExtension.Subunit, + Precision: settingsWithExtension.Precision, + BurnRate: sdkmath.LegacyNewDec(0), + SendCommissionRate: sdkmath.LegacyNewDec(0), + Version: assetfttypes.CurrentTokenVersion, + Admin: issuer.String(), + } + if !reflect.DeepEqual(want, queryTokenRes.Token) { + return errors.Errorf("unexpected token, want:%v, got:%v", want, queryTokenRes.Token) + } + return nil + }) + } + + require.NoError(t, eg.Wait()) +}