From 266073f6112bf09a5bf5b79e7bd5be7b101dbab2 Mon Sep 17 00:00:00 2001 From: CaiWanting Date: Mon, 22 Jul 2024 16:54:28 +0800 Subject: [PATCH] feat(dms): support kafka partition reassignment task management (#5229) --- .../resources/dms_kafka_partition_reassign.md | 130 +++++++++ huaweicloud/provider.go | 1 + ...cloud_dms_kafka_partition_reassign_test.go | 102 +++++++ ...uaweicloud_dms_kafka_partition_reassign.go | 252 ++++++++++++++++++ 4 files changed, 485 insertions(+) create mode 100644 docs/resources/dms_kafka_partition_reassign.md create mode 100644 huaweicloud/services/acceptance/dms/resource_huaweicloud_dms_kafka_partition_reassign_test.go create mode 100644 huaweicloud/services/dms/resource_huaweicloud_dms_kafka_partition_reassign.go diff --git a/docs/resources/dms_kafka_partition_reassign.md b/docs/resources/dms_kafka_partition_reassign.md new file mode 100644 index 0000000000..e31967f627 --- /dev/null +++ b/docs/resources/dms_kafka_partition_reassign.md @@ -0,0 +1,130 @@ +--- +subcategory: "Distributed Message Service (DMS)" +layout: "huaweicloud" +page_title: "HuaweiCloud: huaweicloud_dms_kafka_partition_reassign" +description: |- + Manages a DMS kafka partition reassign resource within HuaweiCloud. +--- + +# huaweicloud_dms_kafka_partition_reassign + +Manages a DMS kafka partition reassign resource within HuaweiCloud. + +## Example Usage + +### Create a partition reassignment task by manually specified assignment plan + +```hcl +variable "instance_id" {} +variable "topic_name" {} + +resource "huaweicloud_dms_kafka_partition_reassign" "test" { + instance_id = var.instance_id + + reassignments { + topic = var.topic_name + + assignment { + partition = 0 + partition_brokers = [0,1,2] + } + + assignment { + partition = 1 + partition_brokers = [2,0,1] + } + + assignment { + partition = 2 + partition_brokers = [1,2,0] + } + } +} +``` + +### Create a partition reassignment task by automatic assignment plan + +```hcl +variable "instance_id" {} +variable "topic_name" {} + +resource "huaweicloud_dms_kafka_partition_reassign" "test" { + instance_id = var.instance_id + + reassignments { + topic = var.topic_name + brokers = [0,1,2] + replication_factor = 1 + } +} +``` + +## Argument Reference + +The following arguments are supported: + +* `region` - (Optional, String, ForceNew) The region in which to create the resource. + If omitted, the provider-level region will be used. Changing this creates a new resource. + +* `instance_id` - (Required, String, ForceNew) Specifies the DMS kafka instance ID. + Changing this creates a new resource. + +* `reassignments` - (Required, List, ForceNew) Specifies the reassignment plan. + Changing this creates a new resource. + The [reassignments](#reassignments_struct) structure is documented below. + +* `throttle` - (Optional, Int, ForceNew) Specifies the reassignment threshold. Value can be specified ranges from **1** + to **300**. The unit is **MB/s**. Or specifies it to **-1**, indicating no throttling required. + Changing this creates a new resource. + +* `is_schedule` - (Optional, Bool, ForceNew) Specifies whether the task is scheduled. Defaults to **false**. + Changing this creates a new resource. + +* `execute_at` - (Optional, Int, ForceNew) Specifies the schedule time. The value is a UNIX timestamp, in **ms**. + It's required if `is_schedule` is **true**. Changing this creates a new resource. + +* `time_estimate` - (Optional, Bool, ForceNew) Specifies whether to perform time estimation tasks. Defaults to **false**. + Changing this creates a new resource. + + +The `reassignments` block supports: + +* `topic` - (Required, String, ForceNew) Specifies the topic name. Changing this creates a new resource. + +* `brokers` - (Optional, List, ForceNew) Specifies the integer list of brokers to which partitions are reassigned. + It's **required** in **automatic** assignment. Changing this creates a new resource. + +* `replication_factor` - (Optional, Int, ForceNew) Specifies the replication factor, which can be specified in + **automatic** assignment. Changing this creates a new resource. + +* `assignment` - (Optional, List, ForceNew) Specifies the manually specified assignment plan. + It's **required** in **manually** specified assignment. Changing this creates a new resource. + The [assignment](#reassignments_assignment_struct) structure is documented below. + +-> If manually specified assignment and automatic assignment are both specified, only **manually** specified assignment +will take effect. + + +The `assignment` block supports: + +* `partition` - (Optional, Int, ForceNew) Specifies the partition number in manual assignment. + It's actually **required** in **manual** assignment plan. Changing this creates a new resource. + +* `partition_brokers` - (Optional, List, ForceNew) Specifies the integer list of brokers to be assigned to a partition in + manual assignment. It's actually **required** in **manual** assignment plan. Changing this creates a new resource. + +## Attribute Reference + +In addition to all arguments above, the following attributes are exported: + +* `id` - The resource ID in UUID format. + +* `task_id` - Indicates the task ID, and it's only returned for a partition reassignment task. + +* `reassignment_time` - Indicates the estimated time, in seconds, and it's only returned for a time estimation task. + +## Timeouts + +This resource provides the following timeout configuration options: + +* `create` - Default is 20 minutes. diff --git a/huaweicloud/provider.go b/huaweicloud/provider.go index 1c8f0b67fa..eaf4972754 100644 --- a/huaweicloud/provider.go +++ b/huaweicloud/provider.go @@ -1286,6 +1286,7 @@ func Provider() *schema.Provider { "huaweicloud_dms_kafka_permissions": dms.ResourceDmsKafkaPermissions(), "huaweicloud_dms_kafka_instance": dms.ResourceDmsKafkaInstance(), "huaweicloud_dms_kafka_topic": dms.ResourceDmsKafkaTopic(), + "huaweicloud_dms_kafka_partition_reassign": dms.ResourceDmsKafkaPartitionReassign(), "huaweicloud_dms_kafka_consumer_group": dms.ResourceDmsKafkaConsumerGroup(), "huaweicloud_dms_kafka_smart_connect": dms.ResourceDmsKafkaSmartConnect(), "huaweicloud_dms_kafka_smart_connect_task": dms.ResourceDmsKafkaSmartConnectTask(), diff --git a/huaweicloud/services/acceptance/dms/resource_huaweicloud_dms_kafka_partition_reassign_test.go b/huaweicloud/services/acceptance/dms/resource_huaweicloud_dms_kafka_partition_reassign_test.go new file mode 100644 index 0000000000..55b92403d7 --- /dev/null +++ b/huaweicloud/services/acceptance/dms/resource_huaweicloud_dms_kafka_partition_reassign_test.go @@ -0,0 +1,102 @@ +package dms + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance" +) + +func TestAccKafkaPartitionReassign_basic(t *testing.T) { + rName := acceptance.RandomAccResourceNameWithDash() + resourceName := "huaweicloud_dms_kafka_partition_reassign.test" + + // Avoid CheckDestroy + // lintignore:AT001 + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acceptance.TestAccPreCheck(t) + }, + ProviderFactories: acceptance.TestAccProviderFactories, + Steps: []resource.TestStep{ + { + Config: testAccKafkaPartitionReassign_basic(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "task_id"), + ), + }, + { + Config: testAccKafkaPartitionReassign_automatical(rName, false), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "task_id"), + ), + }, + { + Config: testAccKafkaPartitionReassign_automatical(rName, true), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrSet(resourceName, "reassignment_time"), + ), + }, + }, + }) +} + +func testAccKafkaPartitionReassign_basic(rName string) string { + return fmt.Sprintf(` +%[1]s + +resource "huaweicloud_dms_kafka_topic" "test" { + instance_id = huaweicloud_dms_kafka_instance.test.id + name = "%s" + partitions = 2 + replicas = 3 +} + +resource "huaweicloud_dms_kafka_partition_reassign" "test" { + depends_on = [huaweicloud_dms_kafka_topic.test] + + instance_id = huaweicloud_dms_kafka_instance.test.id + + reassignments { + topic = huaweicloud_dms_kafka_topic.test.name + + assignment { + partition = 0 + partition_brokers = [0,1,2] + } + + assignment { + partition = 1 + partition_brokers = [2,0,1] + } + } +}`, testAccKafkaInstance_basic(rName), rName) +} + +func testAccKafkaPartitionReassign_automatical(rName string, timeEstimate bool) string { + return fmt.Sprintf(` +%[1]s + +resource "huaweicloud_dms_kafka_topic" "test" { + instance_id = huaweicloud_dms_kafka_instance.test.id + name = "%s" + partitions = 2 + replicas = 3 +} + +resource "huaweicloud_dms_kafka_partition_reassign" "test" { + depends_on = [huaweicloud_dms_kafka_topic.test] + + instance_id = huaweicloud_dms_kafka_instance.test.id + throttle = -1 + time_estimate = %t + + reassignments { + topic = huaweicloud_dms_kafka_topic.test.name + brokers = [0,1,2] + replication_factor = 3 + } +}`, testAccKafkaInstance_basic(rName), rName, timeEstimate) +} diff --git a/huaweicloud/services/dms/resource_huaweicloud_dms_kafka_partition_reassign.go b/huaweicloud/services/dms/resource_huaweicloud_dms_kafka_partition_reassign.go new file mode 100644 index 0000000000..574d8b8f70 --- /dev/null +++ b/huaweicloud/services/dms/resource_huaweicloud_dms_kafka_partition_reassign.go @@ -0,0 +1,252 @@ +package dms + +import ( + "context" + "strings" + "time" + + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + + "github.com/chnsz/golangsdk" + + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config" + "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/utils" +) + +// @API Kafka POST /v2/kafka/{project_id}/instances/{instance_id}/reassign +// @API Kafka GET /v2/{project_id}/instances/{instance_id}/tasks/{task_id} +func ResourceDmsKafkaPartitionReassign() *schema.Resource { + return &schema.Resource{ + CreateContext: resourceDmsKafkaPartitionReassignCreate, + ReadContext: resourceDmsKafkaPartitionReassignRead, + DeleteContext: resourceDmsKafkaPartitionReassignDelete, + + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(20 * time.Minute), + }, + + Schema: map[string]*schema.Schema{ + "region": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ForceNew: true, + }, + "instance_id": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "reassignments": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "topic": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "brokers": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeInt}, + }, + "replication_factor": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + }, + "assignment": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "partition": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + }, + "partition_brokers": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeInt}, + }, + }, + }, + }, + }, + }, + }, + "throttle": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + }, + "is_schedule": { + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "execute_at": { + Type: schema.TypeInt, + Optional: true, + ForceNew: true, + }, + "time_estimate": { + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + }, + "task_id": { + Type: schema.TypeString, + Computed: true, + }, + "reassignment_time": { + Type: schema.TypeInt, + Computed: true, + }, + }, + } +} + +func resourceDmsKafkaPartitionReassignCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + cfg := meta.(*config.Config) + region := cfg.GetRegion(d) + client, err := cfg.NewServiceClient("dmsv2", region) + if err != nil { + return diag.Errorf("error creating DMS client: %s", err) + } + + instanceID := d.Get("instance_id").(string) + + createHttpUrl := "v2/kafka/{project_id}/instances/{instance_id}/reassign" + createPath := client.Endpoint + createHttpUrl + createPath = strings.ReplaceAll(createPath, "{project_id}", client.ProjectID) + createPath = strings.ReplaceAll(createPath, "{instance_id}", instanceID) + createOpt := golangsdk.RequestOpts{ + KeepResponseBody: true, + JSONBody: utils.RemoveNil(buildCreateKafkaPartitionReassignBodyParams(d)), + } + + createResp, err := client.Request("POST", createPath, &createOpt) + if err != nil { + return diag.Errorf("error creating kafka partition reassignment task: %s", err) + } + createRespBody, err := utils.FlattenResponse(createResp) + if err != nil { + return diag.FromErr(err) + } + + // set ID in UUID format for time estimation task having no `job_id` or `schedule_id` in return + id, err := uuid.GenerateUUID() + if err != nil { + return diag.Errorf("unable to generate ID: %s", err) + } + d.SetId(id) + + // just return one of the `reassignment_time`, `job_id` and `schedule_id`, depends on the value of `time_estimate` and `is_schedule` + reassignmentTime := utils.PathSearch("reassignment_time", createRespBody, nil) + jobID := utils.PathSearch("job_id", createRespBody, nil) + scheduleID := utils.PathSearch("schedule_id", createRespBody, nil) + + switch { + case jobID != nil: + // wait for task complete + // if it's not scheduled task, use `/v2/{project_id}/instances/{instance_id}/tasks/{task_id}` to search task. + stateConf := &resource.StateChangeConf{ + Pending: []string{"CREATED", "EXECUTING"}, + Target: []string{"SUCCESS"}, + Refresh: kafkaInstanceTaskStatusRefreshFunc(client, instanceID, jobID.(string)), + Timeout: d.Timeout(schema.TimeoutCreate), + Delay: 1 * time.Second, + PollInterval: 5 * time.Second, + } + if _, err := stateConf.WaitForStateContext(ctx); err != nil { + return diag.Errorf("error waiting for the Kafka instance (%s) partition reassignment task to be finished: %s ", + instanceID, err) + } + + // since the resource ID is in UUID format, return `job_id`. + d.Set("task_id", jobID) + case scheduleID != nil: + // if it's a scheduled task, use `/v2/{project_id}/instances/{instance_id}/scheduled-tasks` to search task, + // but it is not an open API, so return `schedule_id` for scheduled task. + d.Set("task_id", scheduleID) + case reassignmentTime != nil: + // set `reassignment_time` for time estimation task + d.Set("reassignment_time", reassignmentTime) + default: + return diag.Errorf("error creating kafka partition reassignment task: `reassignment_time`, `job_id` and " + + "`schedule_id` are all not found in response") + } + + return resourceDmsKafkaPartitionReassignRead(ctx, d, meta) +} + +func buildCreateKafkaPartitionReassignBodyParams(d *schema.ResourceData) map[string]interface{} { + bodyParams := map[string]interface{}{ + "reassignments": buildCreateReassignBodyParamsReassignments(d.Get("reassignments").([]interface{})), + "throttle": utils.ValueIgnoreEmpty(d.Get("throttle")), + "is_schedule": utils.ValueIgnoreEmpty(d.Get("is_schedule")), + "execute_at": utils.ValueIgnoreEmpty(d.Get("execute_at")), + "time_estimate": utils.ValueIgnoreEmpty(d.Get("time_estimate")), + } + return bodyParams +} + +func buildCreateReassignBodyParamsReassignments(rawParams []interface{}) []map[string]interface{} { + rst := make([]map[string]interface{}, 0, len(rawParams)) + for _, val := range rawParams { + raw := val.(map[string]interface{}) + params := map[string]interface{}{ + "topic": raw["topic"], + "brokers": utils.ValueIgnoreEmpty(raw["brokers"]), + "replication_factor": utils.ValueIgnoreEmpty(raw["replication_factor"]), + "assignment": buildCreateReassignBodyParamsAssignment(raw["assignment"].([]interface{})), + } + rst = append(rst, params) + } + + return rst +} + +func buildCreateReassignBodyParamsAssignment(rawParams []interface{}) []map[string]interface{} { + if len(rawParams) == 0 { + return nil + } + rst := make([]map[string]interface{}, 0) + for _, val := range rawParams { + raw, ok := val.(map[string]interface{}) + if !ok { + continue + } + params := map[string]interface{}{ + "partition": raw["partition"], + "partition_brokers": utils.ValueIgnoreEmpty(raw["partition_brokers"]), + } + rst = append(rst, params) + } + return rst +} + +func resourceDmsKafkaPartitionReassignRead(_ context.Context, _ *schema.ResourceData, _ interface{}) diag.Diagnostics { + return nil +} + +func resourceDmsKafkaPartitionReassignDelete(_ context.Context, _ *schema.ResourceData, _ interface{}) diag.Diagnostics { + errorMsg := "Deleting resource is not supported. The resource is only removed from the state, the task remains in the cloud." + return diag.Diagnostics{ + diag.Diagnostic{ + Severity: diag.Warning, + Summary: errorMsg, + }, + } +}