-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: ⚡️ run apply namespaces in apply pipeline concurrently #644
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"fmt" | ||
"log" | ||
"os" | ||
"runtime" | ||
"strings" | ||
|
||
"github.com/ministryofjustice/cloud-platform-cli/pkg/github" | ||
|
@@ -60,11 +61,11 @@ func notifyUserApplyFailed(prNumberInt int, slackToken, webhookUrl, buildUrl str | |
// NewApply creates a new Apply object and populates its fields with values from options(which are flags), | ||
// instantiate Applier object which also checks and sets the Backend config variables to do terraform init, | ||
// RequiredEnvVars object which stores the values required for plan/apply of namespace | ||
func NewApply(opt Options) *Apply { | ||
func NewApply(opt Options, namespace string) *Apply { | ||
apply := Apply{ | ||
Options: &opt, | ||
Applier: NewApplier("/usr/local/bin/terraform", "/usr/local/bin/kubectl"), | ||
Dir: "namespaces/" + opt.ClusterDir + "/" + opt.Namespace, | ||
Dir: "namespaces/" + opt.ClusterDir + "/" + namespace, | ||
} | ||
|
||
apply.Initialize() | ||
|
@@ -84,7 +85,7 @@ func (a *Apply) Apply() error { | |
// If a namespace is given as a flag, then perform a apply for the given namespace. | ||
if a.Options.Namespace != "" { | ||
|
||
err := a.applyNamespace() | ||
err := a.applyNamespace(a.Options.Namespace) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -111,10 +112,9 @@ func (a *Apply) Apply() error { | |
return err | ||
} | ||
for _, namespace := range changedNamespaces { | ||
a.Options.Namespace = namespace | ||
if _, err = os.Stat(a.Options.Namespace); err != nil { | ||
if _, err = os.Stat(namespace); err != nil { | ||
fmt.Println("Applying Namespace:", namespace) | ||
err = a.applyNamespace() | ||
err = a.applyNamespace(namespace) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -182,25 +182,68 @@ func (a *Apply) ApplyBatch() error { | |
// get the latest changes (In case any PRs were merged since the pipeline started), and perform | ||
// the apply of that namespace | ||
func (a *Apply) applyNamespaceDirs(chunkFolder []string) error { | ||
for _, folder := range chunkFolder { | ||
erroredNs := []string{} | ||
|
||
// split the path to get the namespace name | ||
namespace := strings.Split(folder, "/") | ||
a.Options.Namespace = namespace[2] | ||
err := util.GetLatestGitPull() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err := util.GetLatestGitPull() | ||
if err != nil { | ||
return err | ||
} | ||
err = a.applyNamespace() | ||
if err != nil { | ||
return err | ||
} | ||
done := make(chan bool) | ||
defer close(done) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close the channel which cascades down through the child go routines and in turn closes them to prevent memory leaks |
||
|
||
chunkStream := util.Generator(done, chunkFolder...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. convert array into a channel |
||
|
||
routineResults := a.parallelApplyNamespace(done, chunkStream) | ||
|
||
results := util.FanIn(done, routineResults...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. combine multiple channel results back into a single channel |
||
|
||
for res := range results { | ||
erroredNs = append(erroredNs, res) | ||
} | ||
|
||
fmt.Printf("\nerrored ns: %v\n", erroredNs) | ||
return nil | ||
} | ||
|
||
func (a *Apply) parallelApplyNamespace(done <-chan bool, dirStream <-chan string) []<-chan string { | ||
numRoutines := runtime.NumCPU() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. run as many go routines as we have cores, this proves to be a conservative number of routines because of how lightweight and cheap they are. We should monitor performance in concourse and adjust here if necessary. |
||
|
||
routineResults := make([]<-chan string, numRoutines) | ||
|
||
for i := 0; i < numRoutines; i++ { | ||
routineResults[i] = a.runApply(done, dirStream) | ||
} | ||
|
||
return routineResults | ||
} | ||
|
||
func (a *Apply) runApply(done <-chan bool, dirStream <-chan string) <-chan string { | ||
results := make(chan string) | ||
go func() { | ||
defer close(results) | ||
for dir := range dirStream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a new go routine run each apply for each namespace |
||
select { | ||
case <-done: | ||
return | ||
case results <- func(dir string) string { | ||
ns := strings.Split(dir, "/") | ||
namespace := ns[2] | ||
|
||
err := a.applyNamespace(namespace) | ||
if err != nil { | ||
return "Error in namespace: " + namespace + "\n" + err.Error() | ||
} | ||
|
||
return "" | ||
}(dir): | ||
} | ||
} | ||
}() | ||
|
||
return results | ||
} | ||
|
||
// applyKubectl calls the applier -> applyKubectl with dry-run disabled and return the output from applier | ||
func (a *Apply) applyKubectl() (string, error) { | ||
log.Printf("Running kubectl for namespace: %v in directory %v", a.Options.Namespace, a.Dir) | ||
|
@@ -263,7 +306,6 @@ func (a *Apply) applyTerraform() (string, error) { | |
// and checks if the file SECRET_ROTATE_BLOCK exists. | ||
func secretBlockerExists(filePath string) bool { | ||
// Check if the file contains a secret blocker | ||
// If it does, we don't want to apply it | ||
// If it doesn't, we do want to apply it | ||
secretBlocker := "SECRET_ROTATE_BLOCK" | ||
if _, err := os.Stat(filePath + "/" + secretBlocker); err == nil { | ||
|
@@ -287,30 +329,31 @@ func applySkipExists(filePath string) bool { | |
|
||
// applyNamespace intiates a new Apply object with options and env variables, and calls the | ||
// applyKubectl with dry-run disabled and calls applier TerraformInitAndApply and prints the output | ||
func (a *Apply) applyNamespace() error { | ||
func (a *Apply) applyNamespace(namespace string) error { | ||
// secretBlocker is a file used to control the behaviour of a namespace that will have all | ||
// secrets in a namespace rotated. This came out of the requirement to rotate IAM credentials | ||
// post circle breach. | ||
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace | ||
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace | ||
|
||
if _, err := os.Stat(repoPath); os.IsNotExist(err) { | ||
fmt.Printf("Namespace %s does not exist, skipping apply\n", a.Options.Namespace) | ||
fmt.Printf("Namespace %s does not exist, skipping apply\n", namespace) | ||
return nil | ||
} | ||
|
||
if secretBlockerExists(repoPath) { | ||
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", a.Options.Namespace) | ||
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", namespace) | ||
// We don't want to return an error here so we softly fail. | ||
return nil | ||
} | ||
|
||
if (a.Options.EnableApplySkip) && (applySkipExists(repoPath)) { | ||
log.Printf("Namespace %s has a apply skip file, skipping apply", a.Options.Namespace) | ||
log.Printf("Namespace %s has a apply skip file, skipping apply", namespace) | ||
// We don't want to return an error here so we softly fail. | ||
return nil | ||
} | ||
|
||
applier := NewApply(*a.Options) | ||
applier := NewApply(*a.Options, namespace) | ||
applier.Options.Namespace = namespace | ||
|
||
if util.IsYamlFileExists(repoPath) { | ||
outputKubectl, err := applier.applyKubectl() | ||
|
@@ -323,7 +366,7 @@ func (a *Apply) applyNamespace() error { | |
|
||
fmt.Println("\nOutput of kubectl:", outputKubectl) | ||
} else { | ||
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", a.Options.Namespace) | ||
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", namespace) | ||
} | ||
|
||
exists, err := util.IsFilePathExists(repoPath + "/resources") | ||
|
@@ -341,10 +384,10 @@ func (a *Apply) applyNamespace() error { | |
} | ||
return err | ||
} | ||
fmt.Println("\nOutput of terraform:") | ||
fmt.Printf("\nOutput of terraform for namespace: %s\n", namespace) | ||
util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv) | ||
} else { | ||
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", a.Options.Namespace) | ||
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", namespace) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicated plan see line 152 above