Skip to content

Commit

Permalink
PMM-10938 restart after physical restore (#1424)
Browse files Browse the repository at this point in the history
* skeleton for post restore jobs

* send post restore ops to agents

so far, we can send the request to agents, but the restart isn't
successful in some cases. Another thing is that pbm-agent requires
members of the replica set to be up, so I wonder if we can create
separate jobs for that - or some other approach?

* checkpoint restarting services as jobs

* use actions instead of jobs to restart

mongod and pbm-agents are now restarted with actions, but there's still
the issue where pbm-agent fails because not all mongod nodes were up.

Next, we should start mongod nodes and only start pbm-agents if all
mongod nodes are up

* completed core restart functionality

* remove restart rpc

* record actions in db

* undo method export

* revert string change

* only restart components for physical restore

* handle error

* fix logs

* add restart results to restore job log

* assert on content

* rename service to system_service

* use cluster instead of replica set

* log proper error in agent

* improve rpc message doc

* unify param names

* pass querier to method

Co-authored-by: Artem Gavrilov <[email protected]>
  • Loading branch information
idoqo and artemgavrilov authored Nov 29, 2022
1 parent 3157c1a commit f6a6ad3
Show file tree
Hide file tree
Showing 13 changed files with 1,377 additions and 944 deletions.
29 changes: 27 additions & 2 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/percona/pmm/agent/runner/jobs"
"github.com/percona/pmm/agent/tailog"
"github.com/percona/pmm/agent/utils/backoff"
agenterrors "github.com/percona/pmm/agent/utils/errors"
"github.com/percona/pmm/api/agentpb"
"github.com/percona/pmm/utils/tlsconfig"
"github.com/percona/pmm/version"
Expand Down Expand Up @@ -333,7 +334,7 @@ loop:
responsePayload = &agentpb.StartActionResponse{}
if err := c.handleStartActionRequest(p); err != nil {
responsePayload = nil
status = grpcstatus.New(codes.Unimplemented, "can't handle start action type send, it is not implemented")
status = convertAgentErrorToGrpcStatus(err)
break
}

Expand Down Expand Up @@ -501,9 +502,20 @@ func (c *Client) handleStartActionRequest(p *agentpb.StartActionRequest) error {

case *agentpb.StartActionRequest_PtMongodbSummaryParams:
action = actions.NewProcessAction(p.ActionId, timeout, c.cfg.Paths.PTMongoDBSummary, argListFromMongoDBParams(params.PtMongodbSummaryParams))
case *agentpb.StartActionRequest_RestartSysServiceParams:
var service string
switch params.RestartSysServiceParams.SystemService {
case agentpb.StartActionRequest_RestartSystemServiceParams_MONGOD:
service = "mongod"
case agentpb.StartActionRequest_RestartSystemServiceParams_PBM_AGENT:
service = "pbm-agent"
default:
return errors.Wrapf(agenterrors.ErrInvalidArgument, "invalid service '%s' specified in mongod restart request", params.RestartSysServiceParams.SystemService)
}
action = actions.NewProcessAction(p.ActionId, timeout, "systemctl", []string{"restart", service})

default:
return errors.Errorf("unknown action type request: %T", params)
return errors.Wrapf(agenterrors.ErrInvalidArgument, "invalid action type request: %T", params)
}

return c.runner.StartAction(action)
Expand Down Expand Up @@ -907,6 +919,19 @@ func argListFromMongoDBParams(pParams *agentpb.StartActionRequest_PTMongoDBSumma
return args
}

func convertAgentErrorToGrpcStatus(agentErr error) *grpcstatus.Status {
var status *grpcstatus.Status
switch {
case errors.Is(agentErr, agenterrors.ErrInvalidArgument):
status = grpcstatus.New(codes.InvalidArgument, agentErr.Error())
case errors.Is(agentErr, agenterrors.ErrActionQueueOverflow):
status = grpcstatus.New(codes.ResourceExhausted, agentErr.Error())
default:
status = grpcstatus.New(codes.Unimplemented, agentErr.Error())
}
return status
}

// check interface
var (
_ prometheus.Collector = (*Client)(nil)
Expand Down
51 changes: 38 additions & 13 deletions agent/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,46 @@ func TestUnexpectedActionType(t *testing.T) {
require.NoError(t, err)

// actual test
err = stream.Send(&agentpb.ServerMessage{
Id: 4242,
Payload: &agentpb.ServerMessage_StartAction{
// try to send unknown payload for action type
StartAction: &agentpb.StartActionRequest{},
cases := []struct {
name string
id uint32
payload *agentpb.ServerMessage_StartAction
expectedCode codes.Code
}{
{
name: "invlalid action type",
id: 4242,
payload: &agentpb.ServerMessage_StartAction{
StartAction: &agentpb.StartActionRequest{},
},
expectedCode: codes.InvalidArgument,
},
})
assert.NoError(t, err)
{
name: "mongodb restart invalid system service",
id: 4243,
payload: &agentpb.ServerMessage_StartAction{
StartAction: &agentpb.StartActionRequest{
Params: &agentpb.StartActionRequest_RestartSysServiceParams{
RestartSysServiceParams: &agentpb.StartActionRequest_RestartSystemServiceParams{
SystemService: agentpb.StartActionRequest_RestartSystemServiceParams_SYSTEM_SERVICE_INVALID,
},
},
},
},
expectedCode: codes.InvalidArgument,
},
}

msg, err = stream.Recv()
assert.NoError(t, err)
assert.Equal(t, int32(codes.Unimplemented), msg.GetStatus().GetCode())
assert.NoError(t, err)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err = stream.Send(&agentpb.ServerMessage{Id: tc.id, Payload: tc.payload})
require.NoError(t, err)

msg, err = stream.Recv()
require.NoError(t, err)
assert.Equal(t, int32(tc.expectedCode), msg.GetStatus().GetCode())
})
}
return nil
}
port, teardown := setup(t, connect)
Expand Down Expand Up @@ -259,7 +286,6 @@ func TestArgListFromPgParams(t *testing.T) {
req *agentpb.StartActionRequest_PTPgSummaryParams
expected []string
}

testCases := []*testParams{
{
&agentpb.StartActionRequest_PTPgSummaryParams{Host: "10.20.30.40", Port: 555, Username: "person", Password: "secret"},
Expand Down Expand Up @@ -303,7 +329,6 @@ func TestArgListFromMongoDBParams(t *testing.T) {
req *agentpb.StartActionRequest_PTMongoDBSummaryParams
expected []string
}

testCases := []*testParams{
{
&agentpb.StartActionRequest_PTMongoDBSummaryParams{Host: "10.20.30.40", Port: 555, Username: "person", Password: "secret"},
Expand Down
4 changes: 3 additions & 1 deletion agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/percona/pmm/agent/runner/actions"
"github.com/percona/pmm/agent/runner/jobs"
agenterrors "github.com/percona/pmm/agent/utils/errors"
"github.com/percona/pmm/api/agentpb"
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func (r *Runner) StartAction(action actions.Action) error {
case r.actions <- action:
return nil
default:
return errors.New("actions queue overflowed")
return agenterrors.ErrActionQueueOverflow
}
}

Expand Down Expand Up @@ -238,6 +239,7 @@ func (r *Runner) handleAction(ctx context.Context, action actions.Action) {
if err != nil {
errMsg = err.Error()
l.Warnf("Action terminated with error: %+v", err)
l.Debugf("Action produced output: %s", string(output))
}
r.sendActionsMessage(&agentpb.ActionResultRequest{
ActionId: actionID,
Expand Down
25 changes: 25 additions & 0 deletions agent/utils/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 Percona LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errors

import "github.com/pkg/errors"

var (
// ErrInvalidArgument is returned when an invalid or unknown argument is specified.
ErrInvalidArgument = errors.New("invalid argument")

// ErrActionQueueOverflow is returned when the agent is already running the maximum number of actions.
ErrActionQueueOverflow = errors.New("action queue overflow")
)
Loading

0 comments on commit f6a6ad3

Please sign in to comment.