Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: ⚡️ run apply namespaces in apply pipeline concurrently #644

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions pkg/cluster/delete_utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/ministryofjustice/cloud-platform-cli/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -37,7 +38,7 @@ func AbortIfUserNamespacesExist(namespaces []v1.Namespace, systemNamespaces []st
continue
}

isUserNamespace := !contains(systemNamespaces, ns.Name)
isUserNamespace := !util.Contains(systemNamespaces, ns.Name)

if isUserNamespace {
userNamespaces = append(userNamespaces, ns.Name)
Expand Down Expand Up @@ -73,7 +74,7 @@ func CheckClusterIsDestroyed(clusterName string, eksAPI EKSClient) error {
clusterValues = append(clusterValues, *val)
}

isDeleted := !contains(clusterValues, clusterName)
isDeleted := !util.Contains(clusterValues, clusterName)

if !isDeleted {
return fmt.Errorf("cluster has not successfully deleted")
Expand All @@ -91,7 +92,6 @@ func CheckVpcIsDestroyed(vpcID string, ec2API EC2Client) error {
{Name: aws.String("tag:business-unit"), Values: []*string{aws.String("Platforms")}},
},
})

if err != nil {
return fmt.Errorf("failed to list vpcs: %w", err)
}
Expand All @@ -102,22 +102,11 @@ func CheckVpcIsDestroyed(vpcID string, ec2API EC2Client) error {
vpcValues = append(vpcValues, *val.VpcId)
}

isDeleted := !contains(vpcValues, vpcID)
isDeleted := !util.Contains(vpcValues, vpcID)

if !isDeleted {
return fmt.Errorf("vpc has not successfully deleted")
}

return nil
}

// Contains checks if a string is present in a slice
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}

return false
}
3 changes: 2 additions & 1 deletion pkg/cluster/delete_utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/service/eks"
"github.com/aws/aws-sdk-go/service/eks/eksiface"
"github.com/ministryofjustice/cloud-platform-cli/pkg/client"
"github.com/ministryofjustice/cloud-platform-cli/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -131,7 +132,7 @@ func Test_contains(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := contains(tt.args.s, tt.args.str); got != tt.want {
if got := util.Contains(tt.args.s, tt.args.str); got != tt.want {
t.Errorf("contains() = %v, want %v", got, tt.want)
}
})
Expand Down
6 changes: 0 additions & 6 deletions pkg/environment/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ func (m *ApplierImpl) TerraformInitAndPlan(namespace, directory string) (*tfjson
return nil, "", errors.New("unable to do Terraform Plan: " + err.Error())
}

// ignore if any changes or no changes.
_, err = terraform.Plan(context.Background())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated plan see line 152 above

if err != nil {
return nil, "", errors.New("unable to do Terraform Plan: " + err.Error())
}

return tfPlan, out.String(), nil
}

Expand Down
99 changes: 71 additions & 28 deletions pkg/environment/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"runtime"
"strings"

"github.com/ministryofjustice/cloud-platform-cli/pkg/github"
Expand Down Expand Up @@ -60,11 +61,11 @@ func notifyUserApplyFailed(prNumberInt int, slackToken, webhookUrl, buildUrl str
// NewApply creates a new Apply object and populates its fields with values from options(which are flags),
// instantiate Applier object which also checks and sets the Backend config variables to do terraform init,
// RequiredEnvVars object which stores the values required for plan/apply of namespace
func NewApply(opt Options) *Apply {
func NewApply(opt Options, namespace string) *Apply {
apply := Apply{
Options: &opt,
Applier: NewApplier("/usr/local/bin/terraform", "/usr/local/bin/kubectl"),
Dir: "namespaces/" + opt.ClusterDir + "/" + opt.Namespace,
Dir: "namespaces/" + opt.ClusterDir + "/" + namespace,
}

apply.Initialize()
Expand All @@ -84,7 +85,7 @@ func (a *Apply) Apply() error {
// If a namespace is given as a flag, then perform a apply for the given namespace.
if a.Options.Namespace != "" {

err := a.applyNamespace()
err := a.applyNamespace(a.Options.Namespace)
if err != nil {
return err
}
Expand All @@ -111,10 +112,9 @@ func (a *Apply) Apply() error {
return err
}
for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
if _, err = os.Stat(a.Options.Namespace); err != nil {
if _, err = os.Stat(namespace); err != nil {
fmt.Println("Applying Namespace:", namespace)
err = a.applyNamespace()
err = a.applyNamespace(namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,25 +182,68 @@ func (a *Apply) ApplyBatch() error {
// get the latest changes (In case any PRs were merged since the pipeline started), and perform
// the apply of that namespace
func (a *Apply) applyNamespaceDirs(chunkFolder []string) error {
for _, folder := range chunkFolder {
erroredNs := []string{}

// split the path to get the namespace name
namespace := strings.Split(folder, "/")
a.Options.Namespace = namespace[2]
err := util.GetLatestGitPull()
if err != nil {
return err
}

err := util.GetLatestGitPull()
if err != nil {
return err
}
err = a.applyNamespace()
if err != nil {
return err
}
done := make(chan bool)
defer close(done)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close the channel which cascades down through the child go routines and in turn closes them to prevent memory leaks


chunkStream := util.Generator(done, chunkFolder...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert array into a channel


routineResults := a.parallelApplyNamespace(done, chunkStream)

results := util.FanIn(done, routineResults...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

combine multiple channel results back into a single channel


for res := range results {
erroredNs = append(erroredNs, res)
}

fmt.Printf("\nerrored ns: %v\n", erroredNs)
return nil
}

func (a *Apply) parallelApplyNamespace(done <-chan bool, dirStream <-chan string) []<-chan string {
numRoutines := runtime.NumCPU()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run as many go routines as we have cores, this proves to be a conservative number of routines because of how lightweight and cheap they are. We should monitor performance in concourse and adjust here if necessary.


routineResults := make([]<-chan string, numRoutines)

for i := 0; i < numRoutines; i++ {
routineResults[i] = a.runApply(done, dirStream)
}

return routineResults
}

func (a *Apply) runApply(done <-chan bool, dirStream <-chan string) <-chan string {
results := make(chan string)
go func() {
defer close(results)
for dir := range dirStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a new go routine run each apply for each namespace

select {
case <-done:
return
case results <- func(dir string) string {
ns := strings.Split(dir, "/")
namespace := ns[2]

err := a.applyNamespace(namespace)
if err != nil {
return "Error in namespace: " + namespace + "\n" + err.Error()
}

return ""
}(dir):
}
}
}()

return results
}

// applyKubectl calls the applier -> applyKubectl with dry-run disabled and return the output from applier
func (a *Apply) applyKubectl() (string, error) {
log.Printf("Running kubectl for namespace: %v in directory %v", a.Options.Namespace, a.Dir)
Expand Down Expand Up @@ -263,7 +306,6 @@ func (a *Apply) applyTerraform() (string, error) {
// and checks if the file SECRET_ROTATE_BLOCK exists.
func secretBlockerExists(filePath string) bool {
// Check if the file contains a secret blocker
// If it does, we don't want to apply it
// If it doesn't, we do want to apply it
secretBlocker := "SECRET_ROTATE_BLOCK"
if _, err := os.Stat(filePath + "/" + secretBlocker); err == nil {
Expand All @@ -287,30 +329,31 @@ func applySkipExists(filePath string) bool {

// applyNamespace intiates a new Apply object with options and env variables, and calls the
// applyKubectl with dry-run disabled and calls applier TerraformInitAndApply and prints the output
func (a *Apply) applyNamespace() error {
func (a *Apply) applyNamespace(namespace string) error {
// secretBlocker is a file used to control the behaviour of a namespace that will have all
// secrets in a namespace rotated. This came out of the requirement to rotate IAM credentials
// post circle breach.
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if _, err := os.Stat(repoPath); os.IsNotExist(err) {
fmt.Printf("Namespace %s does not exist, skipping apply\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not exist, skipping apply\n", namespace)
return nil
}

if secretBlockerExists(repoPath) {
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", a.Options.Namespace)
log.Printf("Namespace %s has a secret rotation blocker file, skipping apply", namespace)
// We don't want to return an error here so we softly fail.
return nil
}

if (a.Options.EnableApplySkip) && (applySkipExists(repoPath)) {
log.Printf("Namespace %s has a apply skip file, skipping apply", a.Options.Namespace)
log.Printf("Namespace %s has a apply skip file, skipping apply", namespace)
// We don't want to return an error here so we softly fail.
return nil
}

applier := NewApply(*a.Options)
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace

if util.IsYamlFileExists(repoPath) {
outputKubectl, err := applier.applyKubectl()
Expand All @@ -323,7 +366,7 @@ func (a *Apply) applyNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply", namespace)
}

exists, err := util.IsFilePathExists(repoPath + "/resources")
Expand All @@ -341,10 +384,10 @@ func (a *Apply) applyNamespace() error {
}
return err
}
fmt.Println("\nOutput of terraform:")
fmt.Printf("\nOutput of terraform for namespace: %s\n", namespace)
util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv)
} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform apply", namespace)
}
return nil
}
18 changes: 9 additions & 9 deletions pkg/environment/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ func (a *Apply) Destroy() error {
}

for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
if a.Options.SkipProdDestroy && isProductionNs(namespace, namespaces) {
err := fmt.Errorf("cannot destroy production namespace with skip-prod-destroy flag set to true")
return err
}
// Check if the namespace is present in the folder
if _, err = os.Stat(a.Options.Namespace); err != nil {
if _, err = os.Stat(namespace); err != nil {
fmt.Println("Destroying Namespace:", namespace)
err = a.destroyNamespace()
err = a.destroyNamespace(namespace)
if err != nil {
return err
}
Expand All @@ -84,15 +83,16 @@ func (a *Apply) destroyTerraform() (string, error) {

// destroyNamespace intiates a apply object with options and env variables, and calls the
// calls applier TerraformInitAndDestroy, applyKubectl with dry-run disabled and prints the output
func (a *Apply) destroyNamespace() error {
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
func (a *Apply) destroyNamespace(namespace string) error {
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if _, err := os.Stat(repoPath); os.IsNotExist(err) {
fmt.Printf("Namespace %s does not exist, skipping destroy\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not exist, skipping destroy\n", namespace)
return nil
}

applier := NewApply(*a.Options)
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace

exists, err := util.IsFilePathExists(repoPath + "/resources")
if err == nil && exists {
Expand All @@ -112,11 +112,11 @@ func (a *Apply) destroyNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl delete", namespace)
}

} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform destroy", namespace)
}
return nil
}
16 changes: 8 additions & 8 deletions pkg/environment/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a *Apply) Plan() error {

// If a namespace is given as a flag, then perform a plan for the given namespace.
if a.Options.Namespace != "" {
err := a.planNamespace()
err := a.planNamespace(a.Options.Namespace)
if err != nil {
return err
}
Expand All @@ -51,8 +51,7 @@ func (a *Apply) Plan() error {
return err
}
for _, namespace := range changedNamespaces {
a.Options.Namespace = namespace
err = a.planNamespace()
err = a.planNamespace(namespace)
if err != nil {
return err
}
Expand All @@ -77,9 +76,10 @@ func (a *Apply) planTerraform() (*tfjson.Plan, string, error) {

// planNamespace intiates a new Apply object with options and env variables, and calls the
// applyKubectl with dry-run enabled and calls applier TerraformInitAndPlan and prints the output
func (a *Apply) planNamespace() error {
applier := NewApply(*a.Options)
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + a.Options.Namespace
func (a *Apply) planNamespace(namespace string) error {
applier := NewApply(*a.Options, namespace)
applier.Options.Namespace = namespace
repoPath := "namespaces/" + a.Options.ClusterDir + "/" + namespace

if util.IsYamlFileExists(repoPath) {
outputKubectl, err := applier.planKubectl()
Expand All @@ -89,7 +89,7 @@ func (a *Apply) planNamespace() error {

fmt.Println("\nOutput of kubectl:", outputKubectl)
} else {
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not have yaml resources folder, skipping kubectl apply --dry-run\n", namespace)
}

exists, err := util.IsFilePathExists(repoPath + "/resources")
Expand All @@ -107,7 +107,7 @@ func (a *Apply) planNamespace() error {
}
util.RedactedEnv(os.Stdout, outputTerraform, a.Options.RedactedEnv)
} else {
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", a.Options.Namespace)
fmt.Printf("Namespace %s does not have terraform resources folder, skipping terraform plan\n", namespace)
}
return nil
}
Loading
Loading