Skip to content

Commit

Permalink
feat: update provider util.go to include created and last updated tim…
Browse files Browse the repository at this point in the history
…estamps
  • Loading branch information
Gezi-lzq committed Aug 8, 2024
1 parent a111985 commit 91922c9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 65 deletions.
113 changes: 63 additions & 50 deletions internal/provider/automq_kafka_instance_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type KafkaInstanceResourceModel struct {
ACL types.Bool `tfsdk:"acl"`
Integrations []IntegrationModel `tfsdk:"integrations"`
CreatedAt timetypes.RFC3339 `tfsdk:"created_at"`
LastUpdated types.String `tfsdk:"last_updated"`
LastUpdated timetypes.RFC3339 `tfsdk:"last_updated"`
InstanceStatus types.String `tfsdk:"instance_status"`
Timeouts timeouts.Value `tfsdk:"timeouts"`
}
Expand Down Expand Up @@ -280,7 +280,8 @@ func (r *KafkaInstanceResource) Create(ctx context.Context, req resource.CreateR
return
}

instance.LastUpdated = types.StringValue(time.Now().Format(time.RFC850))
out, err = r.client.GetKafkaInstance(instanceId)

Check failure on line 283 in internal/provider/automq_kafka_instance_resource.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)

Check failure on line 283 in internal/provider/automq_kafka_instance_resource.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)
FlattenKafkaInstanceModel(out, &instance)

// Save data into Terraform state
resp.Diagnostics.Append(resp.State.Set(ctx, &instance)...)
Expand All @@ -301,83 +302,95 @@ func (r *KafkaInstanceResource) Read(ctx context.Context, req resource.ReadReque
}

func (r *KafkaInstanceResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
var data KafkaInstanceResourceModel
var plan, state KafkaInstanceResourceModel

// Read Terraform plan data into the model
resp.Diagnostics.Append(req.Plan.Get(ctx, &data)...)

resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...)
if resp.Diagnostics.HasError() {
return
}
if data.InstanceID.IsNull() && data.InstanceID.ValueString() == "" {
resp.Diagnostics.AddError("Client Error", "Instance ID is required for updating Kafka instance")
resp.Diagnostics.Append(req.State.Get(ctx, &state)...)
if resp.Diagnostics.HasError() {
return
}

instance, err := GetKafkaInstance(&data, r.client)
// check if the instance exists
instance, err := GetKafkaInstance(&state, r.client)
instanceId := instance.InstanceID
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to get Kafka instance %q, got error: %s", data.InstanceID.ValueString(), err))
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to get Kafka instance %q, got error: %s", instanceId, err))
return
}
if instance == nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Kafka instance %q not found", data.InstanceID.ValueString()))
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Kafka instance %q not found", instanceId))
return
}

// check if the instance is in available state
if instance.Status != stateAvailable {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Kafka instance %q is not in available state", data.InstanceID.ValueString()))
return
}

instanceId := data.InstanceID.ValueString()

// Generate API request body from plan
basicUpdate := client.InstanceBasicParam{
DisplayName: data.Name.ValueString(),
Description: data.Description.ValueString(),
}
specUpdate := client.SpecificationUpdateParam{
Values: make([]client.KafkaInstanceRequestValues, 1),
}
specUpdate.Values[0] = client.KafkaInstanceRequestValues{
Key: "aku",
Value: fmt.Sprintf("%d", data.ComputeSpecs.Aku.ValueInt64()),
}

_, err = r.client.UpdateKafkaInstanceBasicInfo(data.InstanceID.ValueString(), basicUpdate)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", data.InstanceID.ValueString(), err))
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Kafka instance %q is not in available state", instanceId))
return
}

if data.ComputeSpecs.Version.ValueString() != "" {
_, err = r.client.UpdateKafkaInstanceVersion(data.InstanceID.ValueString(), data.ComputeSpecs.Version.ValueString())
// Check if the basic info has changed
if state.Name != plan.Name || state.Description != plan.Description {
// Generate API request body from plan
basicUpdate := client.InstanceBasicParam{
DisplayName: state.Name.ValueString(),
Description: state.Description.ValueString(),
}
_, err = r.client.UpdateKafkaInstanceBasicInfo(instanceId, basicUpdate)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", data.InstanceID.ValueString(), err))
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", instanceId, err))
return
}
}

updateTimeout := r.UpdateTimeout(ctx, data.Timeouts)
updateTimeout := r.UpdateTimeout(ctx, state.Timeouts)

if err := waitForKafkaClusterToProvision(ctx, r.client, instanceId, stateChanging, updateTimeout); err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Error waiting for Kafka Cluster %q to provision: %s", instanceId, err))
return
// Check if the compute specs (version) has changed
planVersion := plan.ComputeSpecs.Version.ValueString()
stateVersion := state.ComputeSpecs.Version.ValueString()
if planVersion != "" && planVersion != stateVersion {
_, err = r.client.UpdateKafkaInstanceVersion(state.InstanceID.ValueString(), planVersion)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", instanceId, err))
return
}
// wait for version update
if err := waitForKafkaClusterToProvision(ctx, r.client, instanceId, stateChanging, updateTimeout); err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Error waiting for Kafka Cluster %q to provision: %s", instanceId, err))
return
}
}

_, err = r.client.UpdateKafkaInstanceComputeSpecs(data.InstanceID.ValueString(), specUpdate)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", data.InstanceID.ValueString(), err))
return
}
if err := waitForKafkaClusterToProvision(ctx, r.client, instanceId, stateChanging, updateTimeout); err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Error waiting for Kafka Cluster %q to provision: %s", instanceId, err))
return
stateAKU := state.ComputeSpecs.Aku.ValueInt64()
planAKU := plan.ComputeSpecs.Aku.ValueInt64()
if stateAKU != planAKU {
// Generate API request body from plan
specUpdate := client.SpecificationUpdateParam{
Values: make([]client.KafkaInstanceRequestValues, 1),
}
specUpdate.Values[0] = client.KafkaInstanceRequestValues{
Key: "aku",
Value: fmt.Sprintf("%d", state.ComputeSpecs.Aku.ValueInt64()),
}
_, err = r.client.UpdateKafkaInstanceComputeSpecs(state.InstanceID.ValueString(), specUpdate)
if err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Unable to update Kafka instance %q, got error: %s", instanceId, err))
return
}
// wait for aku update
if err := waitForKafkaClusterToProvision(ctx, r.client, instanceId, stateChanging, updateTimeout); err != nil {
resp.Diagnostics.AddError("Client Error", fmt.Sprintf("Error waiting for Kafka Cluster %q to provision: %s", instanceId, err))
return
}
}

data.LastUpdated = types.StringValue(time.Now().Format(time.RFC850))
// get latest info
out, err := r.client.GetKafkaInstance(instanceId)

Check failure on line 390 in internal/provider/automq_kafka_instance_resource.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)

Check failure on line 390 in internal/provider/automq_kafka_instance_resource.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)
FlattenKafkaInstanceModel(out, &plan)
// Save updated data into Terraform state
resp.Diagnostics.Append(resp.State.Set(ctx, &data)...)
resp.Diagnostics.Append(resp.State.Set(ctx, &plan)...)
}

func (r *KafkaInstanceResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
Expand Down
20 changes: 5 additions & 15 deletions internal/provider/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@ package provider
import (
"fmt"
"terraform-provider-automq/client"
"time"

"github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes"
"github.com/hashicorp/terraform-plugin-framework/types"
)

const (
Aliyun = "aliyun"
AWS = "aws"
)

func getTimeoutFor(cloudProvider string) time.Duration {
if cloudProvider == Aliyun {
return 20 * time.Minute
} else {
return 30 * time.Minute
}
}

func isNotFoundError(err error) bool {
condition, ok := err.(*client.ErrorResponse)
return ok && condition.Code == 404
Expand Down Expand Up @@ -52,10 +39,13 @@ func FlattenKafkaInstanceModel(instance *client.KafkaInstanceResponse, resource
resource.Description = types.StringValue(instance.Description)
resource.CloudProvider = types.StringValue(instance.Provider)
resource.Region = types.StringValue(instance.Region)
resource.NetworkType = types.StringValue("VPC")

resource.Networks = flattenNetworks(instance.Networks)
resource.ComputeSpecs = flattenComputeSpecs(instance.Spec)

resource.CreatedAt = timetypes.NewRFC3339TimePointerValue(&instance.GmtCreate)
resource.LastUpdated = timetypes.NewRFC3339TimePointerValue(&instance.GmtModified)

resource.InstanceStatus = types.StringValue(instance.Status)
}

Expand Down

0 comments on commit 91922c9

Please sign in to comment.