Skip to content

Commit

Permalink
[server] Directed leadership transfer CLI and API (#17383)
Browse files Browse the repository at this point in the history
* Add directed leadership transfer func
* Add leadership transfer RPC endpoint
* Add ACL tests for leadership-transfer endpoint
* Add HTTP API route and implementation
* Add to Go API client
* Implement CLI command
* Add documentation
* Add changelog

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
angrycub and tgross authored Oct 4, 2023
1 parent c885c08 commit 8a93ff3
Show file tree
Hide file tree
Showing 13 changed files with 1,008 additions and 24 deletions.
3 changes: 3 additions & 0 deletions .changelog/17383.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
server: Added transfer-leadership API and CLI
```
49 changes: 49 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,46 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
return nil
}

// RaftTransferLeadershipByAddress is used to transfer leadership to a
// different peer using its address in the form of "IP:port".
func (op *Operator) RaftTransferLeadershipByAddress(address string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)

r.params.Set("address", address)

_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}

resp.Body.Close()
return nil
}

// RaftTransferLeadershipByID is used to transfer leadership to a
// different peer using its Raft ID.
func (op *Operator) RaftTransferLeadershipByID(id string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)

r.params.Set("id", id)

_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}

resp.Body.Close()
return nil
}

// SchedulerConfiguration is the config for controlling scheduler behavior
type SchedulerConfiguration struct {
// SchedulerAlgorithm lets you select between available scheduling algorithms.
Expand Down Expand Up @@ -363,3 +403,12 @@ func (op *Operator) LicenseGet(q *QueryOptions) (*LicenseReply, *QueryMeta, erro

return &reply, qm, nil
}

type LeadershipTransferResponse struct {
From RaftServer
To RaftServer
Noop bool
Err error

WriteMeta
}
56 changes: 54 additions & 2 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (s *HTTPServer) OperatorRequest(resp http.ResponseWriter, req *http.Request
return s.OperatorRaftConfiguration(resp, req)
case strings.HasPrefix(path, "peer"):
return s.OperatorRaftPeer(resp, req)
case strings.HasPrefix(path, "transfer-leadership"):
return s.OperatorRaftTransferLeadership(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
Expand All @@ -56,8 +58,7 @@ func (s *HTTPServer) OperatorRaftConfiguration(resp http.ResponseWriter, req *ht
return reply, nil
}

// OperatorRaftPeer supports actions on Raft peers. Currently we only support
// removing peers by address.
// OperatorRaftPeer supports actions on Raft peers.
func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(404, ErrInvalidMethod)
Expand Down Expand Up @@ -97,6 +98,57 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
return nil, nil
}

// OperatorRaftTransferLeadership supports actions on Raft peers.
func (s *HTTPServer) OperatorRaftTransferLeadership(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodPost && req.Method != http.MethodPut {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

params := req.URL.Query()

// Using the params map directly
id, hasID := params["id"]
addr, hasAddress := params["address"]

// There are some items that we can parse for here that are more unwieldy in
// the Validate() func on the RPC request object, like repeated query params.
switch {
case !hasID && !hasAddress:
return nil, CodedError(http.StatusBadRequest, "must specify id or address")
case hasID && hasAddress:
return nil, CodedError(http.StatusBadRequest, "must specify either id or address")
case hasID && id[0] == "":
return nil, CodedError(http.StatusBadRequest, "id must be non-empty")
case hasID && len(id) > 1:
return nil, CodedError(http.StatusBadRequest, "must specify only one id")
case hasAddress && addr[0] == "":
return nil, CodedError(http.StatusBadRequest, "address must be non-empty")
case hasAddress && len(addr) > 1:
return nil, CodedError(http.StatusBadRequest, "must specify only one address")
}

var out structs.LeadershipTransferResponse
args := &structs.RaftPeerRequest{}
s.parseWriteRequest(req, &args.WriteRequest)

if hasID {
args.ID = raft.ServerID(id[0])
} else {
args.Address = raft.ServerAddress(addr[0])
}

if err := args.Validate(); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

err := s.agent.RPC("Operator.TransferLeadershipToPeer", &args, &out)
if err != nil {
return nil, err
}

return out, nil
}

// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down
140 changes: 140 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
Expand Down Expand Up @@ -91,6 +93,144 @@ func TestHTTP_OperatorRaftPeer(t *testing.T) {
})
}

func TestHTTP_OperatorRaftTransferLeadership(t *testing.T) {
ci.Parallel(t)
configCB := func(c *Config) {
c.Client.Enabled = false
c.Server.NumSchedulers = pointer.Of(0)
}

httpTest(t, configCB, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
badMethods := []string{
http.MethodConnect,
http.MethodDelete,
http.MethodGet,
http.MethodHead,
http.MethodOptions,
http.MethodPatch,
http.MethodTrace,
}
for _, tc := range badMethods {
tc := tc
t.Run(tc+" method errors", func(t *testing.T) {
req, err := http.NewRequest(tc, "/v1/operator/raft/transfer-leadership?address=nope", body)
must.NoError(t, err)

resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)

must.Error(t, err)
must.ErrorContains(t, err, "Invalid method")
body.Reset()
})
}

apiErrTCs := []struct {
name string
qs string
expected string
}{
{
name: "URL with id and address errors",
qs: `?id=foo&address=bar`,
expected: "must specify either id or address",
},
{
name: "URL without id and address errors",
qs: ``,
expected: "must specify id or address",
},
{
name: "URL with multiple id errors",
qs: `?id=foo&id=bar`,
expected: "must specify only one id",
},
{
name: "URL with multiple address errors",
qs: `?address=foo&address=bar`,
expected: "must specify only one address",
},
{
name: "URL with an empty id errors",
qs: `?id`,
expected: "id must be non-empty",
},
{
name: "URL with an empty address errors",
qs: `?address`,
expected: "address must be non-empty",
},
{
name: "an invalid id errors",
qs: `?id=foo`,
expected: "id must be a uuid",
},
{
name: "URL with an empty address errors",
qs: `?address=bar`,
expected: "address must be in IP:port format",
},
}
for _, tc := range apiErrTCs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(
http.MethodPut,
"/v1/operator/raft/transfer-leadership"+tc.qs,
body,
)
must.NoError(t, err)

resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)

must.Error(t, err)
must.ErrorContains(t, err, tc.expected)
body.Reset()
})
}
})

testID := uuid.Generate()
apiOkTCs := []struct {
name string
qs string
expected string
}{
{
"id",
"?id=" + testID,
`id "` + testID + `" was not found in the Raft configuration`,
},
{
"address",
"?address=9.9.9.9:8000",
`address "9.9.9.9:8000" was not found in the Raft configuration`,
},
}
for _, tc := range apiOkTCs {
tc := tc
t.Run(tc.name+" can roundtrip", func(t *testing.T) {
httpTest(t, configCB, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest(
http.MethodPut,
"/v1/operator/raft/transfer-leadership"+tc.qs,
body,
)
must.NoError(t, err)

// If we get this error, it proves we sent the parameter all the
// way through.
resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)
must.ErrorContains(t, err, tc.expected)
})
})
}
}

func TestOperator_AutopilotGetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator raft transfer-leadership": func() (cli.Command, error) {
return &OperatorRaftTransferLeadershipCommand{
Meta: meta,
}, nil
},
"operator raft info": func() (cli.Command, error) {
return &OperatorRaftInfoCommand{
Meta: meta,
Expand Down
Loading

1 comment on commit 8a93ff3

@vercel
Copy link

@vercel vercel bot commented on 8a93ff3 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.