From d41464591abe1741fd39c2c9ddd08412fd6e7209 Mon Sep 17 00:00:00 2001 From: tharindu1st Date: Sun, 28 Jan 2024 22:27:50 +0530 Subject: [PATCH] fix comments --- .../internal/config/default_config.go | 3 +- common-controller/internal/config/types.go | 13 +- .../controlplane/artifact_deployer.go | 4 - .../controlplane/controlplane_client.go | 213 +++++++++++++- .../controlplane/k8s_artifact_deployer.go | 261 ++++++++++++++++-- .../internal/controlplane/types.go | 82 ++++++ .../internal/operator/operator.go | 27 +- .../internal/server/application_types.go | 1 + 8 files changed, 556 insertions(+), 48 deletions(-) create mode 100644 common-controller/internal/controlplane/types.go diff --git a/common-controller/internal/config/default_config.go b/common-controller/internal/config/default_config.go index c7c0747ac1..3097ba8d2c 100644 --- a/common-controller/internal/config/default_config.go +++ b/common-controller/internal/config/default_config.go @@ -36,9 +36,10 @@ var defaultConfig = &Config{ Environment: "Default", InternalAPIServer: internalAPIServer{Port: 18003}, ControlPlane: controlplane{ - Enabled: false, + Enabled: true, Host: "localhost", EventPort: 18000, + RestPort: 18001, RetryInterval: 5, Persistence: persistence{Type: "K8s"}}, }, diff --git a/common-controller/internal/config/types.go b/common-controller/internal/config/types.go index c0561b8e4b..372dc195e7 100644 --- a/common-controller/internal/config/types.go +++ b/common-controller/internal/config/types.go @@ -50,12 +50,13 @@ type commoncontroller struct { ControlPlane controlplane } type controlplane struct { - Enabled bool - Host string - EventPort int - RestPort int - RetryInterval time.Duration - Persistence persistence + Enabled bool + Host string + EventPort int + RestPort int + RetryInterval time.Duration + Persistence persistence + SkipSSLVerification bool } type persistence struct { Type string diff --git a/common-controller/internal/controlplane/artifact_deployer.go b/common-controller/internal/controlplane/artifact_deployer.go index cb515ade8f..838805fa55 100644 --- a/common-controller/internal/controlplane/artifact_deployer.go +++ b/common-controller/internal/controlplane/artifact_deployer.go @@ -39,10 +39,6 @@ type ArtifactDeployer interface { DeleteSubscription(subscriptionID string) error DeleteApplicationMappings(applicationID string) error DeleteKeyMappings(keyMapping server.ApplicationKeyMapping) error - DeleteAllApplications() error - DeleteAllSubscriptions() error - DeleteAllApplicationMappings() error - DeleteAllKeyMappings() error DeployAllApplications(applications server.ApplicationList) error DeployAllSubscriptions(subscriptions server.SubscriptionList) error DeployAllApplicationMappings(applicationMappings server.ApplicationMappingList) error diff --git a/common-controller/internal/controlplane/controlplane_client.go b/common-controller/internal/controlplane/controlplane_client.go index 835962559e..9834139b5c 100644 --- a/common-controller/internal/controlplane/controlplane_client.go +++ b/common-controller/internal/controlplane/controlplane_client.go @@ -3,8 +3,14 @@ package controlplane import ( "context" "crypto/tls" + "encoding/json" + "errors" "fmt" "io" + "io/ioutil" + "net/http" + "reflect" + "strconv" "time" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -31,8 +37,29 @@ type Agent struct { } var ( + subsriptionList *SubscriptionList + applicationList *ApplicationList + appKeyMappingList *ApplicationKeyMappingList + appMappingList *ApplicationMappingList connectionFaultChannel chan bool eventStreamingClient apkmgt.EventStreamService_StreamEventsClient + resources = []resource{ + { + endpoint: "/subscriptions", + responseType: subsriptionList, + }, + { + endpoint: "/applications", + responseType: applicationList, + }, + { + endpoint: "/applicationkeymappings", + responseType: appKeyMappingList, + }, + {endpoint: "/applicationmappings", + responseType: appMappingList, + }, + } ) func init() { @@ -130,7 +157,7 @@ func (controlPlaneGrpcClient *Agent) initializeGrpcStreaming() *grpc.ClientConn func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) { loggers.LoggerAPKOperator.Infof("Received event %s", event.Type) if event.Type == constants.AllEvnts { - retrieveAllData() + go controlPlaneGrpcClient.retrieveAllData() } else if event.Type == constants.ApplicationCreated { loggers.LoggerAPKOperator.Infof("Received APPLICATION_CREATED event.") if event.Application != nil { @@ -139,6 +166,7 @@ func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) { Owner: event.Application.Owner, OrganizationID: event.Application.Organization, Attributes: event.Application.Attributes, + TimeStamp: event.TimeStamp, } loggers.LoggerAPKOperator.Infof("Received Application %s", application.UUID) controlPlaneGrpcClient.artifactDeployer.DeployApplication(application) @@ -260,6 +288,187 @@ func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) { } } -func retrieveAllData() { +func (controlPlaneGrpcClient *Agent) retrieveAllData() { + var responseChannel = make(chan response) + config := config.ReadConfigs() + for _, url := range resources { + // Create a local copy of the loop variable + localURL := url + + go InvokeService(localURL.endpoint, localURL.responseType, nil, responseChannel, 0) + + for { + data := <-responseChannel + loggers.LoggerAPKOperator.Info("Receiving subscription data for an environment") + if data.Payload != nil { + loggers.LoggerAPKOperator.Info("Payload data information received" + string(data.Payload)) + controlPlaneGrpcClient.retrieveDataFromResponseChannel(data) + break + } else if data.ErrorCode >= 400 && data.ErrorCode < 500 { + //Error handle + loggers.LoggerAPKOperator.Info("Error data information received") + //health.SetControlPlaneRestAPIStatus(false) + } else { + // Keep the iteration going on until a response is received. + // Error handle + go func(d response, endpoint string, responseType interface{}) { + // Retry fetching from control plane after a configured time interval + if config.CommonController.ControlPlane.RetryInterval == 0 { + // Assign default retry interval + config.CommonController.ControlPlane.RetryInterval = 5 + } + loggers.LoggerAPKOperator.Debugf("Time Duration for retrying: %v", config.CommonController.ControlPlane.RetryInterval*time.Second) + time.Sleep(config.CommonController.ControlPlane.RetryInterval * time.Second) + loggers.LoggerAPKOperator.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", config.CommonController.ControlPlane.RetryInterval*time.Second) + go InvokeService(endpoint, responseType, nil, responseChannel, 0) + }(data, localURL.endpoint, localURL.responseType) + } + } + } +} +type resource struct { + endpoint string + responseType interface{} +} + +type response struct { + Error error + Payload []byte + ErrorCode int + Endpoint string + Type interface{} +} + +// InvokeService invokes the internal data resource +func InvokeService(endpoint string, responseType interface{}, queryParamMap map[string]string, c chan response, + retryAttempt int) { + config := config.ReadConfigs() + serviceURL := "https://" + config.CommonController.ControlPlane.Host + ":" + strconv.Itoa(config.CommonController.ControlPlane.RestPort) + endpoint + // Create the request + req, err := http.NewRequest("GET", serviceURL, nil) + if err != nil { + c <- response{err, nil, 0, endpoint, responseType} + loggers.LoggerAPKOperator.Errorf("Error occurred while creating an HTTP request for serviceURL: "+serviceURL, err) + return + } + q := req.URL.Query() + req.URL.RawQuery = q.Encode() + + // Check if TLS is enabled + skipSSL := config.CommonController.ControlPlane.SkipSSLVerification + resp, err := InvokeControlPlane(req, skipSSL) + + if err != nil { + if resp != nil { + c <- response{err, nil, resp.StatusCode, endpoint, responseType} + } else { + c <- response{err, nil, 0, endpoint, responseType} + } + loggers.LoggerAPKOperator.Infof("Error occurred while calling the REST API: "+serviceURL, err) + return + } + + responseBytes, err := ioutil.ReadAll(resp.Body) + if resp.StatusCode == http.StatusOK { + if err != nil { + c <- response{err, nil, resp.StatusCode, endpoint, responseType} + loggers.LoggerAPKOperator.Infof("Error occurred while reading the response received for: "+serviceURL, err) + return + } + c <- response{nil, responseBytes, resp.StatusCode, endpoint, responseType} + } else { + c <- response{errors.New(string(responseBytes)), nil, resp.StatusCode, endpoint, responseType} + loggers.LoggerAPKOperator.Infof("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode), + err) + } +} + +// InvokeControlPlane sends request to the control plane and returns the response +func InvokeControlPlane(req *http.Request, skipSSL bool) (*http.Response, error) { + tr := &http.Transport{} + if !skipSSL { + _, _, truststoreLocation := utils.GetKeyLocations() + caCertPool := utils.GetTrustedCertPool(truststoreLocation) + tr = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: caCertPool}, + } + } else { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + + // Configuring the http client + client := &http.Client{ + Transport: tr, + } + return client.Do(req) +} +func (controlPlaneGrpcClient *Agent) retrieveDataFromResponseChannel(response response) { + responseType := reflect.TypeOf(response.Type).Elem() + newResponse := reflect.New(responseType).Interface() + err := json.Unmarshal(response.Payload, &newResponse) + + if err != nil { + loggers.LoggerAPI.Infof("Error occurred while unmarshalling the response received for: "+response.Endpoint, err) + } else { + switch t := newResponse.(type) { + case *SubscriptionList: + loggers.LoggerAPI.Infof("Received Subscription information.") + subList := newResponse.(*SubscriptionList) + resolvedSubscriptionList := marshalMultipleSubscriptions(subList) + controlPlaneGrpcClient.artifactDeployer.DeployAllSubscriptions(resolvedSubscriptionList) + + case *ApplicationList: + loggers.LoggerAPI.Infof("Received Application information.") + appList := newResponse.(*ApplicationList) + resolvedApplicationList := marshalMultipleApplications(appList) + controlPlaneGrpcClient.artifactDeployer.DeployAllApplications(resolvedApplicationList) + case *ApplicationKeyMappingList: + loggers.LoggerAPI.Infof("Received Application Key Mapping information.") + appKeyMappingList := newResponse.(*ApplicationKeyMappingList) + resolvedApplicationKeyMappingList := marshalMultipleApplicationKeyMappings(appKeyMappingList) + controlPlaneGrpcClient.artifactDeployer.DeployAllKeyMappings(resolvedApplicationKeyMappingList) + case *ApplicationMappingList: + loggers.LoggerAPI.Infof("Received Application Mapping information.") + appMappingList := newResponse.(*ApplicationMappingList) + resolvedApplicationMappingList := marshalMultipleApplicationMappings(appMappingList) + controlPlaneGrpcClient.artifactDeployer.DeployAllApplicationMappings(resolvedApplicationMappingList) + default: + loggers.LoggerAPI.Debugf("Unknown type %T", t) + } + } +} +func marshalMultipleSubscriptions(subList *SubscriptionList) server.SubscriptionList { + subscriptionList := server.SubscriptionList{List: []server.Subscription{}} + for _, subscription := range subList.List { + loggers.LoggerAPI.Debugf("Subscription: %v", subscription) + subscriptionList.List = append(subscriptionList.List, server.Subscription{UUID: subscription.UUID, Organization: subscription.Organization, SubStatus: subscription.SubStatus, SubscribedAPI: &server.SubscribedAPI{Name: subscription.SubscribedAPI.Name, Version: subscription.SubscribedAPI.Version}}) + } + return subscriptionList +} +func marshalMultipleApplications(appList *ApplicationList) server.ApplicationList { + applicationList := server.ApplicationList{List: []server.Application{}} + for _, application := range appList.List { + loggers.LoggerAPI.Debugf("Application: %v", application) + applicationList.List = append(applicationList.List, server.Application{UUID: application.UUID, Name: application.Name, Owner: application.Owner, OrganizationID: application.Organization, Attributes: application.Attributes}) + } + return applicationList +} +func marshalMultipleApplicationKeyMappings(appKeyMappingList *ApplicationKeyMappingList) server.ApplicationKeyMappingList { + applicationKeyMappingList := server.ApplicationKeyMappingList{List: []server.ApplicationKeyMapping{}} + for _, applicationKeyMapping := range appKeyMappingList.List { + loggers.LoggerAPI.Debugf("ApplicationKeyMapping: %v", applicationKeyMapping) + applicationKeyMappingList.List = append(applicationKeyMappingList.List, server.ApplicationKeyMapping{ApplicationUUID: applicationKeyMapping.ApplicationUUID, SecurityScheme: applicationKeyMapping.SecurityScheme, ApplicationIdentifier: applicationKeyMapping.ApplicationIdentifier, KeyType: applicationKeyMapping.KeyType, EnvID: applicationKeyMapping.EnvID, OrganizationID: applicationKeyMapping.Organization}) + } + return applicationKeyMappingList +} +func marshalMultipleApplicationMappings(appMappingList *ApplicationMappingList) server.ApplicationMappingList { + applicationMappingList := server.ApplicationMappingList{List: []server.ApplicationMapping{}} + for _, applicationMapping := range appMappingList.List { + loggers.LoggerAPI.Debugf("ApplicationMapping: %v", applicationMapping) + applicationMappingList.List = append(applicationMappingList.List, server.ApplicationMapping{UUID: applicationMapping.UUID, ApplicationRef: applicationMapping.ApplicationRef, SubscriptionRef: applicationMapping.SubscriptionRef, OrganizationID: applicationMapping.Organization}) + } + return applicationMappingList } diff --git a/common-controller/internal/controlplane/k8s_artifact_deployer.go b/common-controller/internal/controlplane/k8s_artifact_deployer.go index b249798aa9..0f0b8a8a59 100644 --- a/common-controller/internal/controlplane/k8s_artifact_deployer.go +++ b/common-controller/internal/controlplane/k8s_artifact_deployer.go @@ -2,6 +2,7 @@ package controlplane import ( "context" + "strconv" "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/apk/common-controller/internal/loggers" @@ -14,6 +15,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) +const ( + // CreationTimeStamp constant for annotation creationTimeStamp + CreationTimeStamp = "creationTimeStamp" +) + // K8sArtifactDeployer is a struct that implements ArtifactDeployer interface type K8sArtifactDeployer struct { client client.Client @@ -26,13 +32,24 @@ func NewK8sArtifactDeployer(mgr manager.Manager) K8sArtifactDeployer { // DeployApplication deploys an application func (k8sArtifactDeployer K8sArtifactDeployer) DeployApplication(application server.Application) error { - crApplication := cpv1alpha2.Application{ObjectMeta: v1.ObjectMeta{Name: application.UUID, Namespace: utils.GetOperatorPodNamespace()}, - Spec: cpv1alpha2.ApplicationSpec{Name: application.Name, Owner: application.Owner, Organization: application.OrganizationID, Attributes: application.Attributes}} + crApplication := cpv1alpha2.Application{ + ObjectMeta: v1.ObjectMeta{ + Name: application.UUID, + Namespace: utils.GetOperatorPodNamespace(), + Labels: map[string]string{CreationTimeStamp: strconv.FormatInt(application.TimeStamp, 10)}, + }, + Spec: cpv1alpha2.ApplicationSpec{ + Name: application.Name, + Owner: application.Owner, + Organization: application.OrganizationID, + Attributes: application.Attributes, + }, + } loggers.LoggerAPKOperator.Debugf("Creating Application %s", application.UUID) loggers.LoggerAPKOperator.Debugf("Application CR %v ", crApplication) err := k8sArtifactDeployer.client.Create(context.Background(), &crApplication) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to create application in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to create application in k8s %v", err.Error())) return err } return nil @@ -44,7 +61,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateApplication(application ser err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: application.UUID, Namespace: utils.GetOperatorPodNamespace()}, &crApplication) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) return err } k8sArtifactDeployer.DeployApplication(application) @@ -55,7 +72,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateApplication(application ser crApplication.Spec.Attributes = application.Attributes err := k8sArtifactDeployer.client.Update(context.Background(), &crApplication) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to update application in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to update application in k8s %v", err.Error())) return err } } @@ -69,7 +86,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeploySubscription(subscription s Spec: cpv1alpha2.SubscriptionSpec{Organization: subscription.Organization, API: cpv1alpha2.API{Name: subscription.SubscribedAPI.Name, Version: subscription.SubscribedAPI.Version}, SubscriptionStatus: subscription.SubStatus}} err := k8sArtifactDeployer.client.Create(context.Background(), &crSubscription) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1101, logging.BLOCKER, "Failed to create subscription in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1101, logging.CRITICAL, "Failed to create subscription in k8s %v", err.Error())) return err } return nil @@ -81,7 +98,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateSubscription(subscription s err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: subscription.UUID, Namespace: utils.GetOperatorPodNamespace()}, &crSubscription) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get subscription from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get subscription from k8s %v", err.Error())) return err } k8sArtifactDeployer.DeploySubscription(subscription) @@ -92,7 +109,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateSubscription(subscription s crSubscription.Spec.SubscriptionStatus = subscription.SubStatus err := k8sArtifactDeployer.client.Update(context.Background(), &crSubscription) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to update subscription in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to update subscription in k8s %v", err.Error())) return err } } @@ -111,7 +128,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeployKeyMappings(keyMapping serv var crApplication cpv1alpha2.Application err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: keyMapping.ApplicationUUID, Namespace: utils.GetOperatorPodNamespace()}, &crApplication) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) return err } securitySchemes := cpv1alpha2.SecuritySchemes{} @@ -141,11 +158,6 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteAllApplicationMappings() er return nil } -// DeleteAllApplications deletes all applications -func (k8sArtifactDeployer K8sArtifactDeployer) DeleteAllApplications() error { - return nil -} - // DeleteAllKeyMappings deletes all key mappings func (k8sArtifactDeployer K8sArtifactDeployer) DeleteAllKeyMappings() error { return nil @@ -162,13 +174,13 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteApplication(applicationID s err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: applicationID, Namespace: utils.GetOperatorPodNamespace()}, &crApplication) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) return err } } else { err := k8sArtifactDeployer.client.Delete(context.Background(), &crApplication) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to delete application in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to delete application in k8s %v", err.Error())) return err } } @@ -182,13 +194,13 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteApplicationMappings(applica err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: applicationMapping, Namespace: utils.GetOperatorPodNamespace()}, &crApplicationMapping) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application mapping from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application mapping from k8s %v", err.Error())) return err } } else { err := k8sArtifactDeployer.client.Delete(context.Background(), &crApplicationMapping) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to delete application mapping in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to delete application mapping in k8s %v", err.Error())) return err } } @@ -202,7 +214,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateApplicationMappings(applica err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: applicationMapping.UUID, Namespace: utils.GetOperatorPodNamespace()}, &crApplicationMapping) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application mapping from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application mapping from k8s %v", err.Error())) return err } k8sArtifactDeployer.DeployApplicationMappings(applicationMapping) @@ -211,7 +223,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateApplicationMappings(applica crApplicationMapping.Spec.SubscriptionRef = applicationMapping.SubscriptionRef err := k8sArtifactDeployer.client.Update(context.Background(), &crApplicationMapping) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to update application mapping in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to update application mapping in k8s %v", err.Error())) return err } } @@ -224,7 +236,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteKeyMappings(keyMapping serv var crApplication cpv1alpha2.Application err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: keyMapping.ApplicationUUID, Namespace: utils.GetOperatorPodNamespace()}, &crApplication) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get application from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) return err } if crApplication.Spec.SecuritySchemes != nil { @@ -253,13 +265,13 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteSubscription(subscriptionID err := k8sArtifactDeployer.client.Get(context.Background(), client.ObjectKey{Name: subscriptionID, Namespace: utils.GetOperatorPodNamespace()}, &crSubscription) if err != nil { if !k8error.IsNotFound(err) { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.BLOCKER, "Failed to get subscription from k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get subscription from k8s %v", err.Error())) return err } } else { err := k8sArtifactDeployer.client.Delete(context.Background(), &crSubscription) if err != nil { - loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.BLOCKER, "Failed to delete subscription in k8s %v", err.Error())) + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1100, logging.CRITICAL, "Failed to delete subscription in k8s %v", err.Error())) return err } } @@ -269,11 +281,99 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteSubscription(subscriptionID // DeployAllApplicationMappings deploys all application mappings func (k8sArtifactDeployer K8sArtifactDeployer) DeployAllApplicationMappings(applicationMappings server.ApplicationMappingList) error { + applicationMappingsFromK8s, _, err := k8sArtifactDeployer.retrieveAllApplicationMappings("") + if err != nil { + return err + } + clonedApplicationMappingsFromK8s := make([]cpv1alpha2.ApplicationMapping, len(applicationMappingsFromK8s)) + copy(clonedApplicationMappingsFromK8s, applicationMappingsFromK8s) + clonedApplicationMappings := make([]server.ApplicationMapping, len(applicationMappings.List)) + copy(clonedApplicationMappings, applicationMappings.List) + newApplicationMappings := make([]server.ApplicationMapping, 0) + sameApplicationMappings := make([]server.ApplicationMapping, 0) + for _, applicationMapping := range clonedApplicationMappings { + found := false + unFilteredApplicationMappingsInK8s := make([]cpv1alpha2.ApplicationMapping, 0) + for _, applicationMappingFromK8s := range clonedApplicationMappingsFromK8s { + if applicationMapping.ApplicationRef == applicationMappingFromK8s.Spec.ApplicationRef && applicationMapping.SubscriptionRef == applicationMappingFromK8s.Spec.SubscriptionRef { + sameApplicationMappings = append(sameApplicationMappings, applicationMapping) + found = true + break + } + unFilteredApplicationMappingsInK8s = append(unFilteredApplicationMappingsInK8s, applicationMappingFromK8s) + } + clonedApplicationMappingsFromK8s = unFilteredApplicationMappingsInK8s + if !found { + newApplicationMappings = append(newApplicationMappings, applicationMapping) + } + } + for _, applicationMapping := range newApplicationMappings { + err := k8sArtifactDeployer.DeployApplicationMappings(applicationMapping) + if err != nil { + return err + } + } + for _, applicationMapping := range sameApplicationMappings { + err := k8sArtifactDeployer.UpdateApplicationMappings(applicationMapping) + if err != nil { + return err + } + } + for _, applicationMappingFromK8s := range clonedApplicationMappingsFromK8s { + err := k8sArtifactDeployer.DeleteApplicationMappings(applicationMappingFromK8s.Name) + if err != nil { + return err + } + } return nil } // DeployAllApplications deploys all key mappings func (k8sArtifactDeployer K8sArtifactDeployer) DeployAllApplications(applications server.ApplicationList) error { + applicationsFromK8s, _, err := k8sArtifactDeployer.retrieveAllApplicationsFromK8s("") + if err != nil { + return err + } + clonedApplicationsFromK8s := make([]cpv1alpha2.Application, len(applicationsFromK8s)) + copy(clonedApplicationsFromK8s, applicationsFromK8s) + clonedApplications := make([]server.Application, len(applications.List)) + copy(clonedApplications, applications.List) + newApplications := make([]server.Application, 0) + sameApplications := make([]server.Application, 0) + for _, application := range clonedApplications { + found := false + unFilteredApplicationsInK8s := make([]cpv1alpha2.Application, 0) + for _, applicationFromK8s := range clonedApplicationsFromK8s { + if application.UUID == applicationFromK8s.Name { + sameApplications = append(sameApplications, application) + found = true + break + } + unFilteredApplicationsInK8s = append(unFilteredApplicationsInK8s, applicationFromK8s) + } + clonedApplicationsFromK8s = unFilteredApplicationsInK8s + if !found { + newApplications = append(newApplications, application) + } + } + for _, application := range newApplications { + err := k8sArtifactDeployer.DeployApplication(application) + if err != nil { + return err + } + } + for _, application := range sameApplications { + err := k8sArtifactDeployer.UpdateApplication(application) + if err != nil { + return err + } + } + for _, applicationFromK8s := range clonedApplicationsFromK8s { + err := k8sArtifactDeployer.DeleteApplication(applicationFromK8s.Name) + if err != nil { + return err + } + } return nil } @@ -284,6 +384,50 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeployAllKeyMappings(keyMappings // DeployAllSubscriptions deploys all subscriptions func (k8sArtifactDeployer K8sArtifactDeployer) DeployAllSubscriptions(subscriptions server.SubscriptionList) error { + subscriptionsFromK8s, _, err := k8sArtifactDeployer.retrieveAllSubscriptionsFromK8s("") + if err != nil { + return err + } + clonedSubscriptionsFromK8s := make([]cpv1alpha2.Subscription, len(subscriptionsFromK8s)) + copy(clonedSubscriptionsFromK8s, subscriptionsFromK8s) + clonedSubscriptions := make([]server.Subscription, len(subscriptions.List)) + copy(clonedSubscriptions, subscriptions.List) + newSubscriptions := make([]server.Subscription, 0) + sameSubscriptions := make([]server.Subscription, 0) + for _, subscription := range clonedSubscriptions { + found := false + unFilteredSubscriptionsInK8s := make([]cpv1alpha2.Subscription, 0) + for _, subscriptionFromK8s := range clonedSubscriptionsFromK8s { + if subscription.UUID == subscriptionFromK8s.Name { + sameSubscriptions = append(sameSubscriptions, subscription) + found = true + break + } + unFilteredSubscriptionsInK8s = append(unFilteredSubscriptionsInK8s, subscriptionFromK8s) + } + clonedSubscriptionsFromK8s = unFilteredSubscriptionsInK8s + if !found { + newSubscriptions = append(newSubscriptions, subscription) + } + } + for _, subscription := range newSubscriptions { + err := k8sArtifactDeployer.DeploySubscription(subscription) + if err != nil { + return err + } + } + for _, subscription := range sameSubscriptions { + err := k8sArtifactDeployer.UpdateSubscription(subscription) + if err != nil { + return err + } + } + for _, subscriptionFromK8s := range clonedSubscriptionsFromK8s { + err := k8sArtifactDeployer.DeleteSubscription(subscriptionFromK8s.Name) + if err != nil { + return err + } + } return nil } @@ -331,3 +475,74 @@ func (k8sArtifactDeployer K8sArtifactDeployer) GetSubscription(subscriptionID st func generateSecurityScheme(keyMapping server.ApplicationKeyMapping) cpv1alpha2.Environment { return cpv1alpha2.Environment{EnvID: keyMapping.EnvID, AppID: keyMapping.ApplicationIdentifier, KeyType: keyMapping.KeyType} } + +func (k8sArtifactDeployer K8sArtifactDeployer) retrieveAllApplicationsFromK8s(nextToken string) ([]cpv1alpha2.Application, string, error) { + applicationList := cpv1alpha2.ApplicationList{} + resolvedApplicationList := make([]cpv1alpha2.Application, 0) + var err error + if nextToken == "" { + err = k8sArtifactDeployer.client.List(context.Background(), &applicationList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace()}) + } else { + err = k8sArtifactDeployer.client.List(context.Background(), &applicationList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace(), Continue: nextToken}) + } + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) + return nil, "", err + } + resolvedApplicationList = append(resolvedApplicationList, applicationList.Items...) + if applicationList.Continue != "" { + tempApplicationList, _, err := k8sArtifactDeployer.retrieveAllApplicationsFromK8s(applicationList.Continue) + if err != nil { + return nil, "", err + } + resolvedApplicationList = append(resolvedApplicationList, tempApplicationList...) + } + return resolvedApplicationList, applicationList.Continue, nil +} + +func (k8sArtifactDeployer K8sArtifactDeployer) retrieveAllSubscriptionsFromK8s(nextToken string) ([]cpv1alpha2.Subscription, string, error) { + subscriptionList := cpv1alpha2.SubscriptionList{} + resolvedSubscripitonList := make([]cpv1alpha2.Subscription, 0) + var err error + if nextToken == "" { + err = k8sArtifactDeployer.client.List(context.Background(), &subscriptionList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace()}) + } else { + err = k8sArtifactDeployer.client.List(context.Background(), &subscriptionList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace(), Continue: nextToken}) + } + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) + return nil, "", err + } + resolvedSubscripitonList = append(resolvedSubscripitonList, subscriptionList.Items...) + if subscriptionList.Continue != "" { + tempSubscriptipnList, _, err := k8sArtifactDeployer.retrieveAllSubscriptionsFromK8s(subscriptionList.Continue) + if err != nil { + return nil, "", err + } + resolvedSubscripitonList = append(resolvedSubscripitonList, tempSubscriptipnList...) + } + return resolvedSubscripitonList, subscriptionList.Continue, nil +} +func (k8sArtifactDeployer K8sArtifactDeployer) retrieveAllApplicationMappings(nextToken string) ([]cpv1alpha2.ApplicationMapping, string, error) { + applicationMappingList := cpv1alpha2.ApplicationMappingList{} + resolvedApplicationMappingList := make([]cpv1alpha2.ApplicationMapping, 0) + var err error + if nextToken == "" { + err = k8sArtifactDeployer.client.List(context.Background(), &applicationMappingList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace()}) + } else { + err = k8sArtifactDeployer.client.List(context.Background(), &applicationMappingList, &client.ListOptions{Namespace: utils.GetOperatorPodNamespace(), Continue: nextToken}) + } + if err != nil { + loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error1102, logging.CRITICAL, "Failed to get application from k8s %v", err.Error())) + return nil, "", err + } + resolvedApplicationMappingList = append(resolvedApplicationMappingList, applicationMappingList.Items...) + if applicationMappingList.Continue != "" { + tempApplicationMappingList, _, err := k8sArtifactDeployer.retrieveAllApplicationMappings(applicationMappingList.Continue) + if err != nil { + return nil, "", err + } + resolvedApplicationMappingList = append(resolvedApplicationMappingList, tempApplicationMappingList...) + } + return resolvedApplicationMappingList, applicationMappingList.Continue, nil +} diff --git a/common-controller/internal/controlplane/types.go b/common-controller/internal/controlplane/types.go new file mode 100644 index 0000000000..da9a987fff --- /dev/null +++ b/common-controller/internal/controlplane/types.go @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package controlplane + +// Subscription for struct subscription +type Subscription struct { + SubStatus string `json:"subStatus,omitempty"` + UUID string `json:"uuid,omitempty"` + Organization string `json:"organization,omitempty"` + SubscribedAPI *SubscribedAPI `json:"subscribedApi,omitempty"` + TimeStamp int64 `json:"timeStamp,omitempty"` +} + +// SubscriptionList for struct list of applications +type SubscriptionList struct { + List []Subscription `json:"list"` +} + +// SubscribedAPI for struct subscribedAPI +type SubscribedAPI struct { + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` +} + +// Application for struct application +type Application struct { + UUID string `json:"uuid,omitempty"` + Name string `json:"name,omitempty"` + Owner string `json:"owner,omitempty"` + Organization string `json:"organization,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + TimeStamp int64 `json:"timeStamp,omitempty"` +} + +// ApplicationList for struct list of application +type ApplicationList struct { + List []Application `json:"list"` +} + +// ApplicationKeyMapping for struct applicationKeyMapping +type ApplicationKeyMapping struct { + ApplicationUUID string `json:"applicationUUID,omitempty"` + SecurityScheme string `json:"securityScheme,omitempty"` + ApplicationIdentifier string `json:"applicationIdentifier,omitempty"` + KeyType string `json:"keyType,omitempty"` + EnvID string `json:"envID,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + Organization string `json:"organization,omitempty"` +} + +// ApplicationKeyMappingList for struct list of applicationKeyMapping +type ApplicationKeyMappingList struct { + List []ApplicationKeyMapping `json:"list"` +} + +// ApplicationMapping for struct applicationMapping +type ApplicationMapping struct { + UUID string `json:"uuid,omitempty"` + ApplicationRef string `json:"applicationRef,omitempty"` + SubscriptionRef string `json:"subscriptionRef,omitempty"` + Organization string `json:"organization,omitempty"` +} + +// ApplicationMappingList for struct list of applicationMapping +type ApplicationMappingList struct { + List []ApplicationMapping `json:"list"` +} diff --git a/common-controller/internal/operator/operator.go b/common-controller/internal/operator/operator.go index a46194367f..e0f09b3842 100644 --- a/common-controller/internal/operator/operator.go +++ b/common-controller/internal/operator/operator.go @@ -164,18 +164,21 @@ func InitOperator() { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2603, logging.BLOCKER, "Unable to set up ready check: %v", err)) os.Exit(1) } - go func() { - config := config.ReadConfigs() - var controlPlane controlplane.ArtifactDeployer - if config.CommonController.ControlPlane.Persistence.Type == "K8s" { - controlPlane = controlplane.NewK8sArtifactDeployer(mgr) - - } - grpcClient := controlplane.NewControlPlaneAgent(config.CommonController.ControlPlane.Host, config.CommonController.ControlPlane.EventPort, controlPlaneID, controlPlane) - if grpcClient != nil { - grpcClient.StartEventStreaming() - } - }() + config := config.ReadConfigs() + if config.CommonController.ControlPlane.Enabled { + go func() { + var controlPlane controlplane.ArtifactDeployer + if config.CommonController.ControlPlane.Persistence.Type == "K8s" { + controlPlane = controlplane.NewK8sArtifactDeployer(mgr) + + } + grpcClient := controlplane.NewControlPlaneAgent(config.CommonController.ControlPlane.Host, config.CommonController.ControlPlane.EventPort, controlPlaneID, controlPlane) + if grpcClient != nil { + grpcClient.StartEventStreaming() + } + }() + } + setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { loggers.LoggerAPKOperator.ErrorC(logging.PrintError(logging.Error2604, logging.BLOCKER, "Problem running manager: %v", err)) diff --git a/common-controller/internal/server/application_types.go b/common-controller/internal/server/application_types.go index 01ad4613b0..e9e7f0bb9c 100644 --- a/common-controller/internal/server/application_types.go +++ b/common-controller/internal/server/application_types.go @@ -24,6 +24,7 @@ type Application struct { Owner string `json:"owner"` Attributes map[string]string `json:"attributes,omitempty"` OrganizationID string `json:"organizationId"` + TimeStamp int64 `json:"timeStamp"` } // ApplicationList contains a list of Application