Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad job restart #11533

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
return &resp, qm, nil
}

type JobRestartRequest struct {
BatchSize string
BatchWait int
}

// Batch restart a job
func (j *Jobs) Restart(jobID string, q *WriteOptions, batchSize string, batchWait int) error {
req := &JobRestartRequest{
BatchSize: batchSize,
BatchWait: batchWait,
}
_, err := j.client.write(fmt.Sprintf("/v1/job/%s/restart", url.PathEscape(jobID)), req, nil, q)
if err != nil {
return err
}
return nil
}

// Scale is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{},
Expand Down
4 changes: 4 additions & 0 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request

func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Build the request and parse the ACL token
fmt.Println("HELLO: command/agent/alloc_endpoint.go: allocRestart")
args := structs.AllocRestartRequest{
AllocID: allocID,
TaskName: "",
Expand All @@ -295,10 +296,13 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
fmt.Println("HELLO: useLocalClient")
rpcErr = s.agent.Client().ClientRPC("Allocations.Restart", &args, &reply)
} else if useClientRPC {
fmt.Println("HELLO: useClientRPC")
rpcErr = s.agent.Client().RPC("ClientAllocations.Restart", &args, &reply)
} else if useServerRPC {
fmt.Println("HELLO: useServerRPC")
rpcErr = s.agent.Server().RPC("ClientAllocations.Restart", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
Expand Down
37 changes: 37 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/golang/snappy"
"github.com/hashicorp/nomad/acl"
api "github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/jobspec"
"github.com/hashicorp/nomad/jobspec2"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -93,6 +95,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/services"):
jobName := strings.TrimSuffix(path, "/services")
return s.jobServiceRegistrations(resp, req, jobName)
case strings.HasSuffix(path, "/restart"):
jobName := strings.TrimSuffix(path, "/restart")
return s.jobRestart(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -493,6 +498,38 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
return out, nil
}

func (s *HTTPServer) jobRestart(resp http.ResponseWriter, req *http.Request, jobName string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}

var restartRequest api.JobRestartRequest
if err := decodeBody(req, &restartRequest); err != nil {
return nil, CodedError(400, err.Error())
}

args := structs.JobRestartRequest{
ID: uuid.Generate(),
JobID: jobName,
BatchSize: restartRequest.BatchSize,
BatchWait: restartRequest.BatchWait,
Status: "running",
RestartedAllocs: []string{},
StartedAt: time.Now(),
UpdatedAt: time.Now(),
}

s.parseWriteRequest(req, &args.WriteRequest)

var out structs.JobRestartResponse
if err := s.agent.RPC("Job.Restart", &args, &out); err != nil {
return nil, err
}

setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) jobScale(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"job restart": func() (cli.Command, error) {
return &JobRestartCommand{
Meta: meta,
}, nil
},
"job deployments": func() (cli.Command, error) {
return &JobDeploymentsCommand{
Meta: meta,
Expand Down
124 changes: 124 additions & 0 deletions command/job_restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type JobRestartCommand struct {
Meta
BatchSize string
BatchWait int
}

func (c *JobRestartCommand) Help() string {
helpText := `
Usage: nomad job restart [options] <job>

Restart allocations for a particular job in batches.

When ACLs are enabled, this command requires a token with the
'alloc-lifecycle', 'read-job', and 'list-jobs' capabilities for the
allocation's namespace.

General Options:

` + generalOptionsUsage(usageOptsDefault) + `

Allocs Options:

-batch-size
Number of allocations to restart at once.

-batch-wait
Wait time in seconds between each batch restart.
`
return strings.TrimSpace(helpText)
}

func (c *JobRestartCommand) Synopsis() string {
return "Restart all allocations for a job"
}

func (c *JobRestartCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-batch-size": complete.PredictNothing,
"-batch-wait": complete.PredictAnything,
})
}

func (c *JobRestartCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}

func (c *JobRestartCommand) Name() string { return "restart job and all it's allocations" }

func (c *JobRestartCommand) Run(args []string) int {
var batchSize string
var batchWait int

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&batchSize, "batch-size", "5", "")
flags.IntVar(&batchWait, "batch-wait", 10, "")

if err := flags.Parse(args); err != nil {
return 1
}

// Check that we got exactly one job
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <job>")
c.Ui.Error(commandErrorText(c))
return 1
}

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

jobID := strings.TrimSpace(args[0])

// Check if the job exists
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error listing jobs: %s", err))
return 1
}
if len(jobs) == 0 {
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID))
return 1
}
if len(jobs) > 1 {
if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) {
c.Ui.Error(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", createStatusListOutput(jobs, c.allNamespaces())))
return 1
}
}

jobID = jobs[0].ID
q := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}

client.Jobs().Restart(jobID, q, batchSize, batchWait)
return 0
}
1 change: 1 addition & 0 deletions helper/raftutil/msgtypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nomad/client_alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, r

// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *ClientAllocations) Restart(args *structs.AllocRestartRequest, reply *structs.GenericResponse) error {
fmt.Println("HELLO nomad/client_alloc_endpoint.go")
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hop
// in the forwarding chain.
Expand Down
20 changes: 20 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyUpsertJob(msgType, buf[1:], log.Index)
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(msgType, buf[1:], log.Index)
case structs.JobRestartRequestType:
return n.applyRestartJob(msgType, buf[1:], log.Index)
case structs.EvalUpdateRequestType:
return n.applyUpdateEval(msgType, buf[1:], log.Index)
case structs.EvalDeleteRequestType:
Expand Down Expand Up @@ -429,6 +431,24 @@ func (n *nomadFSM) applyStatusUpdate(msgType structs.MessageType, buf []byte, in
return nil
}

func (n *nomadFSM) applyRestartJob(reqType structs.MessageType, buf []byte, index uint64) interface{} {
fmt.Println("HELLO nomad/fsm.go: applyRestartJob")
var req structs.JobRestartRequest

if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

fmt.Printf("Hello JobRestartRequest object: %+v\n", req)

if err := n.state.UpdateJobRestart(reqType, index, &req); err != nil {
n.logger.Error("UpdateJobRestart failed", "error", err)
return err
}

return nil
}

func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
Expand Down
81 changes: 81 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,87 @@ func (j *Job) Allocations(args *structs.JobSpecificRequest,
return j.srv.blockingRPC(&opts)
}

func (j *Job) Restart(args *structs.JobRestartRequest,
reply *structs.JobRestartResponse) error {
fmt.Printf("HELLO HELLO: nomad/job_endpoint.go JobRestartRequest: %+v\n", args)

if done, err := j.srv.forward("Job.Restart", args, args, reply); done {
fmt.Printf("HELLO HELLO: I am inside Job.Restart error. DONE IS TRUE")
return err
}

// Check for alloc-lifecycle, read-job and list-jobs permissions
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && (!aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) || !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) || !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityAllocLifecycle)) {
return structs.ErrPermissionDenied
}

snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
allocs, err := snap.AllocsByJob(ws, args.RequestNamespace(), args.JobID, false)
if err != nil {
return err
}

for _, alloc := range allocs {
fmt.Printf("Hello alloc ID: %+s\n", alloc.ID)
queryOptions := structs.QueryOptions{}
queryOptions.Region = args.Region
queryOptions.Namespace = args.Namespace

allocRestartRequest := &structs.AllocRestartRequest{
AllocID: alloc.ID,
// Hardcoding this right now to test it out. Figure out how to get the task names for the allocation?
TaskName: "count-task",
//Region: args.RequestRegion(),
//Namespace: args.Namespace,
QueryOptions: queryOptions,
}

var allocRestartResponse *structs.GenericResponse
//if done, err := j.srv.forward("ClientAllocations.Restart", allocRestartRequest, allocRestartRequest, allocRestartResponse); done {
// return err
//}

_, err = getNodeForRpc(snap, alloc.NodeID)
if err != nil {
fmt.Println(err)
}

// Get the connection to the client
state, ok := j.srv.getNodeConn(alloc.NodeID)
if !ok {
if err := findNodeConnAndForward(j.srv, alloc.NodeID, "ClientAllocations.Restart", allocRestartRequest, allocRestartResponse); err != nil {
fmt.Printf("Hello findNodeConnAndForward error: %v\n", err)
}
} else {

// Make the RPC
if err := NodeRpc(state.Session, "Allocations.Restart", allocRestartRequest, allocRestartResponse); err != nil {
fmt.Printf("HELLO NodeRpc error: %v\n", err)
}
}

}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRestartRequestType, args)
if err != nil {
j.logger.Error("job restart failed", "error", err)
return err
}

reply.RestartModifyIndex = index
reply.Index = index

return nil
}

// Evaluations is used to list the evaluations for a job
func (j *Job) Evaluations(args *structs.JobSpecificRequest,
reply *structs.JobEvaluationsResponse) error {
Expand Down
1 change: 1 addition & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
structs.JobRestartRequestType: structs.TypeJobRestart,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down
Loading