From 046a07b6ce7940e60fdcdd56e6ed6b1075bdd5bc Mon Sep 17 00:00:00 2001 From: Utkarsh Saxena Date: Thu, 9 May 2024 09:45:29 +0530 Subject: [PATCH 1/3] feat: Add support for BasicAuth on gateway Signed-off-by: Utkarsh Saxena --- internal/router/auth.go | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/internal/router/auth.go b/internal/router/auth.go index 0c72bb6..93aca82 100644 --- a/internal/router/auth.go +++ b/internal/router/auth.go @@ -34,10 +34,12 @@ func NewDefaultAuthService() *DefaultAuthService { /* Calls an Auth Token Validator Service with following api contract: With Params- -{ - "email": "abc@xyz.com", - "token": "token123" -} + + { + "email": "abc@xyz.com", + "token": "token123" + } + api returns- If Authenticated - {"ok": true} If not authenticated- {"ok": false} @@ -144,12 +146,33 @@ func WithAuth(ctx *context.Context, h http.Handler, authService ...AuthService) } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - username := trinoheaders.Get(trinoheaders.User, r) - password := trinoheaders.Get(trinoheaders.Password, r) + // TODO: Refactor auth type handling to a dedicated type + + // BasicAuth + username, password, isBasicAuth := r.BasicAuth() + + // CustomAuth + if !isBasicAuth { + provider.Logger(*ctx).Debug("Custom Auth type") + username = trinoheaders.Get(trinoheaders.User, r) + password = trinoheaders.Get(trinoheaders.Password, r) + } else { + if u := trinoheaders.Get(trinoheaders.User, r); u != username { + errorMsg := fmt.Sprintf("Username from basicauth - %s does not match with User principal - %s", username, u) + provider.Logger(*ctx).Debug(errorMsg) + http.Error(w, errorMsg, http.StatusUnauthorized) + } + + // Remove auth details from request + r.Header.Del("Authorization") + } - //Remove this later after full rollout - if password == "" { - h.ServeHTTP(w, r) + // NoAuth + isNoAuth := password == "" + if isNoAuth { + provider.Logger(*ctx).Debug("No Auth type detected") + errorMsg := fmt.Sprintf("Password required") + http.Error(w, errorMsg, http.StatusUnauthorized) return } From d15beeab71c1bfc018f0ee36a68950af009df356 Mon Sep 17 00:00:00 2001 From: Utkarsh Saxena Date: Sat, 25 May 2024 08:00:02 +0530 Subject: [PATCH 2/3] feat: Add proper support for Auth delegation Signed-off-by: Utkarsh Saxena --- cmd/gateway/main.go | 2 +- config/default.toml | 7 +- go.mod | 1 - go.sum | 1 - internal/config/config.go | 9 +- .../20240524205304_add_auth_delegation.go | 31 +++ internal/gatewayserver/models/policy.go | 1 + internal/gatewayserver/policyApi/core.go | 44 ++- internal/gatewayserver/policyApi/server.go | 45 ++- internal/router/auth.go | 198 +++++++------- internal/router/auth_test.go | 257 +++--------------- internal/router/router.go | 22 +- internal/utils/utils.go | 47 ++++ internal/utils/utils_test.go | 90 ++++++ rpc/gateway/service.proto | 11 + 15 files changed, 393 insertions(+), 373 deletions(-) create mode 100644 internal/gatewayserver/database/migrations/20240524205304_add_auth_delegation.go diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index d79eeb7..3994f74 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -109,7 +109,7 @@ func startGatewayServers(_ctx *context.Context) []*http.Server { servers := make([]*http.Server, len(boot.Config.Gateway.Ports)) for i, port := range boot.Config.Gateway.Ports { - server := router.Server(&ctx, port, &gatewayClient, boot.Config.App.ServiceExternalHostname, boot.Config.Auth.Router.Authenticate) + server := router.Server(&ctx, port, &gatewayClient, boot.Config.App.ServiceExternalHostname) servers[i] = server go listenHttp(&ctx, server, port) diff --git a/config/default.toml b/config/default.toml index 7d9cae8..62b8493 100644 --- a/config/default.toml +++ b/config/default.toml @@ -30,11 +30,10 @@ [auth] token = "test123" tokenHeaderKey = "X-Auth-Key" - [auth.router] - validationURL = "localhost:28001" - validationToken = "test123" + [auth.router.delegatedAuth] + validationProviderURL = "localhost:28001" + validationProviderToken = "test123" cacheTTLMinutes = "10m" - authenticate = "false" diff --git a/go.mod b/go.mod index c401c13..daa2e02 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,6 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/visualfc/goembed v0.3.3 // indirect go.uber.org/atomic v1.11.0 // indirect diff --git a/go.sum b/go.sum index 117c674..478a5b1 100644 --- a/go.sum +++ b/go.sum @@ -332,7 +332,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/internal/config/config.go b/internal/config/config.go index f95a7c9..a15fbfe 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,10 +30,11 @@ type Auth struct { Token string TokenHeaderKey string Router struct { - ValidationURL string - ValidationToken string - CacheTTLMinutes string - Authenticate string + DelegatedAuth struct { + ValidationProviderURL string + ValidationProviderToken string + CacheTTLMinutes string + } } } diff --git a/internal/gatewayserver/database/migrations/20240524205304_add_auth_delegation.go b/internal/gatewayserver/database/migrations/20240524205304_add_auth_delegation.go new file mode 100644 index 0000000..15c9880 --- /dev/null +++ b/internal/gatewayserver/database/migrations/20240524205304_add_auth_delegation.go @@ -0,0 +1,31 @@ +package migration + +import ( + "database/sql" + + "github.com/pressly/goose/v3" +) + +func init() { + goose.AddMigration(Up20240524205304, Down20240524205304) +} + +func Up20240524205304(tx *sql.Tx) error { + var err error + + _, err = tx.Exec("ALTER TABLE `policies` ADD COLUMN `is_auth_delegated` BOOL DEFAULT false;") + if err != nil { + return err + } + return err +} + +func Down20240524205304(tx *sql.Tx) error { + var err error + + _, err = tx.Exec("ALTER TABLE `policies` DROP COLUMN `is_auth_delegated`;") + if err != nil { + return err + } + return err +} diff --git a/internal/gatewayserver/models/policy.go b/internal/gatewayserver/models/policy.go index 3441d92..9c773e8 100644 --- a/internal/gatewayserver/models/policy.go +++ b/internal/gatewayserver/models/policy.go @@ -10,6 +10,7 @@ type Policy struct { GroupId string `json:"group_id"` FallbackGroupId *string `json:"fallback_group_id"` IsEnabled *bool `json:"is_enabled" sql:"DEFAULT:true"` + IsAuthDelegated *bool `json:"is_auth_delegated" sql:"DEFAULT:false"` } func (u *Policy) TableName() string { diff --git a/internal/gatewayserver/policyApi/core.go b/internal/gatewayserver/policyApi/core.go index 901f2f3..465ee64 100644 --- a/internal/gatewayserver/policyApi/core.go +++ b/internal/gatewayserver/policyApi/core.go @@ -25,6 +25,7 @@ type ICore interface { DisablePolicy(ctx context.Context, id string) error EvaluateGroupsForClient(ctx context.Context, c *EvaluateClientParams) ([]string, error) + EvaluateAuthDelegation(ctx context.Context, p int32) (bool, error) // EvaluatePolicy(ctx context.Context, group string) (string, error) // FindPolicyForQuery(ctx context.Context, q string) (string, error) } @@ -35,12 +36,13 @@ func NewCore(policy repo.IPolicyRepo) *Core { // CreateParams has attributes that are required for policy.Create() type PolicyCreateParams struct { - ID string - RuleType string - RuleValue string - Group string - FallbackGroup string - IsEnabled bool + ID string + RuleType string + RuleValue string + Group string + FallbackGroup string + IsEnabled bool + IsAuthDelegated bool } func (c *Core) CreateOrUpdatePolicy(ctx context.Context, params *PolicyCreateParams) error { @@ -50,6 +52,7 @@ func (c *Core) CreateOrUpdatePolicy(ctx context.Context, params *PolicyCreatePar GroupId: params.Group, FallbackGroupId: ¶ms.FallbackGroup, IsEnabled: ¶ms.IsEnabled, + IsAuthDelegated: ¶ms.IsAuthDelegated, } policy.ID = params.ID @@ -95,9 +98,10 @@ type FindManyParams struct { // To int32 // custom - IsEnabled bool `json:"is_enabled"` - RuleType string `json:"rule_type"` - RuleValue string `json:"rule_value"` + IsEnabled bool `json:"is_enabled"` + RuleType string `json:"rule_type"` + RuleValue string `json:"rule_value"` + IsAuthDelegated bool `json:"is_auth_delegated,omitempty"` } func (p *FindManyParams) GetIsEnabled() bool { @@ -214,6 +218,28 @@ func (c *Core) EvaluateGroupsForClient(ctx context.Context, params *EvaluateClie return res, nil } +func (c *Core) EvaluateAuthDelegation(ctx context.Context, port int32) (bool, error) { + res, err := c.FindMany( + ctx, + &FindManyParams{ + IsEnabled: true, + RuleType: "listening_port", + RuleValue: strconv.Itoa(int(port)), + IsAuthDelegated: true, + }) + if err != nil { + return false, err + } + provider.Logger(ctx).Debugw("Is Auth Delegated For Port", map[string]interface{}{ + "listeningPort": port, + "matchingRules": res, + }) + if len(res) > 0 { + return true, nil + } + return false, nil +} + // Implementing "set" collection methods here, :) func setIntersection(s1 map[string]struct{}, s2 map[string]struct{}) map[string]struct{} { s_intersection := map[string]struct{}{} diff --git a/internal/gatewayserver/policyApi/server.go b/internal/gatewayserver/policyApi/server.go index 290c1c5..7b883b8 100644 --- a/internal/gatewayserver/policyApi/server.go +++ b/internal/gatewayserver/policyApi/server.go @@ -32,12 +32,13 @@ func (s *Server) CreateOrUpdatePolicy(ctx context.Context, req *gatewayv1.Policy }) createParams := PolicyCreateParams{ - ID: req.GetId(), - RuleType: req.GetRule().Type.Enum().String(), - RuleValue: req.GetRule().Value, - Group: req.GetGroup(), - FallbackGroup: req.GetFallbackGroup(), - IsEnabled: req.GetIsEnabled(), + ID: req.GetId(), + RuleType: req.GetRule().GetType().Enum().String(), + RuleValue: req.GetRule().GetValue(), + Group: req.GetGroup(), + FallbackGroup: req.GetFallbackGroup(), + IsEnabled: req.GetIsEnabled(), + IsAuthDelegated: req.GetIsAuthDelegated(), } err := s.core.CreateOrUpdatePolicy(ctx, &createParams) @@ -142,11 +143,12 @@ func toPolicyResponseProto(policy *models.Policy) (*gatewayv1.Policy, error) { Value: policy.RuleValue, } response := gatewayv1.Policy{ - Id: policy.ID, - Rule: &rule, - Group: policy.GroupId, - FallbackGroup: *policy.FallbackGroupId, - IsEnabled: *policy.IsEnabled, + Id: policy.ID, + Rule: &rule, + Group: policy.GroupId, + FallbackGroup: *policy.FallbackGroupId, + IsEnabled: *policy.IsEnabled, + IsAuthDelegated: *policy.IsAuthDelegated, } return &response, nil @@ -175,3 +177,24 @@ func (s *Server) EvaluateGroupsForClient(ctx context.Context, req *gatewayv1.Eva } return &gatewayv1.EvaluateGroupsResponse{}, nil } + +func (s *Server) EvaluateAuthDelegationForClient(ctx context.Context, req *gatewayv1.EvaluateAuthDelegationRequest) (*gatewayv1.EvaluateAuthDelegationResponse, error) { + provider.Logger(ctx).Debugw("EvaluateAuthDelegation", map[string]interface{}{ + "request": req.String(), + }) + + if req.GetIncomingPort() == 0 { + err := errors.New("Invalid port defined in `incoming_port`.") + provider.Logger(ctx).WithError(err).Error(err.Error()) + return &gatewayv1.EvaluateAuthDelegationResponse{IsAuthDelegated: false}, nil + } + + result, err := s.core.EvaluateAuthDelegation( + ctx, + req.GetIncomingPort(), + ) + if err != nil { + return nil, err + } + return &gatewayv1.EvaluateAuthDelegationResponse{IsAuthDelegated: result}, nil +} diff --git a/internal/router/auth.go b/internal/router/auth.go index 93aca82..51166a0 100644 --- a/internal/router/auth.go +++ b/internal/router/auth.go @@ -7,28 +7,22 @@ import ( "fmt" "io/ioutil" "net/http" - "sync" "time" "github.com/razorpay/trino-gateway/internal/boot" "github.com/razorpay/trino-gateway/internal/provider" "github.com/razorpay/trino-gateway/internal/router/trinoheaders" + "github.com/razorpay/trino-gateway/internal/utils" + gatewayv1 "github.com/razorpay/trino-gateway/rpc/gateway" ) -type AuthCache struct { - Cache map[string]struct { - Timestamp time.Time - Password string - } - mu sync.Mutex -} -type AuthService interface { - Authenticate(ctx *context.Context, username, password string) (bool, error) +type IAuthService interface { + Authenticate(ctx *context.Context, username string, password string) (bool, error) } -type DefaultAuthService struct{} -func NewDefaultAuthService() *DefaultAuthService { - return &DefaultAuthService{} +type AuthService struct { + ValidationProviderURL string + ValidationProviderToken string } /* @@ -47,23 +41,7 @@ If not authenticated- {"ok": false} @returns- boolean{True or False},error_message */ -func (s *DefaultAuthService) Authenticate(ctx *context.Context, username string, password string) (bool, error) { - authCache, ok := (*ctx).Value("routerAuthCache").(*AuthCache) - - if !ok { - authCache = &AuthCache{ - Cache: make(map[string]struct { - Timestamp time.Time - Password string - }), - } - *ctx = context.WithValue(*ctx, "routerAuthCache", authCache) - } - - if authCache.Check(username, password) { - authCache.Update(username, password) - return true, nil - } +func (s *AuthService) ValidateFromValidationProvider(ctx *context.Context, username string, password string) (bool, error) { payload := struct { Username string `json:"email"` Token string `json:"token"` @@ -73,8 +51,8 @@ func (s *DefaultAuthService) Authenticate(ctx *context.Context, username string, } payloadBytes, _ := json.Marshal(payload) - req, _ := http.NewRequest("POST", boot.Config.Auth.Router.ValidationURL, bytes.NewReader(payloadBytes)) - req.Header.Set("X-Auth-Token", boot.Config.Auth.Router.ValidationToken) + req, _ := http.NewRequest("POST", s.ValidationProviderURL, bytes.NewReader(payloadBytes)) + req.Header.Set("X-Auth-Token", s.ValidationProviderToken) req.Header.Set("Content-Type", "application/json") client := &http.Client{} @@ -94,102 +72,112 @@ func (s *DefaultAuthService) Authenticate(ctx *context.Context, username string, return false, jsonParseError } - if data.OK { - authCache.Update(username, password) - } - return data.OK, nil } -func (authCache *AuthCache) Check(username, password string) bool { - authCache.mu.Lock() - defer authCache.mu.Unlock() - - entry, found := authCache.Cache[username] +func (s *AuthService) Authenticate(ctx *context.Context, username string, password string) (bool, error) { + authCache := s.GetInMemoryAuthCache(ctx) - if !found { - return false + if entry, exists := authCache.Get(username); exists && entry == password { + authCache.Update(username, password) + return true, nil } - cachedInterval := boot.Config.Auth.Router.CacheTTLMinutes - cachedDuration, _ := time.ParseDuration(cachedInterval) + isValid, err := s.ValidateFromValidationProvider(ctx, username, password) + if err != nil { + return false, err + } - if time.Since(entry.Timestamp) > cachedDuration { - // If entry is older than cachedDuration, then delete the record and return false - delete(authCache.Cache, username) - return false + if isValid { + authCache.Update(username, password) } - return entry.Password == password + return isValid, nil } -func (authCache *AuthCache) Update(username, password string) { - authCache.mu.Lock() - defer authCache.mu.Unlock() +func (s *AuthService) GetInMemoryAuthCache(ctx *context.Context) utils.ISimpleCache { + ctxKeyName := "routerAuthCache" + authCache, ok := (*ctx).Value(ctxKeyName).(*utils.InMemorySimpleCache) - authCache.Cache[username] = struct { - Timestamp time.Time - Password string - }{ - Timestamp: time.Now(), - Password: password, + if !ok { + + expiryInterval, _ := time.ParseDuration(boot.Config.Auth.Router.DelegatedAuth.CacheTTLMinutes) + + authCache = &utils.InMemorySimpleCache{ + Cache: make(map[string]struct { + Timestamp time.Time + Value string + }), + ExpiryInterval: expiryInterval, + } + *ctx = context.WithValue(*ctx, ctxKeyName, authCache) } + return authCache } -func WithAuth(ctx *context.Context, h http.Handler, authService ...AuthService) http.Handler { +func (r *RouterServer) isAuthDelegated(ctx *context.Context) (bool, error) { + res, err := r.gatewayApiClient.Policy.EvaluateAuthDelegationForClient(*ctx, &gatewayv1.EvaluateAuthDelegationRequest{IncomingPort: int32(r.port)}) - var auth AuthService - if len(authService) > 0 { - auth = authService[0] - } else { - auth = NewDefaultAuthService() + if err != nil { + provider.Logger(*ctx).WithError(err).Errorw( + fmt.Sprint(LOG_TAG, "Failed to evaluate auth delegation policy. Assuming delegation is disabled."), + map[string]interface{}{ + "port": r.port, + }) + return false, err } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - // TODO: Refactor auth type handling to a dedicated type + return res.GetIsAuthDelegated(), nil +} - // BasicAuth - username, password, isBasicAuth := r.BasicAuth() +func (r *RouterServer) AuthHandler(ctx *context.Context, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if isAuth, _ := r.isAuthDelegated(ctx); isAuth { + // TODO: Refactor auth type handling to a dedicated type + + // BasicAuth + username, password, isBasicAuth := req.BasicAuth() + + // CustomAuth + if !isBasicAuth { + provider.Logger(*ctx).Debug("Custom Auth type") + username = trinoheaders.Get(trinoheaders.User, req) + password = trinoheaders.Get(trinoheaders.Password, req) + } else { + if u := trinoheaders.Get(trinoheaders.User, req); u != username { + errorMsg := fmt.Sprintf("Username from basicauth - %s does not match with User principal - %s", username, u) + provider.Logger(*ctx).Debug(errorMsg) + http.Error(w, errorMsg, http.StatusUnauthorized) + } + + // Remove auth details from request + req.Header.Del("Authorization") + } - // CustomAuth - if !isBasicAuth { - provider.Logger(*ctx).Debug("Custom Auth type") - username = trinoheaders.Get(trinoheaders.User, r) - password = trinoheaders.Get(trinoheaders.Password, r) - } else { - if u := trinoheaders.Get(trinoheaders.User, r); u != username { - errorMsg := fmt.Sprintf("Username from basicauth - %s does not match with User principal - %s", username, u) - provider.Logger(*ctx).Debug(errorMsg) + // NoAuth + isNoAuth := password == "" + if isNoAuth { + provider.Logger(*ctx).Debug("No Auth type detected") + errorMsg := fmt.Sprintf("Password required") http.Error(w, errorMsg, http.StatusUnauthorized) + return } - // Remove auth details from request - r.Header.Del("Authorization") - } - - // NoAuth - isNoAuth := password == "" - if isNoAuth { - provider.Logger(*ctx).Debug("No Auth type detected") - errorMsg := fmt.Sprintf("Password required") - http.Error(w, errorMsg, http.StatusUnauthorized) - return - } - - isAuthenticated, err := auth.Authenticate(ctx, username, password) + isAuthenticated, err := r.authService.Authenticate(ctx, username, password) - if err != nil { - errorMsg := fmt.Sprintf("Unable to Authenticate users. Getting error - %s", err) - provider.Logger(*ctx).Error(errorMsg) - http.Error(w, "Unable to Authenticate the user", http.StatusNotFound) - return - } - if !isAuthenticated { - provider.Logger(*ctx).Debug(fmt.Sprintf("User - %s not authenticated", username)) - http.Error(w, "User not authenticated", http.StatusUnauthorized) - return + if err != nil { + errorMsg := fmt.Sprintf("Unable to Authenticate users. Getting error - %s", err) + provider.Logger(*ctx).Error(errorMsg) + http.Error(w, "Unable to Authenticate the user", http.StatusNotFound) + return + } + if !isAuthenticated { + provider.Logger(*ctx).Debug(fmt.Sprintf("User - %s not authenticated", username)) + http.Error(w, "User not authenticated", http.StatusUnauthorized) + return + } + h.ServeHTTP(w, req) + } else { + h.ServeHTTP(w, req) } - - h.ServeHTTP(w, r) }) } diff --git a/internal/router/auth_test.go b/internal/router/auth_test.go index 172c5a2..ef5dc37 100644 --- a/internal/router/auth_test.go +++ b/internal/router/auth_test.go @@ -1,247 +1,54 @@ -package router_test +package router import ( "context" - "errors" - r "github.com/razorpay/trino-gateway/internal/router" - "github.com/stretchr/testify/mock" - "net/http" - "net/http/httptest" - "sync" "testing" - "time" -) - -type MockAuthenticator struct { - mock.Mock -} - -// A mock struct that implements the AuthService interface -type AuthServiceMock struct { - authMock *MockAuthenticator -} -type AuthCache struct { - cache map[string]struct { - Timestamp time.Time - Password string - } - mu sync.Mutex -} - -func createContextWithHeaders() context.Context { - // Create a context with a value for routerAuthCache - ctx := context.WithValue(context.Background(), "routerAuthCache", &AuthCache{ - cache: make(map[string]struct { - Timestamp time.Time - Password string - }), - }) - return ctx -} - -func createTestRequestWithHeaders(username, password string) *http.Request { - req := httptest.NewRequest("GET", "/test", nil) - req.Header.Set("X-Trino-User", username) - req.Header.Set("X-Trino-Password", password) - return req -} - -func (m *AuthServiceMock) Authenticate(ctx *context.Context, username string, password string) (bool, error) { - args := m.authMock.Called(ctx, username, password) - return args.Bool(0), args.Error(1) -} - -func Test_WithAuth_Authorised(t *testing.T) { - authMock := &MockAuthenticator{} - ctx := createContextWithHeaders() - - handler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - } - - authMock.On("Authenticate", &ctx, "testuser", "testpassword").Return(true, nil) - - authService := &AuthServiceMock{authMock} - - wrappedHandler := r.WithAuth(&ctx, http.HandlerFunc(handler), authService) - - req := createTestRequestWithHeaders("testuser", "testpassword") - - rr := httptest.NewRecorder() - - wrappedHandler.ServeHTTP(rr, req) - - if rr.Code != http.StatusOK { - t.Errorf("Expected status code %d, got %d", http.StatusOK, rr.Code) - } - - authMock.AssertExpectations(t) -} - -func Test_WithAuth_UnAuthorised(t *testing.T) { - authMock := &MockAuthenticator{} - ctx := createContextWithHeaders() - - handler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - } - - authMock.On("Authenticate", &ctx, "testuser", "testpassword").Return(false, nil) - - authService := &AuthServiceMock{authMock} - - wrappedHandler := r.WithAuth(&ctx, http.HandlerFunc(handler), authService) - - req := createTestRequestWithHeaders("testuser", "testpassword") - rr := httptest.NewRecorder() - - wrappedHandler.ServeHTTP(rr, req) - - if rr.Code != http.StatusUnauthorized { - t.Errorf("Expected status code %d, got %d", http.StatusUnauthorized, rr.Code) - } + "github.com/razorpay/trino-gateway/pkg/logger" + "github.com/stretchr/testify/suite" +) - authMock.AssertExpectations(t) +// Define the suite, and absorb the built-in basic suite +// functionality from testify - including a T() method which +// returns the current testing context +type AuthSuite struct { + suite.Suite + authService *AuthService + ctx *context.Context } -func Test_WithAuth_Error(t *testing.T) { - authMock := &MockAuthenticator{} - ctx := createContextWithHeaders() - - handler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - } - - authMock.On("Authenticate", &ctx, "testuser", "testpassword").Return(false, errors.New("authentication failed")) - - authService := &AuthServiceMock{authMock} - - wrappedHandler := r.WithAuth(&ctx, http.HandlerFunc(handler), authService) - - req := createTestRequestWithHeaders("testuser", "testpassword") - - rr := httptest.NewRecorder() - - wrappedHandler.ServeHTTP(rr, req) - - if rr.Code != http.StatusNotFound { - t.Errorf("Expected status code %d, got %d", http.StatusNotFound, rr.Code) +func (suite *AuthSuite) SetupTest() { + lgrConfig := logger.Config{ + LogLevel: logger.Warn, } - authMock.AssertExpectations(t) -} - -func Test_Check_ValidCredentials_WithinCache(t *testing.T) { - // Create an instance of AuthCache - - authCache := &r.AuthCache{ - Cache: make(map[string]struct { - Timestamp time.Time - Password string - }), + l, err := logger.NewLogger(lgrConfig) + if err != nil { + panic("failed to initialize logger") } - //r.AuthCache{authCache} - // Define test credentials - username := "testuser" - password := "testpassword" - // Add a test entry to the cache - entry := struct { - Timestamp time.Time - Password string - }{ - Timestamp: time.Now(), - Password: password, - } - authCache.Cache[username] = entry - valid := authCache.Check(username, password) + c := context.WithValue(context.Background(), logger.LoggerCtxKey, l) - // Assert that the check method returns true for valid credentials - if !valid { - t.Error("Expected valid credentials, but got invalid credentials") - } + suite.ctx = &c + suite.authService = &AuthService{} } -func Test_Check_ValidCredentials_NotCached(t *testing.T) { - // Create an instance of AuthCache - - authCache := &r.AuthCache{ - Cache: make(map[string]struct { - Timestamp time.Time - Password string - }), - } - //r.AuthCache{authCache} - // Define test credentials - username := "testuser" - password := "testpassword" - - // Add a test entry to the cache - entry := struct { - Timestamp time.Time - Password string - }{ - Timestamp: time.Now().Add(-15 * time.Minute), - Password: password, - } - authCache.Cache[username] = entry - valid := authCache.Check(username, password) - - // Assert that the check method returns true for valid credentials - if valid { - t.Error("Expected expired credentials, but got valid credentials") - } -} +func (suite *AuthSuite) Test_GetInMemoryAuthCache_Persistance() { + key := "testKey" + value := "testValue" -func Test_Check_InValidCredentials_WithinCache(t *testing.T) { - // Create an instance of AuthCache + authCache := suite.authService.GetInMemoryAuthCache(suite.ctx) + authCache.Update(key, value) - authCache := &r.AuthCache{ - Cache: make(map[string]struct { - Timestamp time.Time - Password string - }), - } - //r.AuthCache{authCache} - // Define test credentials - username := "testuser" - password := "testpassword" + authCacheInstance2 := suite.authService.GetInMemoryAuthCache(suite.ctx) + entry, exists := authCacheInstance2.Get(key) - // Add a test entry to the cache - entry := struct { - Timestamp time.Time - Password string - }{ - Timestamp: time.Now(), - Password: password, + suite.Truef(exists, "Second cache instance doesn't have same key") + if exists { + suite.Equalf(value, entry, "Second Cache instance value doesn't match.") } - authCache.Cache[username] = entry - wrongpassword := "falsepassword" - valid := authCache.Check(username, wrongpassword) - - // Assert that the check method returns true for valid credentials - if valid { - t.Error("Expected Invalid credentials, but got valid credentials") - } } - -func Test_Update(t *testing.T) { - authCache := &r.AuthCache{ - Cache: make(map[string]struct { - Timestamp time.Time - Password string - }), - } - authCache.Update("testUser", "testPassword") - - expectedPassword := "testPassword" - if entry, exists := authCache.Cache["testUser"]; exists { - if entry.Password != expectedPassword { - t.Errorf("Password for 'testUser' is not as expected. Got: %s, Expected: %s", entry.Password, expectedPassword) - } - } else { - t.Errorf("Entry for 'testUser' not found in the cache") - } +func TestAuthSuite(t *testing.T) { + suite.Run(t, new(AuthSuite)) } diff --git a/internal/router/router.go b/internal/router/router.go index 23aa51a..509ff5a 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -8,6 +8,7 @@ import ( "net/http/httputil" "time" + "github.com/razorpay/trino-gateway/internal/boot" "github.com/razorpay/trino-gateway/internal/provider" "github.com/razorpay/trino-gateway/internal/utils" gatewayv1 "github.com/razorpay/trino-gateway/rpc/gateway" @@ -27,6 +28,7 @@ type RouterServer struct { gatewayApiClient *GatewayApiClient port int routerHostname string + authService IAuthService } type key int @@ -54,11 +56,15 @@ func init() { initMetrics() } -func Server(ctx *context.Context, port int, apiClient *GatewayApiClient, routerHostname string, authenticate string) *http.Server { +func Server(ctx *context.Context, port int, apiClient *GatewayApiClient, routerHostname string) *http.Server { routerServer := RouterServer{ port: port, gatewayApiClient: apiClient, routerHostname: routerHostname, + authService: &AuthService{ + ValidationProviderURL: boot.Config.Auth.Router.DelegatedAuth.ValidationProviderURL, + ValidationProviderToken: boot.Config.Auth.Router.DelegatedAuth.ValidationProviderToken, + }, } reverseProxy := httputil.ReverseProxy{ Director: func(req *http.Request) { routerServer.handleClientRequest(ctx, req) }, @@ -108,24 +114,16 @@ func Server(ctx *context.Context, port int, apiClient *GatewayApiClient, routerH }, } - if authenticate == "true" { - authenticatedReverseProxy := WithAuth(ctx, &reverseProxy) - return &http.Server{ - Handler: authenticatedReverseProxy, - } - } else { - return &http.Server{ - Handler: &reverseProxy, - } + return &http.Server{ + Handler: routerServer.AuthHandler(ctx, &reverseProxy), } - } func (r *RouterServer) extractSharedRequestCtxObject(ctx *context.Context, req *http.Request) (*ContextSharedObject, error) { reqCtx := req.Context() res, ok := (reqCtx).Value(keyCtxSharedObj).(*ContextSharedObject) if !ok { - err := errors.New("unable to cast shared object object from context") + err := errors.New("unable to cast shared object from context") provider.Logger(*ctx).WithError(err).Error("unable to cast shared object object from context") return nil, err } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index f55841e..b466eb2 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "net/http/httputil" + "sync" "time" "github.com/razorpay/trino-gateway/internal/provider" @@ -128,3 +129,49 @@ func ParseHttpPayloadBody(ctx *context.Context, body *io.ReadCloser, encoding st return string(bodyBytes), nil } } + +type ISimpleCache interface { + Get(key string) (string, bool) + Update(key, value string) +} + +type InMemorySimpleCache struct { + Cache map[string]struct { + Timestamp time.Time + Value string + } + ExpiryInterval time.Duration + mu sync.Mutex +} + +func (authCache *InMemorySimpleCache) Get(key string) (string, bool) { + authCache.mu.Lock() + defer authCache.mu.Unlock() + + entry, found := authCache.Cache[key] + + if !found { + return "", false + } + + if authCache.ExpiryInterval > 0 && + time.Since(entry.Timestamp) > authCache.ExpiryInterval { + // If entry is older than cachedDuration, then delete the record and return false + delete(authCache.Cache, key) + return "", false + } + return entry.Value, true +} + +func (authCache *InMemorySimpleCache) Update(key, value string) { + authCache.mu.Lock() + defer authCache.mu.Unlock() + + authCache.Cache[key] = struct { + Timestamp time.Time + Value string + }{ + Timestamp: time.Now(), + Value: value, + } +} diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index 66a8e8f..864f23c 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -92,6 +92,96 @@ func (suite *UtilsSuite) Test_parseBody() { suite.Equalf(str, tst_gzipped(), "String extraction is not idempotent") } +func (suite *UtilsSuite) Test_InMemorySimpleCache_Get() { + authCache := &InMemorySimpleCache{ + Cache: make(map[string]struct { + Timestamp time.Time + Value string + }), + } + key := "testKey" + value := "testValue" + authCache.Cache[key] = struct { + Timestamp time.Time + Value string + }{ + Timestamp: time.Now(), + Value: value, + } + + entry, exists := authCache.Get(key) + suite.Truef(exists, "Entry not found in cache.") + if exists { + suite.Equalf(value, entry, "Cached value doesn't match.") + } +} + +func (suite *UtilsSuite) Test_InMemorySimpleCache_Get_InfiniteExpiry() { + authCache := &InMemorySimpleCache{ + Cache: make(map[string]struct { + Timestamp time.Time + Value string + }), + ExpiryInterval: 0 * time.Second, + } + key := "testKey" + value := "testValue" + authCache.Cache[key] = struct { + Timestamp time.Time + Value string + }{ + Timestamp: time.Now().Add(-1000 * time.Hour), + Value: value, + } + + entry, exists := authCache.Get(key) + suite.Truef(exists, "Entry not found in cache.") + if exists { + suite.Equalf(value, entry, "Cached value doesn't match.") + } +} + +func (suite *UtilsSuite) Test_InMemorySimpleCache_Get_Expired() { + expiryInterval := 2 * time.Second + authCache := &InMemorySimpleCache{ + Cache: make(map[string]struct { + Timestamp time.Time + Value string + }), + ExpiryInterval: expiryInterval, + } + key := "testKey" + value := "testValue" + authCache.Cache[key] = struct { + Timestamp time.Time + Value string + }{ + Timestamp: time.Now().Add(-1 * expiryInterval).Add(-1 * time.Second), + Value: value, + } + + _, exists := authCache.Get(key) + suite.False(exists, "Entry not expired.") +} + +func (suite *UtilsSuite) Test_InMemorySimpleCache_Update() { + authCache := &InMemorySimpleCache{ + Cache: make(map[string]struct { + Timestamp time.Time + Value string + }), + } + key := "testKey" + value := "testValue" + authCache.Update(key, value) + + entry, exists := authCache.Cache[key] + suite.Truef(exists, "Entry not found in cache.") + if exists { + suite.Equalf(value, entry.Value, "Cached value doesn't match.") + } +} + func TestSuite(t *testing.T) { suite.Run(t, new(UtilsSuite)) } diff --git a/rpc/gateway/service.proto b/rpc/gateway/service.proto index 1896c5e..a1dd7de 100644 --- a/rpc/gateway/service.proto +++ b/rpc/gateway/service.proto @@ -243,6 +243,8 @@ service PolicyApi { description: "Evaluates routing policies and returns a list of groups eligible for this client request."; }; }; + + rpc EvaluateAuthDelegationForClient(EvaluateAuthDelegationRequest) returns (EvaluateAuthDelegationResponse); } message Policy { @@ -261,6 +263,7 @@ message Policy { string group = 3; // required string fallback_group = 4; bool is_enabled = 5; + bool is_auth_delegated = 6; } message PolicyGetRequest { @@ -298,6 +301,14 @@ message EvaluateGroupsResponse { repeated string group_ids = 1; // required } +message EvaluateAuthDelegationRequest { + int32 incoming_port = 1; // required +} + +message EvaluateAuthDelegationResponse { + bool is_auth_delegated = 1; // required +} + service QueryApi { rpc CreateOrUpdateQuery (Query) returns (Empty); rpc GetQuery (QueryGetRequest) returns (QueryGetResponse){ From 30702187e4427baf7ebde7db38eb344d6f78be46 Mon Sep 17 00:00:00 2001 From: Utkarsh Saxena Date: Sat, 25 May 2024 10:56:53 +0530 Subject: [PATCH 3/3] feat: set request headers Signed-off-by: Utkarsh Saxena --- .../20240525205304_add_set_source.go | 31 ++++++++++++ internal/gatewayserver/models/policy.go | 13 ++--- internal/gatewayserver/policyApi/core.go | 50 ++++++++++++++----- internal/gatewayserver/policyApi/server.go | 49 +++++++++++++----- internal/router/request.go | 9 ++++ internal/router/trinoheaders/trino.go | 1 + rpc/gateway/service.proto | 11 ++++ 7 files changed, 132 insertions(+), 32 deletions(-) create mode 100644 internal/gatewayserver/database/migrations/20240525205304_add_set_source.go diff --git a/internal/gatewayserver/database/migrations/20240525205304_add_set_source.go b/internal/gatewayserver/database/migrations/20240525205304_add_set_source.go new file mode 100644 index 0000000..b7175d8 --- /dev/null +++ b/internal/gatewayserver/database/migrations/20240525205304_add_set_source.go @@ -0,0 +1,31 @@ +package migration + +import ( + "database/sql" + + "github.com/pressly/goose/v3" +) + +func init() { + goose.AddMigration(Up20240525205304, Down20240525205304) +} + +func Up20240525205304(tx *sql.Tx) error { + var err error + + _, err = tx.Exec("ALTER TABLE `policies` ADD COLUMN `set_request_source` VARCHAR(255) DEFAULT '';") + if err != nil { + return err + } + return err +} + +func Down20240525205304(tx *sql.Tx) error { + var err error + + _, err = tx.Exec("ALTER TABLE `policies` DROP COLUMN `set_request_source`;") + if err != nil { + return err + } + return err +} diff --git a/internal/gatewayserver/models/policy.go b/internal/gatewayserver/models/policy.go index 9c773e8..3db8441 100644 --- a/internal/gatewayserver/models/policy.go +++ b/internal/gatewayserver/models/policy.go @@ -5,12 +5,13 @@ import "github.com/razorpay/trino-gateway/pkg/spine" // policy model struct definition type Policy struct { spine.Model - RuleType string `json:"rule_type"` - RuleValue string `json:"rule_value"` - GroupId string `json:"group_id"` - FallbackGroupId *string `json:"fallback_group_id"` - IsEnabled *bool `json:"is_enabled" sql:"DEFAULT:true"` - IsAuthDelegated *bool `json:"is_auth_delegated" sql:"DEFAULT:false"` + RuleType string `json:"rule_type"` + RuleValue string `json:"rule_value"` + GroupId string `json:"group_id"` + FallbackGroupId *string `json:"fallback_group_id"` + IsEnabled *bool `json:"is_enabled" sql:"DEFAULT:true"` + IsAuthDelegated *bool `json:"is_auth_delegated" sql:"DEFAULT:false"` + SetRequestSource *string `json:"set_request_source"` } func (u *Policy) TableName() string { diff --git a/internal/gatewayserver/policyApi/core.go b/internal/gatewayserver/policyApi/core.go index 465ee64..3d49283 100644 --- a/internal/gatewayserver/policyApi/core.go +++ b/internal/gatewayserver/policyApi/core.go @@ -26,6 +26,7 @@ type ICore interface { EvaluateGroupsForClient(ctx context.Context, c *EvaluateClientParams) ([]string, error) EvaluateAuthDelegation(ctx context.Context, p int32) (bool, error) + EvaluateRequestSource(ctx context.Context, p int32) (string, error) // EvaluatePolicy(ctx context.Context, group string) (string, error) // FindPolicyForQuery(ctx context.Context, q string) (string, error) } @@ -36,23 +37,25 @@ func NewCore(policy repo.IPolicyRepo) *Core { // CreateParams has attributes that are required for policy.Create() type PolicyCreateParams struct { - ID string - RuleType string - RuleValue string - Group string - FallbackGroup string - IsEnabled bool - IsAuthDelegated bool + ID string + RuleType string + RuleValue string + Group string + FallbackGroup string + IsEnabled bool + IsAuthDelegated bool + SetRequestSource string } func (c *Core) CreateOrUpdatePolicy(ctx context.Context, params *PolicyCreateParams) error { policy := models.Policy{ - RuleType: params.RuleType, - RuleValue: params.RuleValue, - GroupId: params.Group, - FallbackGroupId: ¶ms.FallbackGroup, - IsEnabled: ¶ms.IsEnabled, - IsAuthDelegated: ¶ms.IsAuthDelegated, + RuleType: params.RuleType, + RuleValue: params.RuleValue, + GroupId: params.Group, + FallbackGroupId: ¶ms.FallbackGroup, + IsEnabled: ¶ms.IsEnabled, + IsAuthDelegated: ¶ms.IsAuthDelegated, + SetRequestSource: ¶ms.SetRequestSource, } policy.ID = params.ID @@ -240,6 +243,27 @@ func (c *Core) EvaluateAuthDelegation(ctx context.Context, port int32) (bool, er return false, nil } +func (c *Core) EvaluateRequestSource(ctx context.Context, port int32) (string, error) { + res, err := c.FindMany( + ctx, + &FindManyParams{ + IsEnabled: true, + RuleType: "listening_port", + RuleValue: strconv.Itoa(int(port)), + }) + if err != nil { + return "", err + } + provider.Logger(ctx).Debugw("Evaluate Request Source For Port", map[string]interface{}{ + "listeningPort": port, + "matchingRules": res, + }) + if len(res) > 0 { + return *res[0].SetRequestSource, nil + } + return "", nil +} + // Implementing "set" collection methods here, :) func setIntersection(s1 map[string]struct{}, s2 map[string]struct{}) map[string]struct{} { s_intersection := map[string]struct{}{} diff --git a/internal/gatewayserver/policyApi/server.go b/internal/gatewayserver/policyApi/server.go index 7b883b8..aa547ad 100644 --- a/internal/gatewayserver/policyApi/server.go +++ b/internal/gatewayserver/policyApi/server.go @@ -32,13 +32,14 @@ func (s *Server) CreateOrUpdatePolicy(ctx context.Context, req *gatewayv1.Policy }) createParams := PolicyCreateParams{ - ID: req.GetId(), - RuleType: req.GetRule().GetType().Enum().String(), - RuleValue: req.GetRule().GetValue(), - Group: req.GetGroup(), - FallbackGroup: req.GetFallbackGroup(), - IsEnabled: req.GetIsEnabled(), - IsAuthDelegated: req.GetIsAuthDelegated(), + ID: req.GetId(), + RuleType: req.GetRule().GetType().Enum().String(), + RuleValue: req.GetRule().GetValue(), + Group: req.GetGroup(), + FallbackGroup: req.GetFallbackGroup(), + IsEnabled: req.GetIsEnabled(), + IsAuthDelegated: req.GetIsAuthDelegated(), + SetRequestSource: req.GetSetRequestSource(), } err := s.core.CreateOrUpdatePolicy(ctx, &createParams) @@ -143,12 +144,13 @@ func toPolicyResponseProto(policy *models.Policy) (*gatewayv1.Policy, error) { Value: policy.RuleValue, } response := gatewayv1.Policy{ - Id: policy.ID, - Rule: &rule, - Group: policy.GroupId, - FallbackGroup: *policy.FallbackGroupId, - IsEnabled: *policy.IsEnabled, - IsAuthDelegated: *policy.IsAuthDelegated, + Id: policy.ID, + Rule: &rule, + Group: policy.GroupId, + FallbackGroup: *policy.FallbackGroupId, + IsEnabled: *policy.IsEnabled, + IsAuthDelegated: *policy.IsAuthDelegated, + SetRequestSource: *policy.SetRequestSource, } return &response, nil @@ -198,3 +200,24 @@ func (s *Server) EvaluateAuthDelegationForClient(ctx context.Context, req *gatew } return &gatewayv1.EvaluateAuthDelegationResponse{IsAuthDelegated: result}, nil } + +func (s *Server) EvaluateRequestSourceForClient(ctx context.Context, req *gatewayv1.EvaluateRequestSourceRequest) (*gatewayv1.EvaluateRequestSourceResponse, error) { + provider.Logger(ctx).Debugw("EvaluateRequestSource", map[string]interface{}{ + "request": req.String(), + }) + + if req.GetIncomingPort() == 0 { + err := errors.New("Invalid port defined in `incoming_port`.") + provider.Logger(ctx).WithError(err).Error(err.Error()) + return &gatewayv1.EvaluateRequestSourceResponse{SetRequestSource: ""}, nil + } + + result, err := s.core.EvaluateRequestSource( + ctx, + req.GetIncomingPort(), + ) + if err != nil { + return nil, err + } + return &gatewayv1.EvaluateRequestSourceResponse{SetRequestSource: result}, nil +} diff --git a/internal/router/request.go b/internal/router/request.go index d880695..4129f5b 100644 --- a/internal/router/request.go +++ b/internal/router/request.go @@ -321,6 +321,15 @@ func (r *RouterServer) prepareReqForRouting(ctx *context.Context, req *http.Requ req.URL.Host = host req.URL.Scheme = scheme req.Host = host + sourceHeader, err := r.gatewayApiClient.Policy.EvaluateRequestSourceForClient(*ctx, &gatewayv1.EvaluateRequestSourceRequest{ + IncomingPort: int32(r.port), + }) + if err != nil { + return err + } + if s := sourceHeader.GetSetRequestSource(); s != "" { + req.Header.Set("X-Trino-Source", s) + } // TODO - validate and refine parsing of X-Forwarded headers req.Header.Set("X-Forwarded-Host", host) provider.Logger(*ctx).Infow( diff --git a/internal/router/trinoheaders/trino.go b/internal/router/trinoheaders/trino.go index e36a1e2..a062e20 100644 --- a/internal/router/trinoheaders/trino.go +++ b/internal/router/trinoheaders/trino.go @@ -14,6 +14,7 @@ const ( ConnectionProperties = "Connection-Properties" TransactionId = "Transaction-Id" Password = "Password" + Source = "Source" ) var allowedPrefixes = [...]string{"Presto", "Trino"} diff --git a/rpc/gateway/service.proto b/rpc/gateway/service.proto index a1dd7de..137f0d4 100644 --- a/rpc/gateway/service.proto +++ b/rpc/gateway/service.proto @@ -245,6 +245,8 @@ service PolicyApi { }; rpc EvaluateAuthDelegationForClient(EvaluateAuthDelegationRequest) returns (EvaluateAuthDelegationResponse); + + rpc EvaluateRequestSourceForClient(EvaluateRequestSourceRequest) returns (EvaluateRequestSourceResponse); } message Policy { @@ -264,6 +266,7 @@ message Policy { string fallback_group = 4; bool is_enabled = 5; bool is_auth_delegated = 6; + string set_request_source = 7; } message PolicyGetRequest { @@ -309,6 +312,14 @@ message EvaluateAuthDelegationResponse { bool is_auth_delegated = 1; // required } +message EvaluateRequestSourceRequest { + int32 incoming_port = 1; // required +} + +message EvaluateRequestSourceResponse { + string set_request_source = 1; +} + service QueryApi { rpc CreateOrUpdateQuery (Query) returns (Empty); rpc GetQuery (QueryGetRequest) returns (QueryGetResponse){