Skip to content

Commit

Permalink
fix: detect external autoscaler policy changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimonas Rastenis committed Jan 3, 2024
1 parent 67bfef6 commit f50c0b7
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 33 deletions.
62 changes: 40 additions & 22 deletions castai/resource_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"context"
"encoding/json"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"io"
"log"
"net/http"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
Expand All @@ -29,6 +29,7 @@ func resourceAutoscaler() *schema.Resource {
CreateContext: resourceCastaiAutoscalerCreate,
UpdateContext: resourceCastaiAutoscalerUpdate,
DeleteContext: resourceCastaiAutoscalerDelete,
CustomizeDiff: resourceCastaiAutoscalerDiff,
Description: "CAST AI autoscaler resource to manage autoscaler settings",

Timeouts: &schema.ResourceTimeout{
Expand Down Expand Up @@ -74,6 +75,23 @@ func resourceCastaiAutoscalerDelete(ctx context.Context, data *schema.ResourceDa
return nil
}

func resourceCastaiAutoscalerDiff(ctx context.Context, d *schema.ResourceDiff, meta interface{}) error {
clusterId := getClusterId(d)
if clusterId == "" {
return nil
}

policies, err := getChangedPolicies(ctx, d, meta, clusterId)
if err != nil {
return err
}
if policies == nil {
return nil
}

return d.SetNew(FieldAutoscalerPolicies, string(policies))
}

func resourceCastaiAutoscalerRead(ctx context.Context, data *schema.ResourceData, meta interface{}) diag.Diagnostics {
err := readAutoscalerPolicies(ctx, data, meta)
if err != nil {
Expand Down Expand Up @@ -119,15 +137,15 @@ func getCurrentPolicies(ctx context.Context, client *sdk.ClientWithResponses, cl
return nil, fmt.Errorf("cluster %s policies do not exist at CAST AI", clusterId)
}

bytes, err := io.ReadAll(resp.Body)
responseBytes, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}

log.Printf("[DEBUG] Read autoscaler policies for cluster %s:\n%v\n", clusterId, string(bytes))
log.Printf("[DEBUG] Read autoscaler policies for cluster %s:\n%v\n", clusterId, string(responseBytes))

return bytes, nil
return normalizeJSON(responseBytes)
}

func updateAutoscalerPolicies(ctx context.Context, data *schema.ResourceData, meta interface{}) error {
Expand All @@ -137,18 +155,17 @@ func updateAutoscalerPolicies(ctx context.Context, data *schema.ResourceData, me
return nil
}

err := readAutoscalerPolicies(ctx, data, meta)
policies, err := getChangedPolicies(ctx, data, meta, clusterId)
if err != nil {
return err
}

changedPolicies, found := data.GetOk(FieldAutoscalerPolicies)
if !found {
log.Printf("[DEBUG] changed policies json not found. Skipping autoscaler policies changes")
if policies == nil {
log.Printf("[DEBUG] changed policies json not calculated. Skipping autoscaler policies changes")
return nil
}

changedPoliciesJSON := changedPolicies.(string)
changedPoliciesJSON := string(policies)
if changedPoliciesJSON == "" {
log.Printf("[DEBUG] changed policies json not found. Skipping autoscaler policies changes")
return nil
Expand Down Expand Up @@ -178,12 +195,13 @@ func readAutoscalerPolicies(ctx context.Context, data *schema.ResourceData, meta
return nil
}

policies, err := getChangedPolicies(ctx, data, meta, clusterId)
client := meta.(*ProviderConfig).api
currentPolicies, err := getCurrentPolicies(ctx, client, clusterId)
if err != nil {
return err
}

err = data.Set(FieldAutoscalerPolicies, string(policies))
err = data.Set(FieldAutoscalerPolicies, string(currentPolicies))
if err != nil {
log.Printf("[ERROR] Failed to set field: %v", err)
return err
Expand All @@ -192,7 +210,16 @@ func readAutoscalerPolicies(ctx context.Context, data *schema.ResourceData, meta
return nil
}

func getChangedPolicies(ctx context.Context, data *schema.ResourceData, meta interface{}, clusterId string) ([]byte, error) {
func getClusterId(data resourceProvider) string {
value, found := data.GetOk(FieldClusterId)
if !found {
return ""
}

return value.(string)
}

func getChangedPolicies(ctx context.Context, data resourceProvider, meta interface{}, clusterId string) ([]byte, error) {
policyChangesJSON, found := data.GetOk(FieldAutoscalerPoliciesJSON)
if !found {
log.Printf("[DEBUG] policies json not provided. Skipping autoscaler policies changes")
Expand All @@ -219,16 +246,7 @@ func getChangedPolicies(ctx context.Context, data *schema.ResourceData, meta int
return nil, fmt.Errorf("failed to merge policies: %v", err)
}

return policies, nil
}

func getClusterId(data *schema.ResourceData) string {
value, found := data.GetOk(FieldClusterId)
if !found {
return ""
}

return value.(string)
return normalizeJSON(policies)
}

func validateAutoscalerPolicyJSON() schema.SchemaValidateDiagFunc {
Expand Down
224 changes: 213 additions & 11 deletions castai/resource_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,6 @@ func TestAutoscalerResource_PoliciesUpdateAction_Fail(t *testing.T) {
r.Equal(`expected status code 200, received: status=400 body={"message":"policies config: Evictor policy management is not allowed: Evictor installed externally. Uninstall Evictor first and try again.","fieldViolations":[]`, result[0].Summary)
}

func JSONBytesEqual(a, b []byte) (bool, error) {
var j, j2 interface{}
if err := json.Unmarshal(a, &j); err != nil {
return false, err
}
if err := json.Unmarshal(b, &j2); err != nil {
return false, err
}
return reflect.DeepEqual(j2, j), nil
}

func Test_validateAutoscalerPolicyJSON(t *testing.T) {
type testData struct {
json string
Expand Down Expand Up @@ -392,3 +381,216 @@ func Test_validateAutoscalerPolicyJSON(t *testing.T) {
})
}
}

func TestAutoscalerResource_ReadPoliciesAction(t *testing.T) {
r := require.New(t)
mockctrl := gomock.NewController(t)
mockClient := mock_sdk.NewMockClientInterface(mockctrl)
ctx := context.Background()
provider := &ProviderConfig{
api: &sdk.ClientWithResponses{
ClientInterface: mockClient,
},
}

currentPoliciesBytes, err := normalizeJSON([]byte(`
{
"enabled": true,
"isScopedMode": false,
"unschedulablePods": {
"enabled": true,
"headroom": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"headroomSpot": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"nodeConstraints": {
"minCpuCores": 2,
"maxCpuCores": 32,
"minRamMib": 4096,
"maxRamMib": 262144,
"enabled": false
},
"diskGibToCpuRatio": 25
},
"clusterLimits": {
"enabled": false,
"cpu": {
"minCores": 1,
"maxCores": 20
}
},
"nodeDownscaler": {
"emptyNodes": {
"enabled": false,
"delaySeconds": 0
}
}
}`))
r.NoError(err)

currentPolicies := string(currentPoliciesBytes)
resource := resourceAutoscaler()

clusterId := "cluster_id"
val := cty.ObjectVal(map[string]cty.Value{
FieldClusterId: cty.StringVal(clusterId),
})
state := terraform.NewInstanceStateShimmedFromValue(val, 0)
data := resource.Data(state)

body := io.NopCloser(bytes.NewReader([]byte(currentPolicies)))
response := &http.Response{StatusCode: 200, Body: body}

mockClient.EXPECT().PoliciesAPIGetClusterPolicies(gomock.Any(), clusterId, gomock.Any()).Return(response, nil).Times(1)
mockClient.EXPECT().PoliciesAPIUpsertClusterPoliciesWithBody(gomock.Any(), clusterId, "application/json", gomock.Any()).
Times(0)

result := resource.ReadContext(ctx, data, provider)
r.Nil(result)
r.Equal(currentPolicies, data.Get(FieldAutoscalerPolicies))
}

func TestAutoscalerResource_CustomizeDiff(t *testing.T) {
r := require.New(t)
mockctrl := gomock.NewController(t)
mockClient := mock_sdk.NewMockClientInterface(mockctrl)
ctx := context.Background()
provider := &ProviderConfig{
api: &sdk.ClientWithResponses{
ClientInterface: mockClient,
},
}

currentPoliciesBytes, err := normalizeJSON([]byte(`
{
"enabled": true,
"isScopedMode": false,
"unschedulablePods": {
"enabled": true,
"headroom": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"headroomSpot": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"nodeConstraints": {
"minCpuCores": 2,
"maxCpuCores": 32,
"minRamMib": 4096,
"maxRamMib": 262144,
"enabled": false
},
"diskGibToCpuRatio": 25
},
"clusterLimits": {
"enabled": false,
"cpu": {
"minCores": 1,
"maxCores": 20
}
},
"nodeDownscaler": {
"emptyNodes": {
"enabled": false,
"delaySeconds": 0
}
}
}`))
r.NoError(err)

policyChangeBytes, err := normalizeJSON([]byte(`
{
"enabled": false,
"unschedulablePods": {
"enabled": false
}
}`))
r.NoError(err)

expectedPoliciesBytes, err := normalizeJSON([]byte(`
{
"enabled": false,
"isScopedMode": false,
"unschedulablePods": {
"enabled": false,
"headroom": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"headroomSpot": {
"cpuPercentage": 10,
"memoryPercentage": 10,
"enabled": true
},
"nodeConstraints": {
"minCpuCores": 2,
"maxCpuCores": 32,
"minRamMib": 4096,
"maxRamMib": 262144,
"enabled": false
},
"diskGibToCpuRatio": 25
},
"clusterLimits": {
"enabled": false,
"cpu": {
"minCores": 1,
"maxCores": 20
}
},
"nodeDownscaler": {
"emptyNodes": {
"enabled": false,
"delaySeconds": 0
}
}
}`))
r.NoError(err)

currentPolicies := string(currentPoliciesBytes)
policyChanges := string(policyChangeBytes)
expectedPolicies := string(expectedPoliciesBytes)
resource := resourceAutoscaler()

clusterId := "cluster_id"
val := cty.ObjectVal(map[string]cty.Value{
FieldAutoscalerPoliciesJSON: cty.StringVal(policyChanges),
FieldClusterId: cty.StringVal(clusterId),
})
state := terraform.NewInstanceStateShimmedFromValue(val, 0)
data := resource.Data(state)
r.NoError(err)

body := io.NopCloser(bytes.NewReader([]byte(currentPolicies)))
response := &http.Response{StatusCode: 200, Body: body}

mockClient.EXPECT().PoliciesAPIGetClusterPolicies(gomock.Any(), clusterId, gomock.Any()).Return(response, nil).Times(1)
mockClient.EXPECT().PoliciesAPIUpsertClusterPoliciesWithBody(gomock.Any(), clusterId, "application/json", gomock.Any()).
Times(0)

result, err := getChangedPolicies(ctx, data, provider, clusterId)
r.NoError(err)
r.Equal(expectedPolicies, string(result))
}

func JSONBytesEqual(a, b []byte) (bool, error) {
var j, j2 interface{}
if err := json.Unmarshal(a, &j); err != nil {
return false, err
}
if err := json.Unmarshal(b, &j2); err != nil {
return false, err
}
return reflect.DeepEqual(j2, j), nil
}
5 changes: 5 additions & 0 deletions castai/sdk/api.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f50c0b7

Please sign in to comment.