Skip to content

Commit

Permalink
feat(dws/cluster): the cluster resource supports scale-in nodes (#5673)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhuanhong authored Dec 31, 2024
1 parent 952caff commit 3200f16
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 57 deletions.
8 changes: 6 additions & 2 deletions docs/resources/dws_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ The following arguments are supported:

* `description` - (Optional, String) Specifies the description of the cluster.

* `force_backup` - (Optional, Bool) Specified whether to automatically execute snapshot when shrinking the number of nodes.
The default value is **true**.
This parameter is required and available only when scaling-in the `number_of_node` parameter value.

<a name="DwsCluster_PublicIp"></a>
The `PublicIp` block supports:

Expand Down Expand Up @@ -276,7 +280,7 @@ $ terraform import huaweicloud_dws_cluster.test <id>

Note that the imported state may not be identical to your resource definition, due to some attributes missing from the
API response, security or some other reason. The missing attributes include: `user_pwd`, `number_of_cn`, `kms_key_id`,
`volume`, `dss_pool_id`, `logical_cluster_enable`, `lts_enable`.
`volume`, `dss_pool_id`, `logical_cluster_enable`, `lts_enable`, `force_backup`.
It is generally recommended running `terraform plan` after importing a cluster.
You can then decide if changes should be applied to the cluster, or the resource definition
should be updated to align with the cluster. Also you can ignore changes as below.
Expand All @@ -287,7 +291,7 @@ resource "huaweicloud_dws_cluster" "test" {
lifecycle {
ignore_changes = [
user_pwd, number_of_cn, kms_key_id, volume, dss_pool_id, logical_cluster_enable, lts_enable
user_pwd, number_of_cn, kms_key_id, volume, dss_pool_id, logical_cluster_enable, lts_enable, `force_backup`,
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func getClusterResourceFunc(cfg *config.Config, state *terraform.ResourceState)
// getDwsCluster: Query the DWS cluster.
getDwsClusterClient, err := cfg.NewServiceClient("dws", region)
if err != nil {
return nil, fmt.Errorf("error creating DWS Client: %s", err)
return nil, fmt.Errorf("error creating DWS client: %s", err)
}

getDwsClusterRespBody, err := dws.GetClusterInfoByClusterId(getDwsClusterClient, state.Primary.ID)
if err != nil {
return nil, fmt.Errorf("error retrieving DwsCluster: %s", err)
return nil, fmt.Errorf("error retrieving DWS cluster: %s", err)
}

return getDwsClusterRespBody, nil
Expand Down Expand Up @@ -89,6 +89,7 @@ func TestAccResourceCluster_basicV1(t *testing.T) {
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "name", name),
resource.TestCheckResourceAttr(resourceName, "description", ""),
resource.TestCheckResourceAttr(resourceName, "number_of_node", "3"),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.public_bind_type", dws.PublicBindTypeNotUse),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.eip_id", ""),
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
Expand All @@ -102,7 +103,7 @@ func TestAccResourceCluster_basicV1(t *testing.T) {
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "logical_cluster_enable"},
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "logical_cluster_enable", "force_backup"},
},
},
})
Expand Down Expand Up @@ -218,7 +219,7 @@ data "huaweicloud_availability_zones" "test" {}
resource "huaweicloud_dws_cluster" "test" {
name = "%[3]s"
node_type = "dwsk2.xlarge"
number_of_node = 6
number_of_node = 3
vpc_id = huaweicloud_vpc.test.id
network_id = huaweicloud_vpc_subnet.test.id
security_group_id = huaweicloud_networking_secgroup.test2.id
Expand Down Expand Up @@ -314,9 +315,10 @@ func TestAccResourceCluster_basicV2(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
rc.CheckResourceExists(),
resource.TestCheckResourceAttr(resourceName, "name", name),
resource.TestCheckResourceAttr(resourceName, "number_of_node", "3"),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.public_bind_type", dws.PublicBindTypeNotUse),
resource.TestCheckResourceAttr(resourceName, "public_ip.0.eip_id", ""),
resource.TestCheckResourceAttr(resourceName, "elb.0.name", ""),
resource.TestCheckResourceAttr(resourceName, "elb.#", "0"),
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
resource.TestCheckResourceAttr(resourceName, "description", ""),
),
Expand All @@ -330,7 +332,7 @@ func TestAccResourceCluster_basicV2(t *testing.T) {
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"user_pwd", "number_of_cn", "volume", "endpoints", "lts_enable",
"logical_cluster_enable", "elb_id"},
"logical_cluster_enable", "elb_id", "force_backup"},
},
},
})
Expand Down Expand Up @@ -462,7 +464,8 @@ func testAccDwsCluster_basicV2_step3(rName, password string) string {
resource "huaweicloud_dws_cluster" "testv2" {
name = "%[2]s"
node_type = "dwsk2.xlarge"
number_of_node = 6
number_of_node = 3
force_backup = false
vpc_id = huaweicloud_vpc.test.id
network_id = huaweicloud_vpc_subnet.test.id
security_group_id = huaweicloud_networking_secgroup.test2.id
Expand Down
158 changes: 110 additions & 48 deletions huaweicloud/services/dws/resource_huaweicloud_dws_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const ClusterIdIllegalErrCode = "DWS.0001"
// @API DWS DELETE /v2/{project_id}/clusters/{cluster_id}/eips/{eip_id}
// @API DWS POST /v1/{project_id}/clusters/{cluster_id}/description
// @API DWS PUT /v1/{project_id}/clusters/{cluster_id}/security-group
// @API DWS POST /v1.0/{project_id}/clusters/{cluster_id}/cluster-shrink
func ResourceDwsCluster() *schema.Resource {
return &schema.Resource{
CreateContext: resourceDwsClusterCreate,
Expand Down Expand Up @@ -215,6 +216,12 @@ func ResourceDwsCluster() *schema.Resource {
Optional: true,
Description: `The description of the cluster.`,
},
"force_backup": {
Type: schema.TypeBool,
Optional: true,
Default: true,
Description: `Whether to automatically execute snapshot when shrinking the number of nodes.`,
},
"status": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -464,7 +471,7 @@ func resourceDwsClusterCreateV2(ctx context.Context, d *schema.ResourceData, met
)
createDwsClusterClient, err := cfg.NewServiceClient(createDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

createDwsClusterPath := createDwsClusterClient.Endpoint + createDwsClusterHttpUrl
Expand All @@ -478,7 +485,7 @@ func resourceDwsClusterCreateV2(ctx context.Context, d *schema.ResourceData, met
createDwsClusterOpt.JSONBody = utils.RemoveNil(buildCreateDwsClusterBodyParams(d, cfg))
createDwsClusterResp, err := createDwsClusterClient.Request("POST", createDwsClusterPath, &createDwsClusterOpt)
if err != nil {
return diag.Errorf("error creating DWS Cluster: %s", err)
return diag.Errorf("error creating DWS cluster: %s", err)
}

createDwsClusterRespBody, err := utils.FlattenResponse(createDwsClusterResp)
Expand Down Expand Up @@ -539,7 +546,7 @@ func resourceDwsClusterCreateV1(ctx context.Context, d *schema.ResourceData, met
)
createDwsClusterClient, err := cfg.NewServiceClient(createDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

createDwsClusterPath := createDwsClusterClient.Endpoint + createDwsClusterHttpUrl
Expand All @@ -553,7 +560,7 @@ func resourceDwsClusterCreateV1(ctx context.Context, d *schema.ResourceData, met
createDwsClusterOpt.JSONBody = utils.RemoveNil(buildCreateDwsClusterBodyParamsV1(d, cfg))
createDwsClusterResp, err := createDwsClusterClient.Request("POST", createDwsClusterPath, &createDwsClusterOpt)
if err != nil {
return diag.Errorf("error creating DWS Cluster: %s", err)
return diag.Errorf("error creating DWS cluster: %s", err)
}

createDwsClusterRespBody, err := utils.FlattenResponse(createDwsClusterResp)
Expand Down Expand Up @@ -746,16 +753,12 @@ func resourceDwsClusterRead(_ context.Context, d *schema.ResourceData, meta inte
// getDwsCluster: Query the DWS cluster.
getDwsClusterClient, err := cfg.NewServiceClient("dws", region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

getDwsClusterRespBody, err := GetClusterInfoByClusterId(getDwsClusterClient, d.Id())
if err != nil {
return common.CheckDeletedDiag(d, err, "error retrieving DWS Cluster")
}

if err != nil {
return diag.FromErr(err)
return common.CheckDeletedDiag(d, err, "error retrieving DWS cluster")
}

mErr = multierror.Append(
Expand Down Expand Up @@ -914,7 +917,7 @@ func resourceDwsClusterUpdate(ctx context.Context, d *schema.ResourceData, meta
expandInstanceStorageOpt.JSONBody = utils.RemoveNil(buildExpandInstanceStorageBodyParams(d))
_, err = clusterClient.Request("POST", expandInstanceStoragePath, &expandInstanceStorageOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
return diag.Errorf("error updating DWS cluster: %s", err)
}
err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
Expand Down Expand Up @@ -943,42 +946,18 @@ func resourceDwsClusterUpdate(ctx context.Context, d *schema.ResourceData, meta
resetPasswordOfClusterOpt.JSONBody = utils.RemoveNil(buildResetPasswordOfClusterBodyParams(d))
_, err = clusterClient.Request("POST", resetPasswordOfClusterPath, &resetPasswordOfClusterOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
return diag.Errorf("error updating DWS cluster: %s", err)
}

err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return diag.Errorf("error waiting for the DWS cluster (%s) update to complete: %s", clusterId, err)
}
}
scaleOutClusterChanges := []string{
"number_of_node",
}

if d.HasChanges(scaleOutClusterChanges...) {
// scaleOutCluster: Scale out DWS cluster
var (
scaleOutClusterHttpUrl = "v1.0/{project_id}/clusters/{cluster_id}/resize"
)

scaleOutClusterPath := clusterClient.Endpoint + scaleOutClusterHttpUrl
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{project_id}", clusterClient.ProjectID)
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{cluster_id}", d.Id())

scaleOutClusterOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
}

scaleOutClusterOpt.JSONBody = utils.RemoveNil(buildScaleOutClusterBodyParams(d))
_, err = clusterClient.Request("POST", scaleOutClusterPath, &scaleOutClusterOpt)
if err != nil {
return diag.Errorf("error updating DWS Cluster: %s", err)
}

err = clusterWaitingForAvailable(ctx, d, clusterClient, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return diag.Errorf("error waiting for the DWS cluster (%s) update to complete: %s", clusterId, err)
if d.HasChange("number_of_node") {
if err := updateClusterNodes(ctx, clusterClient, d, clusterId); err != nil {
return diag.FromErr(err)
}
}

Expand Down Expand Up @@ -1074,16 +1053,99 @@ func buildResetPasswordOfClusterBodyParams(d *schema.ResourceData) map[string]in
return bodyParams
}

func buildScaleOutClusterBodyParams(d *schema.ResourceData) map[string]interface{} {
oldValue, newValue := d.GetChange("number_of_node")
num := newValue.(int) - oldValue.(int)
func updateClusterNodes(ctx context.Context, client *golangsdk.ServiceClient, d *schema.ResourceData, clusterId string) error {
var (
oldValue, newValue = d.GetChange("number_of_node")
nodeNum = newValue.(int) - oldValue.(int)
err error
)

bodyParams := map[string]interface{}{
"scale_out": map[string]interface{}{
"count": num,
if nodeNum > 0 {
err = scaleOutCluster(client, clusterId, nodeNum)
} else {
err = scaleInCluster(client, d, clusterId, -nodeNum)
}

if err != nil {
return err
}

return waitClusterTaskStateCompleted(ctx, client, d.Timeout(schema.TimeoutUpdate), clusterId)
}

func scaleOutCluster(client *golangsdk.ServiceClient, clusterId string, scaleOutNodes int) error {
scaleOutClusterHttpUrl := "v1.0/{project_id}/clusters/{cluster_id}/resize"
scaleOutClusterPath := client.Endpoint + scaleOutClusterHttpUrl
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{project_id}", client.ProjectID)
scaleOutClusterPath = strings.ReplaceAll(scaleOutClusterPath, "{cluster_id}", clusterId)

scaleOutClusterOpt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
JSONBody: map[string]interface{}{
"scale_out": map[string]interface{}{
"count": scaleOutNodes,
},
},
}
return bodyParams
_, err := client.Request("POST", scaleOutClusterPath, &scaleOutClusterOpt)
if err != nil {
return fmt.Errorf("error extending nodes of the DWS cluster (%s) : %s", clusterId, err)
}
return nil
}

func buildScaleInBodyParams(d *schema.ResourceData, shrinkNum int, datastoreType string) map[string]interface{} {
return map[string]interface{}{
"shrink_number": shrinkNum,
"force_backup": d.Get("force_backup"),
"type": datastoreType,
}
}

func scaleInCluster(client *golangsdk.ServiceClient, d *schema.ResourceData, clusterId string, shrinkNum int) error {
clusterResp, err := GetClusterInfoByClusterId(client, clusterId)
if err != nil {
return err
}

datastoreType := utils.PathSearch("cluster.datastore_type", clusterResp, "").(string)
if datastoreType == "" {
return fmt.Errorf("unable to get datastore type of the cluster (%s)", clusterId)
}

httpUrl := "v1.0/{project_id}/clusters/{cluster_id}/cluster-shrink"
scaleInPath := client.Endpoint + httpUrl
scaleInPath = strings.ReplaceAll(scaleInPath, "{project_id}", client.ProjectID)
scaleInPath = strings.ReplaceAll(scaleInPath, "{cluster_id}", d.Id())

opt := golangsdk.RequestOpts{
KeepResponseBody: true,
MoreHeaders: requestOpts.MoreHeaders,
JSONBody: utils.RemoveNil(buildScaleInBodyParams(d, shrinkNum, datastoreType)),
}
resp, err := client.Request("POST", scaleInPath, &opt)
if err != nil {
return err
}

respBody, err := utils.FlattenResponse(resp)
if err != nil {
return err
}

// a. In some cases, the status code is 200 when the scaling-in fails, such as: The scale-in number is incorrect.
// b. The two situations of successful scale-in are as follows:
// 1. When the status id `200`, the error_code` is `DWS.0000`, it means the scale-in is successful.
// 2. When the status id `202`, the job_id` is not empty, it means the scale-in is successful.
errCode := utils.PathSearch("error_code", respBody, "").(string)
jobId := utils.PathSearch("job_id", respBody, "").(string)
if errCode == "DWS.0000" || jobId != "" {
return nil
}

errMsg := utils.PathSearch("error_msg", respBody, "").(string)
return fmt.Errorf("error shrinking nodes of the DWS cluster (%s) : %s", clusterId, errMsg)
}

func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
Expand All @@ -1097,7 +1159,7 @@ func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta
)
deleteDwsClusterClient, err := cfg.NewServiceClient(deleteDwsClusterProduct, region)
if err != nil {
return diag.Errorf("error creating DWS Client: %s", err)
return diag.Errorf("error creating DWS client: %s", err)
}

deleteDwsClusterPath := deleteDwsClusterClient.Endpoint + deleteDwsClusterHttpUrl
Expand All @@ -1112,7 +1174,7 @@ func resourceDwsClusterDelete(ctx context.Context, d *schema.ResourceData, meta
deleteDwsClusterOpt.JSONBody = utils.RemoveNil(buildDeleteDwsClusterBodyParams(d))
_, err = deleteDwsClusterClient.Request("DELETE", deleteDwsClusterPath, &deleteDwsClusterOpt)
if err != nil {
return diag.Errorf("error deleting DWS Cluster: %s", err)
return diag.Errorf("error deleting DWS cluster: %s", err)
}

err = deleteClusterWaitingForCompleted(ctx, d, deleteDwsClusterClient, d.Timeout(schema.TimeoutDelete))
Expand Down

0 comments on commit 3200f16

Please sign in to comment.