Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor api creation #1076

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 27 additions & 248 deletions apim-apk-agent/internal/eventhub/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,25 @@
package eventhub

import (
"archive/zip"
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"strings"
"time"

loggers "github.com/sirupsen/logrus"
dpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/dp/v1alpha2"
"github.com/wso2/product-apim-tooling/apim-apk-agent/config"
"sigs.k8s.io/controller-runtime/pkg/client"

internalk8sClient "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/k8sClient"
mapperUtil "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/mapper"
internalutils "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/utils"
pkgAuth "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/eventhub/types"
logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/logging"
sync "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/synchronizer"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/transformer"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -58,15 +51,6 @@ const (
GatewayLabelParam string = "gatewayLabel"
// APIUUIDParam is required to call /apis endpoint
APIUUIDParam string = "apiId"
// ApisEndpoint is the resource path of /apis endpoint
ApisEndpoint string = "apis"
)

const (
// OrganizationID query parameter key.
organizationID string = "organization"
// OrganizationID query parameter value used when the global adapter is enabled and it is a shared gateway.
commonOrganizationIDValue string = "ALL"
)

var (
Expand All @@ -75,7 +59,6 @@ var (
subList *types.SubscriptionList
appList *types.ApplicationList
appKeyMappingList *types.ApplicationKeyMappingList
apiList *types.APIList

resources = []resource{
{
Expand All @@ -91,11 +74,8 @@ var (
responseType: appKeyMappingList,
},
}
// APIListChannel is used to add apis
APIListChannel chan response
accessToken string
conf *config.Config
apiUUIDList []string
accessToken string
conf *config.Config
)

type response struct {
Expand All @@ -112,10 +92,6 @@ type resource struct {
responseType interface{}
}

func init() {
APIListChannel = make(chan response)
}

// LoadInitialData loads subscription/application and keymapping data from control-plane
func LoadInitialData(configFile *config.Config, client client.Client) {
conf = configFile
Expand Down Expand Up @@ -155,58 +131,8 @@ func LoadInitialData(configFile *config.Config, client client.Client) {
}
}
}
// Take the configured labels from the adapter
configuredEnvs := conf.ControlPlane.EnvironmentLabels

// If no environments are configured, default gateway label value is assigned.
if len(configuredEnvs) == 0 {
configuredEnvs = append(configuredEnvs, config.DefaultGatewayName)
}
for _, configuredEnv := range configuredEnvs {
queryParamMap := make(map[string]string, 1)
queryParamMap[GatewayLabelParam] = configuredEnv
queryParamMap[organizationID] = commonOrganizationIDValue
go InvokeService(ApisEndpoint, apiList, queryParamMap, APIListChannel, 0)
for {
data := <-APIListChannel
logger.LoggerSync.Debug("Receiving API information for an environment")
if data.Payload != nil {
loggers.Info("Payload data with API information recieved" + string(data.Payload))
retrieveAPIList(data)
break
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
logger.LoggerSync.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred when retrieving Subscription information from the control plane: %v", data.Error),
Severity: logging.CRITICAL,
ErrorCode: 1600,
})
//health.SetControlPlaneRestAPIStatus(false)
} else {
// Keep the iteration going on until a response is recieved.
logger.LoggerSync.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while fetching data from control plane: %v", data.Error),
Severity: logging.MAJOR,
ErrorCode: 1601,
})
go func(d response) {
// Retry fetching from control plane after a configured time interval
if conf.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
conf.ControlPlane.RetryInterval = 5
}
logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second)
go InvokeService(ApisEndpoint, apiList, queryParamMap, APIListChannel, 0)
}(data)
}
}
}
if apiUUIDList == nil || len(apiUUIDList) == 0 {
loggers.Info("Empty API List Recieved in fetching")
} else {
FetchAPIsOnStartUp(conf, apiUUIDList, client)
}
FetchAPIsOnStartUp(conf, client)
go utils.SendInitialEventToAllConnectedClients()
}

Expand Down Expand Up @@ -304,178 +230,31 @@ func retrieveDataFromResponseChannel(response response) {

// FetchAPIsOnStartUp APIs from control plane during the server start up and push them
// to the router and enforcer components.
func FetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string, k8sClient client.Client) {
// Populate data from config.
envs := conf.ControlPlane.EnvironmentLabels

// Create a channel for the byte slice (response from the APIs from control plane)
c := make(chan sync.SyncAPIResponse)

var queryParamMap map[string]string
//Get API details.
if apiUUIDList != nil {
GetAPIs(c, nil, envs, sync.APIArtifactEndpoint, true, apiUUIDList, queryParamMap)
func FetchAPIsOnStartUp(conf *config.Config, k8sClient client.Client) {
k8sAPIS, _, err := internalk8sClient.RetrieveAllAPISFromK8s(k8sClient, "")
if err != nil {
logger.LoggerSubscription.Errorf("Error occurred while fetching APIs from K8s %v", err)
}
for i := 0; i < 1; i++ {
data := <-c
logger.LoggerMsg.Info("Receiving data for an environment")
if data.Resp != nil {
// Reading the root zip
zipReader, err := zip.NewReader(bytes.NewReader(data.Resp), int64(len(data.Resp)))

// apiFiles represents zipped API files fetched from API Manager
apiFiles := make(map[string]*zip.File)
// Read the .zip files within the root apis.zip and add apis to apiFiles array.
for _, file := range zipReader.File {
apiFiles[file.Name] = file
loggers.Infof("API file found: " + file.Name)
}
if err != nil {
logger.LoggerSync.Errorf("Error while reading zip: %v", err)
return
}

artifacts, decodingError := transformer.DecodeAPIArtifacts(data.Resp)
logger.LoggerSync.Infof("Artifacts Count: %v", len(artifacts))

if decodingError != nil {
logger.LoggerSync.Errorf("Error while decoding the API Project Artifacts: %v", decodingError)
return
}

for _, artifact := range artifacts {
if artifact.APIJson != "" && artifact.DeploymentDescriptor != "" {
apkConf, apiUUID, revisionID, apkErr := transformer.GenerateAPKConf(artifact.APIJson, artifact.ClientCerts)

if apkErr != nil {
logger.LoggerSync.Errorf("Error while generating APK-Conf: %v", apkErr)
return
}

k8ResourceEndpoint := conf.DataPlane.K8ResourceEndpoint

deploymentDescriptor, descriptorErr := transformer.ProcessDeploymentDescriptor([]byte(artifact.DeploymentDescriptor))
if descriptorErr != nil {
logger.LoggerSync.Errorf("Error while decoding the Deployment Descriptor: %v", descriptorErr)
return
}

crResponse, err := transformer.GenerateUpdatedCRs(apkConf, artifact.Swagger, k8ResourceEndpoint, deploymentDescriptor, artifact.APIFileName, apiUUID, fmt.Sprint(revisionID))
if err != nil {
logger.LoggerSync.Errorf("Error occured in receiving the updated CRDs: %v", err)
return
}

mainZip, err := zip.NewReader(bytes.NewReader(crResponse.Bytes()), int64(crResponse.Len()))
if err != nil {
logger.LoggerSync.Errorf("Error creating zip reader for main zip buffer: %v", err)
return
}

for _, file := range mainZip.File {
if strings.HasSuffix(file.Name, ".zip") {
subZipReader, err := file.Open()
if err != nil {
logger.LoggerSync.Errorf("Error opening sub zip file: %v", err)
return
}
defer subZipReader.Close()

var subZipBuffer bytes.Buffer
_, err = subZipBuffer.ReadFrom(subZipReader)
if err != nil {
logger.LoggerSync.Errorf("Error reading sub zip file: %v", err)
return
}

subZip, err := zip.NewReader(bytes.NewReader(subZipBuffer.Bytes()), int64(subZipBuffer.Len()))
if err != nil {
logger.LoggerSync.Errorf("Error creating zip reader for sub zip file: %v", err)
return
}

for _, subFile := range subZip.File {
mapperUtil.MapAndCreateCR(subFile, k8sClient, conf)
}

}
}

logger.LoggerMsg.Info("API applied successfully.\n")
}
apis, err := internalutils.FetchAPIsOnEvent(conf, nil, k8sClient)
if err != nil {
logger.LoggerSubscription.Errorf("Error occurred while fetching APIs from control plane %v", err)
}
removeApis := make([]dpv1alpha2.API, 0)
for _, k8sAPI := range k8sAPIS {
found := false
for _, api := range *apis {
if k8sAPI.Name == api {
found = true
break
}

} else if data.ErrorCode == 204 {
logger.LoggerMsg.Infof("No API Artifacts are available in the control plane for the envionments :%s",
strings.Join(envs, ", "))
//health.SetControlPlaneRestAPIStatus(true)
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
logger.LoggerMsg.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred when retrieving APIs from control plane(unrecoverable error): %v", data.Err.Error()),
Severity: logging.CRITICAL,
ErrorCode: 1106,
})
//isNoAPIArtifacts := data.ErrorCode == 404 && strings.Contains(data.Err.Error(), "No Api artifacts found")
//health.SetControlPlaneRestAPIStatus(isNoAPIArtifacts)
} else {
// Keep the iteration still until all the envrionment response properly.
i--
logger.LoggerMsg.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while fetching data from control plane: %v ..retrying..", data.Err),
Severity: logging.MINOR,
ErrorCode: 1107,
})
//health.SetControlPlaneRestAPIStatus(false)
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap)
}
if !found {
removeApis = append(removeApis, k8sAPI)
}
}
// Remove the APIs which are not in the list
internalk8sClient.UndeployAPICRs(apiUUIDList, k8sClient)
logger.LoggerMsg.Info("Fetching APIs at startup is completed...")
}

// GetAPIs function calls the FetchAPIs() with relevant environment labels defined in the config.
func GetAPIs(c chan sync.SyncAPIResponse, id *string, envs []string, endpoint string, sendType bool, apiUUIDList []string,
queryParamMap map[string]string) {
if len(envs) > 0 {
// If the envrionment labels are present, call the controle plane with labels.
logger.LoggerAdapter.Debugf("Environment labels present: %v", envs)
go sync.FetchAPIs(id, envs, c, endpoint, sendType, apiUUIDList, queryParamMap)
} else {
// If the environments are not give, fetch the APIs from default envrionment
logger.LoggerAdapter.Debug("Environments label NOT present. Hence adding \"default\"")
envs = append(envs, "default")
go sync.FetchAPIs(id, nil, c, endpoint, sendType, apiUUIDList, queryParamMap)
}
}

func retrieveAPIList(response response) []string {

responseType := reflect.TypeOf(response.Type).Elem()
newResponse := reflect.New(responseType).Interface()
if response.Error == nil && response.Payload != nil {
err := json.Unmarshal(response.Payload, &newResponse)
if err != nil {
logger.LoggerSubscription.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while unmarshalling the APIList response received for: %v %v", response.Endpoint, err.Error()),
Severity: logging.MAJOR,
ErrorCode: 1602,
})
} else {
switch t := newResponse.(type) {
case *types.APIList:
apiListResponse := newResponse.(*types.APIList)
if apiListResponse.List != nil {
for _, api := range apiListResponse.List {
apiUUIDList = append(apiUUIDList, api.UUID)
}
}
loggers.Info("Received API information.", apiUUIDList)
return apiUUIDList
default:
logger.LoggerSubscription.Warnf("APIList Type DTO is not recieved. Unknown type %T", t)
}
for _, removeAPI := range removeApis {
if !removeAPI.Spec.SystemAPI {
internalk8sClient.UndeployAPICR(removeAPI.Name, k8sClient)
}
}
return nil
}
2 changes: 1 addition & 1 deletion apim-apk-agent/internal/eventhub/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func MarshalApplication(appInternal *types.Application) managementserver.Applica
UUID: appInternal.UUID,
Name: appInternal.Name,
Owner: appInternal.SubName,
Organization: appInternal.TenantDomain,
Organization: appInternal.Organization,
Attributes: appInternal.Attributes,
TimeStamp: appInternal.TimeStamp,
}
Expand Down
Loading
Loading