diff --git a/pkg/environment/applier.go b/pkg/environment/applier.go index 370ea482..ee55b69b 100644 --- a/pkg/environment/applier.go +++ b/pkg/environment/applier.go @@ -157,12 +157,6 @@ func (m *ApplierImpl) TerraformInitAndPlan(namespace, directory string) (*tfjson return nil, "", errors.New("unable to do Terraform Plan: " + err.Error()) } - // ignore if any changes or no changes. - _, err = terraform.Plan(context.Background()) - if err != nil { - return nil, "", errors.New("unable to do Terraform Plan: " + err.Error()) - } - return tfPlan, out.String(), nil } diff --git a/pkg/environment/apply.go b/pkg/environment/apply.go index ccc659e9..59223914 100644 --- a/pkg/environment/apply.go +++ b/pkg/environment/apply.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "os" + "runtime" "strings" "github.com/ministryofjustice/cloud-platform-cli/pkg/github" @@ -60,11 +61,11 @@ func notifyUserApplyFailed(prNumberInt int, slackToken, webhookUrl, buildUrl str // NewApply creates a new Apply object and populates its fields with values from options(which are flags), // instantiate Applier object which also checks and sets the Backend config variables to do terraform init, // RequiredEnvVars object which stores the values required for plan/apply of namespace -func NewApply(opt Options) *Apply { +func NewApply(opt Options, namespace string) *Apply { apply := Apply{ Options: &opt, Applier: NewApplier("/usr/local/bin/terraform", "/usr/local/bin/kubectl"), - Dir: "namespaces/" + opt.ClusterDir + "/" + opt.Namespace, + Dir: "namespaces/" + opt.ClusterDir + "/" + namespace, } apply.Initialize() @@ -84,7 +85,7 @@ func (a *Apply) Apply() error { // If a namespace is given as a flag, then perform a apply for the given namespace. if a.Options.Namespace != "" { - err := a.applyNamespace() + err := a.applyNamespace(a.Options.Namespace) if err != nil { return err } @@ -111,10 +112,9 @@ func (a *Apply) Apply() error { return err } for _, namespace := range changedNamespaces { - a.Options.Namespace = namespace - if _, err = os.Stat(a.Options.Namespace); err != nil { + if _, err = os.Stat(namespace); err != nil { fmt.Println("Applying Namespace:", namespace) - err = a.applyNamespace() + err = a.applyNamespace(namespace) if err != nil { return err } @@ -182,25 +182,69 @@ func (a *Apply) ApplyBatch() error { // get the latest changes (In case any PRs were merged since the pipeline started), and perform // the apply of that namespace func (a *Apply) applyNamespaceDirs(chunkFolder []string) error { - for _, folder := range chunkFolder { + erroredNs := []string{} - // split the path to get the namespace name - namespace := strings.Split(folder, "/") - a.Options.Namespace = namespace[2] + err := util.GetLatestGitPull() + if err != nil { + return err + } - err := util.GetLatestGitPull() - if err != nil { - return err - } - err = a.applyNamespace() - if err != nil { - return err - } + done := make(chan bool) + defer close(done) + + chunkStream := util.Generator(done, chunkFolder...) + routineResults := a.parallelApplyNamespace(done, chunkStream) + + results := util.FanIn(done, routineResults...) + + for res := range results { + erroredNs = append(erroredNs, res) } + + fmt.Printf("\nerrored ns: %v\n", erroredNs) return nil } +func (a *Apply) parallelApplyNamespace(done <-chan bool, dirStream <-chan string) []<-chan string { + numRoutines := runtime.NumCPU() + + routineResults := make([]<-chan string, numRoutines) + + for i := 0; i < numRoutines; i++ { + routineResults[i] = a.runApply(done, dirStream) + } + + return routineResults +} + +func (a *Apply) runApply(done <-chan bool, dirStream <-chan string) <-chan string { + results := make(chan string) + go func() { + defer close(results) + for dir := range dirStream { + fmt.Printf("dir %s\n", dir) + select { + case <-done: + return + case results <- func(dir string) string { + ns := strings.Split(dir, "/") + namespace := ns[2] + + err := a.applyNamespace(namespace) + if err != nil { + return namespace + } + + return "" + }(dir): + } + } + }() + + return results +} + // applyKubectl calls the applier -> applyKubectl with dry-run disabled and return the output from applier func (a *Apply) applyKubectl() (string, error) { log.Printf("Running kubectl for namespace: %v in directory %v", a.Options.Namespace, a.Dir) @@ -263,7 +307,6 @@ func (a *Apply) applyTerraform() (string, error) { // and checks if the file SECRET_ROTATE_BLOCK exists. func secretBlockerExists(filePath string) bool { // Check if the file contains a secret blocker - // If it does, we don't want to apply it // If it doesn't, we do want to apply it secretBlocker := "SECRET_ROTATE_BLOCK" if _, err := os.Stat(filePath + "/" + secretBlocker); err == nil { @@ -287,30 +330,31 @@ func applySkipExists(filePath string) bool { // applyNamespace intiates a new Apply object with options and env variables, and calls the // applyKubectl with dry-run disabled and calls applier TerraformInitAndApply and prints the output -func (a *Apply) applyNamespace() error { +func (a *Apply) applyNamespace(namespace string) error { // secretBlocker is a file used to control the behaviour of a namespace that will have all // secrets in a namespace rotated. This came out of the requirement to rotate IAM credentials // post circle breach. - repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace + repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace if _, err := os.Stat(repoPath); os.IsNotExist(err) { - fmt.Printf("Namespace %s does not exist, skipping apply\n", a.Options.Namespace) + fmt.Printf("Namespace %s does not exist, skipping apply\n", namespace) return nil } if secretBlockerExists(repoPath) { - log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", a.Options.Namespace) + log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", namespace) // We don't want to return an error here so we softly fail. return nil } if (a.Options.EnableApplySkip) && (applySkipExists(repoPath)) { - log.Printf("Namespace %s has a apply skip file, skipping apply", a.Options.Namespace) + log.Printf("Namespace %s has a apply skip file, skipping apply", namespace) // We don't want to return an error here so we softly fail. return nil } - applier := NewApply(*a.Options) + applier := NewApply(*a.Options, namespace) + applier.Options.Namespace = namespace if util.IsYamlFileExists(repoPath) { outputKubectl, err := applier.applyKubectl() @@ -323,7 +367,7 @@ func (a *Apply) applyNamespace() error { fmt.Println("\nOutput of kubectl:", outputKubectl) } else { - fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", a.Options.Namespace) + fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", namespace) } exists, err := util.IsFilePathExists(repoPath + "/resources") @@ -344,7 +388,7 @@ func (a *Apply) applyNamespace() error { fmt.Println("\nOutput of terraform:") util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv) } else { - fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", a.Options.Namespace) + fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", namespace) } return nil } diff --git a/pkg/environment/destroy.go b/pkg/environment/destroy.go index 0848fc13..686e8658 100644 --- a/pkg/environment/destroy.go +++ b/pkg/environment/destroy.go @@ -49,15 +49,14 @@ func (a *Apply) Destroy() error { } for _, namespace := range changedNamespaces { - a.Options.Namespace = namespace if a.Options.SkipProdDestroy && isProductionNs(namespace, namespaces) { err := fmt.Errorf("cannot destroy production namespace with skip-prod-destroy flag set to true") return err } // Check if the namespace is present in the folder - if _, err = os.Stat(a.Options.Namespace); err != nil { + if _, err = os.Stat(namespace); err != nil { fmt.Println("Destroying Namespace:", namespace) - err = a.destroyNamespace() + err = a.destroyNamespace(namespace) if err != nil { return err } @@ -84,15 +83,16 @@ func (a *Apply) destroyTerraform() (string, error) { // destroyNamespace intiates a apply object with options and env variables, and calls the // calls applier TerraformInitAndDestroy, applyKubectl with dry-run disabled and prints the output -func (a *Apply) destroyNamespace() error { - repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace +func (a *Apply) destroyNamespace(namespace string) error { + repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace if _, err := os.Stat(repoPath); os.IsNotExist(err) { - fmt.Printf("Namespace %s does not exist, skipping destroy\n", a.Options.Namespace) + fmt.Printf("Namespace %s does not exist, skipping destroy\n", namespace) return nil } - applier := NewApply(*a.Options) + applier := NewApply(*a.Options, namespace) + applier.Options.Namespace = namespace exists, err := util.IsFilePathExists(repoPath + "/resources") if err == nil && exists { @@ -112,11 +112,11 @@ func (a *Apply) destroyNamespace() error { fmt.Println("\nOutput of kubectl:", outputKubectl) } else { - fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", a.Options.Namespace) + fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", namespace) } } else { - fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", a.Options.Namespace) + fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", namespace) } return nil } diff --git a/pkg/environment/plan.go b/pkg/environment/plan.go index 47cae60e..c1c554db 100644 --- a/pkg/environment/plan.go +++ b/pkg/environment/plan.go @@ -34,7 +34,7 @@ func (a *Apply) Plan() error { // If a namespace is given as a flag, then perform a plan for the given namespace. if a.Options.Namespace != "" { - err := a.planNamespace() + err := a.planNamespace(a.Options.Namespace) if err != nil { return err } @@ -51,8 +51,7 @@ func (a *Apply) Plan() error { return err } for _, namespace := range changedNamespaces { - a.Options.Namespace = namespace - err = a.planNamespace() + err = a.planNamespace(namespace) if err != nil { return err } @@ -77,9 +76,10 @@ func (a *Apply) planTerraform() (*tfjson.Plan, string, error) { // planNamespace intiates a new Apply object with options and env variables, and calls the // applyKubectl with dry-run enabled and calls applier TerraformInitAndPlan and prints the output -func (a *Apply) planNamespace() error { - applier := NewApply(*a.Options) - repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace +func (a *Apply) planNamespace(namespace string) error { + applier := NewApply(*a.Options, namespace) + applier.Options.Namespace = namespace + repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace if util.IsYamlFileExists(repoPath) { outputKubectl, err := applier.planKubectl() @@ -89,7 +89,7 @@ func (a *Apply) planNamespace() error { fmt.Println("\nOutput of kubectl:", outputKubectl) } else { - fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", a.Options.Namespace) + fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", namespace) } exists, err := util.IsFilePathExists(repoPath + "/resources") @@ -107,7 +107,7 @@ func (a *Apply) planNamespace() error { } util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv) } else { - fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", a.Options.Namespace) + fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", namespace) } return nil } diff --git a/pkg/util/concurrent.go b/pkg/util/concurrent.go new file mode 100644 index 00000000..4740feea --- /dev/null +++ b/pkg/util/concurrent.go @@ -0,0 +1,49 @@ +package util + +import ( + "sync" +) + +func Generator(done <-chan bool, data ...string) <-chan string { + readStream := make(chan string) + go func() { + defer close(readStream) + for _, s := range data { + select { + case <-done: + return + case readStream <- s: + } + } + }() + + return readStream +} + +func FanIn(done <-chan bool, channels ...<-chan string) <-chan string { + var wg sync.WaitGroup + multiplexedStream := make(chan string) + + multiplex := func(c <-chan string) { + defer wg.Done() + for i := range c { + select { + case <-done: + return + case multiplexedStream <- i: + } + } + } + + wg.Add(len(channels)) + for _, c := range channels { + go multiplex(c) + } + + go func() { + wg.Wait() + close(multiplexedStream) + }() + + return multiplexedStream +}