Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Directed leadership transfer CLI and API #17383

Merged
merged 26 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b938ff5
Add directed leadership transfer func
angrycub May 10, 2023
cc3c075
Add leadership transfer RPC endpoint
angrycub May 10, 2023
3e0ce48
Add ACL tests for leadership-transfer endpoint
angrycub May 10, 2023
ebc49ea
Add HTTP API route and implementation
angrycub May 10, 2023
796d8fe
Add to Go API client
angrycub May 10, 2023
dec242c
Implement CLI command
angrycub May 10, 2023
a11663c
Add documentation
angrycub Jun 1, 2023
c07f94a
Add changelog
angrycub Jun 1, 2023
a93ce06
fix spacing in command output
angrycub Aug 23, 2023
e3051de
Add raft peer request
angrycub Aug 23, 2023
ab1dd97
fixup operator endpoint: don't change message
angrycub Aug 23, 2023
9b3aeb6
Make text updates to command help
angrycub Aug 28, 2023
6139655
Begin adding Validation on request object
angrycub Aug 28, 2023
f76e663
Add RPC endpoint validations; populate response
angrycub Aug 28, 2023
3f5acc5
Update to build a cluster to test against
angrycub Aug 28, 2023
bb8283c
Add validation to HTTP API
angrycub Aug 28, 2023
3487ac5
Add test for OperatorTransfer HTTP endpoint
angrycub Aug 28, 2023
3bbbe81
typo fixes
angrycub Sep 1, 2023
56b2ced
fix error in Validate condition
angrycub Sep 1, 2023
e1a354e
move address validation to the struct
angrycub Sep 1, 2023
7f34aae
refactor `if` to `switch`
angrycub Sep 1, 2023
b8cf935
Code review feedback
angrycub Sep 19, 2023
96cc82a
fix returned object
angrycub Sep 19, 2023
a557c7d
Make RaftIDAddress type
angrycub Oct 3, 2023
9e27072
Remove call to setIndex
angrycub Oct 3, 2023
e46d1e4
Apply suggestions from code review
angrycub Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17383.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
server: Added transfer-leadership API and CLI
```
49 changes: 49 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,46 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
return nil
}

// RaftTransferLeadershipByAddress is used to transfer leadership to a
// different peer using its address in the form of "IP:port".
func (op *Operator) RaftTransferLeadershipByAddress(address string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)

r.params.Set("address", address)

_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}

resp.Body.Close()
return nil
}

// RaftTransferLeadershipByID is used to transfer leadership to a
// different peer using its Raft ID.
func (op *Operator) RaftTransferLeadershipByID(id string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)

r.params.Set("id", id)

_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return err
}

resp.Body.Close()
return nil
}

// SchedulerConfiguration is the config for controlling scheduler behavior
type SchedulerConfiguration struct {
// SchedulerAlgorithm lets you select between available scheduling algorithms.
Expand Down Expand Up @@ -363,3 +403,12 @@ func (op *Operator) LicenseGet(q *QueryOptions) (*LicenseReply, *QueryMeta, erro

return &reply, qm, nil
}

type LeadershipTransferResponse struct {
From RaftServer
To RaftServer
Noop bool
Err error

WriteMeta
}
56 changes: 54 additions & 2 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (s *HTTPServer) OperatorRequest(resp http.ResponseWriter, req *http.Request
return s.OperatorRaftConfiguration(resp, req)
case strings.HasPrefix(path, "peer"):
return s.OperatorRaftPeer(resp, req)
case strings.HasPrefix(path, "transfer-leadership"):
return s.OperatorRaftTransferLeadership(resp, req)
default:
return nil, CodedError(404, ErrInvalidMethod)
}
Expand All @@ -56,8 +58,7 @@ func (s *HTTPServer) OperatorRaftConfiguration(resp http.ResponseWriter, req *ht
return reply, nil
}

// OperatorRaftPeer supports actions on Raft peers. Currently we only support
// removing peers by address.
// OperatorRaftPeer supports actions on Raft peers.
func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodDelete {
return nil, CodedError(404, ErrInvalidMethod)
Expand Down Expand Up @@ -97,6 +98,57 @@ func (s *HTTPServer) OperatorRaftPeer(resp http.ResponseWriter, req *http.Reques
return nil, nil
}

// OperatorRaftTransferLeadership supports actions on Raft peers.
func (s *HTTPServer) OperatorRaftTransferLeadership(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodPost && req.Method != http.MethodPut {
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}

params := req.URL.Query()

// Using the params map directly
id, hasID := params["id"]
addr, hasAddress := params["address"]

// There are some items that we can parse for here that are more unwieldy in
// the Validate() func on the RPC request object, like repeated query params.
switch {
case !hasID && !hasAddress:
return nil, CodedError(http.StatusBadRequest, "must specify id or address")
case hasID && hasAddress:
return nil, CodedError(http.StatusBadRequest, "must specify either id or address")
case hasID && id[0] == "":
return nil, CodedError(http.StatusBadRequest, "id must be non-empty")
case hasID && len(id) > 1:
return nil, CodedError(http.StatusBadRequest, "must specify only one id")
case hasAddress && addr[0] == "":
return nil, CodedError(http.StatusBadRequest, "address must be non-empty")
case hasAddress && len(addr) > 1:
return nil, CodedError(http.StatusBadRequest, "must specify only one address")
}

var out structs.LeadershipTransferResponse
args := &structs.RaftPeerRequest{}
s.parseWriteRequest(req, &args.WriteRequest)

if hasID {
args.ID = raft.ServerID(id[0])
} else {
args.Address = raft.ServerAddress(addr[0])
}

if err := args.Validate(); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

err := s.agent.RPC("Operator.TransferLeadershipToPeer", &args, &out)
if err != nil {
return nil, err
}

return out, nil
}

// OperatorAutopilotConfiguration is used to inspect the current Autopilot configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down
140 changes: 140 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
Expand Down Expand Up @@ -91,6 +93,144 @@ func TestHTTP_OperatorRaftPeer(t *testing.T) {
})
}

func TestHTTP_OperatorRaftTransferLeadership(t *testing.T) {
ci.Parallel(t)
configCB := func(c *Config) {
c.Client.Enabled = false
c.Server.NumSchedulers = pointer.Of(0)
}

httpTest(t, configCB, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
badMethods := []string{
http.MethodConnect,
http.MethodDelete,
http.MethodGet,
http.MethodHead,
http.MethodOptions,
http.MethodPatch,
http.MethodTrace,
}
for _, tc := range badMethods {
tc := tc
t.Run(tc+" method errors", func(t *testing.T) {
req, err := http.NewRequest(tc, "/v1/operator/raft/transfer-leadership?address=nope", body)
must.NoError(t, err)

resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)

must.Error(t, err)
must.ErrorContains(t, err, "Invalid method")
body.Reset()
})
}

apiErrTCs := []struct {
name string
qs string
expected string
}{
{
name: "URL with id and address errors",
qs: `?id=foo&address=bar`,
expected: "must specify either id or address",
},
{
name: "URL without id and address errors",
qs: ``,
expected: "must specify id or address",
},
{
name: "URL with multiple id errors",
qs: `?id=foo&id=bar`,
expected: "must specify only one id",
},
{
name: "URL with multiple address errors",
qs: `?address=foo&address=bar`,
expected: "must specify only one address",
},
{
name: "URL with an empty id errors",
qs: `?id`,
expected: "id must be non-empty",
},
{
name: "URL with an empty address errors",
qs: `?address`,
expected: "address must be non-empty",
},
{
name: "an invalid id errors",
qs: `?id=foo`,
expected: "id must be a uuid",
},
{
name: "URL with an empty address errors",
qs: `?address=bar`,
expected: "address must be in IP:port format",
},
}
for _, tc := range apiErrTCs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
req, err := http.NewRequest(
http.MethodPut,
"/v1/operator/raft/transfer-leadership"+tc.qs,
body,
)
must.NoError(t, err)

resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)

must.Error(t, err)
must.ErrorContains(t, err, tc.expected)
body.Reset()
})
}
})

testID := uuid.Generate()
apiOkTCs := []struct {
name string
qs string
expected string
}{
{
"id",
"?id=" + testID,
`id "` + testID + `" was not found in the Raft configuration`,
},
{
"address",
"?address=9.9.9.9:8000",
`address "9.9.9.9:8000" was not found in the Raft configuration`,
},
}
for _, tc := range apiOkTCs {
tc := tc
t.Run(tc.name+" can roundtrip", func(t *testing.T) {
httpTest(t, configCB, func(s *TestAgent) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest(
http.MethodPut,
"/v1/operator/raft/transfer-leadership"+tc.qs,
body,
)
must.NoError(t, err)

// If we get this error, it proves we sent the parameter all the
// way through.
resp := httptest.NewRecorder()
_, err = s.Server.OperatorRaftTransferLeadership(resp, req)
must.ErrorContains(t, err, tc.expected)
})
})
}
}

func TestOperator_AutopilotGetConfiguration(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator raft transfer-leadership": func() (cli.Command, error) {
return &OperatorRaftTransferLeadershipCommand{
Meta: meta,
}, nil
},
"operator raft info": func() (cli.Command, error) {
return &OperatorRaftInfoCommand{
Meta: meta,
Expand Down
Loading