From 8269774a7083e5eb4222c6736c64e321ec7f84c8 Mon Sep 17 00:00:00 2001 From: Artem Gavrilov Date: Fri, 17 Mar 2023 20:20:03 +0200 Subject: [PATCH] PMM-11808 Fix query results deserialization bug (#1877) * PMM-11808 Fix * PMM-11808 Format * PMM-11808 Fix clickhouse query * PMM-11808 Fix * PMM-11808 Fix * Update managed/cmd/pmm-managed-starlark/main.go Co-authored-by: Alex Tymchuk * Update managed/services/checks/checks.go Co-authored-by: Alex Tymchuk --------- Co-authored-by: Alex Tymchuk --- managed/cmd/pmm-managed-starlark/main.go | 44 ++++++- managed/services/checks/checks.go | 159 +++++++++++++---------- 2 files changed, 133 insertions(+), 70 deletions(-) diff --git a/managed/cmd/pmm-managed-starlark/main.go b/managed/cmd/pmm-managed-starlark/main.go index 325bf2d5c1..07c72fe9ed 100644 --- a/managed/cmd/pmm-managed-starlark/main.go +++ b/managed/cmd/pmm-managed-starlark/main.go @@ -16,6 +16,7 @@ package main import ( + "encoding/base64" "encoding/json" "log" "os" @@ -30,6 +31,7 @@ import ( "golang.org/x/sys/unix" "gopkg.in/alecthomas/kingpin.v2" + "github.com/percona/pmm/api/agentpb" "github.com/percona/pmm/managed/services/checks" "github.com/percona/pmm/managed/utils/logger" "github.com/percona/pmm/version" @@ -117,13 +119,37 @@ func runChecks(l *logrus.Entry, data *checks.StarlarkScriptData) ([]check.Result return nil, errors.Wrap(err, "error initializing starlark env") } + res := make([]any, len(data.QueriesResults)) + for i, queryResult := range data.QueriesResults { + switch qr := queryResult.(type) { + case map[string]any: // used for PG multidb results where key is database name and value is rows + dbRes := make(map[string]any, len(qr)) + for dbName, dbQr := range qr { + s, ok := dbQr.(string) + if !ok { + return nil, errors.Errorf("unexpected query result type: %T", dbQr) + } + if dbRes[dbName], err = unmarshallQueryResult(s); err != nil { + return nil, err + } + } + res[i] = dbRes + case string: // used for all other databases + if res[i], err = unmarshallQueryResult(qr); err != nil { + return nil, err + } + default: + return nil, errors.Errorf("unknown query result type %T", qr) + } + } + var results []check.Result contextFuncs := checks.GetAdditionalContext() switch data.Version { case 1: - results, err = env.Run(data.Name, data.QueriesResults[0], contextFuncs, l.Debugln) + results, err = env.Run(data.Name, res[0], contextFuncs, l.Debugln) case 2: - results, err = env.Run(data.Name, data.QueriesResults, contextFuncs, l.Debugln) + results, err = env.Run(data.Name, res, contextFuncs, l.Debugln) } if err != nil { return nil, errors.Wrap(err, "error running starlark env") @@ -131,3 +157,17 @@ func runChecks(l *logrus.Entry, data *checks.StarlarkScriptData) ([]check.Result return results, nil } + +func unmarshallQueryResult(qr string) ([]map[string]any, error) { + b, err := base64.StdEncoding.DecodeString(qr) + if err != nil { + return nil, errors.Wrap(err, "failed to decode base64 encoded query result") + } + + res, err := agentpb.UnmarshalActionQueryResult(b) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal query result") + } + + return res, nil +} diff --git a/managed/services/checks/checks.go b/managed/services/checks/checks.go index c931ee97af..eb95dca473 100644 --- a/managed/services/checks/checks.go +++ b/managed/services/checks/checks.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "database/sql" + "encoding/base64" "encoding/json" "net/url" "os" @@ -84,6 +85,8 @@ var ( pmmAgent2_7_0 = version.MustParse("2.7.0") pmmAgent2_27_0 = version.MustParse("2.27.0-0") pmmAgentInvalid = version.MustParse("3.0.0-invalid") + + b64 = base64.StdEncoding ) // Service is responsible for interactions with Percona Check service. @@ -878,7 +881,7 @@ func (s *Service) executeCheck(ctx context.Context, target services.Target, c ch return res, nil } -func (s *Service) executeMySQLShowQuery(ctx context.Context, query check.Query, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMySQLShowQuery(ctx context.Context, query check.Query, target services.Target) ([]byte, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { return nil, errors.Wrap(err, "failed to prepare result") @@ -897,13 +900,13 @@ func (s *Service) executeMySQLShowQuery(ctx context.Context, query check.Query, return nil, errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return res, nil } -func (s *Service) executeMySQLSelectQuery(ctx context.Context, query check.Query, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMySQLSelectQuery(ctx context.Context, query check.Query, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -912,20 +915,20 @@ func (s *Service) executeMySQLSelectQuery(ctx context.Context, query check.Query }() if err = s.agentsRegistry.StartMySQLQuerySelectAction(ctx, r.ID, target.AgentID, target.DSN, query.Query, target.Files, target.TDP, target.TLSSkipVerify); err != nil { //nolint:lll - return nil, errors.Wrap(err, "failed to start mySQL select action") + return "", errors.Wrap(err, "failed to start mySQL select action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executePostgreSQLShowQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executePostgreSQLShowQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -933,14 +936,14 @@ func (s *Service) executePostgreSQLShowQuery(ctx context.Context, target service } }() if err = s.agentsRegistry.StartPostgreSQLQueryShowAction(ctx, r.ID, target.AgentID, target.DSN); err != nil { - return nil, errors.Wrap(err, "failed to start postgreSQL show action") + return "", errors.Wrap(err, "failed to start postgreSQL show action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } func (s *Service) executePostgreSQLSelectQuery(ctx context.Context, query check.Query, target services.Target) (any, error) { @@ -960,7 +963,7 @@ func (s *Service) executePostgreSQLSelectQuery(ctx context.Context, query check. if err != nil { return nil, errors.Wrap(err, "failed to split target by db") } - res := make(map[string][]map[string]any, len(targets)) + res := make(map[string]string, len(targets)) for dbName, t := range targets { if res[dbName], err = s.executePostgreSQLSelectQueryForSingleDB(ctx, query, t); err != nil { return nil, errors.WithStack(err) @@ -970,10 +973,10 @@ func (s *Service) executePostgreSQLSelectQuery(ctx context.Context, query check. return res, nil } -func (s *Service) executePostgreSQLSelectQueryForSingleDB(ctx context.Context, query check.Query, target services.Target) ([]map[string]any, error) { +func (s *Service) executePostgreSQLSelectQueryForSingleDB(ctx context.Context, query check.Query, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -982,21 +985,21 @@ func (s *Service) executePostgreSQLSelectQueryForSingleDB(ctx context.Context, q }() if err = s.agentsRegistry.StartPostgreSQLQuerySelectAction(ctx, r.ID, target.AgentID, target.DSN, query.Query); err != nil { - return nil, errors.Wrap(err, "failed to start postgreSQL select action") + return "", errors.Wrap(err, "failed to start postgreSQL select action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMongoDBGetParameterQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMongoDBGetParameterQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -1005,21 +1008,21 @@ func (s *Service) executeMongoDBGetParameterQuery(ctx context.Context, target se }() if err = s.agentsRegistry.StartMongoDBQueryGetParameterAction(ctx, r.ID, target.AgentID, target.DSN, target.Files, target.TDP); err != nil { - return nil, errors.Wrap(err, "failed to start mongoDB getParameter action") + return "", errors.Wrap(err, "failed to start mongoDB getParameter action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMongoDBBuildInfoQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMongoDBBuildInfoQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -1027,21 +1030,21 @@ func (s *Service) executeMongoDBBuildInfoQuery(ctx context.Context, target servi } }() if err = s.agentsRegistry.StartMongoDBQueryBuildInfoAction(ctx, r.ID, target.AgentID, target.DSN, target.Files, target.TDP); err != nil { - return nil, errors.Wrap(err, "failed to start mongoDB buildInfo action") + return "", errors.Wrap(err, "failed to start mongoDB buildInfo action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMongoDBGetCmdLineOptsQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMongoDBGetCmdLineOptsQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -1050,21 +1053,21 @@ func (s *Service) executeMongoDBGetCmdLineOptsQuery(ctx context.Context, target }() if err = s.agentsRegistry.StartMongoDBQueryGetCmdLineOptsAction(ctx, r.ID, target.AgentID, target.DSN, target.Files, target.TDP); err != nil { - return nil, errors.Wrap(err, "failed to start mongoDB getCmdLineOpts action") + return "", errors.Wrap(err, "failed to start mongoDB getCmdLineOpts action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMongoDBReplSetGetStatusQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMongoDBReplSetGetStatusQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -1073,21 +1076,21 @@ func (s *Service) executeMongoDBReplSetGetStatusQuery(ctx context.Context, targe }() if err = s.agentsRegistry.StartMongoDBQueryReplSetGetStatusAction(ctx, r.ID, target.AgentID, target.DSN, target.Files, target.TDP); err != nil { - return nil, errors.Wrap(err, "failed to start mongoDB replSetGetStatus action") + return "", errors.Wrap(err, "failed to start mongoDB replSetGetStatus action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMongoDBGetDiagnosticQuery(ctx context.Context, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMongoDBGetDiagnosticQuery(ctx context.Context, target services.Target) (string, error) { r, err := models.CreateActionResult(s.db.Querier, target.AgentID) if err != nil { - return nil, errors.Wrap(err, "failed to prepare result") + return "", errors.Wrap(err, "failed to prepare result") } defer func() { if err = s.db.Delete(r); err != nil { @@ -1096,18 +1099,18 @@ func (s *Service) executeMongoDBGetDiagnosticQuery(ctx context.Context, target s }() if err = s.agentsRegistry.StartMongoDBQueryGetDiagnosticDataAction(ctx, r.ID, target.AgentID, target.DSN, target.Files, target.TDP); err != nil { - return nil, errors.Wrap(err, "failed to start mongoDB getDiagnosticData action") + return "", errors.Wrap(err, "failed to start mongoDB getDiagnosticData action") } res, err := s.waitForResult(ctx, r.ID) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return agentpb.UnmarshalActionQueryResult(res) + return b64.EncodeToString(res), nil } -func (s *Service) executeMetricsInstantQuery(ctx context.Context, query check.Query, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMetricsInstantQuery(ctx context.Context, query check.Query, target services.Target) (string, error) { queryData := queryPlaceholders{ ServiceName: target.ServiceName, NodeName: target.NodeName, @@ -1115,14 +1118,14 @@ func (s *Service) executeMetricsInstantQuery(ctx context.Context, query check.Qu q, err := fillQueryPlaceholders(query.Query, queryData) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } var lookback time.Time // if not specified use empty time which means "current time" if v, ok := query.Parameters[check.Lookback]; ok { d, err := time.ParseDuration(v) if err != nil { - return nil, errors.Wrap(err, "failed to parse 'lookback' query parameter") + return "", errors.Wrap(err, "failed to parse 'lookback' query parameter") } lookback = time.Now().Add(-d) @@ -1130,7 +1133,7 @@ func (s *Service) executeMetricsInstantQuery(ctx context.Context, query check.Qu r, warns, err := s.vmClient.Query(ctx, q, lookback) if err != nil { - return nil, errors.Wrap(err, "failed to execute instant VM query") + return "", errors.Wrap(err, "failed to execute instant VM query") } for _, warn := range warns { @@ -1139,13 +1142,13 @@ func (s *Service) executeMetricsInstantQuery(ctx context.Context, query check.Qu res, err := convertVMValue(r) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return res, nil + return b64.EncodeToString(res), nil } -func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Query, target services.Target) ([]map[string]any, error) { +func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Query, target services.Target) (string, error) { queryData := queryPlaceholders{ ServiceName: target.ServiceName, NodeName: target.NodeName, @@ -1153,7 +1156,7 @@ func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Quer q, err := fillQueryPlaceholders(query.Query, queryData) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } rng := v1.Range{ @@ -1163,7 +1166,7 @@ func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Quer if v, ok := query.Parameters[check.Lookback]; ok { d, err := time.ParseDuration(v) if err != nil { - return nil, errors.Wrap(err, "failed to parse 'lookback' query parameter") + return "", errors.Wrap(err, "failed to parse 'lookback' query parameter") } rng.End = time.Now().Add(-d) @@ -1171,29 +1174,29 @@ func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Quer rg, ok := query.Parameters[check.Range] if !ok { - return nil, errors.New("'range' query parameter is required for range queries") + return "", errors.New("'range' query parameter is required for range queries") } d, err := time.ParseDuration(rg) if err != nil { - return nil, errors.Wrap(err, "failed to parse 'range' query parameter") + return "", errors.Wrap(err, "failed to parse 'range' query parameter") } rng.Start = rng.End.Add(-d) st, ok := query.Parameters[check.Step] if !ok { - return nil, errors.New("'step' query parameter is required for range queries") + return "", errors.New("'step' query parameter is required for range queries") } rng.Step, err = time.ParseDuration(st) if err != nil { - return nil, errors.Wrap(err, "failed to parse 'step' query parameter") + return "", errors.Wrap(err, "failed to parse 'step' query parameter") } r, warns, err := s.vmClient.QueryRange(ctx, q, rng) if err != nil { - return nil, errors.Wrap(err, "failed to execute range VM query") + return "", errors.Wrap(err, "failed to execute range VM query") } for _, warn := range warns { @@ -1202,13 +1205,13 @@ func (s *Service) executeMetricsRangeQuery(ctx context.Context, query check.Quer res, err := convertVMValue(r) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } - return res, nil + return b64.EncodeToString(res), nil } -func (s *Service) executeClickhouseSelectQuery(ctx context.Context, checkQuery check.Query, target services.Target) ([]byte, error) { +func (s *Service) executeClickhouseSelectQuery(ctx context.Context, checkQuery check.Query, target services.Target) (string, error) { queryData := queryPlaceholders{ ServiceName: target.ServiceName, ServiceID: target.ServiceID, @@ -1216,25 +1219,30 @@ func (s *Service) executeClickhouseSelectQuery(ctx context.Context, checkQuery c query, err := fillQueryPlaceholders(checkQuery.Query, queryData) if err != nil { - return nil, errors.WithStack(err) + return "", errors.WithStack(err) } query = "SELECT " + query rows, err := s.clickhouseDB.QueryContext(ctx, query, nil) if err != nil { - return nil, errors.Wrap(err, "failed to execute query") + return "", errors.Wrap(err, "failed to execute query") } columns, dataRows, err := sqlrows.ReadRows(rows) if err != nil { - return nil, err + return "", errors.WithStack(err) } - return agentpb.MarshalActionQuerySQLResult(columns, dataRows) + b, err := agentpb.MarshalActionQuerySQLResult(columns, dataRows) + if err != nil { + return "", errors.WithStack(err) + } + + return b64.EncodeToString(b), nil } // convertVMValue converts VM results to format applicable to check input. -func convertVMValue(value model.Value) ([]map[string]any, error) { +func convertVMValue(value model.Value) ([]byte, error) { if value.Type() == model.ValScalar { // MetricsQL treats scalar type the same as instant vector without labels, since subtle differences between // these types usually confuse users. See the corresponding Prometheus docs for details. @@ -1254,7 +1262,12 @@ func convertVMValue(value model.Value) ([]map[string]any, error) { return nil, errors.WithStack(err) } - return data, nil + res, err := agentpb.MarshalActionQueryDocsResult(data) + if err != nil { + return nil, errors.WithStack(err) + } + + return res, nil } func (s *Service) discoverAvailablePGDatabases(ctx context.Context, target services.Target) ([]string, error) { @@ -1266,8 +1279,18 @@ WHERE datallowconn = true AND datistemplate = false AND has_database_privilege(c return nil, errors.Wrapf(err, "failed to select available databases") } - r := make([]string, len(res)) - for i, row := range res { + dec, err := b64.DecodeString(res) + if err != nil { + return nil, errors.Wrap(err, "failed to decode database discovery results") + } + + data, err := agentpb.UnmarshalActionQueryResult(dec) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal database discovery results") + } + + r := make([]string, len(data)) + for i, row := range data { datname, ok := row["datname"] if !ok { return nil, errors.New("missing expected 'datname' filed in query response")