From e50f344aaa787caf263b00d1951286261cc59d1e Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Wed, 25 Sep 2024 02:01:47 +0800 Subject: [PATCH] feat: add source graph command --- pkg/apis/api.kusion.io/v1/types.go | 59 ++- pkg/backend/backend.go | 4 + pkg/backend/storages/local.go | 6 + pkg/backend/storages/local_test.go | 58 +++ pkg/backend/storages/oss.go | 6 + pkg/backend/storages/oss_test.go | 62 +++- pkg/backend/storages/s3.go | 6 + pkg/backend/storages/s3_test.go | 62 ++++ pkg/cmd/apply/apply.go | 154 ++++++-- pkg/cmd/apply/apply_test.go | 86 ++++- pkg/cmd/destroy/destroy.go | 6 + pkg/cmd/release/list_test.go | 5 + pkg/cmd/release/show_test.go | 5 + pkg/cmd/release/unlock_test.go | 5 + pkg/cmd/resource/graph.go | 350 ++++++++++++++++++ pkg/cmd/resource/resource.go | 1 + pkg/engine/operation/apply.go | 36 +- pkg/engine/operation/apply_test.go | 90 ++++- pkg/engine/resource/graph/storage.go | 22 ++ pkg/engine/resource/graph/storages/local.go | 100 +++++ pkg/engine/resource/graph/storages/oss.go | 116 ++++++ pkg/engine/resource/graph/storages/s3.go | 134 +++++++ pkg/engine/resource/graph/storages/util.go | 43 +++ pkg/engine/resource/graph/util.go | 257 +++++++++++++ pkg/engine/resource/graph/validation.go | 38 ++ .../app_configurations_generator.go | 2 + 26 files changed, 1672 insertions(+), 41 deletions(-) create mode 100644 pkg/cmd/resource/graph.go create mode 100644 pkg/engine/resource/graph/storage.go create mode 100644 pkg/engine/resource/graph/storages/local.go create mode 100644 pkg/engine/resource/graph/storages/oss.go create mode 100644 pkg/engine/resource/graph/storages/s3.go create mode 100644 pkg/engine/resource/graph/storages/util.go create mode 100644 pkg/engine/resource/graph/util.go create mode 100644 pkg/engine/resource/graph/validation.go diff --git a/pkg/apis/api.kusion.io/v1/types.go b/pkg/apis/api.kusion.io/v1/types.go index 264d5a437..1199ca989 100644 --- a/pkg/apis/api.kusion.io/v1/types.go +++ b/pkg/apis/api.kusion.io/v1/types.go @@ -268,7 +268,8 @@ const ( FieldHealthPolicy = "healthPolicy" FieldKCLHealthCheckKCL = "health.kcl" // kind field in kubernetes resource Attributes - FieldKind = "kind" + FieldKind = "kind" + FieldIsWorkload = "kusion.io/is-workload" ) // BackendConfigs contains the configuration of multiple backends and the current backend. @@ -744,3 +745,59 @@ const ( // The default maximum number of concurrent resource executions for Kusion is 10. DefaultMaxConcurrent = 10 ) + +type Status string + +// Status is to represent resource status displayed by resource graph after apply succeed +const ( + ApplySucceed Status = "Apply succeeded" + ApplyFail Status = "Apply failed" + Reconciled Status = "Apply succeeded | Reconciled" + ReconcileFail Status = "Apply succeeded | Reconcile failed" +) + +// Graph represents the structure of a project's resources within a workspace, used by `resource graph` command. +type Graph struct { + // Name of the project + Project string `yaml:"Project" json:"Project"` + // Name of the workspace where the app is deployed + Workspace string `yaml:"Workspace" json:"Workspace"` + // All the resources related to the app + Resources *GraphResources `yaml:"Resources" json:"Resources"` +} + +// GraphResources defines the categorized resources related to the application. +type GraphResources struct { + // WorkloadResources contains the resources that are directly related to the workload. + WorkloadResources map[string]*GraphResource `yaml:"WorkloadResources" json:"WorkloadResources"` + // DependencyResources stores resources that are required dependencies for the workload. + DependencyResources map[string]*GraphResource `yaml:"DependencyResources" json:"DependencyResources"` + // OtherResources holds independent resources that are not directly tied to workloads or dependencies. + OtherResources map[string]*GraphResource `yaml:"OtherResources" json:"OtherResources"` + // ResourceIndex is a global mapping of resource IDs to their corresponding resource entries. + ResourceIndex map[string]*ResourceEntry `yaml:"ResourceIndex,omitempty" json:"ResourceIndex,omitempty"` +} + +// GraphResource represents an individual resource in the cluster. +type GraphResource struct { + // ID refers to Resource ID. + ID string `yaml:"ID" json:"ID"` + // Type refers to Resource Type in the cluster. + Type string `yaml:"Type" json:"Type"` + // Name refers to Resource name in the cluster. + Name string `yaml:"Name" json:"Name"` + // CloudResourceID refers to Resource ID in the cloud provider. + CloudResourceID string `yaml:"CloudResourceID" json:"CloudResourceID"` + // Resource status after apply. + Status Status `yaml:"Status" json:"Status"` + // Dependents lists the resources that depend on this resource. + Dependents []string `yaml:"Dependents" json:"Dependents"` + // Dependencies lists the resources that this resource relies upon. + Dependencies []string `yaml:"Dependencies" json:"Dependencies"` +} + +// ResourceEntry stores a GraphResource and its associated Resource mapping. +type ResourceEntry struct { + Resource *GraphResource + Category map[string]*GraphResource +} diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index b86939ec3..41a422acb 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -7,6 +7,7 @@ import ( "kusionstack.io/kusion/pkg/backend/storages" "kusionstack.io/kusion/pkg/config" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/workspace" ) @@ -21,6 +22,9 @@ type Backend interface { // StateStorageWithPath returns the state storage with the specified path. StateStorageWithPath(path string) (release.Storage, error) + // GraphStorage returns the graph storage. + GraphStorage(project, workspace string) (graph.Storage, error) + // ProjectStorage returns the project directory under release folder. ProjectStorage() (map[string][]string, error) } diff --git a/pkg/backend/storages/local.go b/pkg/backend/storages/local.go index 6127c2399..1fd9f9e9a 100644 --- a/pkg/backend/storages/local.go +++ b/pkg/backend/storages/local.go @@ -4,6 +4,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/engine/release" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + "kusionstack.io/kusion/pkg/engine/resource/graph" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" projectstorages "kusionstack.io/kusion/pkg/project/storages" "kusionstack.io/kusion/pkg/workspace" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" @@ -32,6 +34,10 @@ func (s *LocalStorage) StateStorageWithPath(path string) (release.Storage, error return releasestorages.NewLocalStorage(releasestorages.GenReleasePrefixKeyWithPath(s.path, path)) } +func (s *LocalStorage) GraphStorage(project, workspace string) (graph.Storage, error) { + return graphstorages.NewLocalStorage(graphstorages.GenGraphDirPath(s.path, project, workspace)) +} + func (s *LocalStorage) ProjectStorage() (map[string][]string, error) { return projectstorages.NewLocalStorage(projectstorages.GenProjectDirPath(s.path)).Get() } diff --git a/pkg/backend/storages/local_test.go b/pkg/backend/storages/local_test.go index f80525d79..43dc40ec0 100644 --- a/pkg/backend/storages/local_test.go +++ b/pkg/backend/storages/local_test.go @@ -8,6 +8,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" + projectstorages "kusionstack.io/kusion/pkg/project/storages" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" ) @@ -86,3 +88,59 @@ func TestLocalStorage_ReleaseStorage(t *testing.T) { }) } } + +func TestLocalStorage_GraphStorage(t *testing.T) { + testcases := []struct { + name string + success bool + localStorage *LocalStorage + project, workspace string + }{ + { + name: "graph storage from local backend", + success: true, + localStorage: &LocalStorage{ + path: "kusion", + }, + project: "wordpress", + workspace: "dev", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new local graph storage", t, func() { + mockey.Mock(graphstorages.NewLocalStorage).Return(&graphstorages.LocalStorage{}, nil).Build() + _, err := tc.localStorage.GraphStorage(tc.project, tc.workspace) + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} + +func TestLocalStorage_ProjectStorage(t *testing.T) { + testcases := []struct { + name string + success bool + localStorage *LocalStorage + }{ + { + name: "project storage from local backend", + success: true, + localStorage: &LocalStorage{ + path: "kusion", + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new local project storage", t, func() { + mockey.Mock((*projectstorages.LocalStorage).Get).Return(map[string][]string{}, nil).Build() + _, err := tc.localStorage.ProjectStorage() + + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} diff --git a/pkg/backend/storages/oss.go b/pkg/backend/storages/oss.go index 264c23613..13079949a 100644 --- a/pkg/backend/storages/oss.go +++ b/pkg/backend/storages/oss.go @@ -6,6 +6,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/engine/release" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + "kusionstack.io/kusion/pkg/engine/resource/graph" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" projectstorages "kusionstack.io/kusion/pkg/project/storages" "kusionstack.io/kusion/pkg/workspace" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" @@ -44,6 +46,10 @@ func (s *OssStorage) StateStorageWithPath(path string) (release.Storage, error) return releasestorages.NewOssStorage(s.bucket, releasestorages.GenReleasePrefixKeyWithPath(s.prefix, path)) } +func (s *OssStorage) GraphStorage(project, workspace string) (graph.Storage, error) { + return graphstorages.NewOssStorage(s.bucket, graphstorages.GenGenericOssResourcePrefixKey(s.prefix, project, workspace)) +} + func (s *OssStorage) ProjectStorage() (map[string][]string, error) { return projectstorages.NewOssStorage(s.bucket, projectstorages.GenGenericOssReleasePrefixKey(s.prefix)).Get() } diff --git a/pkg/backend/storages/oss_test.go b/pkg/backend/storages/oss_test.go index 5f2f62efb..1196d261b 100644 --- a/pkg/backend/storages/oss_test.go +++ b/pkg/backend/storages/oss_test.go @@ -9,6 +9,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" + projectstorages "kusionstack.io/kusion/pkg/project/storages" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" ) @@ -78,7 +80,7 @@ func TestOssStorage_ReleaseStorage(t *testing.T) { project, workspace string }{ { - name: "release storage from s3 backend", + name: "release storage from oss backend", success: true, ossStorage: &OssStorage{ bucket: &oss.Bucket{}, @@ -99,3 +101,61 @@ func TestOssStorage_ReleaseStorage(t *testing.T) { }) } } + +func TestOssStorage_GraphStorage(t *testing.T) { + testcases := []struct { + name string + success bool + ossStorage *OssStorage + project, workspace string + }{ + { + name: "graph storage from oss backend", + success: true, + ossStorage: &OssStorage{ + bucket: &oss.Bucket{}, + prefix: "kusion", + }, + project: "wordpress", + workspace: "dev", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new oss graph storage", t, func() { + mockey.Mock(graphstorages.NewOssStorage).Return(&graphstorages.OssStorage{}, nil).Build() + _, err := tc.ossStorage.GraphStorage(tc.project, tc.workspace) + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} + +func TestOssStorage_ProjectStorage(t *testing.T) { + testcases := []struct { + name string + success bool + ossStorage *OssStorage + }{ + { + name: "project storage from oss backend", + success: true, + ossStorage: &OssStorage{ + bucket: &oss.Bucket{}, + prefix: "kusion", + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new oss project storage", t, func() { + mockey.Mock((*projectstorages.OssStorage).Get).Return(map[string][]string{}, nil).Build() + _, err := tc.ossStorage.ProjectStorage() + + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} diff --git a/pkg/backend/storages/s3.go b/pkg/backend/storages/s3.go index 354759153..393af826f 100644 --- a/pkg/backend/storages/s3.go +++ b/pkg/backend/storages/s3.go @@ -9,6 +9,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/engine/release" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + "kusionstack.io/kusion/pkg/engine/resource/graph" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" projectstorages "kusionstack.io/kusion/pkg/project/storages" "kusionstack.io/kusion/pkg/workspace" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" @@ -57,6 +59,10 @@ func (s *S3Storage) StateStorageWithPath(path string) (release.Storage, error) { return releasestorages.NewS3Storage(s.s3, s.bucket, releasestorages.GenReleasePrefixKeyWithPath(s.prefix, path)) } +func (s *S3Storage) GraphStorage(project, workspace string) (graph.Storage, error) { + return graphstorages.NewS3Storage(s.s3, s.bucket, graphstorages.GenGenericOssResourcePrefixKey(s.prefix, project, workspace)) +} + func (s *S3Storage) ProjectStorage() (map[string][]string, error) { return projectstorages.NewS3Storage(s.s3, s.bucket, projectstorages.GenGenericOssReleasePrefixKey(s.prefix)).Get() } diff --git a/pkg/backend/storages/s3_test.go b/pkg/backend/storages/s3_test.go index 073d7c3f6..38c642ee1 100644 --- a/pkg/backend/storages/s3_test.go +++ b/pkg/backend/storages/s3_test.go @@ -10,6 +10,8 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" + projectstorages "kusionstack.io/kusion/pkg/project/storages" workspacestorages "kusionstack.io/kusion/pkg/workspace/storages" ) @@ -103,3 +105,63 @@ func TestS3Storage_ReleaseStorage(t *testing.T) { }) } } + +func TestS3Storage_GraphStorage(t *testing.T) { + testcases := []struct { + name string + success bool + s3Storage *S3Storage + project, workspace string + }{ + { + name: "graph storage from s3 backend", + success: true, + s3Storage: &S3Storage{ + s3: &s3.S3{}, + bucket: "infra", + prefix: "kusion", + }, + project: "wordpress", + workspace: "dev", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new s3 graph storage", t, func() { + mockey.Mock(graphstorages.NewS3Storage).Return(&graphstorages.S3Storage{}, nil).Build() + _, err := tc.s3Storage.GraphStorage(tc.project, tc.workspace) + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} + +func TestS3Storage_ProjectStorage(t *testing.T) { + testcases := []struct { + name string + success bool + s3Storage *S3Storage + }{ + { + name: "project storage from s3 backend", + success: true, + s3Storage: &S3Storage{ + s3: &s3.S3{}, + bucket: "infra", + prefix: "kusion", + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + mockey.PatchConvey("mock new s3 project storage", t, func() { + mockey.Mock((*projectstorages.S3Storage).Get).Return(map[string][]string{}, nil).Build() + _, err := tc.s3Storage.ProjectStorage() + + assert.Equal(t, tc.success, err == nil) + }) + }) + } +} diff --git a/pkg/cmd/apply/apply.go b/pkg/cmd/apply/apply.go index 7c95f640b..b1f9e689b 100644 --- a/pkg/cmd/apply/apply.go +++ b/pkg/cmd/apply/apply.go @@ -46,6 +46,7 @@ import ( "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/printers" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/engine/runtime" runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" "kusionstack.io/kusion/pkg/log" @@ -95,9 +96,10 @@ var ( // scattering them across different go-routines. var ( rel *apiv1.Release + gph *apiv1.Graph relLock = &sync.Mutex{} releaseCreated = false - storage release.Storage + releaseStorage release.Storage portForwarded = false ) @@ -248,10 +250,10 @@ func (o *ApplyOptions) Run() (err error) { if err != nil { release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) // Join the errors if update apply release failed. - err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) } else { release.UpdateReleasePhase(rel, apiv1.ReleasePhaseSucceeded, relLock) - err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock) + err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock) } }() @@ -261,16 +263,16 @@ func (o *ApplyOptions) Run() (err error) { } // create release - storage, err = o.Backend.ReleaseStorage(o.RefProject.Name, o.RefWorkspace.Name) + releaseStorage, err = o.Backend.ReleaseStorage(o.RefProject.Name, o.RefWorkspace.Name) if err != nil { return } - rel, err = release.NewApplyRelease(storage, o.RefProject.Name, o.RefStack.Name, o.RefWorkspace.Name) + rel, err = release.NewApplyRelease(releaseStorage, o.RefProject.Name, o.RefStack.Name, o.RefWorkspace.Name) if err != nil { return } if !o.DryRun { - if err = storage.Create(rel); err != nil { + if err = releaseStorage.Create(rel); err != nil { return } releaseCreated = true @@ -291,7 +293,7 @@ func (o *ApplyOptions) Run() (err error) { }() go func() { - errCh <- o.run(rel, storage) + errCh <- o.run(rel, releaseStorage) }() // Check whether the kusion apply command has timed out. @@ -309,7 +311,7 @@ func (o *ApplyOptions) Run() (err error) { return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) return err } } else { @@ -323,14 +325,14 @@ func (o *ApplyOptions) Run() (err error) { } // run executes the apply cmd after the release is created. -func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err error) { +func (o *ApplyOptions) run(rel *apiv1.Release, releaseStorage release.Storage) (err error) { defer func() { if !releaseCreated { return } if err != nil { release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) } }() @@ -361,12 +363,12 @@ func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err err // update release phase to previewing rel.Spec = spec release.UpdateReleasePhase(rel, apiv1.ReleasePhasePreviewing, relLock) - if err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock); err != nil { + if err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock); err != nil { return } // compute changes for preview - changes, err := preview.Preview(o.PreviewOptions, storage, rel.Spec, rel.State, o.RefProject, o.RefStack) + changes, err := preview.Preview(o.PreviewOptions, releaseStorage, rel.Spec, rel.State, o.RefProject, o.RefStack) if err != nil { return } @@ -413,16 +415,46 @@ func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err err // update release phase to applying release.UpdateReleasePhase(rel, apiv1.ReleasePhaseApplying, relLock) - if err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock); err != nil { + if err = release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock); err != nil { return } + // Get graph storage directory, create if not exist + graphStorage, err := o.Backend.GraphStorage(o.RefProject.Name, o.RefWorkspace.Name) + if err != nil { + return err + } + + // Try to get existing graph, use the graph if exists + if graphStorage.CheckGraphStorageExistence() { + gph, err = graphStorage.Get() + if err != nil { + return err + } + err = graph.ValidateGraph(gph) + if err != nil { + return err + } + // Put new resources from the generated spec to graph + gph, err = graph.GenerateGraph(spec.Resources, gph) + } else { + // Create a new graph to be used globally if no graph is stored in the storage + gph = &apiv1.Graph{ + Project: o.RefProject.Name, + Workspace: o.RefWorkspace.Name, + } + gph, err = graph.GenerateGraph(spec.Resources, gph) + } + if err != nil { + return err + } + // start applying fmt.Printf("\nStart applying diffs ...\n") // NOTE: release should be updated in the process of apply, so as to avoid the problem // of being unable to update after being terminated by SIGINT or SIGTERM. - _, err = Apply(o, storage, rel, changes) + _, err = Apply(o, releaseStorage, rel, gph, changes) if err != nil { return } @@ -445,11 +477,12 @@ func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err err } // The Apply function will apply the resources changes through the execution kusion engine. -// You can customize the runtime of engine and the release storage through `runtime` and `storage` parameters. +// You can customize the runtime of engine and the release releaseStorage through `runtime` and `releaseStorage` parameters. func Apply( o *ApplyOptions, - storage release.Storage, + releaseStorage release.Storage, rel *apiv1.Release, + gph *apiv1.Graph, changes *models.Changes, ) (*apiv1.Release, error) { var err error @@ -464,7 +497,47 @@ func Apply( return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + err = errors.Join([]error{err, release.UpdateApplyRelease(releaseStorage, rel, o.DryRun, relLock)}...) + } + + // Update graph and write to storage if not dry run. + if !o.DryRun { + // Use resources in the state to get resource Cloud ID. + for _, resource := range rel.State.Resources { + // Get information of each of the resources + info, err := graph.GetResourceInfo(&resource) + if err != nil { + return + } + // Update information of each of the resources. + graphResource := graph.FindGroupResourceByID(gph.Resources, resource.ID) + if graphResource != nil { + graphResource.CloudResourceID = info.CloudResourceID + graphResource.Type = info.ResourceType + graphResource.Name = info.ResourceName + } + } + // Get the directory to store the graph. + graphStorage, err := o.Backend.GraphStorage(o.RefProject.Name, o.RefWorkspace.Name) + if err != nil { + return + } + + // Update graph if exists, otherwise create a new graph file. + if graphStorage.CheckGraphStorageExistence() { + // No need to store resource index + graph.RemoveResourceIndex(gph) + err := graphStorage.Update(gph) + if err != nil { + return + } + } else { + graph.RemoveResourceIndex(gph) + err := graphStorage.Create(gph) + if err != nil { + return + } + } } }() @@ -472,7 +545,7 @@ func Apply( ac := &operation.ApplyOperation{ Operation: models.Operation{ Stack: changes.Stack(), - ReleaseStorage: storage, + ReleaseStorage: releaseStorage, MsgCh: make(chan models.Message), IgnoreFields: o.IgnoreFields, }, @@ -541,6 +614,7 @@ func Apply( &ls, o.DryRun, o.Watch, + gph.Resources, ) watchErrCh := make(chan error) @@ -554,6 +628,7 @@ func Apply( watchErrCh, multi, changesWriterMap, + gph, ) } @@ -575,6 +650,7 @@ func Apply( Stack: changes.Stack(), }, Release: rel, + Graph: gph, }) if v1.IsErr(st) { errWriter.(*bytes.Buffer).Reset() @@ -584,6 +660,7 @@ func Apply( // Update the release with that in the apply response if not dryrun. updatedRel = rsp.Release *rel = *updatedRel + gph = rsp.Graph } // wait for msgCh closed @@ -622,6 +699,7 @@ func PrintApplyDetails( ls *lineSummary, dryRun bool, watch bool, + gphResources *apiv1.GraphResources, ) { defer func() { if p := recover(); p != nil { @@ -633,7 +711,7 @@ func PrintApplyDetails( return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - *err = errors.Join([]error{*err, release.UpdateApplyRelease(storage, rel, dryRun, relLock)}...) + *err = errors.Join([]error{*err, release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock)}...) } (*errWriter).(*bytes.Buffer).Reset() }() @@ -667,6 +745,20 @@ func PrintApplyDetails( changesWriterMap[msg.ResourceID].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(msg.ResourceID))) } } + + // Update resource status + if !dryRun { + gphResource := graph.FindGroupResourceByID(gphResources, msg.ResourceID) + if gphResource != nil { + // Delete resource from the graph if it's deleted during apply + if changeStep.Action == models.Delete { + graph.RemoveResource(gph, gphResource) + } else { + gphResource.Status = apiv1.ApplySucceed + } + } + } + progressbar.Increment() ls.Count(changeStep.Action) case models.Failed: @@ -695,6 +787,7 @@ func Watch( watchErrCh chan error, multi *pterm.MultiPrinter, changesWriterMap map[string]*pterm.SpinnerPrinter, + gph *apiv1.Graph, ) { resourceMap := make(map[string]apiv1.Resource) ioWriterMap := make(map[string]io.Writer) @@ -719,7 +812,7 @@ func Watch( return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) } watchErrCh <- *err @@ -765,7 +858,7 @@ func Watch( // Setup a go-routine to concurrently watch K8s and TF resources. if res.Type == apiv1.Kubernetes { healthPolicy, kind := getResourceInfo(&res) - go watchK8sResources(id, kind, w.Watchers, table, tables, dryRun, healthPolicy) + go watchK8sResources(id, kind, w.Watchers, table, tables, gph, dryRun, healthPolicy) } else if res.Type == apiv1.Terraform { go watchTFResources(id, w.TFWatcher, table, dryRun) } else { @@ -794,6 +887,12 @@ func Watch( if table.AllCompleted() { finished[id] = true changesWriterMap[id].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(id))) + + // Update resource status to reconciled. + resource := graph.FindGroupResourceByID(gph.Resources, id) + if resource != nil { + resource.Status = apiv1.Reconciled + } } } <-ticker.C @@ -878,7 +977,7 @@ func prompt(ui *terminal.UI) (string, error) { // To gracefully exit if interrupted by SIGINT or SIGTERM. WithOnInterruptFunc(func() { release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - release.UpdateApplyRelease(storage, rel, false, relLock) + release.UpdateApplyRelease(releaseStorage, rel, false, relLock) os.Exit(1) }). Show() @@ -895,6 +994,7 @@ func watchK8sResources( chs []<-chan watch.Event, table *printers.Table, tables map[string]*printers.Table, + gph *apiv1.Graph, dryRun bool, healthPolicy interface{}, ) { @@ -909,10 +1009,16 @@ func watchK8sResources( return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) } }() + // Set resource status to `reconcile failed` before reconcile successfully. + resource := graph.FindGroupResourceByID(gph.Resources, id) + if resource != nil { + resource.Status = apiv1.ReconcileFail + } + // Resources selects cases := createSelectCases(chs) // Default select @@ -994,7 +1100,7 @@ func watchTFResources( return } release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) - _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + _ = release.UpdateApplyRelease(releaseStorage, rel, dryRun, relLock) } }() diff --git a/pkg/cmd/apply/apply_test.go b/pkg/cmd/apply/apply_test.go index 0aacb2c89..9f079530d 100644 --- a/pkg/cmd/apply/apply_test.go +++ b/pkg/cmd/apply/apply_test.go @@ -40,6 +40,8 @@ import ( "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/printers" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" + "kusionstack.io/kusion/pkg/engine/resource/graph" + graphstorages "kusionstack.io/kusion/pkg/engine/resource/graph/storages" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/runtime/kubernetes" "kusionstack.io/kusion/pkg/util/terminal" @@ -174,6 +176,23 @@ func mockReleaseStorage() { mockey.Mock((*releasestorages.LocalStorage).Get).Return(&apiv1.Release{State: &apiv1.State{}, Phase: apiv1.ReleasePhaseSucceeded}, nil).Build() } +func mockGraphStorage() { + mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Delete).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Update).Return(nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Get).Return(&apiv1.Graph{ + Project: "", + Workspace: "", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + }, nil).Build() +} + func TestApplyOptions_Run(t *testing.T) { mockey.PatchConvey("DryRun is true", t, func() { mockGenerateSpecWithSpinner() @@ -222,7 +241,6 @@ func TestApply(t *testing.T) { loc, _ := time.LoadLocation("Asia/Shanghai") mockey.PatchConvey("dry run", t, func() { mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - rel := &apiv1.Release{ Project: "fake-project", Workspace: "fake-workspace", @@ -246,15 +264,18 @@ func TestApply(t *testing.T) { } changes := models.NewChanges(proj, stack, order) + graph := &apiv1.Graph{} o := newApplyOptions() o.DryRun = true - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, graph, changes) assert.Nil(t, err) }) mockey.PatchConvey("apply success", t, func() { mockOperationApply(models.Success) mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - + mockey.Mock((*storages.LocalStorage).GraphStorage).Return(&graphstorages.LocalStorage{}, nil).Build() + mockey.Mock((*graphstorages.LocalStorage).Create).Return(nil).Build() + // mockGraphStorage() o := newApplyOptions() rel := &apiv1.Release{ Project: "fake-project", @@ -284,13 +305,18 @@ func TestApply(t *testing.T) { } changes := models.NewChanges(proj, stack, order) - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) + gph := &apiv1.Graph{ + Project: rel.Project, + Workspace: rel.Workspace, + } + graph.GenerateGraph(rel.Spec.Resources, gph) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, gph, changes) assert.Nil(t, err) }) mockey.PatchConvey("apply failed", t, func() { mockOperationApply(models.Failed) mockey.Mock((*releasestorages.LocalStorage).Update).Return(nil).Build() - + mockGraphStorage() o := newApplyOptions() rel := &apiv1.Release{ Project: "fake-project", @@ -314,8 +340,9 @@ func TestApply(t *testing.T) { }, } changes := models.NewChanges(proj, stack, order) - - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) + gph := &apiv1.Graph{} + graph.GenerateGraph(rel.Spec.Resources, gph) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, gph, changes) assert.NotNil(t, err) }) } @@ -414,8 +441,27 @@ func TestWatchK8sResources(t *testing.T) { tables := map[string]*printers.Table{ id: table, } - - watchK8sResources(id, "", chs, table, tables, true, nil) + resource := &apiv1.GraphResource{ + ID: id, + Type: "", + Name: "", + CloudResourceID: "", + Status: "", + Dependents: []string{}, + Dependencies: []string{}, + } + gph := &apiv1.Graph{ + Project: "example project", + Workspace: "example workspace", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + } + graph.UpdateResourceIndex(gph.Resources) + watchK8sResources(id, "", chs, table, tables, gph, true, nil) assert.Equal(t, true, table.AllCompleted()) }) @@ -456,7 +502,27 @@ func TestWatchK8sResources(t *testing.T) { "health.kcl": "assert res.metadata.generation == 1", } policyInterface = healthPolicy - watchK8sResources(id, "Deployment", chs, table, tables, false, policyInterface) + resource := &apiv1.GraphResource{ + ID: id, + Type: "", + Name: "", + CloudResourceID: "", + Status: "", + Dependents: []string{}, + Dependencies: []string{}, + } + gph := &apiv1.Graph{ + Project: "example project", + Workspace: "example workspace", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"id": resource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + } + graph.UpdateResourceIndex(gph.Resources) + watchK8sResources(id, "Deployment", chs, table, tables, gph, false, policyInterface) assert.Equal(t, true, table.AllCompleted()) }) diff --git a/pkg/cmd/destroy/destroy.go b/pkg/cmd/destroy/destroy.go index c0bc184fd..e318f8011 100644 --- a/pkg/cmd/destroy/destroy.go +++ b/pkg/cmd/destroy/destroy.go @@ -212,6 +212,12 @@ func (o *DestroyOptions) Run() (err error) { } else { rel.Phase = apiv1.ReleasePhaseSucceeded release.UpdateDestroyRelease(storage, rel) + graphStorage, _ := o.Backend.GraphStorage(o.RefProject.Name, o.RefWorkspace.Name) + // Remove resource graph if resources are destroyed + err := graphStorage.Delete() + if err != nil { + return err + } } return err diff --git a/pkg/cmd/release/list_test.go b/pkg/cmd/release/list_test.go index 8ef7b03c5..38f52692f 100644 --- a/pkg/cmd/release/list_test.go +++ b/pkg/cmd/release/list_test.go @@ -9,6 +9,7 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/cmd/meta" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/workspace" ) @@ -59,6 +60,10 @@ func (f *fakeBackendForList) StateStorageWithPath(path string) (release.Storage, return &fakeStorageForList{}, nil } +func (f *fakeBackendForList) GraphStorage(project, workspace string) (graph.Storage, error) { + return nil, nil +} + func (f *fakeBackendForList) ProjectStorage() (map[string][]string, error) { return nil, nil } diff --git a/pkg/cmd/release/show_test.go b/pkg/cmd/release/show_test.go index 3d4986ec7..d50c29201 100644 --- a/pkg/cmd/release/show_test.go +++ b/pkg/cmd/release/show_test.go @@ -10,6 +10,7 @@ import ( v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/backend" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/project" "kusionstack.io/kusion/pkg/workspace" ) @@ -107,6 +108,10 @@ func (f *fakeBackendShow) StateStorageWithPath(_ string) (release.Storage, error return nil, nil } +func (f *fakeBackendShow) GraphStorage(project, workspace string) (graph.Storage, error) { + return nil, nil +} + func (f *fakeBackendShow) ProjectStorage() (map[string][]string, error) { return nil, nil } diff --git a/pkg/cmd/release/unlock_test.go b/pkg/cmd/release/unlock_test.go index a7e5db6ee..91d2f32e6 100644 --- a/pkg/cmd/release/unlock_test.go +++ b/pkg/cmd/release/unlock_test.go @@ -11,6 +11,7 @@ import ( "kusionstack.io/kusion/pkg/backend" "kusionstack.io/kusion/pkg/cmd/meta" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/project" "kusionstack.io/kusion/pkg/workspace" ) @@ -167,6 +168,10 @@ func (f *fakeBackend) StateStorageWithPath(path string) (release.Storage, error) return nil, nil } +func (f *fakeBackend) GraphStorage(project, workspace string) (graph.Storage, error) { + return nil, nil +} + func (f *fakeBackend) ProjectStorage() (map[string][]string, error) { return nil, nil } diff --git a/pkg/cmd/resource/graph.go b/pkg/cmd/resource/graph.go new file mode 100644 index 000000000..000e79415 --- /dev/null +++ b/pkg/cmd/resource/graph.go @@ -0,0 +1,350 @@ +package resource + +import ( + "encoding/json" + "fmt" + + "github.com/spf13/cobra" + "k8s.io/cli-runtime/pkg/genericiooptions" + "k8s.io/kubectl/pkg/util/templates" + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/backend" + cmdutil "kusionstack.io/kusion/pkg/cmd/util" + "kusionstack.io/kusion/pkg/engine/resource/graph" + "kusionstack.io/kusion/pkg/util/i18n" +) + +var ( + graphShort = i18n.T("Display a graph of all the resources' information of the target project and target workspaces") + + graphLong = i18n.T(` + Display information of all the resources of a project. + + This command displays information of all the resources of a project in the current or specified workspaces. + `) + + graphExample = i18n.T(` + # Display information of all the resources of a project in the current workspace. + kusion resource graph --project quickstart + + # Display information of all the resources of a project in specified workspaces. + kusion resource graph --project quickstart --workspace=dev,default + + # Display information of all the resource of a project in all the workspaces that has been deployed. + kusion resource graph --project quickstart --all + kusion resource graph --project quickstart -a + + # Display information of all the resource of a project with in specified workspaces with json format result. + kusion resource graph --project quickstart --workspace dev -o json + `) +) + +const jsonOutput = "json" + +var ( + // Define the width for each column to print + idWidth = 55 + kindWidth = 30 + nameWidth = 30 + cloudResourceIDWidth = 30 + statusWidth = 30 +) + +// GraphFlags reflects the information that CLI is gathering via flags, +// which will be converted into GraphOptions. +type GraphFlags struct { + Project *string + Workspace *[]string + Backend *string + All bool + Output string +} + +// GraphOptions defines the configuration parameters for the `kusion release graph` command. +type GraphOptions struct { + Project string + Workspace []string + GraphStorage map[string]graph.Storage + Output string +} + +// NewGraphFlags returns a default GraphFlags. +func NewGraphFlags() *GraphFlags { + projectName := "" + workspaceName := []string{} + backendName := "" + all := false + + return &GraphFlags{ + Project: &projectName, + Workspace: &workspaceName, + Backend: &backendName, + All: all, + } +} + +// NewCmdGraph creates the `kusion resource graph` command. +func NewCmdGraph(streams genericiooptions.IOStreams) *cobra.Command { + flags := NewGraphFlags() + + cmd := &cobra.Command{ + Use: "graph", + Short: graphShort, + Long: templates.LongDesc(graphLong), + Example: templates.Examples(graphExample), + RunE: func(cmd *cobra.Command, args []string) (err error) { + o, err := flags.ToOptions() + defer cmdutil.RecoverErr(&err) + cmdutil.CheckErr(err) + cmdutil.CheckErr(o.Validate(cmd, args)) + cmdutil.CheckErr(o.Run()) + + return + }, + } + + flags.AddFlags(cmd) + + return cmd +} + +// AddFlags registers flags for the CLI. +func (f *GraphFlags) AddFlags(cmd *cobra.Command) { + if f.Project != nil { + cmd.Flags().StringVarP(f.Project, "project", "", "", i18n.T("The name of the target project")) + } + if f.Workspace != nil { + cmd.Flags().StringSliceVarP(f.Workspace, "workspace", "", []string{}, i18n.T("The name of the target workspace")) + } + if f.Backend != nil { + cmd.Flags().StringVarP(f.Backend, "backend", "", "", i18n.T("The backend to use, supports 'local', 'oss' and 's3'")) + } + + cmd.Flags().StringVarP(&f.Output, "output", "o", f.Output, i18n.T("Specify the output format, json only")) + cmd.Flags().BoolVarP(&f.All, "all", "a", false, i18n.T("Display all the resources of all the workspaces")) +} + +// ToOptions converts from CLI inputs to runtime inputs. +func (f *GraphFlags) ToOptions() (*GraphOptions, error) { + var storageBackend backend.Backend + var err error + // Get the backend storage + if f.Backend != nil && *f.Backend != "" { + storageBackend, err = backend.NewBackend(*f.Backend) + if err != nil { + return nil, err + } + } else { + storageBackend, err = backend.NewBackend("") + if err != nil { + return nil, err + } + } + + workspaceName := "" + projectName := "" + workspaces := []string{} + graphStorages := map[string]graph.Storage{} + + workspaceStorage, err := storageBackend.WorkspaceStorage() + if err != nil { + return nil, err + } + + if f.Project != nil && *f.Project != "" { + projectName = *f.Project + } else { + return nil, fmt.Errorf("project is a must") + } + + // Get all the available workspaces + if f.All { + workspaceNames, err := workspaceStorage.GetNames() + if err != nil { + return nil, err + } + workspaces = append(workspaces, workspaceNames...) + } else { + // Use the workspaces that specified + if len(*f.Workspace) != 0 { + for _, workspace := range *f.Workspace { + if workspace != "" { + refWorkspace, err := workspaceStorage.Get(workspace) + if err != nil { + return nil, err + } + workspaceName = refWorkspace.Name + workspaces = append(workspaces, workspaceName) + } + } + // If no workspace is specified, use the current workspace + } else { + currentWorkspace, err := workspaceStorage.GetCurrent() + if err != nil { + return nil, err + } + + workspaceName = currentWorkspace + workspaces = append(workspaces, workspaceName) + } + } + + // Get graph for each of the workspace + for _, workspaceName := range workspaces { + graphStorage, err := storageBackend.GraphStorage(projectName, workspaceName) + if err != nil { + return nil, err + } + if graphStorage.CheckGraphStorageExistence() { + graphStorages[workspaceName] = graphStorage + } + } + + if len(graphStorages) == 0 { + return nil, fmt.Errorf("no graph found for project: %s", projectName) + } + + return &GraphOptions{ + Project: projectName, + Workspace: workspaces, + GraphStorage: graphStorages, + Output: f.Output, + }, nil +} + +// Validate verifies if GraphOptions are valid and without conflicts. +func (o *GraphOptions) Validate(cmd *cobra.Command, args []string) error { + if len(args) != 0 { + return cmdutil.UsageErrorf(cmd, "Unexpected args: %v", args) + } + + return nil +} + +// Run executes the `kusion release graph` command. +func (o *GraphOptions) Run() error { + // Get the storage backend of the graph. + for _, workspace := range o.Workspace { + storage, ok := o.GraphStorage[workspace] + if ok { + graph, err := storage.Get() + if err != nil { + return err + } + + if o.Output == jsonOutput { + output, err := json.Marshal(graph) + if err != nil { + return fmt.Errorf("json marshal resource graph failed as %w", err) + } + fmt.Println(string(output)) + } else { + displayGraph(graph) + } + } + } + + return nil +} + +// displayGraph displays resource graph +func displayGraph(graph *v1.Graph) { + fmt.Printf("Displaying resource graph in the project %s...\n\n", graph.Project) + fmt.Printf("Workspace: %s\n\n", graph.Workspace) + + // Print Workload Resources + fmt.Println("Workload Resources:") + printResourceHeader(idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + for _, resource := range graph.Resources.WorkloadResources { + printResourceRow(resource, idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + } + fmt.Println() + + // Print Dependency Resources + fmt.Println("Dependency Resources:") + printResourceHeader(idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + for _, resource := range graph.Resources.DependencyResources { + printResourceRow(resource, idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + } + fmt.Println() + + // Print Other Resources + fmt.Println("Other Resources:") + printResourceHeader(idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + for _, resource := range graph.Resources.OtherResources { + printResourceRow(resource, idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth) + } +} + +// Helper function to print the header row +func printResourceHeader(idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth int) { + fmt.Printf("%-*s %-*s %-*s %-*s %-*s\n", idWidth, "ID", kindWidth, "Kind", nameWidth, "Name", cloudResourceIDWidth, "CloudResourceID", statusWidth, "Status") +} + +// Helper function to print each row of resources with wrapping if necessary +func printResourceRow(resource *v1.GraphResource, idWidth, kindWidth, nameWidth, cloudResourceIDWidth, statusWidth int) { + idLines := wrapText(resource.ID, idWidth) + typeLines := wrapText(resource.Type, kindWidth) + nameLines := wrapText(resource.Name, nameWidth) + cloudResourceIDLines := wrapText(resource.CloudResourceID, cloudResourceIDWidth) + statusLines := wrapText(string(resource.Status), statusWidth) + + // Find the maximum number of lines needed for this resource + maxLines := maxNumber(len(idLines), len(typeLines), len(nameLines), len(cloudResourceIDLines), len(statusLines)) + + // Print each line of the resource, line by line + for i := 0; i < maxLines; i++ { + id := "" + if i < len(idLines) { + id = idLines[i] + } + kind := "" + if i < len(typeLines) { + kind = typeLines[i] + } + name := "" + if i < len(nameLines) { + name = nameLines[i] + } + cloudResourceID := "" + if i < len(cloudResourceIDLines) { + cloudResourceID = cloudResourceIDLines[i] + } + status := "" + if i < len(statusLines) { + status = statusLines[i] + } + + fmt.Printf("%-*s %-*s %-*s %-*s %-*s\n", idWidth, id, kindWidth, kind, nameWidth, name, cloudResourceIDWidth, cloudResourceID, statusWidth, status) + } +} + +// Helper function to wrap text based on a given width +func wrapText(text string, width int) []string { + if len(text) <= width { + return []string{text} + } + + var lines []string + for len(text) > width { + lines = append(lines, text[:width]) + text = text[width:] + } + lines = append(lines, text) + return lines +} + +// Helper function to find the maximum of two integers +func maxNumber(nums ...int) int { + if len(nums) == 0 { + panic("max() arg is an empty sequence") + } + + maxNum := nums[0] + for _, num := range nums[1:] { + if num > maxNum { + maxNum = num + } + } + return maxNum +} diff --git a/pkg/cmd/resource/resource.go b/pkg/cmd/resource/resource.go index 060692f04..927478bbd 100644 --- a/pkg/cmd/resource/resource.go +++ b/pkg/cmd/resource/resource.go @@ -23,5 +23,6 @@ func NewCmdRes(streams genericiooptions.IOStreams) *cobra.Command { Run: cmdutil.DefaultSubCommandRun(streams.ErrOut), } + cmd.AddCommand(NewCmdGraph(streams)) return cmd } diff --git a/pkg/engine/operation/apply.go b/pkg/engine/operation/apply.go index 742ef8abb..89f6e6448 100644 --- a/pkg/engine/operation/apply.go +++ b/pkg/engine/operation/apply.go @@ -13,6 +13,7 @@ import ( "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/operation/parser" "kusionstack.io/kusion/pkg/engine/release" + resourcegraph "kusionstack.io/kusion/pkg/engine/resource/graph" runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/third_party/terraform/dag" @@ -26,10 +27,12 @@ type ApplyOperation struct { type ApplyRequest struct { models.Request Release *apiv1.Release + Graph *apiv1.Graph } type ApplyResponse struct { Release *apiv1.Release + Graph *apiv1.Graph } // Apply means turn all actual infra resources into the desired state described in the request by invoking a specified Runtime. @@ -88,6 +91,8 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta return nil, s } log.Infof("Apply Graph:\n%s", applyGraph.String()) + // Get dependencies and dependents of each node to be populated into resource graph. + resourceGraph := populateResourceGraph(applyGraph, req.Graph) rel, s := copyRelease(req.Release) if v1.IsErr(s) { @@ -120,7 +125,7 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta return nil, s } - return &ApplyResponse{Release: applyOperation.Release}, nil + return &ApplyResponse{Release: applyOperation.Release, Graph: resourceGraph}, nil } func (ao *ApplyOperation) walkFun(v dag.Vertex) (diags tfdiags.Diagnostics) { @@ -134,6 +139,9 @@ func validateApplyRequest(req *ApplyRequest) v1.Status { if err := release.ValidateRelease(req.Release); err != nil { return v1.NewErrorStatusWithMsg(v1.InvalidArgument, err.Error()) } + if err := resourcegraph.ValidateGraph(req.Graph); err != nil { + return v1.NewErrorStatusWithMsg(v1.InvalidArgument, err.Error()) + } if req.Release.Phase != apiv1.ReleasePhaseApplying { return v1.NewErrorStatusWithMsg(v1.InvalidArgument, "release phase is not applying") } @@ -199,3 +207,29 @@ func applyWalkFun(o *models.Operation, v dag.Vertex) (diags tfdiags.Diagnostics) } return diags } + +// populateResourceGraph populate dependents and dependencies of each resource in resource graph with acyclicGraph +func populateResourceGraph(applyGraph *dag.AcyclicGraph, resourceGraph *apiv1.Graph) *apiv1.Graph { + for _, vertex := range applyGraph.Vertices() { + // Get resource ID from vertex. + resourceName := dag.VertexName(vertex) + resource := resourcegraph.FindGroupResourceByID(resourceGraph.Resources, resourceName) + // Populate it's dependents and dependencies + if resource != nil { + dependents := applyGraph.DownEdges(vertex) + dependencies := applyGraph.UpEdges(vertex) + + for _, dependent := range dependents { + dependentName := dag.VertexName(dependent) + resource.Dependents = append(resource.Dependents, dependentName) + } + + for _, dependency := range dependencies { + dependencyName := dag.VertexName(dependency) + resource.Dependencies = append(resource.Dependencies, dependencyName) + } + } + } + + return resourceGraph +} diff --git a/pkg/engine/operation/apply_test.go b/pkg/engine/operation/apply_test.go index a02fd3749..9a59b6a0f 100644 --- a/pkg/engine/operation/apply_test.go +++ b/pkg/engine/operation/apply_test.go @@ -1,22 +1,24 @@ package operation import ( + "reflect" "sync" "testing" "time" "github.com/bytedance/mockey" "github.com/stretchr/testify/assert" - apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" "kusionstack.io/kusion/pkg/engine/operation/graph" "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/release" "kusionstack.io/kusion/pkg/engine/release/storages" + resourcegraph "kusionstack.io/kusion/pkg/engine/resource/graph" "kusionstack.io/kusion/pkg/engine/runtime" runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" "kusionstack.io/kusion/pkg/engine/runtime/kubernetes" + "kusionstack.io/kusion/third_party/terraform/dag" ) func TestApplyOperation_Apply(t *testing.T) { @@ -96,7 +98,8 @@ func TestApplyOperation_Apply(t *testing.T) { Path: "fake-path", Stacks: []*apiv1.Stack{fakeStack}, } - + fakeGraph := &apiv1.Graph{Project: fakeRelease.Project, Workspace: fakeRelease.Workspace} + resourcegraph.GenerateGraph(fakeSpec.Resources, fakeGraph) testcases := []struct { name string fields fields @@ -118,8 +121,9 @@ func TestApplyOperation_Apply(t *testing.T) { Project: fakeProject, }, Release: fakeRelease, + Graph: fakeGraph, }}, - expectedResponse: &ApplyResponse{Release: fakeUpdateRelease}, + expectedResponse: &ApplyResponse{Release: fakeUpdateRelease, Graph: fakeGraph}, expectedStatus: nil, }, } @@ -152,7 +156,7 @@ func TestApplyOperation_Apply(t *testing.T) { ) (map[apiv1.Type]runtime.Runtime, v1.Status) { return map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: &kubernetes.KubernetesRuntime{}}, nil }).Build() - + mockey.Mock(populateResourceGraph).Return(fakeGraph).Build() rsp, status := ao.Apply(tc.args.applyRequest) assert.Equal(t, tc.expectedResponse, rsp) assert.Equal(t, tc.expectedStatus, status) @@ -187,9 +191,87 @@ func Test_ValidateApplyRequest(t *testing.T) { mockey.PatchConvey("mock valid release and spec", t, func() { mockey.Mock(release.ValidateRelease).Return(nil).Build() mockey.Mock(release.ValidateSpec).Return(nil).Build() + mockey.Mock(resourcegraph.ValidateGraph).Return(nil).Build() err := validateApplyRequest(tc.req) assert.Equal(t, tc.success, err == nil) }) }) } } + +func Test_populateResourceGraph(t *testing.T) { + graph := &dag.AcyclicGraph{ + Graph: dag.Graph{}, + } + graph.Add("mock-ID1") + graph.Add("mock-ID2") + graph.Add("mock-ID") + graph.Connect(dag.BasicEdge("mock-ID", "mock-ID1")) + graph.Connect(dag.BasicEdge("mock-ID2", "mock-ID")) + testResource := &apiv1.GraphResource{ + ID: "mock-ID", + Type: "mock-type", + Name: "mock-name", + CloudResourceID: "", + Status: "", + Dependents: []string{}, + Dependencies: []string{}, + } + mockResource := &apiv1.GraphResource{ + ID: "mock-ID", + Type: "mock-type", + Name: "mock-name", + CloudResourceID: "", + Status: "", + Dependents: []string{"mock-ID1"}, + Dependencies: []string{"mock-ID2"}, + } + type args struct { + applyGraph *dag.AcyclicGraph + resourceGraph *apiv1.Graph + } + tests := []struct { + name string + args args + want *apiv1.Graph + }{ + { + name: "poplute resource dependents and dependencies", + args: args{ + applyGraph: graph, + resourceGraph: &apiv1.Graph{ + Project: "project name", + Workspace: "workspace name", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"mock-ID": testResource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + }, + }, + want: &apiv1.Graph{ + Project: "project name", + Workspace: "workspace name", + Resources: &apiv1.GraphResources{ + WorkloadResources: map[string]*apiv1.GraphResource{"mock-ID": mockResource}, + DependencyResources: map[string]*apiv1.GraphResource{}, + OtherResources: map[string]*apiv1.GraphResource{}, + ResourceIndex: map[string]*apiv1.ResourceEntry{}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourcegraph.UpdateResourceIndex(tt.args.resourceGraph.Resources) + got := populateResourceGraph(tt.args.applyGraph, tt.args.resourceGraph) + if !reflect.DeepEqual(got.Resources.WorkloadResources["mock-ID"].Dependents, tt.want.Resources.WorkloadResources["mock-ID"].Dependents) { + t.Errorf("populateResourceGraph() = %v, want %v", got.Resources.WorkloadResources["mock-ID"], tt.want.Resources.WorkloadResources["mock-ID"]) + } + if !reflect.DeepEqual(got.Resources.WorkloadResources["mock-ID"].Dependencies, tt.want.Resources.WorkloadResources["mock-ID"].Dependencies) { + t.Errorf("populateResourceGraph() = %v, want %v", got.Resources.WorkloadResources["mock-ID"], tt.want.Resources.WorkloadResources["mock-ID"]) + } + }) + } +} diff --git a/pkg/engine/resource/graph/storage.go b/pkg/engine/resource/graph/storage.go new file mode 100644 index 000000000..e0b9811de --- /dev/null +++ b/pkg/engine/resource/graph/storage.go @@ -0,0 +1,22 @@ +package graph + +import v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + +// Storage is used to provide storage service for multiple Releases of a specified Project +// and Workspace. +type Storage interface { + // Get returns a specified Graph. + Get() (*v1.Graph, error) + + // Create creates a new Graph in the Storage. + Create(*v1.Graph) error + + // Update updates an existing Graph in the Storage. + Update(*v1.Graph) error + + // Delete deletes an existing Graph in the Storage. + Delete() error + + // CheckGraphStorageExistence checks if the Graph storage exists. + CheckGraphStorageExistence() bool +} diff --git a/pkg/engine/resource/graph/storages/local.go b/pkg/engine/resource/graph/storages/local.go new file mode 100644 index 000000000..4fd5e23ac --- /dev/null +++ b/pkg/engine/resource/graph/storages/local.go @@ -0,0 +1,100 @@ +package storages + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/engine/resource/graph" +) + +// LocalStorage is an implementation of resource.Storage which uses local filesystem as storage. +type LocalStorage struct { + // The directory path to store the resource files. + path string +} + +// NewLocalStorage news local resource storage, and derives graph. +// For instance, local path is ~/.kusion/resources/project/workspace +func NewLocalStorage(path string) (*LocalStorage, error) { + s := &LocalStorage{path: path} + + // create the resources directory + if err := os.MkdirAll(s.path, os.ModePerm); err != nil { + return nil, fmt.Errorf("create resources directory failed, %w", err) + } + return s, nil +} + +// Get gets the graph from local. +func (s *LocalStorage) Get() (*v1.Graph, error) { + content, err := os.ReadFile(filepath.Join(s.path, graphFileName)) + if err != nil { + return nil, fmt.Errorf("read resource graph file failed: %w", err) + } + + r := &v1.Graph{} + if err = json.Unmarshal(content, r); err != nil { + return nil, fmt.Errorf("json unmarshal graph failed: %w", err) + } + + // Index is not stored in s3, so we need to rebuild it. + // Update resource index to use index in the memory. + graph.UpdateResourceIndex(r.Resources) + + return r, nil +} + +// Create creates the graph in s3. +func (s *LocalStorage) Create(r *v1.Graph) error { + content, _ := os.ReadFile(filepath.Join(s.path, graphFileName)) + if content != nil { + return ErrGraphAlreadyExist + } + + return s.writeGraph(r) +} + +// Update updates the graph in s3. +func (s *LocalStorage) Update(r *v1.Graph) error { + _, err := os.ReadFile(filepath.Join(s.path, graphFileName)) + if err != nil { + return ErrGraphNotExist + } + + return s.writeGraph(r) +} + +// Delete deletes the graph in s3 +func (s *LocalStorage) Delete() error { + _, err := os.ReadFile(filepath.Join(s.path, graphFileName)) + if !os.IsNotExist(err) { + if err := os.Remove(filepath.Join(s.path, graphFileName)); err != nil { + return fmt.Errorf("remove graph file failed: %w", err) + } + } + + return nil +} + +// writeGraph writes the graph to s3. +func (s *LocalStorage) writeGraph(r *v1.Graph) error { + content, err := json.Marshal(r) + if err != nil { + return fmt.Errorf("json marshal graph failed: %w", err) + } + + if err = os.WriteFile(filepath.Join(s.path, graphFileName), content, os.ModePerm); err != nil { + return fmt.Errorf("write graph file failed: %w", err) + } + + return nil +} + +// CheckGraphStorageExistence checks whether the graph storage exists. +func (s *LocalStorage) CheckGraphStorageExistence() bool { + _, err := os.ReadFile(filepath.Join(s.path, graphFileName)) + return !os.IsNotExist(err) +} diff --git a/pkg/engine/resource/graph/storages/oss.go b/pkg/engine/resource/graph/storages/oss.go new file mode 100644 index 000000000..ac5af5559 --- /dev/null +++ b/pkg/engine/resource/graph/storages/oss.go @@ -0,0 +1,116 @@ +package storages + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/engine/resource/graph" +) + +// OssStorage is an implementation of graph.Storage which uses oss as storage. +type OssStorage struct { + bucket *oss.Bucket + + // The prefix to store the graph files. + prefix string +} + +// NewOssStorage news oss graph storage, and derives metadata. +func NewOssStorage(bucket *oss.Bucket, prefix string) (*OssStorage, error) { + s := &OssStorage{ + bucket: bucket, + prefix: prefix, + } + + return s, nil +} + +// Get gets the graph from oss. +func (s *OssStorage) Get() (*v1.Graph, error) { + body, err := s.bucket.GetObject(fmt.Sprintf("%s/%s", s.prefix, graphFileName)) + if err != nil { + return nil, fmt.Errorf("get resource graph from oss failed: %w", err) + } + defer func() { + _ = body.Close() + }() + content, err := io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("read resource graph failed: %w", err) + } + + r := &v1.Graph{} + if err = json.Unmarshal(content, r); err != nil { + return nil, fmt.Errorf("json unmarshal graph failed: %w", err) + } + // Index is not stored in oss, so we need to rebuild it. + // Update resource index to use index in the memory. + + graph.UpdateResourceIndex(r.Resources) + + return r, nil +} + +// Create creates the graph in oss. +func (s *OssStorage) Create(r *v1.Graph) error { + body, _ := s.bucket.GetObject(fmt.Sprintf("%s/%s", s.prefix, graphFileName)) + if body != nil { + return ErrGraphAlreadyExist + } + defer func() { + _ = body.Close() + }() + + return s.writeGraph(r) +} + +// Update updates the graph in oss. +func (s *OssStorage) Update(r *v1.Graph) error { + body, err := s.bucket.GetObject(fmt.Sprintf("%s/%s", s.prefix, graphFileName)) + if err != nil { + return ErrGraphNotExist + } + defer func() { + _ = body.Close() + }() + + return s.writeGraph(r) +} + +// Delete deletes the graph in oss. +func (s *OssStorage) Delete() error { + if err := s.bucket.DeleteObject(fmt.Sprintf("%s/%s", s.prefix, graphFileName)); err != nil { + return fmt.Errorf("remove workspace in oss failed: %w", err) + } + + return nil +} + +// writeGraph writes the graph to oss. +func (s *OssStorage) writeGraph(r *v1.Graph) error { + content, err := json.Marshal(r) + if err != nil { + return fmt.Errorf("json marshal graph failed: %w", err) + } + + key := fmt.Sprintf("%s/%s", s.prefix, graphFileName) + if err = s.bucket.PutObject(key, bytes.NewReader(content)); err != nil { + return fmt.Errorf("put graph to oss failed: %w", err) + } + + return nil +} + +// CheckGraphStorageExistence checks whether the graph storage exists. +func (s *OssStorage) CheckGraphStorageExistence() bool { + body, err := s.bucket.GetObject(fmt.Sprintf("%s/%s", s.prefix, graphFileName)) + defer func() { + _ = body.Close() + }() + + return err == nil +} diff --git a/pkg/engine/resource/graph/storages/s3.go b/pkg/engine/resource/graph/storages/s3.go new file mode 100644 index 000000000..58fab7691 --- /dev/null +++ b/pkg/engine/resource/graph/storages/s3.go @@ -0,0 +1,134 @@ +package storages + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/engine/resource/graph" +) + +// S3Storage is an implementation of graph.Storage which uses s3 as storage. +type S3Storage struct { + s3 *s3.S3 + bucket string + + // The prefix to store the graph files. + prefix string +} + +// NewS3Storage news s3 graph storage, and derives metadata. +func NewS3Storage(s3 *s3.S3, bucket, prefix string) (*S3Storage, error) { + s := &S3Storage{ + s3: s3, + bucket: bucket, + prefix: prefix, + } + return s, nil +} + +// Get gets the graph from s3. +func (s *S3Storage) Get() (*v1.Graph, error) { + output, err := getS3StorageObject(s.s3, s.bucket, s.prefix, graphFileName) + if err != nil { + return nil, fmt.Errorf("get graph from s3 failed: %w", err) + } + defer func() { + _ = output.Body.Close() + }() + content, err := io.ReadAll(output.Body) + if err != nil { + return nil, fmt.Errorf("read graph failed: %w", err) + } + + r := &v1.Graph{} + if err = json.Unmarshal(content, r); err != nil { + return nil, fmt.Errorf("json unmarshal graph failed: %w", err) + } + + // Index is not stored in s3, so we need to rebuild it. + // Update resource index to use index in the memory. + graph.UpdateResourceIndex(r.Resources) + + return r, nil +} + +// Create creates the graph in s3. +func (s *S3Storage) Create(r *v1.Graph) error { + output, _ := getS3StorageObject(s.s3, s.bucket, s.prefix, graphFileName) + if output != nil { + return ErrGraphAlreadyExist + } + + return s.writeGraph(r) +} + +// Update updates the graph in s3. +func (s *S3Storage) Update(r *v1.Graph) error { + _, err := getS3StorageObject(s.s3, s.bucket, s.prefix, graphFileName) + if err != nil { + return ErrGraphNotExist + } + + return s.writeGraph(r) +} + +// Delete deletes the graph in s3 +func (s *S3Storage) Delete() error { + input := &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(fmt.Sprintf("%s/%s", s.prefix, graphFileName)), + } + if _, err := s.s3.DeleteObject(input); err != nil { + return fmt.Errorf("remove workspace in s3 failed: %w", err) + } + + return nil +} + +// writeGraph writes the graph to s3. +func (s *S3Storage) writeGraph(r *v1.Graph) error { + content, err := json.Marshal(r) + if err != nil { + return fmt.Errorf("json marshal graph failed: %w", err) + } + + input := &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(fmt.Sprintf("%s/%s", s.prefix, graphFileName)), + Body: bytes.NewReader(content), + } + if _, err = s.s3.PutObject(input); err != nil { + return fmt.Errorf("put graph to s3 failed: %w", err) + } + + return nil +} + +// CheckGraphStorageExistence checks whether the graph storage exists. +func (s *S3Storage) CheckGraphStorageExistence() bool { + if _, err := getS3StorageObject(s.s3, s.bucket, s.prefix, graphFileName); err != nil { + return false + } + + return true +} + +// getS3StorageObject gets the graph object from s3. +func getS3StorageObject(s *s3.S3, bucket, prefix, graphFileName string) (*s3.GetObjectOutput, error) { + key := fmt.Sprintf("%s/%s", prefix, graphFileName) + input := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: &key, + } + output, err := s.GetObject(input) + if err != nil { + return nil, fmt.Errorf("get graph from s3 failed: %w", err) + } + + return output, nil +} diff --git a/pkg/engine/resource/graph/storages/util.go b/pkg/engine/resource/graph/storages/util.go new file mode 100644 index 000000000..231455c06 --- /dev/null +++ b/pkg/engine/resource/graph/storages/util.go @@ -0,0 +1,43 @@ +package storages + +import ( + "errors" + "fmt" + "path/filepath" + "strings" +) + +const ( + resourcesPrefix = "resources" + graphFileName = "graph.json" +) + +var ( + ErrGraphNotExist = errors.New("graph does not exist") + ErrGraphAlreadyExist = errors.New("graph has already existed") +) + +// GenResourceDirPath generates the resource dir path, which is used for LocalStorage. +func GenGraphDirPath(dir, project, workspace string) string { + return filepath.Join(dir, resourcesPrefix, project, workspace) +} + +// GenGenericOssResourcePrefixKey generates generic oss resource prefix, which is use for OssStorage and S3Storage. +func GenGenericOssResourcePrefixKey(prefix, project, workspace string) string { + prefix = strings.TrimPrefix(prefix, "/") + if prefix != "" { + prefix += "/" + } + + return fmt.Sprintf("%s%s/%s/%s", prefix, resourcesPrefix, project, workspace) +} + +// GenResourcePrefixKeyWithPath generates oss state file key with cloud and env instead of workspace, which is use for OssStorage and S3Storage. +func GenResourcePrefixKeyWithPath(prefix, path string) string { + prefix = strings.TrimPrefix(prefix, "/") + if prefix != "" { + prefix += "/" + } + + return fmt.Sprintf("%s%s/%s", prefix, resourcesPrefix, path) +} diff --git a/pkg/engine/resource/graph/util.go b/pkg/engine/resource/graph/util.go new file mode 100644 index 000000000..fca72b64f --- /dev/null +++ b/pkg/engine/resource/graph/util.go @@ -0,0 +1,257 @@ +package graph + +import ( + "fmt" + "strings" + + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" + "kusionstack.io/kusion/pkg/log" +) + +const ( + AWSProviderType = "aws" + AliCloudProviderType = "alicloud" + AzureProviderType = "azure" + GoogleProviderType = "google" + AntProviderType = "ant" + // AntProviderRegistrySuffix = "alipay.com" + CustomProviderType = "custom" + workloadCategory = "workload" + dependencyCategory = "dependency" + otherCategory = "other" +) + +type ResourceInfo struct { + ResourceType string + CloudResourceID string + ResourceName string +} + +// addGraphResource adds a GraphResource to the Graph +func addGraphResource(gr *v1.GraphResources, resource *v1.GraphResource, category string) { + var resourceCollection map[string]*v1.GraphResource + switch category { + case workloadCategory: + if gr.WorkloadResources == nil { + gr.WorkloadResources = map[string]*v1.GraphResource{} + } + resourceCollection = gr.WorkloadResources + case dependencyCategory: + if gr.DependencyResources == nil { + gr.DependencyResources = map[string]*v1.GraphResource{} + } + resourceCollection = gr.DependencyResources + case otherCategory: + if gr.OtherResources == nil { + gr.OtherResources = map[string]*v1.GraphResource{} + } + resourceCollection = gr.OtherResources + } + + // Add the resource to the selected collection. + resourceCollection[resource.ID] = resource + if gr.ResourceIndex == nil { + gr.ResourceIndex = map[string]*v1.ResourceEntry{} + } + // Update the global resource index with the new resource and its category. + gr.ResourceIndex[resource.ID] = &v1.ResourceEntry{ + Resource: resource, + Category: resourceCollection, + } +} + +// FindGroupResourceByID searches for a GraphResource by its ID in the resource index. +// If the resource is found, it returns the corresponding GraphResource. Otherwise, it returns nil. +func FindGroupResourceByID(gr *v1.GraphResources, id string) *v1.GraphResource { + if entry, found := gr.ResourceIndex[id]; found { + return entry.Resource + } + + return nil +} + +// FindGroupResourceCollectionByID retrieves the resource collection for a specific resource ID. +// It returns the collection (category) to which the resource belongs or nil if the resource is not found. +func FindGroupResourceCollectionByID(gr *v1.GraphResources, id string) map[string]*v1.GraphResource { + if entry, found := gr.ResourceIndex[id]; found { + return entry.Category + } + + return nil +} + +// GenerateGraph generate a new graph from resources in the spec before apply operation is applied. +func GenerateGraph(resources v1.Resources, gph *v1.Graph) (*v1.Graph, error) { + log.Infof("Adding spec resources to graph...") + if gph.Resources == nil { + gph.Resources = &v1.GraphResources{ + WorkloadResources: map[string]*v1.GraphResource{}, + DependencyResources: map[string]*v1.GraphResource{}, + OtherResources: map[string]*v1.GraphResource{}, + ResourceIndex: map[string]*v1.ResourceEntry{}, + } + } + // Get workload resources and its dependsOn resources + for _, res := range resources { + if isResourceWorkload(&res) { + workload := &v1.GraphResource{ + ID: res.ID, + Status: v1.ApplyFail, // status initialized to failure before actual apply operation + } + addGraphResource(gph.Resources, workload, workloadCategory) + + if len(res.DependsOn) != 0 { + for _, dependOn := range res.DependsOn { + dependOn := &v1.GraphResource{ + ID: dependOn, + Status: v1.ApplyFail, // status initialized to failure before actual apply operation + } + addGraphResource(gph.Resources, dependOn, dependencyCategory) + } + } + log.Infof("Added workload resource %s to graph", workload.ID) + break + } + } + + // Put other resources to graph + for _, res := range resources { + if found := FindGroupResourceByID(gph.Resources, res.ID); found == nil { + other := &v1.GraphResource{ + ID: res.ID, + Status: v1.ApplyFail, // status initialized to failure before actual apply operation + } + addGraphResource(gph.Resources, other, otherCategory) + log.Infof("Added workload irrelevant resource %s to graph", other.ID) + } + } + + return gph, nil +} + +// RemoveResource removes a GraphResource from its category and the global resource index. +func RemoveResource(gph *v1.Graph, resource *v1.GraphResource) { + // Remove the resource from the category it belongs to + delete(gph.Resources.ResourceIndex[resource.ID].Category, resource.ID) + delete(gph.Resources.ResourceIndex, resource.ID) + resource = nil +} + +// RemoveResourceIndex clears the entire resource index of the Graph. +func RemoveResourceIndex(gph *v1.Graph) { + gph.Resources.ResourceIndex = nil +} + +// isResourceWorkload checks if a resource is identified as a workload. +// It looks for the 'FieldIsWorkload' extension in the resource metadata. +func isResourceWorkload(res *v1.Resource) bool { + if res.Extensions != nil { + isWorkload := res.Extensions[v1.FieldIsWorkload] + if isWorkload != nil && isWorkload.(bool) { + return true + } + } + + return false +} + +// GetResourceInfo gets all the essential information for a resource to populate into the resource graph. +func GetResourceInfo(resource *v1.Resource) (*ResourceInfo, error) { + // ApiVersion:Kind:Namespace:Name is an idiomatic way for Kubernetes resources. + // providerNamespace:providerName:resourceType:resourceName for Terraform resources. + + // Meta determines whether this is a Kubernetes resource or Terraform resource. + resourceTypeMeta := resource.Type + var resourceType, resourcePlane, cloudResourceID, resourceName string + // Split the resource name to get the parts + idParts := strings.Split(resource.ID, ":") + if len(idParts) != 4 { + // This indicates a Kubernetes resource without the namespace. + if len(idParts) == 3 && resource.Type == v1.Kubernetes { + modifiedID := fmt.Sprintf("%s:%s:%s:%s", idParts[0], idParts[1], "", idParts[2]) + idParts = strings.Split(modifiedID, ":") + } else { + return nil, fmt.Errorf("invalid resource ID: %s", resource.ID) + } + } + + // Determine resource plane and resource type based on meta type. + switch resourceTypeMeta { + case v1.Kubernetes: + resourcePlane = string(v1.Kubernetes) + // if this is Kubernetes resource, resource type is apiVersion/kind, resource name is namespace/name. + resourceType = fmt.Sprintf("%s:%s", idParts[0], idParts[1]) + if idParts[2] == "" { + resourceName = idParts[3] + } else { + resourceName = fmt.Sprintf("%s/%s", idParts[2], idParts[3]) + } + case v1.Terraform: + // Get provider info for terraform resources. + // Look at second element of the id to determine the resource plane. + switch idParts[1] { + case AWSProviderType: + resourcePlane = AWSProviderType + resourceType = idParts[2] + resourceName = idParts[3] + if arn, ok := resource.Attributes["arn"].(string); ok { + cloudResourceID = arn + } + case AzureProviderType: + resourcePlane = AzureProviderType + resourceType = idParts[2] + resourceName = idParts[3] + if resID, ok := resource.Attributes["id"].(string); ok { + cloudResourceID = resID + } + case GoogleProviderType: + resourcePlane = GoogleProviderType + resourceType = idParts[2] + resourceName = idParts[3] + if resID, ok := resource.Attributes["id"].(string); ok { + cloudResourceID = resID + } + case AliCloudProviderType: + resourcePlane = AliCloudProviderType + resourceType = idParts[2] + resourceName = idParts[3] + if resID, ok := resource.Attributes["id"].(string); ok { + cloudResourceID = resID + } + default: + if _, ok := resource.Extensions["provider"]; ok { + resourcePlane = CustomProviderType + resourceType = idParts[2] + } + } + default: + return nil, fmt.Errorf("unsupported resource type: %s", resourceTypeMeta) + } + + return &ResourceInfo{ + ResourceType: fmt.Sprintf("%s:%s", resourcePlane, resourceType), + CloudResourceID: cloudResourceID, + ResourceName: resourceName, + }, nil +} + +// UpdateResourceIndex updates the global resource index for all resources in GraphResources. +func UpdateResourceIndex(graphResources *v1.GraphResources) { + if graphResources.WorkloadResources != nil { + for _, resources := range graphResources.WorkloadResources { + addGraphResource(graphResources, resources, workloadCategory) + } + } + + if graphResources.DependencyResources != nil { + for _, resources := range graphResources.DependencyResources { + addGraphResource(graphResources, resources, dependencyCategory) + } + } + + if graphResources.OtherResources != nil { + for _, resources := range graphResources.OtherResources { + addGraphResource(graphResources, resources, otherCategory) + } + } +} diff --git a/pkg/engine/resource/graph/validation.go b/pkg/engine/resource/graph/validation.go new file mode 100644 index 000000000..77bbd257e --- /dev/null +++ b/pkg/engine/resource/graph/validation.go @@ -0,0 +1,38 @@ +package graph + +import ( + "errors" + + v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" +) + +var ( + ErrEmptyGraph = errors.New("empty graph") + ErrEmptyProject = errors.New("empty project") + ErrEmptyWorkspace = errors.New("empty workspace") + ErrEmptyResources = errors.New("empty resources") + ErrEmptyResourceIndex = errors.New("empty resource index") +) + +// ValidateGraph checks the validity of a Graph object. +// It ensures that the essential fields within the Graph structure are not empty or nil. +// If any of the required fields are missing, an appropriate error is returned. +func ValidateGraph(graph *v1.Graph) error { + if graph == nil { + return ErrEmptyGraph + } + if graph.Project == "" { + return ErrEmptyProject + } + if graph.Workspace == "" { + return ErrEmptyWorkspace + } + if graph.Resources == nil { + return ErrEmptyResources + } + + if graph.Resources.ResourceIndex == nil { + return ErrEmptyResourceIndex + } + return nil +} diff --git a/pkg/modules/generators/app_configurations_generator.go b/pkg/modules/generators/app_configurations_generator.go index dab7cebf2..8c5b9e170 100644 --- a/pkg/modules/generators/app_configurations_generator.go +++ b/pkg/modules/generators/app_configurations_generator.go @@ -478,6 +478,8 @@ func (g *appConfigurationGenerator) callModules(projectModuleConfigs map[string] if err != nil { return nil, nil, nil, err } + // add isWorkload extension to workload to mark workload + workload.Extensions[isWorkload] = true } else { for _, res := range response.Resources { temp := &v1.Resource{}