Skip to content

Commit

Permalink
perf: ⚡️ run apply namespaces in apply pipeline concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
jaskaransarkaria committed Oct 16, 2024
1 parent be7c9d9 commit 9815c3d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 50 deletions.
6 changes: 0 additions & 6 deletions pkg/environment/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ func (m *ApplierImpl) TerraformInitAndPlan(namespace, directory string) (*tfjson
return nil, "", errors.New("unable to do Terraform Plan: " + err.Error())
}

// ignore if any changes or no changes.
_, err = terraform.Plan(context.Background())
if err != nil {
return nil, "", errors.New("unable to do Terraform Plan: " + err.Error())
}

return tfPlan, out.String(), nil
}

Expand Down
98 changes: 71 additions & 27 deletions pkg/environment/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"runtime"
"strings"

"github.com/ministryofjustice/cloud-platform-cli/pkg/github"
Expand Down Expand Up @@ -60,11 +61,11 @@ func notifyUserApplyFailed(prNumberInt int, slackToken, webhookUrl, buildUrl str
// NewApply creates a new Apply object and populates its fields with values from options(which are flags),
// instantiate Applier object which also checks and sets the Backend config variables to do terraform init,
// RequiredEnvVars object which stores the values required for plan/apply of namespace
func NewApply(opt Options) *Apply {
func NewApply(opt Options, namespace string) *Apply {
apply := Apply{
Options: &opt,
Applier: NewApplier("/usr/local/bin/terraform", "/usr/local/bin/kubectl"),
Dir: "namespaces/" + opt.ClusterDir + "/" + opt.Namespace,
Dir: "namespaces/" + opt.ClusterDir + "/" + namespace,
}

apply.Initialize()
Expand All @@ -84,7 +85,7 @@ func (a *Apply) Apply() error {
// If a namespace is given as a flag, then perform a apply for the given namespace.
if a.Options.Namespace != "" {

err := a.applyNamespace()
err := a.applyNamespace(a.Options.Namespace)
if err != nil {
return err
}
Expand All @@ -111,10 +112,9 @@ func (a *Apply) Apply() error {
return err
}
for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
if _, err = os.Stat(a.Options.Namespace); err != nil {
if _, err = os.Stat(namespace); err != nil {
fmt.Println("Applying Namespace:", namespace)
err = a.applyNamespace()
err = a.applyNamespace(namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,25 +182,69 @@ func (a *Apply) ApplyBatch() error {
// get the latest changes (In case any PRs were merged since the pipeline started), and perform
// the apply of that namespace
func (a *Apply) applyNamespaceDirs(chunkFolder []string) error {
for _, folder := range chunkFolder {
erroredNs := []string{}

// split the path to get the namespace name
namespace := strings.Split(folder, "/")
a.Options.Namespace = namespace[2]
err := util.GetLatestGitPull()
if err != nil {
return err
}

err := util.GetLatestGitPull()
if err != nil {
return err
}
err = a.applyNamespace()
if err != nil {
return err
}
done := make(chan bool)
defer close(done)

chunkStream := util.Generator(done, chunkFolder...)

routineResults := a.parallelApplyNamespace(done, chunkStream)

results := util.FanIn(done, routineResults...)

for res := range results {
erroredNs = append(erroredNs, res)
}

fmt.Printf("\nerrored ns: %v\n", erroredNs)
return nil
}

func (a *Apply) parallelApplyNamespace(done <-chan bool, dirStream <-chan string) []<-chan string {
numRoutines := runtime.NumCPU()

routineResults := make([]<-chan string, numRoutines)

for i := 0; i < numRoutines; i++ {
routineResults[i] = a.runApply(done, dirStream)
}

return routineResults
}

func (a *Apply) runApply(done <-chan bool, dirStream <-chan string) <-chan string {
results := make(chan string)
go func() {
defer close(results)
for dir := range dirStream {
fmt.Printf("dir %s\n", dir)
select {
case <-done:
return
case results <- func(dir string) string {
ns := strings.Split(dir, "/")
namespace := ns[2]

err := a.applyNamespace(namespace)
if err != nil {
return namespace
}

return ""
}(dir):
}
}
}()

return results
}

// applyKubectl calls the applier -> applyKubectl with dry-run disabled and return the output from applier
func (a *Apply) applyKubectl() (string, error) {
log.Printf("Running kubectl for namespace: %v in directory %v", a.Options.Namespace, a.Dir)
Expand Down Expand Up @@ -263,7 +307,6 @@ func (a *Apply) applyTerraform() (string, error) {
// and checks if the file SECRET_ROTATE_BLOCK exists.
func secretBlockerExists(filePath string) bool {
// Check if the file contains a secret blocker
// If it does, we don't want to apply it
// If it doesn't, we do want to apply it
secretBlocker := "SECRET_ROTATE_BLOCK"
if _, err := os.Stat(filePath + "/" + secretBlocker); err == nil {
Expand All @@ -287,30 +330,31 @@ func applySkipExists(filePath string) bool {

// applyNamespace intiates a new Apply object with options and env variables, and calls the
// applyKubectl with dry-run disabled and calls applier TerraformInitAndApply and prints the output
func (a *Apply) applyNamespace() error {
func (a *Apply) applyNamespace(namespace string) error {
// secretBlocker is a file used to control the behaviour of a namespace that will have all
// secrets in a namespace rotated. This came out of the requirement to rotate IAM credentials
// post circle breach.
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if _, err := os.Stat(repoPath); os.IsNotExist(err) {
fmt.Printf("Namespace %s does not exist, skipping apply\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not exist, skipping apply\n", namespace)
return nil
}

if secretBlockerExists(repoPath) {
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", a.Options.Namespace)
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", namespace)
// We don't want to return an error here so we softly fail.
return nil
}

if (a.Options.EnableApplySkip) && (applySkipExists(repoPath)) {
log.Printf("Namespace %s has a apply skip file, skipping apply", a.Options.Namespace)
log.Printf("Namespace %s has a apply skip file, skipping apply", namespace)
// We don't want to return an error here so we softly fail.
return nil
}

applier := NewApply(*a.Options)
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace

if util.IsYamlFileExists(repoPath) {
outputKubectl, err := applier.applyKubectl()
Expand All @@ -323,7 +367,7 @@ func (a *Apply) applyNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", namespace)
}

exists, err := util.IsFilePathExists(repoPath + "/resources")
Expand All @@ -344,7 +388,7 @@ func (a *Apply) applyNamespace() error {
fmt.Println("\nOutput of terraform:")
util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv)
} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", namespace)
}
return nil
}
18 changes: 9 additions & 9 deletions pkg/environment/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ func (a *Apply) Destroy() error {
}

for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
if a.Options.SkipProdDestroy && isProductionNs(namespace, namespaces) {
err := fmt.Errorf("cannot destroy production namespace with skip-prod-destroy flag set to true")
return err
}
// Check if the namespace is present in the folder
if _, err = os.Stat(a.Options.Namespace); err != nil {
if _, err = os.Stat(namespace); err != nil {
fmt.Println("Destroying Namespace:", namespace)
err = a.destroyNamespace()
err = a.destroyNamespace(namespace)
if err != nil {
return err
}
Expand All @@ -84,15 +83,16 @@ func (a *Apply) destroyTerraform() (string, error) {

// destroyNamespace intiates a apply object with options and env variables, and calls the
// calls applier TerraformInitAndDestroy, applyKubectl with dry-run disabled and prints the output
func (a *Apply) destroyNamespace() error {
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
func (a *Apply) destroyNamespace(namespace string) error {
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if _, err := os.Stat(repoPath); os.IsNotExist(err) {
fmt.Printf("Namespace %s does not exist, skipping destroy\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not exist, skipping destroy\n", namespace)
return nil
}

applier := NewApply(*a.Options)
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace

exists, err := util.IsFilePathExists(repoPath + "/resources")
if err == nil && exists {
Expand All @@ -112,11 +112,11 @@ func (a *Apply) destroyNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", namespace)
}

} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", namespace)
}
return nil
}
16 changes: 8 additions & 8 deletions pkg/environment/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a *Apply) Plan() error {

// If a namespace is given as a flag, then perform a plan for the given namespace.
if a.Options.Namespace != "" {
err := a.planNamespace()
err := a.planNamespace(a.Options.Namespace)
if err != nil {
return err
}
Expand All @@ -51,8 +51,7 @@ func (a *Apply) Plan() error {
return err
}
for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
err = a.planNamespace()
err = a.planNamespace(namespace)
if err != nil {
return err
}
Expand All @@ -77,9 +76,10 @@ func (a *Apply) planTerraform() (*tfjson.Plan, string, error) {

// planNamespace intiates a new Apply object with options and env variables, and calls the
// applyKubectl with dry-run enabled and calls applier TerraformInitAndPlan and prints the output
func (a *Apply) planNamespace() error {
applier := NewApply(*a.Options)
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
func (a *Apply) planNamespace(namespace string) error {
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if util.IsYamlFileExists(repoPath) {
outputKubectl, err := applier.planKubectl()
Expand All @@ -89,7 +89,7 @@ func (a *Apply) planNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", namespace)
}

exists, err := util.IsFilePathExists(repoPath + "/resources")
Expand All @@ -107,7 +107,7 @@ func (a *Apply) planNamespace() error {
}
util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv)
} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", namespace)
}
return nil
}
49 changes: 49 additions & 0 deletions pkg/util/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package util

import (
"sync"
)

func Generator(done <-chan bool, data ...string) <-chan string {
readStream := make(chan string)
go func() {
defer close(readStream)
for _, s := range data {
select {
case <-done:
return
case readStream <- s:
}
}
}()

return readStream
}

func FanIn(done <-chan bool, channels ...<-chan string) <-chan string {
var wg sync.WaitGroup
multiplexedStream := make(chan string)

multiplex := func(c <-chan string) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}

wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}

go func() {
wg.Wait()
close(multiplexedStream)
}()

return multiplexedStream
}

0 comments on commit 9815c3d

Please sign in to comment.