Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Ordered task execution #1134

Merged
merged 1 commit into from
Feb 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 74 additions & 85 deletions agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package agent

import (
"fmt"
"sort"
"time"

"github.com/coreos/fleet/job"
Expand Down Expand Up @@ -79,9 +80,8 @@ func (ar *AgentReconciler) Reconcile(a *Agent) {
return
}

for tc := range ar.calculateTaskChainsForUnits(dAgentState, cAgentState) {
ar.launchTaskChain(tc, a)
}
tasks := ar.calculateTasksForUnits(dAgentState, cAgentState)
ar.launchTasks(tasks, a)
}

// Purge attempts to unload all Units that have been loaded locally
Expand All @@ -96,18 +96,18 @@ func (ar *AgentReconciler) Purge(a *Agent) {
return
}

var tasks []task
for name, _ := range cAgentState {
t := task{
tasks = append(tasks, task{
typ: taskTypeUnloadUnit,
reason: taskReasonPurgingAgent,
}
u := &job.Unit{
Name: name,
}
tc := newTaskChain(u, t)
ar.launchTaskChain(tc, a)
unit: &job.Unit{
Name: name,
},
})
}

ar.launchTasks(tasks, a)
time.Sleep(time.Second)
}
}
Expand Down Expand Up @@ -158,36 +158,29 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return &as, nil
}

// calculateTaskChainsForUnits compares the desired and current state of an Agent.
// The generated taskChains represent what should be done to make the desired
// state match the current state.
func (ar *AgentReconciler) calculateTaskChainsForUnits(dState *AgentState, cState unitStates) <-chan taskChain {
tcChan := make(chan taskChain)
go func() {
jobs := pkg.NewUnsafeSet()
for cName := range cState {
jobs.Add(cName)
}

for dName := range dState.Units {
jobs.Add(dName)
}
// calculateTasksForUnits compares the desired and current state of an Agent.
// The generated tasks represent what, in order, should be done to make the
// desired state match the current state.
func (ar *AgentReconciler) calculateTasksForUnits(dState *AgentState, cState unitStates) []task {
jobs := pkg.NewUnsafeSet()
for cName := range cState {
jobs.Add(cName)
}

for _, name := range jobs.Values() {
tc := ar.calculateTaskChainForUnit(dState, cState, name)
if tc == nil {
continue
}
tcChan <- *tc
}
for dName := range dState.Units {
jobs.Add(dName)
}

close(tcChan)
}()
var tasks []task
for _, name := range jobs.Values() {
tasks = append(tasks, ar.calculateTasksForUnit(dState, cState, name)...)
}

return tcChan
sort.Sort(sortableTasks(tasks))
return tasks
}

func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState unitStates, jName string) *taskChain {
func (ar *AgentReconciler) calculateTasksForUnit(dState *AgentState, cState unitStates, jName string) (tasks []task) {
var dJob *job.Unit
var dJHash string
if dState != nil {
Expand All @@ -207,119 +200,115 @@ func (ar *AgentReconciler) calculateTaskChainForUnit(dState *AgentState, cState
return nil
}

u := &job.Unit{
Name: jName,
}

if dJob == nil || dJob.TargetState == job.JobStateInactive {
if cJState == nil {
return nil
}
t := task{
tasks = append(tasks, task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButNotScheduled,
}
u := &job.Unit{
Name: jName,
}
tc := newTaskChain(u, t)
return &tc
unit: u,
})
return
}

u := &job.Unit{
Name: jName,
Unit: dJob.Unit,
}
u.Unit = dJob.Unit

if cJState == nil {
tc := newTaskChain(u)
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
})

// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

return &tc
return
}

if cJHash != dJHash {
log.Debugf("Desired hash %q differs to current hash %s of Job(%s) - unloading", dJHash, cJHash, jName)
tc := newTaskChain(u)
tc.Add(task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
})

// queue the correct unit for loading immediately after unloading the old one
tc.Add(task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
})
tasks = append(tasks,
task{
typ: taskTypeUnloadUnit,
reason: taskReasonLoadedButHashDiffers,
unit: u,
},
task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
},
)

// as an optimization, queue the unit for launching immediately after loading
if dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

return &tc
return
}

if *cJState == dJob.TargetState {
log.Debugf("Desired state %q matches current state of Job(%s), nothing to do", *cJState, jName)
return nil
}

tc := newTaskChain(u)
if *cJState == job.JobStateInactive {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeLoadUnit,
reason: taskReasonScheduledButUnloaded,
unit: u,
})
}

if (*cJState == job.JobStateInactive || *cJState == job.JobStateLoaded) && dJob.TargetState == job.JobStateLaunched {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStartUnit,
reason: taskReasonLoadedDesiredStateLaunched,
unit: u,
})
}

if *cJState == job.JobStateLaunched && dJob.TargetState == job.JobStateLoaded {
tc.Add(task{
tasks = append(tasks, task{
typ: taskTypeStopUnit,
reason: taskReasonLaunchedDesiredStateLoaded,
unit: u,
})
}

if len(tc.tasks) == 0 {
if len(tasks) == 0 {
log.Errorf("Unable to determine how to reconcile Job(%s): desiredState=%#v currentState=%#v", jName, dJob, cJState)
return nil
}

return &tc
return
}

func (ar *AgentReconciler) launchTaskChain(tc taskChain, a *Agent) {
log.Debugf("AgentReconciler attempting task chain %s", tc)
reschan, err := ar.tManager.Do(tc, a)
if err != nil {
log.Infof("AgentReconciler task chain failed: chain=%s err=%v", tc, err)
return
}

go func() {
for res := range reschan {
if res.err == nil {
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, tc.unit.Name, res.task.reason)
} else {
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, tc.unit.Name, res.task.reason, res.err)
}
func (ar *AgentReconciler) launchTasks(tasks []task, a *Agent) {
log.Debugf("AgentReconciler attempting tasks %s", tasks)
results := ar.tManager.Do(tasks, a)
for _, res := range results {
if res.err == nil {
log.Infof("AgentReconciler completed task: type=%s job=%s reason=%q", res.task.typ, res.task.unit.Name, res.task.reason)
} else {
log.Infof("AgentReconciler task failed: type=%s job=%s reason=%q err=%v", res.task.typ, res.task.unit.Name, res.task.reason, res.err)
}
}()
}
}
Loading