diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1c22213d29..adf3447122 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,8 +2,7 @@ /build/ @nikita-b @atymchuk /admin/ @michal-kralik /agent/agents/postgres/ @JiriCtvrtka -/agent/actions/ @artemgavrilov -/agent/jobs/ @artemgavrilov +/agent/runner/ @artemgavrilov /api/ @BupycHuk /docs/api/ @atymchuk /managed/services/checks/ @idoqo diff --git a/.golangci.yml b/.golangci.yml index c7ed149826..ae12e08721 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -66,7 +66,6 @@ linters: - gochecknoinits # we use init functions - gomnd # we are using numbers in many cases - gomoddirectives # we use replace directives - - gomodguard # we are not using modules - ifshort # a lot of false positives - interfacer # deprecated - maligned # deprecated diff --git a/agent/actions/concurrent_runner.go b/agent/actions/concurrent_runner.go deleted file mode 100644 index 713e07168c..0000000000 --- a/agent/actions/concurrent_runner.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2019 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package actions - -import ( - "context" - "runtime/pprof" - "sync" - "time" - - "github.com/sirupsen/logrus" -) - -// ActionResult represents an Action result. -type ActionResult struct { - ID string - Output []byte - Error string -} - -// ConcurrentRunner represents concurrent Action runner. -// Action runner is component that can run an Actions. -type ConcurrentRunner struct { - ctx context.Context - l *logrus.Entry - results chan ActionResult - - runningActions sync.WaitGroup - - rw sync.RWMutex - actionsCancel map[string]context.CancelFunc -} - -// NewConcurrentRunner returns new runner. -// With this component you can run actions concurrently and read action results when they will be finished. -// -// ConcurrentRunner is stopped when context passed to NewConcurrentRunner is canceled. -// Results are reported via Results() channel which must be read until it is closed. -func NewConcurrentRunner(ctx context.Context) *ConcurrentRunner { - r := &ConcurrentRunner{ - ctx: ctx, - l: logrus.WithField("component", "actions-runner"), - results: make(chan ActionResult), - actionsCancel: make(map[string]context.CancelFunc), - } - - // let all actions finish and send their results before closing it - go func() { - <-ctx.Done() - r.runningActions.Wait() - r.l.Infof("Done.") - close(r.results) - }() - - return r -} - -// Start starts an Action in a separate goroutine. -func (r *ConcurrentRunner) Start(a Action, timeout time.Duration) { - if err := r.ctx.Err(); err != nil { - r.l.Errorf("Ignoring Start: %s.", err) - return - } - actionID, actionType := a.ID(), a.Type() - - // FIXME There is a data race. Add must not be called concurrently with Wait, but it can be: - // 0. no actions are running, WaitGroup has 0 - // 1. Start is called - // 2. ctx is canceled on this line - // 3. Wait is called in the goroutine above - // 4. Add is called below - // 5. Add panics with "sync: WaitGroup misuse: Add called concurrently with Wait" - // See skipped test (run it in a loop with race detector). - // https://jira.percona.com/browse/PMM-4112 - // https://jira.percona.com/browse/PMM-7206 - r.runningActions.Add(1) - - ctx, cancel := context.WithTimeout(r.ctx, timeout) - run := func(ctx context.Context) { - defer r.runningActions.Done() - defer cancel() - - r.rw.Lock() - r.actionsCancel[actionID] = cancel - r.rw.Unlock() - - l := r.l.WithFields(logrus.Fields{"id": actionID, "type": actionType}) - l.Infof("Starting...") - - b, err := a.Run(ctx) - - r.rw.Lock() - delete(r.actionsCancel, actionID) - r.rw.Unlock() - - if err == nil { - l.Infof("Done without error.") - } else { - l.Warnf("Done with error: %s.", err) - } - - var errorS string - if err != nil { - errorS = err.Error() - } - r.results <- ActionResult{ - ID: actionID, - Output: b, - Error: errorS, - } - } - go pprof.Do(ctx, pprof.Labels("actionID", actionID, "type", actionType), run) -} - -// Results returns channel with Actions results. -func (r *ConcurrentRunner) Results() <-chan ActionResult { - return r.results -} - -// Stop stops running Action. -func (r *ConcurrentRunner) Stop(id string) { - r.rw.RLock() - defer r.rw.RUnlock() - if cancel, ok := r.actionsCancel[id]; ok { - cancel() - } -} diff --git a/agent/actions/concurrent_runner_test.go b/agent/actions/concurrent_runner_test.go deleted file mode 100644 index 2750880eef..0000000000 --- a/agent/actions/concurrent_runner_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2019 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package actions - -import ( - "context" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -// assertResults checks expected results in any order. -func assertResults(t *testing.T, cr *ConcurrentRunner, expected ...ActionResult) { - t.Helper() - - actual := make([]ActionResult, len(expected)) - for i := range expected { - r := <-cr.Results() - if len(r.Output) == 0 { - r.Output = nil - } - actual[i] = r - } - - sort.Slice(expected, func(i, j int) bool { return expected[i].ID < expected[j].ID }) - sort.Slice(actual, func(i, j int) bool { return actual[i].ID < actual[j].ID }) - assert.Equal(t, expected, actual) -} - -func TestConcurrentRunnerRun(t *testing.T) { - t.Parallel() - - cr := NewConcurrentRunner(context.Background()) - a1 := NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", "echo", []string{"test"}) - a2 := NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", "echo", []string{"test2"}) - - cr.Start(a1, 5*time.Second) - cr.Start(a2, 5*time.Second) - - expected := []ActionResult{ - {ID: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Output: []byte("test\n")}, - {ID: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Output: []byte("test2\n")}, - } - assertResults(t, cr, expected...) - assert.Empty(t, cr.actionsCancel) -} - -func TestConcurrentRunnerTimeout(t *testing.T) { - t.Parallel() - - cr := NewConcurrentRunner(context.Background()) - a1 := NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", "sleep", []string{"20"}) - a2 := NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", "sleep", []string{"30"}) - - cr.Start(a1, time.Second) - cr.Start(a2, time.Second) - - // https://github.com/golang/go/issues/21880 - expected := []ActionResult{ - {ID: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Error: "signal: killed"}, - {ID: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Error: "signal: killed"}, - } - assertResults(t, cr, expected...) - assert.Empty(t, cr.actionsCancel) -} - -func TestConcurrentRunnerStop(t *testing.T) { - t.Parallel() - - cr := NewConcurrentRunner(context.Background()) - a1 := NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", "sleep", []string{"20"}) - a2 := NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", "sleep", []string{"30"}) - - cr.Start(a1, 5*time.Second) - cr.Start(a2, 5*time.Second) - - <-time.After(time.Second) - - cr.Stop(a1.ID()) - cr.Stop(a2.ID()) - - // https://github.com/golang/go/issues/21880 - expected := []ActionResult{ - {ID: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Error: "signal: killed"}, - {ID: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Error: "signal: killed"}, - } - assertResults(t, cr, expected...) - assert.Empty(t, cr.actionsCancel) -} - -func TestConcurrentRunnerCancel(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - cr := NewConcurrentRunner(ctx) - a1 := NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", "sleep", []string{"20"}) - a2 := NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", "sleep", []string{"30"}) - - cr.Start(a1, 5*time.Second) - cr.Start(a2, 5*time.Second) - - cancel() - - // Unlike other tests, there we mostly see "context canceled", but "signal: killed" still happens. - // Check both. - expected := make([]ActionResult, 2) - expected[0] = <-cr.Results() - expected[1] = <-cr.Results() - sort.Slice(expected, func(i, j int) bool { return expected[i].ID < expected[j].ID }) - assert.Equal(t, expected[0].ID, "/action_id/6a479303-5081-46d0-baa0-87d6248c987b") - assert.Contains(t, []string{"signal: killed", context.Canceled.Error()}, expected[0].Error) - assert.Equal(t, expected[1].ID, "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14") - assert.Contains(t, []string{"signal: killed", context.Canceled.Error()}, expected[0].Error) - assert.Empty(t, cr.actionsCancel) -} - -func TestConcurrentRunnerCancelEmpty(t *testing.T) { - t.Skip("https://jira.percona.com/browse/PMM-4112") - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - cr := NewConcurrentRunner(ctx) - a := NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", "sleep", []string{"20"}) - - go cancel() - cr.Start(a, 5*time.Second) - - expected := []ActionResult{ - {ID: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Error: context.Canceled.Error()}, - } - assertResults(t, cr, expected...) - assert.Empty(t, cr.actionsCancel) -} diff --git a/agent/agents/mongodb/internal/profiler/profiler_test.go b/agent/agents/mongodb/internal/profiler/profiler_test.go index f6234e19bd..16bef16aba 100644 --- a/agent/agents/mongodb/internal/profiler/profiler_test.go +++ b/agent/agents/mongodb/internal/profiler/profiler_test.go @@ -30,9 +30,9 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "github.com/percona/pmm/agent/actions" "github.com/percona/pmm/agent/agents/mongodb/internal/profiler/aggregator" "github.com/percona/pmm/agent/agents/mongodb/internal/report" + "github.com/percona/pmm/agent/runner/actions" "github.com/percona/pmm/agent/utils/templates" "github.com/percona/pmm/agent/utils/tests" "github.com/percona/pmm/api/agentpb" @@ -216,14 +216,15 @@ func testProfiler(t *testing.T, url string) { // This test is here to ensure the query example the profiler captures is valid to be used in Explain. t.Run("TestMongoDBExplain", func(t *testing.T) { id := "abcd1234" - ctx := context.TODO() params := &agentpb.StartActionRequest_MongoDBExplainParams{ Dsn: tests.GetTestMongoDBDSN(t), Query: findBucket.Common.Example, } - ex := actions.NewMongoDBExplainAction(id, params, os.TempDir()) + ex := actions.NewMongoDBExplainAction(id, 5*time.Second, params, os.TempDir()) + ctx, cancel := context.WithTimeout(context.Background(), ex.Timeout()) + defer cancel() res, err := ex.Run(ctx) assert.Nil(t, err) diff --git a/agent/client/client.go b/agent/client/client.go index 83291f1141..02ada80e96 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -34,10 +34,11 @@ import ( grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/percona/pmm/agent/actions" // TODO https://jira.percona.com/browse/PMM-7206 "github.com/percona/pmm/agent/client/channel" "github.com/percona/pmm/agent/config" - "github.com/percona/pmm/agent/jobs" + "github.com/percona/pmm/agent/runner" + "github.com/percona/pmm/agent/runner/actions" // TODO https://jira.percona.com/browse/PMM-7206 + "github.com/percona/pmm/agent/runner/jobs" "github.com/percona/pmm/agent/utils/backoff" "github.com/percona/pmm/api/agentpb" "github.com/percona/pmm/utils/tlsconfig" @@ -45,11 +46,10 @@ import ( ) const ( - dialTimeout = 5 * time.Second - backoffMinDelay = 1 * time.Second - backoffMaxDelay = 15 * time.Second - clockDriftWarning = 5 * time.Second - defaultActionTimeout = 10 * time.Second // default timeout for compatibility with an older server + dialTimeout = 5 * time.Second + backoffMinDelay = 1 * time.Second + backoffMaxDelay = 15 * time.Second + clockDriftWarning = 5 * time.Second ) // Client represents pmm-agent's connection to nginx/pmm-managed. @@ -67,8 +67,7 @@ type Client struct { // for unit tests only dialTimeout time.Duration - actionsRunner *actions.ConcurrentRunner - jobsRunner *jobs.Runner + runner *runner.Runner rw sync.RWMutex md *agentpb.ServerConnectMetadata @@ -88,6 +87,7 @@ func New(cfg *config.Config, supervisor supervisor, connectionChecker connection backoff: backoff.New(backoffMinDelay, backoffMaxDelay), done: make(chan struct{}), dialTimeout: dialTimeout, + runner: runner.New(cfg.RunnerCapacity), defaultsFileParser: dfp, } } @@ -102,9 +102,6 @@ func New(cfg *config.Config, supervisor supervisor, connectionChecker connection func (c *Client) Run(ctx context.Context) error { c.l.Info("Starting...") - c.actionsRunner = actions.NewConcurrentRunner(ctx) - c.jobsRunner = jobs.NewRunner() - // do nothing until ctx is canceled if config misses critical info var missing string if c.cfg.ID == "" { @@ -181,7 +178,7 @@ func (c *Client) Run(ctx context.Context) error { oneDone := make(chan struct{}, 5) go func() { - c.jobsRunner.Run(ctx) + c.runner.Run(ctx) oneDone <- struct{}{} }() go func() { @@ -219,13 +216,8 @@ func (c *Client) Done() <-chan struct{} { } func (c *Client) processActionResults() { - for result := range c.actionsRunner.Results() { - resp, err := c.channel.SendAndWaitResponse(&agentpb.ActionResultRequest{ - ActionId: result.ID, - Output: result.Output, - Done: true, - Error: result.Error, - }) + for result := range c.runner.ActionsResults() { + resp, err := c.channel.SendAndWaitResponse(result) if err != nil { c.l.Error(err) continue @@ -238,8 +230,11 @@ func (c *Client) processActionResults() { } func (c *Client) processJobsResults() { - for message := range c.jobsRunner.Messages() { - c.channel.Send(message) + for message := range c.runner.JobsMessages() { + c.channel.Send(&channel.AgentResponse{ + ID: 0, // Jobs send messages that don't require any responses, so we can leave message ID blank. + Payload: message, + }) } c.l.Debugf("Jobs runner Messages() channel drained.") } @@ -288,7 +283,6 @@ func (c *Client) processChannelRequests(ctx context.Context) { for req := range c.channel.Requests() { var responsePayload agentpb.AgentResponsePayload var status *grpcstatus.Status - outerSwitch: switch p := req.Payload.(type) { case *agentpb.Ping: responsePayload = &agentpb.Pong{ @@ -300,115 +294,15 @@ func (c *Client) processChannelRequests(ctx context.Context) { responsePayload = &agentpb.SetStateResponse{} case *agentpb.StartActionRequest: - var action actions.Action - switch params := p.Params.(type) { - case *agentpb.StartActionRequest_MysqlExplainParams: - action = actions.NewMySQLExplainAction(p.ActionId, params.MysqlExplainParams) - - case *agentpb.StartActionRequest_MysqlShowCreateTableParams: - action = actions.NewMySQLShowCreateTableAction(p.ActionId, params.MysqlShowCreateTableParams) - - case *agentpb.StartActionRequest_MysqlShowTableStatusParams: - action = actions.NewMySQLShowTableStatusAction(p.ActionId, params.MysqlShowTableStatusParams) - - case *agentpb.StartActionRequest_MysqlShowIndexParams: - action = actions.NewMySQLShowIndexAction(p.ActionId, params.MysqlShowIndexParams) - - case *agentpb.StartActionRequest_PostgresqlShowCreateTableParams: - action = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, params.PostgresqlShowCreateTableParams, c.cfg.Paths.TempDir) - - case *agentpb.StartActionRequest_PostgresqlShowIndexParams: - action = actions.NewPostgreSQLShowIndexAction(p.ActionId, params.PostgresqlShowIndexParams, c.cfg.Paths.TempDir) - - case *agentpb.StartActionRequest_MongodbExplainParams: - action = actions.NewMongoDBExplainAction(p.ActionId, params.MongodbExplainParams, c.cfg.Paths.TempDir) - - case *agentpb.StartActionRequest_MysqlQueryShowParams: - action = actions.NewMySQLQueryShowAction(p.ActionId, params.MysqlQueryShowParams) - - case *agentpb.StartActionRequest_MysqlQuerySelectParams: - action = actions.NewMySQLQuerySelectAction(p.ActionId, params.MysqlQuerySelectParams) - - case *agentpb.StartActionRequest_PostgresqlQueryShowParams: - action = actions.NewPostgreSQLQueryShowAction(p.ActionId, params.PostgresqlQueryShowParams, c.cfg.Paths.TempDir) - - case *agentpb.StartActionRequest_PostgresqlQuerySelectParams: - action = actions.NewPostgreSQLQuerySelectAction(p.ActionId, params.PostgresqlQuerySelectParams, c.cfg.Paths.TempDir) - - case *agentpb.StartActionRequest_MongodbQueryGetparameterParams: - action = actions.NewMongoDBQueryAdmincommandAction(actions.MongoDBQueryAdmincommandActionParams{ - ID: p.ActionId, - DSN: params.MongodbQueryGetparameterParams.Dsn, - Files: params.MongodbQueryGetparameterParams.TextFiles, - Command: "getParameter", - Arg: "*", - TempDir: c.cfg.Paths.TempDir, - }) - - case *agentpb.StartActionRequest_MongodbQueryBuildinfoParams: - action = actions.NewMongoDBQueryAdmincommandAction(actions.MongoDBQueryAdmincommandActionParams{ - ID: p.ActionId, - DSN: params.MongodbQueryBuildinfoParams.Dsn, - Files: params.MongodbQueryBuildinfoParams.TextFiles, - Command: "buildInfo", - Arg: 1, - TempDir: c.cfg.Paths.TempDir, - }) - - case *agentpb.StartActionRequest_MongodbQueryGetcmdlineoptsParams: - action = actions.NewMongoDBQueryAdmincommandAction(actions.MongoDBQueryAdmincommandActionParams{ - ID: p.ActionId, - DSN: params.MongodbQueryGetcmdlineoptsParams.Dsn, - Files: params.MongodbQueryGetcmdlineoptsParams.TextFiles, - Command: "getCmdLineOpts", - Arg: 1, - TempDir: c.cfg.Paths.TempDir, - }) - - case *agentpb.StartActionRequest_MongodbQueryReplsetgetstatusParams: - action = actions.NewMongoDBQueryAdmincommandAction(actions.MongoDBQueryAdmincommandActionParams{ - ID: p.ActionId, - DSN: params.MongodbQueryReplsetgetstatusParams.Dsn, - Files: params.MongodbQueryReplsetgetstatusParams.TextFiles, - Command: "replSetGetStatus", - Arg: 1, - TempDir: c.cfg.Paths.TempDir, - }) - - case *agentpb.StartActionRequest_MongodbQueryGetdiagnosticdataParams: - action = actions.NewMongoDBQueryAdmincommandAction(actions.MongoDBQueryAdmincommandActionParams{ - ID: p.ActionId, - DSN: params.MongodbQueryGetdiagnosticdataParams.Dsn, - Files: params.MongodbQueryGetdiagnosticdataParams.TextFiles, - Command: "getDiagnosticData", - Arg: 1, - TempDir: c.cfg.Paths.TempDir, - }) - - case *agentpb.StartActionRequest_PtSummaryParams: - action = actions.NewProcessAction(p.ActionId, c.cfg.Paths.PTSummary, []string{}) - - case *agentpb.StartActionRequest_PtPgSummaryParams: - action = actions.NewProcessAction(p.ActionId, c.cfg.Paths.PTPGSummary, argListFromPgParams(params.PtPgSummaryParams)) - - case *agentpb.StartActionRequest_PtMysqlSummaryParams: - action = actions.NewPTMySQLSummaryAction(p.ActionId, c.cfg.Paths.PTMySQLSummary, params.PtMysqlSummaryParams) - - case *agentpb.StartActionRequest_PtMongodbSummaryParams: - action = actions.NewProcessAction(p.ActionId, c.cfg.Paths.PTMongoDBSummary, argListFromMongoDBParams(params.PtMongodbSummaryParams)) - - default: - c.l.Errorf("Unhandled StartAction request: %v.", req) + responsePayload = &agentpb.StartActionResponse{} + if err := c.handleStartActionRequest(p); err != nil { responsePayload = nil status = grpcstatus.New(codes.Unimplemented, "can't handle start action type send, it is not implemented") - break outerSwitch + break } - c.actionsRunner.Start(action, c.getActionTimeout(p)) - responsePayload = &agentpb.StartActionResponse{} - case *agentpb.StopActionRequest: - c.actionsRunner.Stop(p.ActionId) + c.runner.Stop(p.ActionId) responsePayload = &agentpb.StopActionResponse{} case *agentpb.CheckConnectionRequest: @@ -422,11 +316,11 @@ func (c *Client) processChannelRequests(ctx context.Context) { responsePayload = &resp case *agentpb.StopJobRequest: - c.jobsRunner.Stop(p.JobId) + c.runner.Stop(p.JobId) responsePayload = &agentpb.StopJobResponse{} case *agentpb.JobStatusRequest: - alive := c.jobsRunner.IsRunning(p.JobId) + alive := c.runner.IsRunning(p.JobId) responsePayload = &agentpb.JobStatusResponse{Alive: alive} case *agentpb.GetVersionsRequest: @@ -459,6 +353,116 @@ func (c *Client) processChannelRequests(ctx context.Context) { c.l.Debug("Channel closed.") } +func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error { + timeout := p.Timeout.AsDuration() + if err := p.Timeout.CheckValid(); err != nil { + timeout = 0 + } + + var action actions.Action + switch params := p.Params.(type) { + case *agentpb.StartActionRequest_MysqlExplainParams: + action = actions.NewMySQLExplainAction(p.ActionId, timeout, params.MysqlExplainParams) + + case *agentpb.StartActionRequest_MysqlShowCreateTableParams: + action = actions.NewMySQLShowCreateTableAction(p.ActionId, timeout, params.MysqlShowCreateTableParams) + + case *agentpb.StartActionRequest_MysqlShowTableStatusParams: + action = actions.NewMySQLShowTableStatusAction(p.ActionId, timeout, params.MysqlShowTableStatusParams) + + case *agentpb.StartActionRequest_MysqlShowIndexParams: + action = actions.NewMySQLShowIndexAction(p.ActionId, timeout, params.MysqlShowIndexParams) + + case *agentpb.StartActionRequest_PostgresqlShowCreateTableParams: + action = actions.NewPostgreSQLShowCreateTableAction(p.ActionId, timeout, params.PostgresqlShowCreateTableParams, c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_PostgresqlShowIndexParams: + action = actions.NewPostgreSQLShowIndexAction(p.ActionId, timeout, params.PostgresqlShowIndexParams, c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbExplainParams: + action = actions.NewMongoDBExplainAction(p.ActionId, timeout, params.MongodbExplainParams, c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MysqlQueryShowParams: + action = actions.NewMySQLQueryShowAction(p.ActionId, timeout, params.MysqlQueryShowParams) + + case *agentpb.StartActionRequest_MysqlQuerySelectParams: + action = actions.NewMySQLQuerySelectAction(p.ActionId, timeout, params.MysqlQuerySelectParams) + + case *agentpb.StartActionRequest_PostgresqlQueryShowParams: + action = actions.NewPostgreSQLQueryShowAction(p.ActionId, timeout, params.PostgresqlQueryShowParams, c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_PostgresqlQuerySelectParams: + action = actions.NewPostgreSQLQuerySelectAction(p.ActionId, timeout, params.PostgresqlQuerySelectParams, c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbQueryGetparameterParams: + action = actions.NewMongoDBQueryAdmincommandAction( + p.ActionId, + timeout, + params.MongodbQueryGetparameterParams.Dsn, + params.MongodbQueryGetparameterParams.TextFiles, + "getParameter", + "*", + c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbQueryBuildinfoParams: + action = actions.NewMongoDBQueryAdmincommandAction( + p.ActionId, + timeout, + params.MongodbQueryBuildinfoParams.Dsn, + params.MongodbQueryBuildinfoParams.TextFiles, + "buildInfo", + 1, + c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbQueryGetcmdlineoptsParams: + action = actions.NewMongoDBQueryAdmincommandAction( + p.ActionId, + timeout, + params.MongodbQueryGetcmdlineoptsParams.Dsn, + params.MongodbQueryGetcmdlineoptsParams.TextFiles, + "getCmdLineOpts", + 1, + c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbQueryReplsetgetstatusParams: + action = actions.NewMongoDBQueryAdmincommandAction( + p.ActionId, + timeout, + params.MongodbQueryReplsetgetstatusParams.Dsn, + params.MongodbQueryReplsetgetstatusParams.TextFiles, + "replSetGetStatus", + 1, + c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_MongodbQueryGetdiagnosticdataParams: + action = actions.NewMongoDBQueryAdmincommandAction( + p.ActionId, + timeout, + params.MongodbQueryGetdiagnosticdataParams.Dsn, + params.MongodbQueryGetdiagnosticdataParams.TextFiles, + "getDiagnosticData", + 1, + c.cfg.Paths.TempDir) + + case *agentpb.StartActionRequest_PtSummaryParams: + action = actions.NewProcessAction(p.ActionId, timeout, c.cfg.Paths.PTSummary, []string{}) + + case *agentpb.StartActionRequest_PtPgSummaryParams: + action = actions.NewProcessAction(p.ActionId, timeout, c.cfg.Paths.PTPGSummary, argListFromPgParams(params.PtPgSummaryParams)) + + case *agentpb.StartActionRequest_PtMysqlSummaryParams: + action = actions.NewPTMySQLSummaryAction(p.ActionId, timeout, c.cfg.Paths.PTMySQLSummary, params.PtMysqlSummaryParams) + + case *agentpb.StartActionRequest_PtMongodbSummaryParams: + action = actions.NewProcessAction(p.ActionId, timeout, c.cfg.Paths.PTMongoDBSummary, argListFromMongoDBParams(params.PtMongodbSummaryParams)) + + default: + return errors.Errorf("unknown action type request: %T", params) + } + + return c.runner.StartAction(action) +} + func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error { if err := p.Timeout.CheckValid(); err != nil { return err @@ -559,20 +563,7 @@ func (c *Client) handleStartJobRequest(p *agentpb.StartJobRequest) error { return errors.Errorf("unknown job type: %T", j) } - return c.jobsRunner.Start(job) -} - -func (c *Client) getActionTimeout(req *agentpb.StartActionRequest) time.Duration { - duration := req.Timeout.AsDuration() - err := req.Timeout.CheckValid() - if err == nil && duration == 0 { - err = errors.New("timeout can't be zero") - } - if err != nil { - c.l.Warnf("Invalid timeout, using default value instead: %s.", err) - duration = defaultActionTimeout - } - return duration + return c.runner.StartJob(job) } type dialResult struct { diff --git a/agent/client/client_test.go b/agent/client/client_test.go index 9b1ce9b4a5..16e380dc75 100644 --- a/agent/client/client_test.go +++ b/agent/client/client_test.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/protobuf/encoding/prototext" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/percona/pmm/agent/config" @@ -193,33 +192,6 @@ func TestClient(t *testing.T) { }) } -func TestGetActionTimeout(t *testing.T) { - type testStartActionReq struct { - req *agentpb.StartActionRequest - expected time.Duration - } - - testCases := []*testStartActionReq{{ - req: &agentpb.StartActionRequest{Timeout: durationpb.New(0 * time.Second)}, - expected: 10 * time.Second, - }, { - req: &agentpb.StartActionRequest{Timeout: nil}, - expected: 10 * time.Second, - }, { - req: &agentpb.StartActionRequest{Timeout: durationpb.New(15 * time.Second)}, - expected: 15 * time.Second, - }} - - for _, tc := range testCases { - tc := tc - t.Run(prototext.Format(tc.req), func(t *testing.T) { - client := New(nil, nil, nil, nil, nil) - actual := client.getActionTimeout(tc.req) - assert.Equal(t, tc.expected, actual) - }) - } -} - func TestUnexpectedActionType(t *testing.T) { serverMD := &agentpb.ServerConnectMetadata{ ServerVersion: t.Name(), diff --git a/agent/config/config.go b/agent/config/config.go index 4090c60251..6e591b2cf3 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -139,9 +139,10 @@ type Setup struct { type Config struct { // no config file there - ID string `yaml:"id"` - ListenAddress string `yaml:"listen-address"` - ListenPort uint16 `yaml:"listen-port"` + ID string `yaml:"id"` + ListenAddress string `yaml:"listen-address"` + ListenPort uint16 `yaml:"listen-port"` + RunnerCapacity uint16 `yaml:"runner-capacity,omitempty"` Server Server `yaml:"server"` Paths Paths `yaml:"paths"` diff --git a/agent/jobs/runner.go b/agent/jobs/runner.go deleted file mode 100644 index 68412f3e71..0000000000 --- a/agent/jobs/runner.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2019 Percona LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package jobs - -import ( - "context" - "runtime/pprof" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/percona/pmm/agent/client/channel" - "github.com/percona/pmm/api/agentpb" -) - -const jobsBufferSize = 32 - -// Runner executes jobs. -type Runner struct { - l *logrus.Entry - - messages chan *channel.AgentResponse - - jobs chan Job - runningJobs sync.WaitGroup - - rw sync.RWMutex - jobsCancel map[string]context.CancelFunc -} - -// NewRunner creates new jobs runner. -func NewRunner() *Runner { - return &Runner{ - l: logrus.WithField("component", "jobs-runner"), - jobs: make(chan Job, jobsBufferSize), - jobsCancel: make(map[string]context.CancelFunc), - messages: make(chan *channel.AgentResponse), - } -} - -// Run starts jobs execution loop. It reads jobs from the channel and starts them in separate goroutines. -func (r *Runner) Run(ctx context.Context) { - for { - select { - case job := <-r.jobs: - jobID, jobType := job.ID(), job.Type() - l := r.l.WithFields(logrus.Fields{"id": jobID, "type": jobType}) - - var nCtx context.Context - var cancel context.CancelFunc - if timeout := job.Timeout(); timeout != 0 { - nCtx, cancel = context.WithTimeout(ctx, timeout) - } else { - nCtx, cancel = context.WithCancel(ctx) - } - - r.addJobCancel(jobID, cancel) - r.runningJobs.Add(1) - run := func(ctx context.Context) { - l.Infof("Job started.") - - defer func(start time.Time) { - l.WithField("duration", time.Since(start).String()).Info("Job finished.") - }(time.Now()) - - defer r.runningJobs.Done() - defer cancel() - defer r.removeJobCancel(jobID) - - err := job.Run(ctx, r.send) - if err != nil { - r.send(&agentpb.JobResult{ - JobId: job.ID(), - Timestamp: timestamppb.Now(), - Result: &agentpb.JobResult_Error_{ - Error: &agentpb.JobResult_Error{ - Message: err.Error(), - }, - }, - }) - l.Warnf("Job terminated with error: %+v", err) - } - } - - go pprof.Do(nCtx, pprof.Labels("jobID", jobID, "type", string(jobType)), run) - case <-ctx.Done(): - r.runningJobs.Wait() // wait for all jobs termination - close(r.messages) - return - } - } -} - -// Messages returns channel with Jobs messages. -func (r *Runner) Messages() <-chan *channel.AgentResponse { - return r.messages -} - -func (r *Runner) send(payload agentpb.AgentResponsePayload) { - r.messages <- &channel.AgentResponse{ - ID: 0, // Jobs send messages that doesn't require any responses, so we can leave message ID blank. - Payload: payload, - } -} - -// Start starts given job. -func (r *Runner) Start(job Job) error { - select { - case r.jobs <- job: - return nil - default: - return errors.New("jobs queue overflowed") - } -} - -// Stop stops running Job. -func (r *Runner) Stop(id string) { - r.rw.RLock() - defer r.rw.RUnlock() - - // Job removes itself from jobsCancel map. So here we only invoke cancel. - if cancel, ok := r.jobsCancel[id]; ok { - cancel() - } -} - -// IsRunning returns true if job with given ID still running. -func (r *Runner) IsRunning(id string) bool { - r.rw.RLock() - defer r.rw.RUnlock() - _, ok := r.jobsCancel[id] - - return ok -} - -func (r *Runner) addJobCancel(jobID string, cancel context.CancelFunc) { - r.rw.Lock() - defer r.rw.Unlock() - r.jobsCancel[jobID] = cancel -} - -func (r *Runner) removeJobCancel(jobID string) { - r.rw.Lock() - defer r.rw.Unlock() - delete(r.jobsCancel, jobID) -} diff --git a/agent/runner/actions/action.go b/agent/runner/actions/action.go new file mode 100644 index 0000000000..6ade8f3cf4 --- /dev/null +++ b/agent/runner/actions/action.go @@ -0,0 +1,36 @@ +// Copyright 2019 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package actions + +import ( + "context" + "time" +) + +// go-sumtype:decl Action + +// Action describes an abstract thing that can be run by a client and return some output. +type Action interface { + // ID returns an Action ID. + ID() string + // Type returns an Action type. + Type() string + // Timeout returns Job timeout. + Timeout() time.Duration + // Run runs an Action and returns output and error. + Run(ctx context.Context) ([]byte, error) + + sealed() +} diff --git a/agent/actions/actions.go b/agent/runner/actions/common.go similarity index 87% rename from agent/actions/actions.go rename to agent/runner/actions/common.go index bb7c9357a3..d56cc096e2 100644 --- a/agent/actions/actions.go +++ b/agent/runner/actions/common.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package actions provides Actions implementations and runner. +// Package actions provides Actions implementations. package actions import ( - "context" "database/sql" "encoding/json" @@ -27,20 +26,6 @@ import ( "github.com/percona/pmm/api/agentpb" ) -// go-sumtype:decl Action - -// Action describes an abstract thing that can be run by a client and return some output. -type Action interface { - // ID returns an Action ID. - ID() string - // Type returns an Action type. - Type() string - // Run runs an Action and returns output and error. - Run(ctx context.Context) ([]byte, error) - - sealed() -} - // readRows reads and closes given *sql.Rows, returning columns, data rows, and first encountered error. func readRows(rows *sql.Rows) (columns []string, dataRows [][]interface{}, err error) { defer func() { diff --git a/agent/actions/mongodb_explain_action.go b/agent/runner/actions/mongodb_explain_action.go similarity index 87% rename from agent/actions/mongodb_explain_action.go rename to agent/runner/actions/mongodb_explain_action.go index 80207d9614..93eb10cedb 100644 --- a/agent/actions/mongodb_explain_action.go +++ b/agent/runner/actions/mongodb_explain_action.go @@ -19,6 +19,7 @@ import ( "fmt" "path/filepath" "strings" + "time" "github.com/percona/percona-toolkit/src/go/mongolib/proto" "github.com/pkg/errors" @@ -32,6 +33,7 @@ import ( type mongodbExplainAction struct { id string + timeout time.Duration params *agentpb.StartActionRequest_MongoDBExplainParams tempDir string } @@ -39,9 +41,10 @@ type mongodbExplainAction struct { var errCannotExplain = fmt.Errorf("cannot explain this type of query") // NewMongoDBExplainAction creates a MongoDB EXPLAIN query Action. -func NewMongoDBExplainAction(id string, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) Action { +func NewMongoDBExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MongoDBExplainParams, tempDir string) Action { return &mongodbExplainAction{ id: id, + timeout: timeout, params: params, tempDir: tempDir, } @@ -52,12 +55,17 @@ func (a *mongodbExplainAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mongodbExplainAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mongodbExplainAction) Type() string { return "mongodb-explain" } -// Run runs an Action and returns output and error. +// Run runs an action and returns output and error. func (a *mongodbExplainAction) Run(ctx context.Context) ([]byte, error) { dsn, err := templates.RenderDSN(a.params.Dsn, a.params.TextFiles, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id)) if err != nil { diff --git a/agent/actions/mongodb_explain_action_test.go b/agent/runner/actions/mongodb_explain_action_test.go similarity index 97% rename from agent/actions/mongodb_explain_action_test.go rename to agent/runner/actions/mongodb_explain_action_test.go index e205b2a25f..1b503b5ff8 100644 --- a/agent/actions/mongodb_explain_action_test.go +++ b/agent/runner/actions/mongodb_explain_action_test.go @@ -52,7 +52,7 @@ func TestMongoDBExplain(t *testing.T) { Query: `{"ns":"test.coll","op":"query","query":{"k":{"$lte":{"$numberInt":"1"}}}}`, } - ex := NewMongoDBExplainAction(id, params, os.TempDir()) + ex := NewMongoDBExplainAction(id, 0, params, os.TempDir()) res, err := ex.Run(ctx) assert.Nil(t, err) @@ -130,7 +130,7 @@ func TestNewMongoDBExplain(t *testing.T) { Query: string(query), } - ex := NewMongoDBExplainAction(id, params, os.TempDir()) + ex := NewMongoDBExplainAction(id, 0, params, os.TempDir()) res, err := ex.Run(ctx) assert.NoError(t, err) diff --git a/agent/actions/mongodb_query_admincommand_action.go b/agent/runner/actions/mongodb_query_admincommand_action.go similarity index 77% rename from agent/actions/mongodb_query_admincommand_action.go rename to agent/runner/actions/mongodb_query_admincommand_action.go index 4cfbc31218..c7b8e1a133 100644 --- a/agent/actions/mongodb_query_admincommand_action.go +++ b/agent/runner/actions/mongodb_query_admincommand_action.go @@ -18,6 +18,7 @@ import ( "context" "path/filepath" "strings" + "time" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -28,18 +29,9 @@ import ( "github.com/percona/pmm/api/agentpb" ) -// MongoDBQueryAdmincommandActionParams represent Mongo DB Query Admin Command Action params. -type MongoDBQueryAdmincommandActionParams struct { - ID string - DSN string - Files *agentpb.TextFiles - Command string - Arg interface{} - TempDir string -} - type mongodbQueryAdmincommandAction struct { id string + timeout time.Duration dsn string files *agentpb.TextFiles command string @@ -47,29 +39,35 @@ type mongodbQueryAdmincommandAction struct { tempDir string } -// NewMongoDBQueryAdmincommandAction creates a MongoDB adminCommand query Action. -func NewMongoDBQueryAdmincommandAction(params MongoDBQueryAdmincommandActionParams) Action { +// NewMongoDBQueryAdmincommandAction creates a MongoDB adminCommand query action. +func NewMongoDBQueryAdmincommandAction(id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) Action { return &mongodbQueryAdmincommandAction{ - id: params.ID, - dsn: params.DSN, - files: params.Files, - command: params.Command, - arg: params.Arg, - tempDir: params.TempDir, + id: id, + timeout: timeout, + dsn: dsn, + files: files, + command: command, + arg: arg, + tempDir: tempDir, } } -// ID returns an Action ID. +// ID returns an action ID. func (a *mongodbQueryAdmincommandAction) ID() string { return a.id } -// Type returns an Action type. +// Timeout returns Action timeout. +func (a *mongodbQueryAdmincommandAction) Timeout() time.Duration { + return a.timeout +} + +// Type returns an action type. func (a *mongodbQueryAdmincommandAction) Type() string { return "mongodb-query-admincommand" } -// Run runs an Action and returns output and error. +// Run runs an action and returns output and error. func (a *mongodbQueryAdmincommandAction) Run(ctx context.Context) ([]byte, error) { dsn, err := templates.RenderDSN(a.dsn, a.files, filepath.Join(a.tempDir, strings.ToLower(a.Type()), a.id)) if err != nil { diff --git a/agent/actions/mongodb_query_admincommand_action_test.go b/agent/runner/actions/mongodb_query_admincommand_action_test.go similarity index 73% rename from agent/actions/mongodb_query_admincommand_action_test.go rename to agent/runner/actions/mongodb_query_admincommand_action_test.go index c3aed863c0..28504ae7f1 100644 --- a/agent/actions/mongodb_query_admincommand_action_test.go +++ b/agent/runner/actions/mongodb_query_admincommand_action_test.go @@ -39,31 +39,30 @@ func TestMongoDBActions(t *testing.T) { t.Run("getParameter", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getParameter", "*", createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getParameter", "*", createTempDir(t)) getParameterAssertions(t, b) }) t.Run("buildInfo", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "buildInfo", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "buildInfo", 1, createTempDir(t)) buildInfoAssertions(t, b) }) t.Run("getCmdLineOpts", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getCmdLineOpts", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getCmdLineOpts", 1, createTempDir(t)) getCmdLineOptsAssertionsWithAuth(t, b) }) t.Run("replSetGetStatus", func(t *testing.T) { t.Parallel() - params := &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "replSetGetStatus", 1, createTempDir(t)} - replSetGetStatusAssertionsStandalone(t, params) + replSetGetStatusAssertionsStandalone(t, "", 0, dsn, nil, "replSetGetStatus", 1, createTempDir(t)) }) t.Run("getDiagnosticData", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getDiagnosticData", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getDiagnosticData", 1, createTempDir(t)) getDiagnosticDataAssertions(t, b) }) } @@ -71,35 +70,34 @@ func TestMongoDBActions(t *testing.T) { func TestMongoDBActionsWithSSL(t *testing.T) { t.Parallel() - dsn, files := tests.GetTestMongoDBWithSSLDSN(t, "../") + dsn, files := tests.GetTestMongoDBWithSSLDSN(t, "../../") t.Run("getParameter", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getParameter", "*", createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getParameter", "*", createTempDir(t)) getParameterAssertions(t, b) }) t.Run("buildInfo", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "buildInfo", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "buildInfo", 1, createTempDir(t)) buildInfoAssertions(t, b) }) t.Run("getCmdLineOpts", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getCmdLineOpts", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getCmdLineOpts", 1, createTempDir(t)) getCmdLineOptsAssertionsWithSSL(t, b) }) t.Run("replSetGetStatus", func(t *testing.T) { t.Parallel() - params := &MongoDBQueryAdmincommandActionParams{"", dsn, files, "replSetGetStatus", 1, createTempDir(t)} - replSetGetStatusAssertionsStandalone(t, params) + replSetGetStatusAssertionsStandalone(t, "", 0, dsn, files, "replSetGetStatus", 1, createTempDir(t)) }) t.Run("getDiagnosticData", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getDiagnosticData", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getDiagnosticData", 1, createTempDir(t)) getDiagnosticDataAssertions(t, b) }) } @@ -111,31 +109,31 @@ func TestMongoDBActionsReplNoAuth(t *testing.T) { t.Run("getParameter", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getParameter", "*", createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getParameter", "*", createTempDir(t)) getParameterAssertions(t, b) }) t.Run("buildInfo", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "buildInfo", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "buildInfo", 1, createTempDir(t)) buildInfoAssertions(t, b) }) t.Run("getCmdLineOpts", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getCmdLineOpts", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getCmdLineOpts", 1, createTempDir(t)) getCmdLineOptsAssertionsWithoutAuth(t, b) }) t.Run("replSetGetStatus", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "replSetGetStatus", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "replSetGetStatus", 1, createTempDir(t)) replSetGetStatusAssertionsReplicated(t, b) }) t.Run("getDiagnosticData", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, nil, "getDiagnosticData", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, nil, "getDiagnosticData", 1, createTempDir(t)) getDiagnosticDataAssertions(t, b) }) } @@ -143,42 +141,42 @@ func TestMongoDBActionsReplNoAuth(t *testing.T) { func TestMongoDBActionsReplWithSSL(t *testing.T) { t.Parallel() - dsn, files := tests.GetTestMongoDBReplicatedWithSSLDSN(t, "../") + dsn, files := tests.GetTestMongoDBReplicatedWithSSLDSN(t, "../../") t.Run("getParameter", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getParameter", "*", createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getParameter", "*", createTempDir(t)) getParameterAssertions(t, b) }) t.Run("buildInfo", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "buildInfo", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "buildInfo", 1, createTempDir(t)) buildInfoAssertions(t, b) }) t.Run("getCmdLineOpts", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getCmdLineOpts", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getCmdLineOpts", 1, createTempDir(t)) getCmdLineOptsAssertionsWithSSL(t, b) }) t.Run("replSetGetStatus", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "replSetGetStatus", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "replSetGetStatus", 1, createTempDir(t)) replSetGetStatusAssertionsReplicated(t, b) }) t.Run("getDiagnosticData", func(t *testing.T) { t.Parallel() - b := runAction(t, &MongoDBQueryAdmincommandActionParams{"", dsn, files, "getDiagnosticData", 1, createTempDir(t)}) + b := runAction(t, "", 0, dsn, files, "getDiagnosticData", 1, createTempDir(t)) getDiagnosticDataAssertions(t, b) }) } -func runAction(t *testing.T, params *MongoDBQueryAdmincommandActionParams) []byte { +func runAction(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) []byte { t.Helper() - a := NewMongoDBQueryAdmincommandAction(*params) + a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() b, err := a.Run(ctx) @@ -229,8 +227,8 @@ func replSetGetStatusAssertionsReplicated(t *testing.T, b []byte) { //nolint:the assert.Len(t, objxM.Get("members").Data(), 2) } -func replSetGetStatusAssertionsStandalone(t *testing.T, params *MongoDBQueryAdmincommandActionParams) { //nolint:thelper - a := NewMongoDBQueryAdmincommandAction(*params) +func replSetGetStatusAssertionsStandalone(t *testing.T, id string, timeout time.Duration, dsn string, files *agentpb.TextFiles, command string, arg interface{}, tempDir string) { //nolint:thelper + a := NewMongoDBQueryAdmincommandAction(id, timeout, dsn, files, command, arg, tempDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() b, err := a.Run(ctx) diff --git a/agent/actions/mysql_explain_action.go b/agent/runner/actions/mysql_explain_action.go similarity index 92% rename from agent/actions/mysql_explain_action.go rename to agent/runner/actions/mysql_explain_action.go index 9593c16cce..63d255a78b 100644 --- a/agent/actions/mysql_explain_action.go +++ b/agent/runner/actions/mysql_explain_action.go @@ -22,6 +22,7 @@ import ( "fmt" "strings" "text/tabwriter" + "time" "github.com/pkg/errors" @@ -30,9 +31,10 @@ import ( ) type mysqlExplainAction struct { - id string - params *agentpb.StartActionRequest_MySQLExplainParams - query string + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLExplainParams + query string } type explainResponse struct { @@ -46,14 +48,13 @@ var errCannotEncodeExplainResponse = errors.New("cannot JSON encode the explain // NewMySQLExplainAction creates MySQL Explain Action. // This is an Action that can run `EXPLAIN` command on MySQL service with given DSN. -func NewMySQLExplainAction(id string, params *agentpb.StartActionRequest_MySQLExplainParams) Action { - ret := &mysqlExplainAction{ - id: id, - params: params, - query: params.Query, +func NewMySQLExplainAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLExplainParams) Action { + return &mysqlExplainAction{ + id: id, + timeout: timeout, + params: params, + query: params.Query, } - - return ret } // ID returns an Action ID. @@ -61,6 +62,11 @@ func (a *mysqlExplainAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlExplainAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlExplainAction) Type() string { return "mysql-explain" diff --git a/agent/actions/mysql_explain_action_test.go b/agent/runner/actions/mysql_explain_action_test.go similarity index 88% rename from agent/actions/mysql_explain_action_test.go rename to agent/runner/actions/mysql_explain_action_test.go index f516f8ce72..39c0f7cd22 100644 --- a/agent/actions/mysql_explain_action_test.go +++ b/agent/runner/actions/mysql_explain_action_test.go @@ -45,8 +45,8 @@ func TestMySQLExplain(t *testing.T) { Query: query, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() b, err := a.Run(ctx) @@ -71,8 +71,8 @@ func TestMySQLExplain(t *testing.T) { Query: query, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_JSON, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() b, err := a.Run(ctx) @@ -115,8 +115,8 @@ func TestMySQLExplain(t *testing.T) { Query: query, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_TRADITIONAL_JSON, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() b, err := a.Run(ctx) @@ -155,8 +155,8 @@ func TestMySQLExplain(t *testing.T) { Dsn: "pmm-agent:pmm-agent-wrong-password@tcp(127.0.0.1:3306)/world", OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() _, err := a.Run(ctx) @@ -170,8 +170,8 @@ func TestMySQLExplain(t *testing.T) { Query: `INSERT INTO city (Name) VALUES ('Rosario')`, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() resp, err := a.Run(ctx) @@ -199,8 +199,8 @@ func TestMySQLExplain(t *testing.T) { Query: `SELECT 1; DROP TABLE city; --`, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() _, err := a.Run(ctx) @@ -217,8 +217,8 @@ func TestMySQLExplain(t *testing.T) { Query: `DELETE FROM city`, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() _, err := a.Run(ctx) @@ -269,8 +269,8 @@ func TestMySQLExplain(t *testing.T) { Query: `select * from (select cleanup()) as testclean;`, OutputFormat: agentpb.MysqlExplainOutputFormat_MYSQL_EXPLAIN_OUTPUT_FORMAT_DEFAULT, } - a := NewMySQLExplainAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLExplainAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() _, err := a.Run(ctx) diff --git a/agent/actions/mysql_query_select_action.go b/agent/runner/actions/mysql_query_select_action.go similarity index 82% rename from agent/actions/mysql_query_select_action.go rename to agent/runner/actions/mysql_query_select_action.go index 2eb02f1fc3..019296c8e6 100644 --- a/agent/actions/mysql_query_select_action.go +++ b/agent/runner/actions/mysql_query_select_action.go @@ -16,6 +16,7 @@ package actions import ( "context" + "time" "github.com/pkg/errors" @@ -24,15 +25,17 @@ import ( ) type mysqlQuerySelectAction struct { - id string - params *agentpb.StartActionRequest_MySQLQuerySelectParams + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLQuerySelectParams } // NewMySQLQuerySelectAction creates MySQL SELECT query Action. -func NewMySQLQuerySelectAction(id string, params *agentpb.StartActionRequest_MySQLQuerySelectParams) Action { +func NewMySQLQuerySelectAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLQuerySelectParams) Action { return &mysqlQuerySelectAction{ - id: id, - params: params, + id: id, + timeout: timeout, + params: params, } } @@ -41,6 +44,11 @@ func (a *mysqlQuerySelectAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlQuerySelectAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlQuerySelectAction) Type() string { return "mysql-query-select" diff --git a/agent/actions/mysql_query_select_action_test.go b/agent/runner/actions/mysql_query_select_action_test.go similarity index 95% rename from agent/actions/mysql_query_select_action_test.go rename to agent/runner/actions/mysql_query_select_action_test.go index e159221344..d6e3926223 100644 --- a/agent/actions/mysql_query_select_action_test.go +++ b/agent/runner/actions/mysql_query_select_action_test.go @@ -39,7 +39,7 @@ func TestMySQLQuerySelect(t *testing.T) { Dsn: dsn, Query: "COUNT(*) AS count FROM mysql.user WHERE plugin NOT IN ('caching_sha2_password')", } - a := NewMySQLQuerySelectAction("", params) + a := NewMySQLQuerySelectAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -59,7 +59,7 @@ func TestMySQLQuerySelect(t *testing.T) { Dsn: dsn, Query: `x'0001feff' AS bytes`, } - a := NewMySQLQuerySelectAction("", params) + a := NewMySQLQuerySelectAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -82,7 +82,7 @@ func TestMySQLQuerySelect(t *testing.T) { Dsn: dsn, Query: "* FROM city; DROP TABLE city; --", } - a := NewMySQLQuerySelectAction("", params) + a := NewMySQLQuerySelectAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/mysql_query_show_action.go b/agent/runner/actions/mysql_query_show_action.go similarity index 82% rename from agent/actions/mysql_query_show_action.go rename to agent/runner/actions/mysql_query_show_action.go index b00a82bc8d..e0c7f9d2d1 100644 --- a/agent/actions/mysql_query_show_action.go +++ b/agent/runner/actions/mysql_query_show_action.go @@ -16,6 +16,7 @@ package actions import ( "context" + "time" "github.com/pkg/errors" @@ -24,15 +25,17 @@ import ( ) type mysqlQueryShowAction struct { - id string - params *agentpb.StartActionRequest_MySQLQueryShowParams + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLQueryShowParams } // NewMySQLQueryShowAction creates MySQL SHOW query Action. -func NewMySQLQueryShowAction(id string, params *agentpb.StartActionRequest_MySQLQueryShowParams) Action { +func NewMySQLQueryShowAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLQueryShowParams) Action { return &mysqlQueryShowAction{ - id: id, - params: params, + id: id, + timeout: timeout, + params: params, } } @@ -41,6 +44,11 @@ func (a *mysqlQueryShowAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlQueryShowAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlQueryShowAction) Type() string { return "mysql-query-show" diff --git a/agent/actions/mysql_query_show_action_test.go b/agent/runner/actions/mysql_query_show_action_test.go similarity index 93% rename from agent/actions/mysql_query_show_action_test.go rename to agent/runner/actions/mysql_query_show_action_test.go index ea73947367..bb5fa68963 100644 --- a/agent/actions/mysql_query_show_action_test.go +++ b/agent/runner/actions/mysql_query_show_action_test.go @@ -39,8 +39,8 @@ func TestMySQLQueryShow(t *testing.T) { Dsn: dsn, Query: "VARIABLES", } - a := NewMySQLQueryShowAction("", params) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + a := NewMySQLQueryShowAction("", time.Second, params) + ctx, cancel := context.WithTimeout(context.Background(), a.Timeout()) defer cancel() b, err := a.Run(ctx) diff --git a/agent/actions/mysql_show_create_table_action.go b/agent/runner/actions/mysql_show_create_table_action.go similarity index 81% rename from agent/actions/mysql_show_create_table_action.go rename to agent/runner/actions/mysql_show_create_table_action.go index 4bd089173e..eef575e883 100644 --- a/agent/actions/mysql_show_create_table_action.go +++ b/agent/runner/actions/mysql_show_create_table_action.go @@ -17,22 +17,25 @@ package actions import ( "context" "fmt" + "time" "github.com/percona/pmm/agent/tlshelpers" "github.com/percona/pmm/api/agentpb" ) type mysqlShowCreateTableAction struct { - id string - params *agentpb.StartActionRequest_MySQLShowCreateTableParams + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLShowCreateTableParams } // NewMySQLShowCreateTableAction creates MySQL SHOW CREATE TABLE Action. // This is an Action that can run `SHOW CREATE TABLE` command on MySQL service with given DSN. -func NewMySQLShowCreateTableAction(id string, params *agentpb.StartActionRequest_MySQLShowCreateTableParams) Action { +func NewMySQLShowCreateTableAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLShowCreateTableParams) Action { return &mysqlShowCreateTableAction{ - id: id, - params: params, + id: id, + timeout: timeout, + params: params, } } @@ -41,6 +44,11 @@ func (a *mysqlShowCreateTableAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlShowCreateTableAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlShowCreateTableAction) Type() string { return "mysql-show-create-table" diff --git a/agent/actions/mysql_show_create_table_action_test.go b/agent/runner/actions/mysql_show_create_table_action_test.go similarity index 96% rename from agent/actions/mysql_show_create_table_action_test.go rename to agent/runner/actions/mysql_show_create_table_action_test.go index cd26c5ca44..95b1fea331 100644 --- a/agent/actions/mysql_show_create_table_action_test.go +++ b/agent/runner/actions/mysql_show_create_table_action_test.go @@ -40,7 +40,7 @@ func TestMySQLShowCreateTable(t *testing.T) { Dsn: dsn, Table: "city", } - a := NewMySQLShowCreateTableAction("", params) + a := NewMySQLShowCreateTableAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -104,7 +104,7 @@ CREATE TABLE "city" ( Dsn: dsn, Table: "no_such_table", } - a := NewMySQLShowCreateTableAction("", params) + a := NewMySQLShowCreateTableAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -117,7 +117,7 @@ CREATE TABLE "city" ( Dsn: dsn, Table: `city"; DROP TABLE city; --`, } - a := NewMySQLShowCreateTableAction("", params) + a := NewMySQLShowCreateTableAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/mysql_show_index_action.go b/agent/runner/actions/mysql_show_index_action.go similarity index 82% rename from agent/actions/mysql_show_index_action.go rename to agent/runner/actions/mysql_show_index_action.go index 8797470086..5a7758f428 100644 --- a/agent/actions/mysql_show_index_action.go +++ b/agent/runner/actions/mysql_show_index_action.go @@ -17,22 +17,25 @@ package actions import ( "context" "fmt" + "time" "github.com/percona/pmm/agent/tlshelpers" "github.com/percona/pmm/api/agentpb" ) type mysqlShowIndexAction struct { - id string - params *agentpb.StartActionRequest_MySQLShowIndexParams + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLShowIndexParams } // NewMySQLShowIndexAction creates MySQL SHOW INDEX Action. // This is an Action that can run `SHOW INDEX` command on MySQL service with given DSN. -func NewMySQLShowIndexAction(id string, params *agentpb.StartActionRequest_MySQLShowIndexParams) Action { +func NewMySQLShowIndexAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLShowIndexParams) Action { return &mysqlShowIndexAction{ - id: id, - params: params, + id: id, + timeout: timeout, + params: params, } } @@ -41,6 +44,11 @@ func (a *mysqlShowIndexAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlShowIndexAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlShowIndexAction) Type() string { return "mysql-show-index" diff --git a/agent/actions/mysql_show_index_action_test.go b/agent/runner/actions/mysql_show_index_action_test.go similarity index 97% rename from agent/actions/mysql_show_index_action_test.go rename to agent/runner/actions/mysql_show_index_action_test.go index 18e748931e..e2fab44b46 100644 --- a/agent/actions/mysql_show_index_action_test.go +++ b/agent/runner/actions/mysql_show_index_action_test.go @@ -41,7 +41,7 @@ func TestMySQLShowIndex(t *testing.T) { Dsn: dsn, Table: "city", } - a := NewMySQLShowIndexAction("", params) + a := NewMySQLShowIndexAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -103,7 +103,7 @@ func TestMySQLShowIndex(t *testing.T) { Dsn: dsn, Table: "no_such_table", } - a := NewMySQLShowIndexAction("", params) + a := NewMySQLShowIndexAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -116,7 +116,7 @@ func TestMySQLShowIndex(t *testing.T) { Dsn: dsn, Table: `city"; DROP TABLE city; --`, } - a := NewMySQLShowIndexAction("", params) + a := NewMySQLShowIndexAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/mysql_show_table_status_action.go b/agent/runner/actions/mysql_show_table_status_action.go similarity index 81% rename from agent/actions/mysql_show_table_status_action.go rename to agent/runner/actions/mysql_show_table_status_action.go index 86ba277eb2..4506dfe0f5 100644 --- a/agent/actions/mysql_show_table_status_action.go +++ b/agent/runner/actions/mysql_show_table_status_action.go @@ -16,6 +16,7 @@ package actions import ( "context" + "time" "github.com/pkg/errors" @@ -24,16 +25,18 @@ import ( ) type mysqlShowTableStatusAction struct { - id string - params *agentpb.StartActionRequest_MySQLShowTableStatusParams + id string + timeout time.Duration + params *agentpb.StartActionRequest_MySQLShowTableStatusParams } // NewMySQLShowTableStatusAction creates MySQL SHOW TABLE STATUS Action. // This is an Action that can run `SHOW TABLE STATUS` command on MySQL service with given DSN. -func NewMySQLShowTableStatusAction(id string, params *agentpb.StartActionRequest_MySQLShowTableStatusParams) Action { +func NewMySQLShowTableStatusAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_MySQLShowTableStatusParams) Action { return &mysqlShowTableStatusAction{ - id: id, - params: params, + id: id, + timeout: timeout, + params: params, } } @@ -42,6 +45,11 @@ func (a *mysqlShowTableStatusAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *mysqlShowTableStatusAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *mysqlShowTableStatusAction) Type() string { return "mysql-show-table-status" diff --git a/agent/actions/mysql_show_table_status_action_test.go b/agent/runner/actions/mysql_show_table_status_action_test.go similarity index 95% rename from agent/actions/mysql_show_table_status_action_test.go rename to agent/runner/actions/mysql_show_table_status_action_test.go index 3420330ef7..af8a49af8e 100644 --- a/agent/actions/mysql_show_table_status_action_test.go +++ b/agent/runner/actions/mysql_show_table_status_action_test.go @@ -39,7 +39,7 @@ func TestShowTableStatus(t *testing.T) { Dsn: dsn, Table: "city", } - a := NewMySQLShowTableStatusAction("", params) + a := NewMySQLShowTableStatusAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -83,7 +83,7 @@ func TestShowTableStatus(t *testing.T) { Dsn: dsn, Table: "no_such_table", } - a := NewMySQLShowTableStatusAction("", params) + a := NewMySQLShowTableStatusAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -96,7 +96,7 @@ func TestShowTableStatus(t *testing.T) { Dsn: dsn, Table: `city"; DROP TABLE city; --`, } - a := NewMySQLShowTableStatusAction("", params) + a := NewMySQLShowTableStatusAction("", 0, params) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/postgresql_query_select_action.go b/agent/runner/actions/postgresql_query_select_action.go similarity index 89% rename from agent/actions/postgresql_query_select_action.go rename to agent/runner/actions/postgresql_query_select_action.go index 18367f7d69..368d8e0de8 100644 --- a/agent/actions/postgresql_query_select_action.go +++ b/agent/runner/actions/postgresql_query_select_action.go @@ -19,6 +19,7 @@ import ( "database/sql" "path/filepath" "strings" + "time" "github.com/lib/pq" "github.com/pkg/errors" @@ -29,14 +30,16 @@ import ( type postgresqlQuerySelectAction struct { id string + timeout time.Duration params *agentpb.StartActionRequest_PostgreSQLQuerySelectParams tempDir string } // NewPostgreSQLQuerySelectAction creates PostgreSQL SELECT query Action. -func NewPostgreSQLQuerySelectAction(id string, params *agentpb.StartActionRequest_PostgreSQLQuerySelectParams, tempDir string) Action { +func NewPostgreSQLQuerySelectAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLQuerySelectParams, tempDir string) Action { return &postgresqlQuerySelectAction{ id: id, + timeout: timeout, params: params, tempDir: tempDir, } @@ -47,6 +50,11 @@ func (a *postgresqlQuerySelectAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *postgresqlQuerySelectAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *postgresqlQuerySelectAction) Type() string { return "postgresql-query-select" diff --git a/agent/actions/postgresql_query_select_action_test.go b/agent/runner/actions/postgresql_query_select_action_test.go similarity index 93% rename from agent/actions/postgresql_query_select_action_test.go rename to agent/runner/actions/postgresql_query_select_action_test.go index 3dd628a861..f25ab986e9 100644 --- a/agent/actions/postgresql_query_select_action_test.go +++ b/agent/runner/actions/postgresql_query_select_action_test.go @@ -40,7 +40,7 @@ func TestPostgreSQLQuerySelect(t *testing.T) { Dsn: dsn, Query: "* FROM pg_extension", } - a := NewPostgreSQLQuerySelectAction("", params, os.TempDir()) + a := NewPostgreSQLQuerySelectAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -72,7 +72,7 @@ func TestPostgreSQLQuerySelect(t *testing.T) { Dsn: dsn, Query: `'\x0001feff'::bytea AS bytes`, } - a := NewPostgreSQLQuerySelectAction("", params, os.TempDir()) + a := NewPostgreSQLQuerySelectAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -95,7 +95,7 @@ func TestPostgreSQLQuerySelect(t *testing.T) { Dsn: dsn, Query: "* FROM city; DROP TABLE city CASCADE; --", } - a := NewPostgreSQLQuerySelectAction("", params, os.TempDir()) + a := NewPostgreSQLQuerySelectAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/postgresql_query_show_action.go b/agent/runner/actions/postgresql_query_show_action.go similarity index 86% rename from agent/actions/postgresql_query_show_action.go rename to agent/runner/actions/postgresql_query_show_action.go index 42eac7976d..0572bbe752 100644 --- a/agent/actions/postgresql_query_show_action.go +++ b/agent/runner/actions/postgresql_query_show_action.go @@ -19,6 +19,7 @@ import ( "database/sql" "path/filepath" "strings" + "time" "github.com/lib/pq" "github.com/pkg/errors" @@ -29,14 +30,16 @@ import ( type postgresqlQueryShowAction struct { id string + timeout time.Duration params *agentpb.StartActionRequest_PostgreSQLQueryShowParams tempDir string } // NewPostgreSQLQueryShowAction creates PostgreSQL SHOW query Action. -func NewPostgreSQLQueryShowAction(id string, params *agentpb.StartActionRequest_PostgreSQLQueryShowParams, tempDir string) Action { +func NewPostgreSQLQueryShowAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLQueryShowParams, tempDir string) Action { return &postgresqlQueryShowAction{ id: id, + timeout: timeout, params: params, tempDir: tempDir, } @@ -47,6 +50,11 @@ func (a *postgresqlQueryShowAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *postgresqlQueryShowAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *postgresqlQueryShowAction) Type() string { return "postgresql-query-show" diff --git a/agent/actions/postgresql_query_show_action_test.go b/agent/runner/actions/postgresql_query_show_action_test.go similarity index 96% rename from agent/actions/postgresql_query_show_action_test.go rename to agent/runner/actions/postgresql_query_show_action_test.go index ac41dee79d..3ff31771ba 100644 --- a/agent/actions/postgresql_query_show_action_test.go +++ b/agent/runner/actions/postgresql_query_show_action_test.go @@ -39,7 +39,7 @@ func TestPostgreSQLQueryShow(t *testing.T) { params := &agentpb.StartActionRequest_PostgreSQLQueryShowParams{ Dsn: dsn, } - a := NewPostgreSQLQueryShowAction("", params, os.TempDir()) + a := NewPostgreSQLQueryShowAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/postgresql_show_create_table_action.go b/agent/runner/actions/postgresql_show_create_table_action.go similarity index 97% rename from agent/actions/postgresql_show_create_table_action.go rename to agent/runner/actions/postgresql_show_create_table_action.go index 57421fe131..5729f7e095 100644 --- a/agent/actions/postgresql_show_create_table_action.go +++ b/agent/runner/actions/postgresql_show_create_table_action.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strings" "text/tabwriter" + "time" "github.com/AlekSi/pointer" "github.com/lib/pq" @@ -63,15 +64,17 @@ type indexInfo struct { type postgresqlShowCreateTableAction struct { id string + timeout time.Duration params *agentpb.StartActionRequest_PostgreSQLShowCreateTableParams tempDir string } // NewPostgreSQLShowCreateTableAction creates PostgreSQL SHOW CREATE TABLE Action. // This is an Action that can run `\d+ table` command analog on PostgreSQL service with given DSN. -func NewPostgreSQLShowCreateTableAction(id string, params *agentpb.StartActionRequest_PostgreSQLShowCreateTableParams, tempDir string) Action { +func NewPostgreSQLShowCreateTableAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLShowCreateTableParams, tempDir string) Action { return &postgresqlShowCreateTableAction{ id: id, + timeout: timeout, params: params, tempDir: tempDir, } @@ -82,6 +85,11 @@ func (a *postgresqlShowCreateTableAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *postgresqlShowCreateTableAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *postgresqlShowCreateTableAction) Type() string { return "postgresql-show-create-table" diff --git a/agent/actions/postgresql_show_create_table_action_test.go b/agent/runner/actions/postgresql_show_create_table_action_test.go similarity index 95% rename from agent/actions/postgresql_show_create_table_action_test.go rename to agent/runner/actions/postgresql_show_create_table_action_test.go index 42335b2093..695042b989 100644 --- a/agent/actions/postgresql_show_create_table_action_test.go +++ b/agent/runner/actions/postgresql_show_create_table_action_test.go @@ -39,7 +39,7 @@ func TestPostgreSQLShowCreateTable(t *testing.T) { Dsn: dsn, Table: "public.country", } - a := NewPostgreSQLShowCreateTableAction("", params, os.TempDir()) + a := NewPostgreSQLShowCreateTableAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -81,7 +81,7 @@ Referenced by: Dsn: dsn, Table: "city", } - a := NewPostgreSQLShowCreateTableAction("", params, os.TempDir()) + a := NewPostgreSQLShowCreateTableAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -109,7 +109,7 @@ Referenced by: Dsn: dsn, Table: "countrylanguage", } - a := NewPostgreSQLShowCreateTableAction("", params, os.TempDir()) + a := NewPostgreSQLShowCreateTableAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -136,7 +136,7 @@ Foreign-key constraints: Dsn: dsn, Table: `city; DROP TABLE city; --`, } - a := NewPostgreSQLShowCreateTableAction("", params, os.TempDir()) + a := NewPostgreSQLShowCreateTableAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/postgresql_show_index_action.go b/agent/runner/actions/postgresql_show_index_action.go similarity index 88% rename from agent/actions/postgresql_show_index_action.go rename to agent/runner/actions/postgresql_show_index_action.go index b76dbefe67..872224e0ef 100644 --- a/agent/actions/postgresql_show_index_action.go +++ b/agent/runner/actions/postgresql_show_index_action.go @@ -20,6 +20,7 @@ import ( "fmt" "path/filepath" "strings" + "time" "github.com/lib/pq" "github.com/pkg/errors" @@ -30,15 +31,17 @@ import ( type postgresqlShowIndexAction struct { id string + timeout time.Duration params *agentpb.StartActionRequest_PostgreSQLShowIndexParams tempDir string } // NewPostgreSQLShowIndexAction creates PostgreSQL SHOW INDEX Action. // This is an Action that can run `SHOW INDEX` command on PostgreSQL service with given DSN. -func NewPostgreSQLShowIndexAction(id string, params *agentpb.StartActionRequest_PostgreSQLShowIndexParams, tempDir string) Action { +func NewPostgreSQLShowIndexAction(id string, timeout time.Duration, params *agentpb.StartActionRequest_PostgreSQLShowIndexParams, tempDir string) Action { return &postgresqlShowIndexAction{ id: id, + timeout: timeout, params: params, tempDir: tempDir, } @@ -49,6 +52,11 @@ func (a *postgresqlShowIndexAction) ID() string { return a.id } +// Timeout returns Action timeout. +func (a *postgresqlShowIndexAction) Timeout() time.Duration { + return a.timeout +} + // Type returns an Action type. func (a *postgresqlShowIndexAction) Type() string { return "postgresql-show-index" diff --git a/agent/actions/postgresql_show_index_action_test.go b/agent/runner/actions/postgresql_show_index_action_test.go similarity index 94% rename from agent/actions/postgresql_show_index_action_test.go rename to agent/runner/actions/postgresql_show_index_action_test.go index e3de765b3e..1ff794e914 100644 --- a/agent/actions/postgresql_show_index_action_test.go +++ b/agent/runner/actions/postgresql_show_index_action_test.go @@ -40,7 +40,7 @@ func TestPostgreSQLShowIndex(t *testing.T) { Dsn: dsn, Table: "city", } - a := NewPostgreSQLShowIndexAction("", params, os.TempDir()) + a := NewPostgreSQLShowIndexAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -64,7 +64,7 @@ func TestPostgreSQLShowIndex(t *testing.T) { Dsn: dsn, Table: "public.city", } - a := NewPostgreSQLShowIndexAction("", params, os.TempDir()) + a := NewPostgreSQLShowIndexAction("", 0, params, os.TempDir()) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/agent/actions/process_action.go b/agent/runner/actions/process_action.go similarity index 74% rename from agent/actions/process_action.go rename to agent/runner/actions/process_action.go index c40c7a20a4..b8c04d6de2 100644 --- a/agent/actions/process_action.go +++ b/agent/runner/actions/process_action.go @@ -17,6 +17,7 @@ package actions import ( "context" "os/exec" + "time" "golang.org/x/sys/unix" @@ -25,6 +26,7 @@ import ( type processAction struct { id string + timeout time.Duration command string arg []string } @@ -33,27 +35,33 @@ type processAction struct { // // Process Action, it's an abstract Action that can run an external commands. // This commands can be a shell script, script written on interpreted language, or binary file. -func NewProcessAction(id string, cmd string, arg []string) Action { +func NewProcessAction(id string, timeout time.Duration, cmd string, arg []string) Action { return &processAction{ id: id, + timeout: timeout, command: cmd, arg: arg, } } // ID returns an Action ID. -func (p *processAction) ID() string { - return p.id +func (a *processAction) ID() string { + return a.id +} + +// Timeout returns Action timeout. +func (a *processAction) Timeout() time.Duration { + return a.timeout } // Type returns an Action type. -func (p *processAction) Type() string { - return p.command +func (a *processAction) Type() string { + return a.command } // Run runs an Action and returns output and error. -func (p *processAction) Run(ctx context.Context) ([]byte, error) { - cmd := exec.CommandContext(ctx, p.command, p.arg...) //nolint:gosec +func (a *processAction) Run(ctx context.Context) ([]byte, error) { + cmd := exec.CommandContext(ctx, a.command, a.arg...) //nolint:gosec // restrict process cmd.Env = []string{} // do not inherit environment diff --git a/agent/actions/process_action_test.go b/agent/runner/actions/process_action_test.go similarity index 85% rename from agent/actions/process_action_test.go rename to agent/runner/actions/process_action_test.go index 769a8bca90..a80786e7cf 100644 --- a/agent/actions/process_action_test.go +++ b/agent/runner/actions/process_action_test.go @@ -29,7 +29,8 @@ func TestProcessActionRun(t *testing.T) { // setup id := "/action_id/6a479303-5081-46d0-baa0-87d6248c987b" cmd := "echo" - p := NewProcessAction(id, cmd, nil) + timeout := time.Second + p := NewProcessAction(id, timeout, cmd, nil) // run ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -40,6 +41,7 @@ func TestProcessActionRun(t *testing.T) { require.NoError(t, err) assert.NotEmpty(t, got) assert.Equal(t, id, p.ID()) + assert.Equal(t, timeout, p.Timeout()) assert.Equal(t, cmd, p.Type()) } @@ -47,9 +49,9 @@ func TestProcessActionRunAndCancel(t *testing.T) { t.Parallel() // setup - p := NewProcessAction("/action_id/14b2422d-32ec-44fb-9019-8b70e3cc8a3a", "sleep", []string{"10"}) + p := NewProcessAction("/action_id/14b2422d-32ec-44fb-9019-8b70e3cc8a3a", time.Second, "sleep", []string{"10"}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout()) // run time.AfterFunc(time.Millisecond, cancel) _, err := p.Run(ctx) diff --git a/agent/actions/pt_mysql_summary_action.go b/agent/runner/actions/pt_mysql_summary_action.go similarity index 61% rename from agent/actions/pt_mysql_summary_action.go rename to agent/runner/actions/pt_mysql_summary_action.go index 8cc7767298..a049d45b7f 100644 --- a/agent/actions/pt_mysql_summary_action.go +++ b/agent/runner/actions/pt_mysql_summary_action.go @@ -20,6 +20,7 @@ import ( "os" "os/exec" "strconv" + "time" "golang.org/x/sys/unix" @@ -29,6 +30,7 @@ import ( type ptMySQLSummaryAction struct { id string + timeout time.Duration command string params *agentpb.StartActionRequest_PTMySQLSummaryParams } @@ -37,27 +39,33 @@ type ptMySQLSummaryAction struct { // // PTMySQL Summary Action, it's an abstract Action that can run an external commands. // This commands can be a shell script, script written on interpreted language, or binary file. -func NewPTMySQLSummaryAction(id string, cmd string, params *agentpb.StartActionRequest_PTMySQLSummaryParams) Action { +func NewPTMySQLSummaryAction(id string, timeout time.Duration, cmd string, params *agentpb.StartActionRequest_PTMySQLSummaryParams) Action { return &ptMySQLSummaryAction{ id: id, + timeout: timeout, command: cmd, params: params, } } // ID returns an Action ID. -func (p *ptMySQLSummaryAction) ID() string { - return p.id +func (a *ptMySQLSummaryAction) ID() string { + return a.id +} + +// Timeout returns Action timeout. +func (a *ptMySQLSummaryAction) Timeout() time.Duration { + return a.timeout } // Type returns an Action type. -func (p *ptMySQLSummaryAction) Type() string { - return p.command +func (a *ptMySQLSummaryAction) Type() string { + return a.command } // Run runs an Action and returns output and error. -func (p *ptMySQLSummaryAction) Run(ctx context.Context) ([]byte, error) { - cmd := exec.CommandContext(ctx, p.command, p.ListFromMySQLParams()...) //nolint:gosec +func (a *ptMySQLSummaryAction) Run(ctx context.Context) ([]byte, error) { + cmd := exec.CommandContext(ctx, a.command, a.ListFromMySQLParams()...) //nolint:gosec cmd.Env = []string{fmt.Sprintf("PATH=%s", os.Getenv("PATH"))} cmd.Dir = "/" pdeathsig.Set(cmd, unix.SIGKILL) @@ -66,29 +74,29 @@ func (p *ptMySQLSummaryAction) Run(ctx context.Context) ([]byte, error) { } // Creates an array of strings from parameters. -func (p *ptMySQLSummaryAction) ListFromMySQLParams() []string { - if p.params == nil { +func (a *ptMySQLSummaryAction) ListFromMySQLParams() []string { + if a.params == nil { return []string{} } var args []string - if p.params.Socket != "" { - args = append(args, "--socket", p.params.Socket) + if a.params.Socket != "" { + args = append(args, "--socket", a.params.Socket) } else { - if p.params.Host != "" { - args = append(args, "--host", p.params.Host) + if a.params.Host != "" { + args = append(args, "--host", a.params.Host) } - if p.params.Port > 0 && p.params.Port <= 65535 { - args = append(args, "--port", strconv.FormatUint(uint64(p.params.Port), 10)) + if a.params.Port > 0 && a.params.Port <= 65535 { + args = append(args, "--port", strconv.FormatUint(uint64(a.params.Port), 10)) } } - if p.params.Username != "" { - args = append(args, "--user", p.params.Username) + if a.params.Username != "" { + args = append(args, "--user", a.params.Username) } - if p.params.Password != "" { - args = append(args, "--password", p.params.Password) + if a.params.Password != "" { + args = append(args, "--password", a.params.Password) } return args diff --git a/agent/actions/pt_mysql_summary_action_test.go b/agent/runner/actions/pt_mysql_summary_action_test.go similarity index 92% rename from agent/actions/pt_mysql_summary_action_test.go rename to agent/runner/actions/pt_mysql_summary_action_test.go index b45de158c6..3a9deabc18 100644 --- a/agent/actions/pt_mysql_summary_action_test.go +++ b/agent/runner/actions/pt_mysql_summary_action_test.go @@ -31,9 +31,9 @@ func TestPTMySQLSummaryActionRun(t *testing.T) { id := "/action_id/6a479303-5081-46d0-baa0-87d6248c987b" cmd := "echo" - p := NewPTMySQLSummaryAction(id, cmd, nil) + p := NewPTMySQLSummaryAction(id, 5*time.Second, cmd, nil) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout()) defer cancel() got, err := p.Run(ctx) @@ -46,9 +46,9 @@ func TestPTMySQLSummaryActionRun(t *testing.T) { func TestPTMySQLSummaryActionRunAndCancel(t *testing.T) { t.Parallel() - p := NewPTMySQLSummaryAction("/action_id/14b2422d-32ec-44fb-9019-8b70e3cc8a3a", "sleep", &agentpb.StartActionRequest_PTMySQLSummaryParams{}) + p := NewPTMySQLSummaryAction("/action_id/14b2422d-32ec-44fb-9019-8b70e3cc8a3a", time.Second, "sleep", &agentpb.StartActionRequest_PTMySQLSummaryParams{}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout()) time.AfterFunc(time.Millisecond, cancel) _, err := p.Run(ctx) diff --git a/agent/actions/query_transform.go b/agent/runner/actions/query_transform.go similarity index 100% rename from agent/actions/query_transform.go rename to agent/runner/actions/query_transform.go diff --git a/agent/actions/query_transform_test.go b/agent/runner/actions/query_transform_test.go similarity index 100% rename from agent/actions/query_transform_test.go rename to agent/runner/actions/query_transform_test.go diff --git a/agent/actions/testdata/aggregate.json b/agent/runner/actions/testdata/aggregate.json similarity index 100% rename from agent/actions/testdata/aggregate.json rename to agent/runner/actions/testdata/aggregate.json diff --git a/agent/actions/testdata/count.json b/agent/runner/actions/testdata/count.json similarity index 100% rename from agent/actions/testdata/count.json rename to agent/runner/actions/testdata/count.json diff --git a/agent/actions/testdata/distinct.json b/agent/runner/actions/testdata/distinct.json similarity index 100% rename from agent/actions/testdata/distinct.json rename to agent/runner/actions/testdata/distinct.json diff --git a/agent/actions/testdata/find_and_modify.json b/agent/runner/actions/testdata/find_and_modify.json similarity index 100% rename from agent/actions/testdata/find_and_modify.json rename to agent/runner/actions/testdata/find_and_modify.json diff --git a/agent/jobs/backup_location.go b/agent/runner/jobs/backup_location.go similarity index 100% rename from agent/jobs/backup_location.go rename to agent/runner/jobs/backup_location.go diff --git a/agent/jobs/common.go b/agent/runner/jobs/common.go similarity index 98% rename from agent/jobs/common.go rename to agent/runner/jobs/common.go index 50de337948..1c42f662ad 100644 --- a/agent/jobs/common.go +++ b/agent/runner/jobs/common.go @@ -14,6 +14,8 @@ package jobs +const maxLogsChunkSize = 50 + // Storage represents target storage parameters. type Storage struct { Type string `yaml:"type"` diff --git a/agent/jobs/job.go b/agent/runner/jobs/job.go similarity index 98% rename from agent/jobs/job.go rename to agent/runner/jobs/job.go index 71969919d4..0543f4f786 100644 --- a/agent/jobs/job.go +++ b/agent/runner/jobs/job.go @@ -31,8 +31,6 @@ const ( MongoDBBackup = JobType("mongodb_backup") MongoDBRestore = JobType("mongodb_restore") MySQLRestore = JobType("mysql_restore") - - maxLogsChunkSize = 50 ) // Send is interface for function that used by jobs to send messages back to pmm-server. diff --git a/agent/jobs/mongodb_backup_job.go b/agent/runner/jobs/mongodb_backup_job.go similarity index 100% rename from agent/jobs/mongodb_backup_job.go rename to agent/runner/jobs/mongodb_backup_job.go diff --git a/agent/jobs/mongodb_backup_job_test.go b/agent/runner/jobs/mongodb_backup_job_test.go similarity index 100% rename from agent/jobs/mongodb_backup_job_test.go rename to agent/runner/jobs/mongodb_backup_job_test.go diff --git a/agent/jobs/mongodb_restore_job.go b/agent/runner/jobs/mongodb_restore_job.go similarity index 100% rename from agent/jobs/mongodb_restore_job.go rename to agent/runner/jobs/mongodb_restore_job.go diff --git a/agent/jobs/mysql_backup_job.go b/agent/runner/jobs/mysql_backup_job.go similarity index 100% rename from agent/jobs/mysql_backup_job.go rename to agent/runner/jobs/mysql_backup_job.go diff --git a/agent/jobs/mysql_restore_job.go b/agent/runner/jobs/mysql_restore_job.go similarity index 100% rename from agent/jobs/mysql_restore_job.go rename to agent/runner/jobs/mysql_restore_job.go diff --git a/agent/jobs/pbm_helpers.go b/agent/runner/jobs/pbm_helpers.go similarity index 100% rename from agent/jobs/pbm_helpers.go rename to agent/runner/jobs/pbm_helpers.go diff --git a/agent/runner/runner.go b/agent/runner/runner.go new file mode 100644 index 0000000000..76313caf16 --- /dev/null +++ b/agent/runner/runner.go @@ -0,0 +1,270 @@ +// Copyright 2019 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runner + +import ( + "context" + "runtime/pprof" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/percona/pmm/agent/runner/actions" + "github.com/percona/pmm/agent/runner/jobs" + "github.com/percona/pmm/api/agentpb" +) + +const ( + bufferSize = 256 + defaultActionTimeout = 10 * time.Second // default timeout for compatibility with an older server + defaultCapacity = 32 +) + +// Runner executes jobs and actions. +type Runner struct { + l *logrus.Entry + + actions chan actions.Action + jobs chan jobs.Job + + actionsMessages chan agentpb.AgentRequestPayload + jobsMessages chan agentpb.AgentResponsePayload + + sem *semaphore.Weighted + wg sync.WaitGroup + + rw sync.RWMutex + rCancel map[string]context.CancelFunc +} + +// New creates new runner. If capacity is 0 then default value is used. +func New(capacity uint16) *Runner { + l := logrus.WithField("component", "runner") + if capacity == 0 { + capacity = defaultCapacity + } + + l.Infof("Runner capacity set to %d.", capacity) + + return &Runner{ + l: l, + actions: make(chan actions.Action, bufferSize), + jobs: make(chan jobs.Job, bufferSize), + sem: semaphore.NewWeighted(int64(capacity)), + rCancel: make(map[string]context.CancelFunc), + jobsMessages: make(chan agentpb.AgentResponsePayload), + actionsMessages: make(chan agentpb.AgentRequestPayload), + } +} + +// Run starts jobs execution loop. It reads jobs from the channel and starts them in separate goroutines. +func (r *Runner) Run(ctx context.Context) { + for { + select { + case action := <-r.actions: + r.handleAction(ctx, action) + case job := <-r.jobs: + r.handleJob(ctx, job) + case <-ctx.Done(): + r.wg.Wait() // wait for all actions and jobs termination + close(r.actionsMessages) + close(r.jobsMessages) + return + } + } +} + +// StartAction starts given actions.Action. +func (r *Runner) StartAction(action actions.Action) error { + select { + case r.actions <- action: + return nil + default: + return errors.New("actions queue overflowed") + } +} + +// StartJob starts given jobs.Job. +func (r *Runner) StartJob(job jobs.Job) error { + select { + case r.jobs <- job: + return nil + default: + return errors.New("jobs queue overflowed") + } +} + +// JobsMessages returns channel with Jobs messages. +func (r *Runner) JobsMessages() <-chan agentpb.AgentResponsePayload { + return r.jobsMessages +} + +// ActionsResults return chanel with Actions results payload. +func (r *Runner) ActionsResults() <-chan agentpb.AgentRequestPayload { + return r.actionsMessages +} + +// Stop stops running Action or Job. +func (r *Runner) Stop(id string) { + r.rw.RLock() + defer r.rw.RUnlock() + + // Job removes itself from rCancel map. So here we only invoke cancel. + if cancel, ok := r.rCancel[id]; ok { + cancel() + } +} + +// IsRunning returns true if Action or Job with given ID still running. +func (r *Runner) IsRunning(id string) bool { + r.rw.RLock() + defer r.rw.RUnlock() + _, ok := r.rCancel[id] + + return ok +} + +func (r *Runner) handleJob(ctx context.Context, job jobs.Job) { + jobID, jobType := job.ID(), job.Type() + l := r.l.WithFields(logrus.Fields{"id": jobID, "type": jobType}) + + if err := r.sem.Acquire(ctx, 1); err != nil { + l.Errorf("Failed to acquire token for a job: %v", err) + r.sendJobsMessage(&agentpb.JobResult{ + JobId: job.ID(), + Timestamp: timestamppb.Now(), + Result: &agentpb.JobResult_Error_{ + Error: &agentpb.JobResult_Error{ + Message: err.Error(), + }, + }, + }) + return + } + + var nCtx context.Context + var cancel context.CancelFunc + if timeout := job.Timeout(); timeout != 0 { + nCtx, cancel = context.WithTimeout(ctx, timeout) + } else { + nCtx, cancel = context.WithCancel(ctx) + } + r.addCancel(jobID, cancel) + + r.wg.Add(1) + run := func(ctx context.Context) { + l.Infof("Job started.") + + defer func(start time.Time) { + l.WithField("duration", time.Since(start).String()).Info("Job finished.") + }(time.Now()) + + defer r.sem.Release(1) + defer r.wg.Done() + defer cancel() + defer r.removeCancel(jobID) + + err := job.Run(ctx, r.sendJobsMessage) + if err != nil { + r.sendJobsMessage(&agentpb.JobResult{ + JobId: job.ID(), + Timestamp: timestamppb.Now(), + Result: &agentpb.JobResult_Error_{ + Error: &agentpb.JobResult_Error{ + Message: err.Error(), + }, + }, + }) + l.Warnf("Job terminated with error: %+v", err) + } + } + + go pprof.Do(nCtx, pprof.Labels("jobID", jobID, "type", string(jobType)), run) +} + +func (r *Runner) handleAction(ctx context.Context, action actions.Action) { + actionID, actionType := action.ID(), action.Type() + l := r.l.WithFields(logrus.Fields{"id": actionID, "type": actionType}) + + if err := r.sem.Acquire(ctx, 1); err != nil { + l.Errorf("Failed to acquire token for an action: %v", err) + r.sendActionsMessage(&agentpb.ActionResultRequest{ + ActionId: actionID, + Done: true, + Error: err.Error(), + }) + return + } + + var timeout time.Duration + if timeout = action.Timeout(); timeout == 0 { + timeout = defaultActionTimeout + } + + nCtx, cancel := context.WithTimeout(ctx, timeout) + r.addCancel(actionID, cancel) + + r.wg.Add(1) + run := func(ctx context.Context) { + l.Infof("Action started.") + + defer func(start time.Time) { + l.WithField("duration", time.Since(start).String()).Info("Action finished.") + }(time.Now()) + + defer r.sem.Release(1) + defer r.wg.Done() + defer cancel() + defer r.removeCancel(actionID) + + output, err := action.Run(nCtx) + var errMsg string + if err != nil { + errMsg = err.Error() + l.Warnf("Action terminated with error: %+v", err) + } + r.sendActionsMessage(&agentpb.ActionResultRequest{ + ActionId: actionID, + Done: true, + Output: output, + Error: errMsg, + }) + } + go pprof.Do(nCtx, pprof.Labels("actionID", actionID, "type", actionType), run) +} + +func (r *Runner) sendJobsMessage(payload agentpb.AgentResponsePayload) { + r.jobsMessages <- payload +} + +func (r *Runner) sendActionsMessage(payload agentpb.AgentRequestPayload) { + r.actionsMessages <- payload +} + +func (r *Runner) addCancel(jobID string, cancel context.CancelFunc) { + r.rw.Lock() + defer r.rw.Unlock() + r.rCancel[jobID] = cancel +} + +func (r *Runner) removeCancel(jobID string) { + r.rw.Lock() + defer r.rw.Unlock() + delete(r.rCancel, jobID) +} diff --git a/agent/runner/runner_test.go b/agent/runner/runner_test.go new file mode 100644 index 0000000000..bd596a1cfd --- /dev/null +++ b/agent/runner/runner_test.go @@ -0,0 +1,257 @@ +// Copyright 2019 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runner + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/percona/pmm/agent/runner/actions" + "github.com/percona/pmm/agent/runner/jobs" + "github.com/percona/pmm/api/agentpb" +) + +// assertActionResults checks expected results in any order. +func assertActionResults(t *testing.T, cr *Runner, expected ...*agentpb.ActionResultRequest) { + t.Helper() + + actual := make([]agentpb.AgentRequestPayload, len(expected)) + for i := range expected { + actual[i] = <-cr.ActionsResults() + } + assert.ElementsMatch(t, expected, actual) +} + +func TestConcurrentRunnerRun(t *testing.T) { + t.Parallel() + cr := New(0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cr.Run(ctx) + a1 := actions.NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", 5*time.Second, "echo", []string{"test"}) + a2 := actions.NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", 5*time.Second, "echo", []string{"test2"}) + + err := cr.StartAction(a1) + require.NoError(t, err) + + err = cr.StartAction(a2) + require.NoError(t, err) + + expected := []*agentpb.ActionResultRequest{ + {ActionId: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Output: []byte("test\n"), Done: true}, + {ActionId: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Output: []byte("test2\n"), Done: true}, + } + assertActionResults(t, cr, expected...) + cr.wg.Wait() + assert.Empty(t, cr.rCancel) +} + +func TestCapacityLimit(t *testing.T) { + t.Parallel() + + cr := New(2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cr.Run(ctx) + + j1 := testJob{id: "test-1", timeout: time.Second} + j2 := testJob{id: "test-2", timeout: 2 * time.Second} + j3 := testJob{id: "test-3", timeout: 2 * time.Second} + j4 := testJob{id: "test-4", timeout: time.Second} + + require.NoError(t, cr.StartJob(j1)) + require.NoError(t, cr.StartJob(j2)) + require.NoError(t, cr.StartJob(j3)) + require.NoError(t, cr.StartJob(j4)) + + // Let first jobs start + time.Sleep(500 * time.Millisecond) + + // First two jobs are started + assert.True(t, cr.IsRunning(j1.ID())) + assert.True(t, cr.IsRunning(j2.ID())) + assert.False(t, cr.IsRunning(j3.ID())) + assert.False(t, cr.IsRunning(j4.ID())) + + time.Sleep(time.Second) + + // After second first job terminated and third job started + assert.False(t, cr.IsRunning(j1.ID())) + assert.True(t, cr.IsRunning(j2.ID())) + assert.True(t, cr.IsRunning(j3.ID())) + assert.False(t, cr.IsRunning(j4.ID())) + + time.Sleep(time.Second) + + // After one more second second job terminated and third started + assert.False(t, cr.IsRunning(j1.ID())) + assert.False(t, cr.IsRunning(j2.ID())) + assert.True(t, cr.IsRunning(j3.ID())) + assert.True(t, cr.IsRunning(j4.ID())) + + time.Sleep(time.Second) + + // After another second all jobs are terminated + assert.False(t, cr.IsRunning(j1.ID())) + assert.False(t, cr.IsRunning(j2.ID())) + assert.False(t, cr.IsRunning(j3.ID())) + assert.False(t, cr.IsRunning(j4.ID())) +} + +func TestDefaultCapacityLimit(t *testing.T) { + t.Parallel() + + // Use default capacity + cr := New(0) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cr.Run(ctx) + + totalJobs := 2 * defaultCapacity + for i := 0; i < totalJobs; i++ { + require.NoError(t, cr.StartJob(testJob{id: fmt.Sprintf("test-%d", i), timeout: time.Second})) + } + + // Let first jobs start + time.Sleep(500 * time.Millisecond) + + for i := 0; i < totalJobs; i++ { + // Check that running jobs amount is not exceeded default capacity. + assert.Equal(t, i < defaultCapacity, cr.IsRunning(fmt.Sprintf("test-%d", i))) + } +} + +func TestConcurrentRunnerTimeout(t *testing.T) { + t.Parallel() + cr := New(0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cr.Run(ctx) + a1 := actions.NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", time.Second, "sleep", []string{"20"}) + a2 := actions.NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", time.Second, "sleep", []string{"30"}) + + err := cr.StartAction(a1) + require.NoError(t, err) + + err = cr.StartAction(a2) + require.NoError(t, err) + + // https://github.com/golang/go/issues/21880 + expected := []*agentpb.ActionResultRequest{ + {ActionId: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Output: []byte{}, Error: "signal: killed", Done: true}, + {ActionId: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Output: []byte{}, Error: "signal: killed", Done: true}, + } + assertActionResults(t, cr, expected...) + cr.wg.Wait() + assert.Empty(t, cr.rCancel) +} + +func TestConcurrentRunnerStop(t *testing.T) { + t.Parallel() + cr := New(0) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go cr.Run(ctx) + a1 := actions.NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", 5*time.Second, "sleep", []string{"20"}) + a2 := actions.NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", 5*time.Second, "sleep", []string{"30"}) + + err := cr.StartAction(a1) + require.NoError(t, err) + + err = cr.StartAction(a2) + require.NoError(t, err) + + time.Sleep(time.Second) + + cr.Stop(a1.ID()) + cr.Stop(a2.ID()) + + // https://github.com/golang/go/issues/21880 + expected := []*agentpb.ActionResultRequest{ + {ActionId: "/action_id/6a479303-5081-46d0-baa0-87d6248c987b", Output: []byte{}, Error: "signal: killed", Done: true}, + {ActionId: "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", Output: []byte{}, Error: "signal: killed", Done: true}, + } + assertActionResults(t, cr, expected...) + cr.wg.Wait() + assert.Empty(t, cr.rCancel) +} + +func TestConcurrentRunnerCancel(t *testing.T) { + t.Parallel() + cr := New(0) + + ctx, cancel := context.WithCancel(context.Background()) + go cr.Run(ctx) + + a1 := actions.NewProcessAction("/action_id/6a479303-5081-46d0-baa0-87d6248c987b", 5*time.Second, "sleep", []string{"20"}) + a2 := actions.NewProcessAction("/action_id/84140ab2-612d-4d93-9360-162a4bd5de14", 5*time.Second, "sleep", []string{"30"}) + + err := cr.StartAction(a1) + require.NoError(t, err) + + err = cr.StartAction(a2) + require.NoError(t, err) + + time.Sleep(time.Second) // To let actions to actually start + cancel() + + // Unlike other tests, there we mostly see "context canceled", but "signal: killed" still happens. + // Check both. + expected := make([]agentpb.AgentRequestPayload, 2) + expected[0] = <-cr.ActionsResults() + expected[1] = <-cr.ActionsResults() + sort.Slice(expected, func(i, j int) bool { + return expected[i].(*agentpb.ActionResultRequest).ActionId < expected[j].(*agentpb.ActionResultRequest).ActionId + }) + assert.Equal(t, expected[0].(*agentpb.ActionResultRequest).ActionId, "/action_id/6a479303-5081-46d0-baa0-87d6248c987b") + assert.Contains(t, []string{"signal: killed", context.Canceled.Error()}, expected[0].(*agentpb.ActionResultRequest).Error) + assert.True(t, expected[0].(*agentpb.ActionResultRequest).Done) + assert.Equal(t, expected[1].(*agentpb.ActionResultRequest).ActionId, "/action_id/84140ab2-612d-4d93-9360-162a4bd5de14") + assert.Contains(t, []string{"signal: killed", context.Canceled.Error()}, expected[0].(*agentpb.ActionResultRequest).Error) + assert.True(t, expected[1].(*agentpb.ActionResultRequest).Done) + cr.wg.Wait() + assert.Empty(t, cr.rCancel) +} + +type testJob struct { + id string + timeout time.Duration +} + +func (t testJob) ID() string { + return t.id +} + +func (t testJob) Type() jobs.JobType { + return jobs.JobType("test") +} + +func (t testJob) Timeout() time.Duration { + return t.timeout +} + +func (t testJob) Run(ctx context.Context, send jobs.Send) error { + <-ctx.Done() + return nil +}