diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 699b4c9f6..6e605b91f 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -83,9 +83,6 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. s.TargetProviders[k] = p } } - for key, _ := range providers { - log.Info(" key is %s", key) - } keylockprovider, err := managers.GetKeyLockProvider(config, providers) if err == nil { @@ -156,7 +153,6 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. // The deployment spec may have changed, so the previous target is not in the new deployment anymore func (s *SolutionManager) GetTargetProviderForStep(target string, role string, deployment model.DeploymentSpec, previousDesiredState *SolutionManagerDeploymentState) (providers.IProvider, error) { var override tgt.ITargetProvider - log.Info("get step role %s", role) if role == "container" { role = "instance" } @@ -296,7 +292,7 @@ func (s *SolutionManager) sendHeartbeat(ctx context.Context, id string, namespac } } -func (s *SolutionManager) CleanupHeartbeat(ctx context.Context, id string, namespace string, remove bool) { +func (s *SolutionManager) cleanupHeartbeat(ctx context.Context, id string, namespace string, remove bool) { if !remove { return } @@ -367,7 +363,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy }() defer func() { - s.CleanupHeartbeat(ctx, deployment.Instance.ObjectMeta.Name, namespace, remove) + s.cleanupHeartbeat(ctx, deployment.Instance.ObjectMeta.Name, namespace, remove) }() stopCh := make(chan struct{}) diff --git a/api/pkg/apis/v1alpha1/managers/staging/staging-manager.go b/api/pkg/apis/v1alpha1/managers/staging/staging-manager.go index c2ff56b05..8c85e4615 100644 --- a/api/pkg/apis/v1alpha1/managers/staging/staging-manager.go +++ b/api/pkg/apis/v1alpha1/managers/staging/staging-manager.go @@ -40,7 +40,6 @@ func (s *StagingManager) Init(context *contexts.VendorContext, config managers.M if err != nil { return err } - log.Info(" config is %+v providers is %+v ", config, providers) queueProvider, err := managers.GetQueueProvider(config, providers) if err == nil { s.QueueProvider = queueProvider diff --git a/api/pkg/apis/v1alpha1/providers/providerfactory_test.go b/api/pkg/apis/v1alpha1/providers/providerfactory_test.go index 030971070..bf68a7268 100644 --- a/api/pkg/apis/v1alpha1/providers/providerfactory_test.go +++ b/api/pkg/apis/v1alpha1/providers/providerfactory_test.go @@ -77,7 +77,7 @@ func TestCreateProvider(t *testing.T) { if testRedis == "" { t.Log("Skipping providers.state.redis test as TEST_REDIS is not set") } else { - provider, err = providerfactory.CreateProvider("providers.state.redis", redisstate.RedisStateProviderConfig{Host: "localhost:6380"}) + provider, err = providerfactory.CreateProvider("providers.state.redis", redisstate.RedisStateProviderConfig{Host: "localhost:6379"}) assert.Nil(t, err) assert.NotNil(t, *provider.(*redisstate.RedisStateProvider)) } diff --git a/api/pkg/apis/v1alpha1/vendors/federation-vendor.go b/api/pkg/apis/v1alpha1/vendors/federation-vendor.go index 373a099e1..3d53146ab 100644 --- a/api/pkg/apis/v1alpha1/vendors/federation-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/federation-vendor.go @@ -11,16 +11,13 @@ import ( "encoding/json" "fmt" "strconv" - "time" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/catalogs" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/sites" - "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/staging" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/sync" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/trails" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" - tgt "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/target" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" @@ -28,10 +25,8 @@ import ( observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/pubsub" - "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/vendors" "github.com/eclipse-symphony/symphony/coa/pkg/logger" - "github.com/google/uuid" "github.com/valyala/fasthttp" ) @@ -44,7 +39,6 @@ type FederationVendor struct { StagingManager *staging.StagingManager SyncManager *sync.SyncManager TrailsManager *trails.TrailsManager - SolutionManager *solution.SolutionManager apiClient utils.ApiClient } @@ -77,9 +71,6 @@ func (f *FederationVendor) Init(config vendors.VendorConfig, factories []manager if c, ok := m.(*trails.TrailsManager); ok { f.TrailsManager = c } - if c, ok := m.(*solution.SolutionManager); ok { - f.SolutionManager = c - } } if f.StagingManager == nil { return v1alpha2.NewCOAError(nil, "staging manager is not supplied", v1alpha2.MissingConfig) @@ -90,9 +81,6 @@ func (f *FederationVendor) Init(config vendors.VendorConfig, factories []manager if f.CatalogsManager == nil { return v1alpha2.NewCOAError(nil, "catalogs manager is not supplied", v1alpha2.MissingConfig) } - if f.SolutionManager == nil { - return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig) - } f.apiClient, err = utils.GetParentApiClient(f.Vendor.Context.SiteInfo.ParentSite.BaseUrl) if err != nil { return err @@ -172,27 +160,6 @@ func (f *FederationVendor) Init(config vendors.VendorConfig, factories []manager } return nil }, - }, - ) - // todo: add retry - f.Vendor.Context.Subscribe(DeploymentStepTopic, v1alpha2.EventHandler{ - Handler: func(topic string, event v1alpha2.Event) error { - ctx := context.TODO() - if event.Context != nil { - ctx = event.Context - } - log.InfoCtx(ctx, "V(Federation): subscribe deployment-step and begin to apply step ") - // get data - for i := 0; i < MaxRetries; i++ { - err := f.handleDeploymentStep(ctx, event) - if err == nil { - return nil - } - time.Sleep(RetryDelay) - } - return err - }, - Group: "federation-vendor", }) // now register the current site site := model.SiteState{ @@ -215,155 +182,6 @@ func (f *FederationVendor) Init(config vendors.VendorConfig, factories []manager return f.SitesManager.UpsertState(context.Background(), f.Context.SiteInfo.SiteId, site) } -func (f *FederationVendor) handleDeploymentStep(ctx context.Context, event v1alpha2.Event) error { - var stepEnvelope StepEnvelope - jData, err := json.Marshal(event.Body) - if err != nil { - log.ErrorfCtx(ctx, "V (Federation): failed to unmarshal event body: %v", err) - return err - } - if err := json.Unmarshal(jData, &stepEnvelope); err != nil { - log.ErrorfCtx(ctx, "V (Federation): failed to unmarshal step envelope: %v", err) - return err - } - if stepEnvelope.Step.Role == "container" { - stepEnvelope.Step.Role = "instance" - } - switch stepEnvelope.PlanState.Phase { - case PhaseGet: - return f.handlePhaseGet(ctx, stepEnvelope) - case PhaseApply: - return f.handlePhaseApply(ctx, stepEnvelope) - } - return nil -} -func findAgentFromDeploymentState(deployment model.DeploymentSpec, targetName string) bool { - // find targt component - targetSpec := deployment.Targets[targetName] - log.Info("compare between state and target name %s, %+v", targetName, targetSpec) - for _, component := range targetSpec.Spec.Components { - log.Info("compare between state and target name %+v, %s", component, component.Name) - if component.Type == "remote-agent" { - log.Info("It is remote call ") - return true - } else { - log.Info(" it is not remote call target Name %s", targetName) - } - } - return false -} -func (f *FederationVendor) publishStepResult(ctx context.Context, target string, planId string, stepId int, Error error, getResult []model.ComponentSpec, applyResult map[string]model.ComponentResultSpec) error { - errorString := "" - if Error != nil { - errorString = Error.Error() - } - return f.Vendor.Context.Publish("step-result", v1alpha2.Event{ - Body: StepResult{ - Target: target, - PlanId: planId, - StepId: stepId, - GetResult: getResult, - ApplyResult: applyResult, - Timestamp: time.Now(), - Error: errorString, - }, - }) -} - -func (f *FederationVendor) handlePhaseGet(ctx context.Context, stepEnvelope StepEnvelope) error { - if findAgentFromDeploymentState(stepEnvelope.PlanState.Deployment, stepEnvelope.Step.Target) { - return f.enqueueProviderGetRequest(ctx, stepEnvelope) - } - return f.getProviderAndExecute(ctx, stepEnvelope) -} -func (f *FederationVendor) enqueueProviderGetRequest(ctx context.Context, stepEnvelope StepEnvelope) error { - operationId := uuid.New().String() - providerGetRequest := &ProviderGetRequest{ - AgentRequest: AgentRequest{ - OperationID: operationId, - Provider: stepEnvelope.Step.Role, - Action: string(PhaseGet), - }, - References: stepEnvelope.Step.Components, - Deployment: stepEnvelope.PlanState.Deployment, - } - - log.InfoCtx(ctx, "V(Federation): Enqueue get message %s-%s %+v ", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace, providerGetRequest) - messageID, err := f.StagingManager.QueueProvider.Enqueue(fmt.Sprintf("%s-%s", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace), providerGetRequest) - err = f.upsertOperationState(ctx, operationId, stepEnvelope.StepId, stepEnvelope.PlanState.PlanId, stepEnvelope.Step.Target, stepEnvelope.PlanState.Phase, stepEnvelope.PlanState.Namespace, stepEnvelope.Remove, messageID) - if err != nil { - log.ErrorCtx(ctx, "V(Federation) Error in insert operation Id %s", operationId) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - return err -} - -func (f *FederationVendor) getProviderAndExecute(ctx context.Context, stepEnvelope StepEnvelope) error { - provider, err := f.SolutionManager.GetTargetProviderForStep(stepEnvelope.Step.Target, stepEnvelope.Step.Role, stepEnvelope.PlanState.Deployment, stepEnvelope.PlanState.PreviousDesiredState) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to create provider & Failed to save summary progress: %v", err) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - dep := stepEnvelope.PlanState.Deployment - dep.ActiveTarget = stepEnvelope.Step.Target - getResult, stepError := (provider.(tgt.ITargetProvider)).Get(ctx, dep, stepEnvelope.Step.Components) - if stepError != nil { - log.ErrorCtx(ctx, "V(Federation) Error in get target current states %+v", stepError) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, getResult, map[string]model.ComponentResultSpec{}) -} -func (f *FederationVendor) handlePhaseApply(ctx context.Context, stepEnvelope StepEnvelope) error { - if findAgentFromDeploymentState(stepEnvelope.PlanState.Deployment, stepEnvelope.Step.Target) { - return f.enqueueProviderApplyRequest(ctx, stepEnvelope) - } - return f.applyProviderAndExecute(ctx, stepEnvelope) -} -func (f *FederationVendor) enqueueProviderApplyRequest(ctx context.Context, stepEnvelope StepEnvelope) error { - operationId := uuid.New().String() - providApplyRequest := &ProviderApplyRequest{ - AgentRequest: AgentRequest{ - OperationID: operationId, - Provider: stepEnvelope.Step.Role, - Action: string(PhaseApply), - }, - Deployment: stepEnvelope.PlanState.Deployment, - Step: stepEnvelope.Step, - IsDryRun: stepEnvelope.PlanState.Deployment.IsDryRun, - } - messageId, err := f.StagingManager.QueueProvider.Enqueue(fmt.Sprintf("%s-%s", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace), providApplyRequest) - if err != nil { - return err - } - log.InfoCtx(ctx, "V(Federation): Enqueue apply message %s-%s %+v ", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace, providApplyRequest) - err = f.upsertOperationState(ctx, operationId, stepEnvelope.StepId, stepEnvelope.PlanState.PlanId, stepEnvelope.Step.Target, stepEnvelope.PlanState.Phase, stepEnvelope.PlanState.Namespace, stepEnvelope.Remove, messageId) - if err != nil { - log.ErrorCtx(ctx, "error in insert operation Id %s", operationId) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - return err -} - -func (f *FederationVendor) applyProviderAndExecute(ctx context.Context, stepEnvelope StepEnvelope) error { - // get provider todo : is dry run - provider, err := f.SolutionManager.GetTargetProviderForStep(stepEnvelope.Step.Target, stepEnvelope.Step.Role, stepEnvelope.PlanState.Deployment, stepEnvelope.PlanState.PreviousDesiredState) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to create provider & Failed to save summary progress: %v", err) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - previousDesiredState := stepEnvelope.PlanState.PreviousDesiredState - currentState := stepEnvelope.PlanState.CurrentState - step := stepEnvelope.Step - if previousDesiredState != nil { - testState := solution.MergeDeploymentStates(&previousDesiredState.State, currentState) - if f.SolutionManager.CanSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { - log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, nil, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) - } - } - componentResults, stepError := (provider.(tgt.ITargetProvider)).Apply(ctx, stepEnvelope.PlanState.Deployment, stepEnvelope.Step, stepEnvelope.PlanState.Deployment.IsDryRun) - return f.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, stepError, []model.ComponentSpec{}, componentResults) -} func (f *FederationVendor) GetEndpoints() []v1alpha2.Endpoint { route := "federation" @@ -403,221 +221,8 @@ func (f *FederationVendor) GetEndpoints() []v1alpha2.Endpoint { Route: route + "/k8shook", Version: f.Version, Handler: f.onK8sHook, - }, { - Methods: []string{fasthttp.MethodGet}, - Route: route + "/tasks", - Version: f.Version, - Handler: f.onGetRequest, }, - { - Methods: []string{fasthttp.MethodPost}, - Route: route + "/task/getResult", - Version: f.Version, - Handler: f.onGetResponse, - }, - } -} - -// onGetRequest handles the get request from the remote agent. -func (f *FederationVendor) onGetRequest(request v1alpha2.COARequest) v1alpha2.COAResponse { - ctx, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ - "method": "onGetRequest", - }) - defer span.End() - var agentRequest AgentRequest - sLog.InfoCtx(ctx, "V(Federation): get request from remote agent") - target := request.Parameters["target"] - namespace := request.Parameters["namespace"] - getAll, exists := request.Parameters["getAll"] - - if exists && getAll == "true" { - // Logic to handle getALL parameter - sLog.InfoCtx(ctx, "V(Federation): getALL request from remote agent %+v", agentRequest) - return f.getTaskFromQueue(ctx, target, namespace, true) - } - return f.getTaskFromQueue(ctx, target, namespace, false) -} - -// onGetResponse handles the get response from the remote agent. -func (f *FederationVendor) onGetResponse(request v1alpha2.COARequest) v1alpha2.COAResponse { - ctx, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ - "method": "onGetResponse", - }) - defer span.End() - - var asyncResult AsyncResult - err := json.Unmarshal(request.Body, &asyncResult) - if err != nil { - sLog.ErrorfCtx(ctx, "V(Federation): onGetResponse failed - %s", err.Error()) - return v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(err.Error()), - } - } - sLog.InfoCtx(ctx, "V(Federation): get async result from remote agent %+v", asyncResult) - return f.handleRemoteAgentExecuteResult(ctx, asyncResult) -} - -// handleRemoteAgentExecuteResult handles the execution result from the remote agent. -func (f *FederationVendor) handleRemoteAgentExecuteResult(ctx context.Context, asyncResult AsyncResult) v1alpha2.COAResponse { - // Get operation ID - operationId := asyncResult.OperationID - // Get related info from redis - todo: timeout - log.InfoCtx(ctx, "V(FederationVendor): handle remote agent request %+v", asyncResult) - operationBody, err := f.getOperationState(ctx, operationId) - if err != nil { - sLog.ErrorfCtx(ctx, "V(FederationVendor): onGetResponse failed - %s", err.Error()) - return v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(err.Error()), - } - } - queueName := fmt.Sprintf("%s-%s", operationBody.Target, operationBody.NameSpace) - switch operationBody.Action { - case PhaseGet: - // Send to step result - var response []model.ComponentSpec - err := json.Unmarshal(asyncResult.Body, &response) - if err != nil { - return v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(err.Error()), - } - } - f.publishStepResult(ctx, operationBody.Target, operationBody.PlanId, operationBody.StepId, asyncResult.Error, response, map[string]model.ComponentResultSpec{}) - deleteRequest := states.DeleteRequest{ - ID: operationId, - } - - err = f.StagingManager.StateProvider.Delete(ctx, deleteRequest) - if err != nil { - return v1alpha2.COAResponse{ - State: v1alpha2.BadRequest, - Body: []byte("{\"result\":\"405 - delete operation Id failed\"}"), - ContentType: "application/json", - } - } - // delete from queue - - f.StagingManager.QueueProvider.RemoveFromQueue(queueName, operationBody.MessageId) - return v1alpha2.COAResponse{ - State: v1alpha2.OK, - Body: []byte("{\"result\":\"200 - handle async result successfully\"}"), - ContentType: "application/json", - } - case PhaseApply: - var response map[string]model.ComponentResultSpec - err := json.Unmarshal(asyncResult.Body, &response) - if err != nil { - return v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(err.Error()), - } - } - f.publishStepResult(ctx, operationBody.Target, operationBody.PlanId, operationBody.StepId, asyncResult.Error, []model.ComponentSpec{}, response) - deleteRequest := states.DeleteRequest{ - ID: operationId, - } - err = f.StagingManager.StateProvider.Delete(ctx, deleteRequest) - // delete from queue - f.StagingManager.QueueProvider.RemoveFromQueue(queueName, operationBody.MessageId) - if err != nil { - return v1alpha2.COAResponse{ - State: v1alpha2.BadRequest, - Body: []byte("{\"result\":\"delete operation Id failed\"}"), - ContentType: "application/json", - } - } - return v1alpha2.COAResponse{ - State: v1alpha2.OK, - Body: []byte("{\"result\":\"200 - get response successfully\"}"), - ContentType: "application/json", - } - } - return v1alpha2.COAResponse{ - State: v1alpha2.MethodNotAllowed, - Body: []byte("{\"result\":\"405 - method not allowed\"}"), - ContentType: "application/json", - } -} - -// getTaskFromQueue retrieves a task from the queue for the specified target and namespace. -func (f *FederationVendor) getTaskFromQueue(ctx context.Context, target string, namespace string, fromBegining bool) v1alpha2.COAResponse { - ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ - "method": "doGetFromQueue", - }) - queueName := fmt.Sprintf("%s-%s", target, namespace) - sLog.InfoCtx(ctx, "V(FederationVendor): getFromQueue %s queue length %s", queueName) - defer span.End() - var queueElement interface{} - var err error - if fromBegining { - queueElement, err = f.StagingManager.QueueProvider.PeekFromBegining(queueName) - } else { - queueElement, err = f.StagingManager.QueueProvider.Peek(queueName) - } - if err != nil { - sLog.ErrorfCtx(ctx, "V(FederationVendor): getQueue failed - %s", err.Error()) - return v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(err.Error()), - } - } - data, _ := json.Marshal(queueElement) - return v1alpha2.COAResponse{ - State: v1alpha2.OK, - Body: data, - ContentType: "application/json", - } -} - -// upsertOperationState upserts the operation state for the specified parameters. -func (f *FederationVendor) upsertOperationState(ctx context.Context, operationId string, stepId int, planId string, target string, action JobPhase, namespace string, remove bool, messageId string) error { - upsertRequest := states.UpsertRequest{ - Value: states.StateEntry{ - ID: operationId, - Body: map[string]interface{}{ - "StepId": stepId, - "PlanId": planId, - "Target": target, - "Action": action, - "namespace": namespace, - "Remove": remove, - "MessageId": messageId, - }}, - } - _, err := f.StagingManager.StateProvider.Upsert(ctx, upsertRequest) - return err -} - -// getOperationState retrieves the operation state for the specified operation ID. -func (f *FederationVendor) getOperationState(ctx context.Context, operationId string) (OperationBody, error) { - getRequest := states.GetRequest{ - ID: operationId, - } - var entry states.StateEntry - entry, err := f.StagingManager.StateProvider.Get(ctx, getRequest) - if err != nil { - return OperationBody{}, err - } - var ret OperationBody - ret, err = f.getOperationBody(entry.Body) - if err != nil { - log.ErrorfCtx(ctx, "V(FederationVendor): Failed to convert to operation state for %s", operationId) - return OperationBody{}, err - } - return ret, err -} - -// getOperationBody converts the body to an OperationBody. -func (f *FederationVendor) getOperationBody(body interface{}) (OperationBody, error) { - var operationBody OperationBody - bytes, _ := json.Marshal(body) - err := json.Unmarshal(bytes, &operationBody) - if err != nil { - return OperationBody{}, err } - return operationBody, nil } func (c *FederationVendor) onStatus(request v1alpha2.COARequest) v1alpha2.COAResponse { diff --git a/api/pkg/apis/v1alpha1/vendors/federation-vendor_test.go b/api/pkg/apis/v1alpha1/vendors/federation-vendor_test.go index d0b95af37..bf56e7571 100644 --- a/api/pkg/apis/v1alpha1/vendors/federation-vendor_test.go +++ b/api/pkg/apis/v1alpha1/vendors/federation-vendor_test.go @@ -13,12 +13,10 @@ import ( "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" - mockconfig "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/config/mock" memorykeylock "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/keylock/memory" mockledger "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/ledger/mock" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/pubsub/memory" memoryqueue "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/queue/memory" - mocksecret "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/secret/mock" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states/memorystate" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/vendors" "github.com/stretchr/testify/assert" @@ -40,10 +38,6 @@ func federationVendorInit() FederationVendor { stateProvider.Init(memorystate.MemoryStateProviderConfig{}) graphProvider := &memorygraph.MemoryGraphProvider{} graphProvider.Init(memorygraph.MemoryGraphProviderConfig{}) - configProvider := mockconfig.MockConfigProvider{} - configProvider.Init(mockconfig.MockConfigProviderConfig{}) - secretProvider := mocksecret.MockSecretProvider{} - secretProvider.Init(mocksecret.MockSecretProviderConfig{}) catalogProviders := make(map[string]providers.IProvider) catalogProviders["StateProvider"] = stateProvider catalogProviders["GraphProvider"] = graphProvider @@ -108,16 +102,6 @@ func federationVendorInit() FederationVendor { Name: "trails-manager", Type: "managers.symphony.trails", }, - { - Name: "solution-manager", - Type: "managers.symphony.solution", - Properties: map[string]string{ - "providers.persistentstate": "mem-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret", - "providers.keylock": "mem-keylock", - }, - }, }, }, []managers.IManagerFactroy{ &sym_mgr.SymphonyManagerFactory{}, @@ -126,12 +110,6 @@ func federationVendorInit() FederationVendor { "trails-manager": trailsProviders, "sites-manager": siteProviders, "staging-manager": stagingProviders, - "solution-manager": { - "mem-state": stateProvider, - "mem-keylock": &keyLockProvider, - "mock-config": &configProvider, - "mock-secret": &secretProvider, - }, }, &pubSubProvider) return vendor } @@ -140,7 +118,7 @@ func TestFederationGetEndpoint(t *testing.T) { vendor := federationVendorInit() endpoints := vendor.GetEndpoints() assert.NotNil(t, endpoints) - assert.Equal(t, "federation/k8shook", endpoints[len(endpoints)-3].Route) + assert.Equal(t, "federation/k8shook", endpoints[len(endpoints)-1].Route) } func TestFederationGetInfo(t *testing.T) { diff --git a/api/pkg/apis/v1alpha1/vendors/message_types.go b/api/pkg/apis/v1alpha1/vendors/message_types.go index c8b663061..f145c4dc6 100644 --- a/api/pkg/apis/v1alpha1/vendors/message_types.go +++ b/api/pkg/apis/v1alpha1/vendors/message_types.go @@ -139,6 +139,10 @@ type ProviderGetRequest struct { References []model.ComponentStep `json:"references"` } +type ProviderPagingRequest struct { + RequestList []AgentRequest `json:"requestList"` + LastMessageID string `json:"lastMessageID"` +} type ProviderApplyRequest struct { AgentRequest Deployment model.DeploymentSpec `json:"deployment"` diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index 52e67018c..02ea66e44 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -10,11 +10,16 @@ import ( "context" "encoding/json" "fmt" + "strconv" + "time" "github.com/eclipse-symphony/symphony/api/constants" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution/metrics" + "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/stage" + "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/staging" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" + tgt "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/target" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils" api_utils "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" @@ -23,13 +28,22 @@ import ( observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/pubsub" + states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/vendors" + "github.com/google/uuid" "github.com/valyala/fasthttp" ) type SolutionVendor struct { vendors.Vendor SolutionManager *solution.SolutionManager + PlanManager *PlanManager + StageManager *stage.StageManager + StagingManager *staging.StagingManager +} + +func NewPlanManager() *PlanManager { + return &PlanManager{} } var apiOperationMetrics *metrics.Metrics @@ -48,13 +62,335 @@ func (e *SolutionVendor) Init(config vendors.VendorConfig, factories []managers. return err } for _, m := range e.Managers { + if c, ok := m.(*stage.StageManager); ok { + e.StageManager = c + } + if c, ok := m.(*staging.StagingManager); ok { + e.StagingManager = c + } if c, ok := m.(*solution.SolutionManager); ok { e.SolutionManager = c } } + e.PlanManager = NewPlanManager() + if e.StageManager == nil { + return v1alpha2.NewCOAError(nil, "stage manager is not supplied", v1alpha2.MissingConfig) + } + if e.StagingManager == nil { + return v1alpha2.NewCOAError(nil, "staging manager is not supplied", v1alpha2.MissingConfig) + } if e.SolutionManager == nil { return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig) } + e.Vendor.Context.Subscribe(DeploymentStepTopic, v1alpha2.EventHandler{ + Handler: func(topic string, event v1alpha2.Event) error { + ctx := context.TODO() + if event.Context != nil { + ctx = event.Context + } + log.InfoCtx(ctx, "V(Solution): subscribe deployment-step and begin to apply step ") + // get data + err := e.handleDeploymentStep(ctx, event) + if err != nil { + log.ErrorfCtx(ctx, "V(Solution): Failed to handle deployment-step: %v", err) + } + return err + }, + Group: "Solution-vendor", + }) + e.Vendor.Context.Subscribe(DeploymentPlanTopic, v1alpha2.EventHandler{ + Handler: func(topic string, event v1alpha2.Event) error { + ctx := context.TODO() + if event.Context != nil { + ctx = event.Context + } + + log.InfoCtx(ctx, "V(Solution): Begin to execute deployment-plan") + err := e.handleDeploymentPlan(ctx, event) + if err != nil { + log.ErrorfCtx(ctx, "V(Solution): Failed to handle deployment plan: %v", err) + } + return err + }, + Group: "stage-vendor", + }) + e.Vendor.Context.Subscribe(CollectStepResultTopic, v1alpha2.EventHandler{ + Handler: func(topic string, event v1alpha2.Event) error { + ctx := event.Context + if ctx == nil { + ctx = context.TODO() + } + err := e.handleStepResult(ctx, event) + if err != nil { + log.ErrorfCtx(ctx, "V(Solution): Failed to handle step result: %v", err) + return err + } + return err + }, + Group: "stage-vendor", + }) + return nil +} +func (e *SolutionVendor) handleDeploymentPlan(ctx context.Context, event v1alpha2.Event) error { + var planEnvelope PlanEnvelope + jData, _ := json.Marshal(event.Body) + err := json.Unmarshal(jData, &planEnvelope) + if err != nil { + log.ErrorCtx(ctx, "failed to unmarshal plan envelope :%v", err) + return err + } + + planState := e.createPlanState(ctx, planEnvelope) + lockName := api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name) + e.SolutionManager.KeyLockProvider.TryLock(lockName) + log.InfoCtx(ctx, "begin to save summary for %s", planEnvelope.PlanId) + if err := e.SaveSummaryInfo(ctx, planState, model.SummaryStateRunning); err != nil { + return err + } + if planState.isCompleted() { + return e.handlePlanComplete(ctx, planState) + + } + for stepId, step := range planEnvelope.Plan.Steps { + switch planEnvelope.Phase { + case PhaseGet: + log.InfoCtx(ctx, "phase get begin deployment %+v", planEnvelope.Deployment) + if err := e.publishDeploymentStep(ctx, stepId, planState, planEnvelope.Remove, planState.Steps[stepId]); err != nil { + return err + } + case PhaseApply: + planState.Summary.PlannedDeployment += len(step.Components) + } + } + + switch planEnvelope.Phase { + case PhaseApply: + log.InfoCtx(ctx, "V(Solution): publish deployment step id %s step %+v", 0, planEnvelope.Plan.Steps[0].Role) + if err := e.publishDeploymentStep(ctx, 0, planState, planEnvelope.Remove, planState.Steps[0]); err != nil { + return err + } + } + log.InfoCtx(ctx, "V(Solution): store plan id %s in map %+v", planEnvelope.PlanId) + e.PlanManager.Plans.Store(planEnvelope.PlanId, planState) + return nil +} +func (e *SolutionVendor) publishDeploymentStep(ctx context.Context, stepId int, planState *PlanState, remove bool, step model.DeploymentStep) error { + log.InfoCtx(ctx, "V(Solution): publish deployment step for PlanId %s StepId %s", planState.PlanId, stepId) + if err := e.Vendor.Context.Publish(DeploymentStepTopic, v1alpha2.Event{ + Body: StepEnvelope{ + Step: step, + StepId: stepId, + Remove: remove, + PlanState: planState, + }, + Context: ctx, + }); err != nil { + log.InfoCtx(ctx, "V(Solution): publish deployment step failed PlanId %s, stepId %s", planState.PlanId, stepId) + return err + } + return nil +} +func (e *SolutionVendor) publishStepResult(ctx context.Context, target string, planId string, stepId int, Error error, getResult []model.ComponentSpec, applyResult map[string]model.ComponentResultSpec) error { + errorString := "" + if Error != nil { + errorString = Error.Error() + } + return e.Vendor.Context.Publish(CollectStepResultTopic, v1alpha2.Event{ + Body: StepResult{ + Target: target, + PlanId: planId, + StepId: stepId, + GetResult: getResult, + ApplyResult: applyResult, + Timestamp: time.Now(), + Error: errorString, + }, + }) +} + +// create inital plan state +func (e *SolutionVendor) createPlanState(ctx context.Context, planEnvelope PlanEnvelope) *PlanState { + return &PlanState{ + PlanId: planEnvelope.PlanId, + StartTime: time.Now(), + TotalSteps: len(planEnvelope.Plan.Steps), + Phase: planEnvelope.Phase, + Summary: model.SummarySpec{ + TargetResults: make(map[string]model.TargetResultSpec), + TargetCount: len(planEnvelope.Deployment.Targets), + SuccessCount: 0, + AllAssignedDeployed: true, + JobID: planEnvelope.Deployment.JobID, + IsRemoval: planEnvelope.Remove, + }, + PreviousDesiredState: planEnvelope.PreviousDesiredState, + CompletedSteps: 0, + MergedState: planEnvelope.MergedState, + Deployment: planEnvelope.Deployment, + Namespace: planEnvelope.Namespace, + Remove: planEnvelope.Remove, + TargetResult: make(map[string]int), + CurrentState: planEnvelope.CurrentState, + StepStates: make([]StepState, len(planEnvelope.Plan.Steps)), + Steps: planEnvelope.Plan.Steps, + } +} + +// saveStepResult updates the plan state with the step result and saves the summary. +func (e *SolutionVendor) saveStepResult(ctx context.Context, planState *PlanState, stepResult StepResult) error { + // Log the update of plan state with the step result + log.InfoCtx(ctx, "V(Solution): Update plan state %v with step result %v phase %s", planState, stepResult, planState.Phase) + planState.CompletedSteps++ + lockName := api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name) + e.SolutionManager.KeyLockProvider.TryLock(lockName) + switch planState.Phase { + case PhaseGet: + // Update the GetResult for the specific step + planState.StepStates[stepResult.StepId].GetResult = stepResult.GetResult + case PhaseApply: + if stepResult.Error != "" { + // Handle error case and update the target result status and message + targetResultStatus := fmt.Sprintf("%s Failed", deploymentTypeMap[planState.Remove]) + targetResultMessage := fmt.Sprintf("Failed to create provider %s, err: %s", deploymentTypeMap[planState.Remove], stepResult.Error) + targetResultSpec := model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: stepResult.ApplyResult} + planState.Summary.UpdateTargetResult(stepResult.Target, targetResultSpec) + planState.Summary.AllAssignedDeployed = false + for _, ret := range stepResult.ApplyResult { + if (!planState.Remove && ret.Status == v1alpha2.Updated) || (planState.Remove && ret.Status == v1alpha2.Deleted) { + planState.Summary.CurrentDeployed++ + } + } + if planState.TargetResult[stepResult.Target] == 1 || planState.TargetResult[stepResult.Target] == 0 { + planState.TargetResult[stepResult.Target] = -1 + planState.Summary.SuccessCount -= planState.TargetResult[stepResult.Target] + } + return e.handleAllPlanCompletetion(ctx, planState) + } else { + // Handle success case and update the target result status and message + targetResultSpec := model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: stepResult.ApplyResult} + planState.Summary.UpdateTargetResult(stepResult.Target, targetResultSpec) + log.InfoCtx(ctx, "Update plan state target spec %v", targetResultSpec) + planState.Summary.CurrentDeployed += len(stepResult.ApplyResult) + if planState.TargetResult[stepResult.Target] == 0 { + planState.TargetResult[stepResult.Target] = 1 + planState.Summary.SuccessCount++ + } + // publish next step execute event + if stepResult.StepId != planState.TotalSteps-1 { + log.InfoCtx(ctx, "V(Solution): publish deployment step id %s step %+v", stepResult.StepId+1, planState.Steps[stepResult.StepId+1].Role) + if err := e.publishDeploymentStep(ctx, stepResult.StepId+1, planState, planState.Remove, planState.Steps[stepResult.StepId+1]); err != nil { + log.InfoCtx(ctx, "V(Solution): publish deployment step failed PlanId %s, stepId %s", planState.PlanId, 0) + } + } + } + + // If no components are deployed, set success count to target count + if planState.Summary.CurrentDeployed == 0 && planState.Summary.AllAssignedDeployed { + planState.Summary.SuccessCount = planState.Summary.TargetCount + } + + // Save the summary information + log.InfoCtx(ctx, "Begin to save summary for %s", planState.Deployment.Instance.ObjectMeta.Name) + if err := e.SaveSummaryInfo(ctx, planState, model.SummaryStateRunning); err != nil { + log.ErrorfCtx(ctx, "Failed to save summary progress: %v", err) + } + } + + // Store the updated plan state + e.PlanManager.Plans.Store(planState.PlanId, planState) + + // Check if all steps are completed and handle plan completion + if planState.isCompleted() { + err := e.handlePlanComplete(ctx, planState) + if err != nil { + log.InfoCtx(ctx, "V(Solution): handle plan Complete failed %+v", err) + lockName := api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name) + e.UnlockObject(ctx, lockName) + } + } + return nil +} + +// handleGetPlanCompletetion handles the completion of the get plan phase. +func (e *SolutionVendor) handleGetPlanCompletetion(ctx context.Context, planState *PlanState) error { + // Collect result + log.InfoCtx(ctx, "V(Solution): Begin to get current state %v", planState) + Plan, planState, err := e.threeStateMerge(ctx, planState) + if err != nil { + log.ErrorfCtx(ctx, "V(Solution): Failed to merge states: %v", err) + return err + } + e.Vendor.Context.Publish(DeploymentPlanTopic, v1alpha2.Event{ + Metadata: map[string]string{ + "Id": planState.Deployment.JobID, + }, + Body: PlanEnvelope{ + Plan: Plan, + Deployment: planState.Deployment, + MergedState: planState.MergedState, + CurrentState: planState.CurrentState, + PreviousDesiredState: planState.PreviousDesiredState, + PlanId: planState.PlanId, + Remove: planState.Remove, + Namespace: planState.Namespace, + Phase: PhaseApply, + }, + Context: ctx, + }) + return nil +} + +// handlePlanComplete handles the completion of a plan and updates its status. +func (e *SolutionVendor) handlePlanComplete(ctx context.Context, planState *PlanState) error { + log.InfoCtx(ctx, "V(Solution): Plan state %s is completed %v", planState.Phase, planState) + if !planState.Summary.AllAssignedDeployed { + planState.Status = "failed" + } + log.InfoCtx(ctx, "V(Solution): Plan state is completed %v", planState.Summary.AllAssignedDeployed) + switch planState.Phase { + case PhaseGet: + if err := e.handleGetPlanCompletetion(ctx, planState); err != nil { + e.PlanManager.DeletePlan(planState.PlanId) + return err + } + case PhaseApply: + if err := e.handleAllPlanCompletetion(ctx, planState); err != nil { + e.PlanManager.DeletePlan(planState.PlanId) + return err + } + e.PlanManager.DeletePlan(planState.PlanId) + } + + return nil +} + +// handleStepResult processes the event and updates the plan state accordingly. +func (e *SolutionVendor) handleStepResult(ctx context.Context, event v1alpha2.Event) error { + var stepResult StepResult + // Marshal the event body to JSON + jData, _ := json.Marshal(event.Body) + log.InfofCtx(ctx, "Received event body: %s", string(jData)) + + // Unmarshal the JSON data into stepResult + if err := json.Unmarshal(jData, &stepResult); err != nil { + log.ErrorfCtx(ctx, "Failed to unmarshal step result: %v", err) + return err + } + + planId := stepResult.PlanId + // Load the plan state object from the PlanManager + planStateObj, exists := e.PlanManager.Plans.Load(planId) + if !exists { + return fmt.Errorf("Plan not found: %s", planId) + } + planState := planStateObj.(*PlanState) + lockName := api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name) + e.SolutionManager.KeyLockProvider.TryLock(lockName) + // Update the plan state in the map and save the summary + if err := e.saveStepResult(ctx, planState, stepResult); err != nil { + log.ErrorCtx(ctx, "Failed to handle step result: %v", err) + return err + } return nil } @@ -83,9 +419,162 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint { Version: o.Version, Handler: o.onQueue, }, + { + Methods: []string{fasthttp.MethodGet}, + Route: route + "/tasks", + Version: o.Version, + Handler: o.onGetRequest, + }, + { + Methods: []string{fasthttp.MethodPost}, + Route: route + "/task/getResult", + Version: o.Version, + Handler: o.onGetResponse, + }, + } +} +func (e *SolutionVendor) handleDeploymentStep(ctx context.Context, event v1alpha2.Event) error { + var stepEnvelope StepEnvelope + jData, err := json.Marshal(event.Body) + if err != nil { + log.ErrorfCtx(ctx, "V (Solution): failed to unmarshal event body: %v", err) + return err + } + if err := json.Unmarshal(jData, &stepEnvelope); err != nil { + log.ErrorfCtx(ctx, "V (Solution): failed to unmarshal step envelope: %v", err) + return err + } + lockName := api_utils.GenerateKeyLockName(stepEnvelope.PlanState.Namespace, stepEnvelope.PlanState.Deployment.Instance.ObjectMeta.Name) + e.SolutionManager.KeyLockProvider.TryLock(lockName) + if stepEnvelope.Step.Role == "container" { + stepEnvelope.Step.Role = "instance" + } + switch stepEnvelope.PlanState.Phase { + case PhaseGet: + return e.handlePhaseGet(ctx, stepEnvelope) + case PhaseApply: + return e.handlePhaseApply(ctx, stepEnvelope) + } + return nil +} +func findAgentFromDeploymentState(deployment model.DeploymentSpec, targetName string) bool { + // find targt component + targetSpec := deployment.Targets[targetName] + log.Info("compare between state and target name %s, %+v", targetName, targetSpec) + for _, component := range targetSpec.Spec.Components { + log.Info("compare between state and target name %+v, %s", component, component.Name) + if component.Type == "remote-agent" { + log.Info("It is remote call ") + return true + } else { + log.Info(" it is not remote call target Name %s", targetName) + } + } + return false +} + +func (e *SolutionVendor) handlePhaseGet(ctx context.Context, stepEnvelope StepEnvelope) error { + if findAgentFromDeploymentState(stepEnvelope.PlanState.Deployment, stepEnvelope.Step.Target) { + return e.enqueueProviderGetRequest(ctx, stepEnvelope) + } + return e.getProviderAndExecute(ctx, stepEnvelope) +} +func (e *SolutionVendor) enqueueProviderGetRequest(ctx context.Context, stepEnvelope StepEnvelope) error { + operationId := uuid.New().String() + providerGetRequest := &ProviderGetRequest{ + AgentRequest: AgentRequest{ + OperationID: operationId, + Provider: stepEnvelope.Step.Role, + Action: string(PhaseGet), + }, + References: stepEnvelope.Step.Components, + Deployment: stepEnvelope.PlanState.Deployment, + } + + log.InfoCtx(ctx, "V(Solution): Enqueue get message %s-%s %+v ", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace, providerGetRequest) + messageID, err := e.StagingManager.QueueProvider.Enqueue(fmt.Sprintf("%s-%s", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace), providerGetRequest) + if err != nil { + log.ErrorCtx(ctx, "V(Solution): Error in enqueue message %s", fmt.Sprintf("%s-%s", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace)) + return err + } + err = e.upsertOperationState(ctx, operationId, stepEnvelope.StepId, stepEnvelope.PlanState.PlanId, stepEnvelope.Step.Target, stepEnvelope.PlanState.Phase, stepEnvelope.PlanState.Namespace, stepEnvelope.Remove, messageID) + if err != nil { + log.ErrorCtx(ctx, "V(Solution) Error in insert operation Id %s", operationId) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } + return err +} + +func (e *SolutionVendor) getProviderAndExecute(ctx context.Context, stepEnvelope StepEnvelope) error { + provider, err := e.SolutionManager.GetTargetProviderForStep(stepEnvelope.Step.Target, stepEnvelope.Step.Role, stepEnvelope.PlanState.Deployment, stepEnvelope.PlanState.PreviousDesiredState) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to create provider & Failed to save summary progress: %v", err) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } + dep := stepEnvelope.PlanState.Deployment + dep.ActiveTarget = stepEnvelope.Step.Target + getResult, stepError := (provider.(tgt.ITargetProvider)).Get(ctx, dep, stepEnvelope.Step.Components) + if stepError != nil { + log.ErrorCtx(ctx, "V(Solution) Error in get target current states %+v", stepError) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, getResult, map[string]model.ComponentResultSpec{}) +} + +func (e *SolutionVendor) handlePhaseApply(ctx context.Context, stepEnvelope StepEnvelope) error { + if findAgentFromDeploymentState(stepEnvelope.PlanState.Deployment, stepEnvelope.Step.Target) { + return e.enqueueProviderApplyRequest(ctx, stepEnvelope) + } + return e.applyProviderAndExecute(ctx, stepEnvelope) +} + +func (e *SolutionVendor) enqueueProviderApplyRequest(ctx context.Context, stepEnvelope StepEnvelope) error { + operationId := uuid.New().String() + providApplyRequest := &ProviderApplyRequest{ + AgentRequest: AgentRequest{ + OperationID: operationId, + Provider: stepEnvelope.Step.Role, + Action: string(PhaseApply), + }, + Deployment: stepEnvelope.PlanState.Deployment, + Step: stepEnvelope.Step, + IsDryRun: stepEnvelope.PlanState.Deployment.IsDryRun, + } + messageId, err := e.StagingManager.QueueProvider.Enqueue(fmt.Sprintf("%s-%s", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace), providApplyRequest) + if err != nil { + return err + } + log.InfoCtx(ctx, "V(Solution): Enqueue apply message %s-%s %+v ", stepEnvelope.Step.Target, stepEnvelope.PlanState.Namespace, providApplyRequest) + err = e.upsertOperationState(ctx, operationId, stepEnvelope.StepId, stepEnvelope.PlanState.PlanId, stepEnvelope.Step.Target, stepEnvelope.PlanState.Phase, stepEnvelope.PlanState.Namespace, stepEnvelope.Remove, messageId) + if err != nil { + log.ErrorCtx(ctx, "error in insert operation Id %s", operationId) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } + return err +} + +func (e *SolutionVendor) applyProviderAndExecute(ctx context.Context, stepEnvelope StepEnvelope) error { + // get provider todo : is dry run + provider, err := e.SolutionManager.GetTargetProviderForStep(stepEnvelope.Step.Target, stepEnvelope.Step.Role, stepEnvelope.PlanState.Deployment, stepEnvelope.PlanState.PreviousDesiredState) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to create provider & Failed to save summary progress: %v", err) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, err, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } + previousDesiredState := stepEnvelope.PlanState.PreviousDesiredState + currentState := stepEnvelope.PlanState.CurrentState + step := stepEnvelope.Step + if previousDesiredState != nil { + testState := solution.MergeDeploymentStates(&previousDesiredState.State, currentState) + if e.SolutionManager.CanSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { + log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, nil, []model.ComponentSpec{}, map[string]model.ComponentResultSpec{}) + } } + componentResults, stepError := (provider.(tgt.ITargetProvider)).Apply(ctx, stepEnvelope.PlanState.Deployment, stepEnvelope.Step, stepEnvelope.PlanState.Deployment.IsDryRun) + return e.publishStepResult(ctx, stepEnvelope.Step.Target, stepEnvelope.PlanState.PlanId, stepEnvelope.StepId, stepError, []model.ComponentSpec{}, componentResults) } -func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse { + +func (e *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onQueue", }) @@ -111,7 +600,7 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon ContentType: "application/json", }) } - summary, err := c.SolutionManager.GetSummary(ctx, instance, namespace) + summary, err := e.SolutionManager.GetSummary(ctx, instance, namespace) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): onQueue failed - %s", err.Error()) @@ -177,7 +666,7 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon if delete == "true" { action = v1alpha2.JobDelete } - c.Vendor.Context.Publish("job", v1alpha2.Event{ + e.Vendor.Context.Publish("job", v1alpha2.Event{ Metadata: map[string]string{ "objectType": objectType, "namespace": namespace, @@ -209,7 +698,7 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon }) } - err := c.SolutionManager.DeleteSummary(ctx, instance, namespace) + err := e.SolutionManager.DeleteSummary(ctx, instance, namespace) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): onQueue DeleteSummary failed - %s", err.Error()) return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ @@ -229,7 +718,8 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon ContentType: "application/json", }) } -func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COAResponse { + +func (e *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onReconcile", }) @@ -253,6 +743,12 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe Body: []byte(err.Error()), }) } + lockName := api_utils.GenerateKeyLockName(namespace, deployment.Instance.ObjectMeta.Name) + // if !e.SolutionManager.KeyLockProvider.TryLock(api_utils.GenerateKeyLockName(namespace, deployment.Instance.ObjectMeta.Name)) { + // log.Info("can not get lock %s", lockName) + // } + e.SolutionManager.KeyLockProvider.Lock(lockName) + log.InfoCtx(ctx, "lock succeed %s", lockName) delete := request.Parameters["delete"] remove := delete == "true" targetName := "" @@ -261,6 +757,7 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe targetName = v } } + log.InfoCtx(ctx, "get deployment %+v", deployment) log.InfofCtx(ctx, " M (Solution): reconciling deployment.InstanceName: %s, deployment.SolutionName: %s, remove: %t, namespace: %s, targetName: %s, generation: %s, jobID: %s", deployment.Instance.ObjectMeta.Name, deployment.SolutionName, @@ -269,12 +766,13 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe targetName, deployment.Generation, deployment.JobID) - previousDesiredState := c.SolutionManager.GetPreviousState(ctx, deployment.Instance.ObjectMeta.Name, namespace) + previousDesiredState := e.SolutionManager.GetPreviousState(ctx, deployment.Instance.ObjectMeta.Name, namespace) // create new deployment state var state model.DeploymentState state, err = solution.NewDeploymentState(deployment) if err != nil { log.ErrorfCtx(ctx, " M (Solution): failed to create manager state for deployment: %+v", err) + e.UnlockObject(ctx, lockName) return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ State: v1alpha2.MethodNotAllowed, Body: []byte("{\"result\":\"405 - method not allowed\"}"), @@ -290,6 +788,21 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe JobID: deployment.JobID, } data, _ := json.Marshal(summary) + err = e.SolutionManager.SaveSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, model.SummaryStateRunning, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to create manager state for deployment: %+v", err) + e.UnlockObject(ctx, lockName) + e.SolutionManager.ConcludeSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(fmt.Sprintf("{\"result\":\"500 - M (Solution): failed to save summary: %+v\"}", err)), + ContentType: "application/json", + }) + } + + // stopCh := make(chan struct{}) + // defer close(stopCh) + // go e.SolutionManager.SendHeartbeat(ctx, deployment.Instance.ObjectMeta.Name, namespace, remove, stopCh) // get the components count for the deployment componentCount := len(deployment.Solution.Spec.Components) @@ -299,8 +812,8 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe metrics.UpdateOperationType, ) - if c.SolutionManager.VendorContext != nil && c.SolutionManager.VendorContext.EvaluationContext != nil { - context := c.SolutionManager.VendorContext.EvaluationContext.Clone() + if e.SolutionManager.VendorContext != nil && e.SolutionManager.VendorContext.EvaluationContext != nil { + context := e.SolutionManager.VendorContext.EvaluationContext.Clone() context.DeploymentSpec = deployment context.Value = deployment context.Component = "" @@ -312,32 +825,36 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe log.InfofCtx(ctx, " M (Solution): skipped failure to evaluate deployment spec: %+v", err) } else { summary.SummaryMessage = "failed to evaluate deployment spec: " + err.Error() + data, _ = json.Marshal(summary) log.ErrorfCtx(ctx, " M (Solution): failed to evaluate deployment spec: %+v", err) + e.SolutionManager.ConcludeSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + log.InfoCtx(ctx, "unlock7") + e.UnlockObject(ctx, lockName) return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(fmt.Sprintf("{\"result\":\"500 - M (Solution): failed to evaluate deployment spec: %+v\"}", err)), - ContentType: "application/json", + State: v1alpha2.GetErrorState(err), + Body: data, }) } } } - log.InfoCtx(ctx, "lock %s", deployment.Instance.ObjectMeta.Name) - c.SolutionManager.KeyLockProvider.Lock(api_utils.GenerateKeyLockName(namespace, deployment.Instance.ObjectMeta.Name)) + // e.SolutionManager.KeyLockProvider.Lock(api_utils.GenerateKeyLockName(namespace, deployment.Instance.ObjectMeta.Name)) // Generate new deployment plan for deployment initalPlan, err := solution.PlanForDeployment(deployment, state) if err != nil { + e.SolutionManager.ConcludeSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + log.ErrorfCtx(ctx, " M (Solution): failed initalPlan for deployment: %+v", err) + e.UnlockObject(ctx, lockName) return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ - State: v1alpha2.InternalError, - Body: []byte(fmt.Sprintf("{\"result\":\"500 - M (Solution): failed to generate initial plan: %+v\"}", err)), - ContentType: "application/json", + State: v1alpha2.GetErrorState(err), + Body: data, }) } // remove no use steps var stepList []model.DeploymentStep for _, step := range initalPlan.Steps { - if c.SolutionManager.IsTarget && !api_utils.ContainsString(c.SolutionManager.TargetNames, step.Target) { + if e.SolutionManager.IsTarget && !api_utils.ContainsString(e.SolutionManager.TargetNames, step.Target) { continue } if targetName != "" && targetName != step.Target { @@ -347,7 +864,7 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe } initalPlan.Steps = stepList log.InfoCtx(ctx, "publish topic for object %s", deployment.Instance.ObjectMeta.Name) - c.Vendor.Context.Publish(DeploymentPlanTopic, v1alpha2.Event{ + e.Vendor.Context.Publish(DeploymentPlanTopic, v1alpha2.Event{ Metadata: map[string]string{ "Id": deployment.JobID, }, @@ -356,14 +873,14 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe Deployment: deployment, MergedState: model.DeploymentState{}, PreviousDesiredState: previousDesiredState, - PlanId: deployment.Instance.ObjectMeta.Name, + PlanId: uuid.New().String(), Remove: delete == "true", Namespace: namespace, Phase: PhaseGet, }, Context: ctx, }) - c.SolutionManager.SaveSummary(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, model.SummaryStateRunning, namespace) + return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ State: v1alpha2.OK, Body: data, @@ -378,7 +895,266 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe }) } -func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse { +// onGetRequest handles the get request from the remote agent. +func (e *SolutionVendor) onGetRequest(request v1alpha2.COARequest) v1alpha2.COAResponse { + ctx, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ + "method": "onGetRequest", + }) + defer span.End() + var agentRequest AgentRequest + sLog.InfoCtx(ctx, "V(Solution): get request from remote agent") + target := request.Parameters["target"] + namespace := request.Parameters["namespace"] + getAll, exists := request.Parameters["getAll"] + + if exists && getAll == "true" { + // Logic to handle getALL parameter + sLog.InfoCtx(ctx, "V(Solution): getALL request from remote agent %+v", agentRequest) + + start, startExist := request.Parameters["preindex"] + if !startExist { + start = "0" + } + sizeStr, sizeExist := request.Parameters["size"] + var size int + var err error + if !sizeExist { + size = 10 + } else { + size, err = strconv.Atoi(sizeStr) + if err != nil { + // Handle the error, for example, set a default value or return an error + size = 10 + } + } + + return e.getTaskFromQueueByPaging(ctx, target, namespace, start, size) + } + return e.getTaskFromQueue(ctx, target, namespace) +} + +// onGetResponse handles the get response from the remote agent. +func (e *SolutionVendor) onGetResponse(request v1alpha2.COARequest) v1alpha2.COAResponse { + ctx, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ + "method": "onGetResponse", + }) + defer span.End() + + var asyncResult AsyncResult + err := json.Unmarshal(request.Body, &asyncResult) + if err != nil { + sLog.ErrorfCtx(ctx, "V(Solution): onGetResponse failed - %s", err.Error()) + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + sLog.InfoCtx(ctx, "V(Solution): get async result from remote agent %+v", asyncResult) + return e.handleRemoteAgentExecuteResult(ctx, asyncResult) +} + +// handleRemoteAgentExecuteResult handles the execution result from the remote agent. +func (e *SolutionVendor) handleRemoteAgentExecuteResult(ctx context.Context, asyncResult AsyncResult) v1alpha2.COAResponse { + // Get operation ID + operationId := asyncResult.OperationID + // Get related info from redis - todo: timeout + log.InfoCtx(ctx, "V(SolutionVendor): handle remote agent request %+v", asyncResult) + operationBody, err := e.getOperationState(ctx, operationId) + if err != nil { + sLog.ErrorfCtx(ctx, "V(SolutionVendor): onGetResponse failed - %s", err.Error()) + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + queueName := fmt.Sprintf("%s-%s", operationBody.Target, operationBody.NameSpace) + switch operationBody.Action { + case PhaseGet: + // Send to step result + var response []model.ComponentSpec + err := json.Unmarshal(asyncResult.Body, &response) + if err != nil { + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + e.publishStepResult(ctx, operationBody.Target, operationBody.PlanId, operationBody.StepId, asyncResult.Error, response, map[string]model.ComponentResultSpec{}) + deleteRequest := states.DeleteRequest{ + ID: operationId, + } + + err = e.StagingManager.StateProvider.Delete(ctx, deleteRequest) + if err != nil { + return v1alpha2.COAResponse{ + State: v1alpha2.BadRequest, + Body: []byte("{\"result\":\"405 - delete operation Id failed\"}"), + ContentType: "application/json", + } + } + // delete from queue + + e.StagingManager.QueueProvider.RemoveFromQueue(queueName, operationBody.MessageId) + return v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: []byte("{\"result\":\"200 - handle async result successfully\"}"), + ContentType: "application/json", + } + case PhaseApply: + var response map[string]model.ComponentResultSpec + err := json.Unmarshal(asyncResult.Body, &response) + if err != nil { + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + e.publishStepResult(ctx, operationBody.Target, operationBody.PlanId, operationBody.StepId, asyncResult.Error, []model.ComponentSpec{}, response) + deleteRequest := states.DeleteRequest{ + ID: operationId, + } + err = e.StagingManager.StateProvider.Delete(ctx, deleteRequest) + // delete from queue + e.StagingManager.QueueProvider.RemoveFromQueue(queueName, operationBody.MessageId) + if err != nil { + return v1alpha2.COAResponse{ + State: v1alpha2.BadRequest, + Body: []byte("{\"result\":\"delete operation Id failed\"}"), + ContentType: "application/json", + } + } + return v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: []byte("{\"result\":\"200 - get response successfully\"}"), + ContentType: "application/json", + } + } + return v1alpha2.COAResponse{ + State: v1alpha2.MethodNotAllowed, + Body: []byte("{\"result\":\"405 - method not allowed\"}"), + ContentType: "application/json", + } +} + +// getTaskFromQueue retrieves a task from the queue for the specified target and namespace. +func (e *SolutionVendor) getTaskFromQueue(ctx context.Context, target string, namespace string) v1alpha2.COAResponse { + ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ + "method": "doGetFromQueue", + }) + queueName := fmt.Sprintf("%s-%s", target, namespace) + sLog.InfoCtx(ctx, "V(SolutionVendor): getFromQueue %s queue length %s", queueName) + defer span.End() + var queueElement interface{} + var err error + queueElement, err = e.StagingManager.QueueProvider.Peek(queueName) + if err != nil { + sLog.ErrorfCtx(ctx, "V(SolutionVendor): getQueue failed - %s", err.Error()) + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + data, _ := json.Marshal(queueElement) + return v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: data, + ContentType: "application/json", + } +} + +// getTaskFromQueue retrieves a task from the queue for the specified target and namespace. +func (e *SolutionVendor) getTaskFromQueueByPaging(ctx context.Context, target string, namespace string, start string, size int) v1alpha2.COAResponse { + ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ + "method": "doGetFromQueue", + }) + queueName := fmt.Sprintf("%s-%s", target, namespace) + sLog.InfoCtx(ctx, "V(SolutionVendor): getFromQueue %s queue length %s", queueName) + defer span.End() + var err error + queueElement, lastMessageID, err := e.StagingManager.QueueProvider.QueryByPaging(queueName, start, size) + var requestList []AgentRequest + for _, element := range queueElement { + var agentRequest AgentRequest + err = json.Unmarshal(element, &agentRequest) + log.InfoCtx(ctx, "unmarshal one element", agentRequest) + if err != nil { + sLog.ErrorfCtx(ctx, "V(SolutionVendor): failed to unmarshal element - %s", err.Error()) + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + requestList = append(requestList, agentRequest) + } + response := &ProviderPagingRequest{ + RequestList: requestList, + LastMessageID: lastMessageID, + } + if err != nil { + sLog.ErrorfCtx(ctx, "V(SolutionVendor): getQueue failed - %s", err.Error()) + return v1alpha2.COAResponse{ + State: v1alpha2.InternalError, + Body: []byte(err.Error()), + } + } + data, _ := json.Marshal(response) + return v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: data, + ContentType: "application/json", + } +} + +// upsertOperationState upserts the operation state for the specified parameters. +func (e *SolutionVendor) upsertOperationState(ctx context.Context, operationId string, stepId int, planId string, target string, action JobPhase, namespace string, remove bool, messageId string) error { + upsertRequest := states.UpsertRequest{ + Value: states.StateEntry{ + ID: operationId, + Body: map[string]interface{}{ + "StepId": stepId, + "PlanId": planId, + "Target": target, + "Action": action, + "namespace": namespace, + "Remove": remove, + "MessageId": messageId, + }}, + } + _, err := e.StagingManager.StateProvider.Upsert(ctx, upsertRequest) + return err +} + +// getOperationState retrieves the operation state for the specified operation ID. +func (e *SolutionVendor) getOperationState(ctx context.Context, operationId string) (OperationBody, error) { + getRequest := states.GetRequest{ + ID: operationId, + } + var entry states.StateEntry + entry, err := e.StagingManager.StateProvider.Get(ctx, getRequest) + if err != nil { + return OperationBody{}, err + } + var ret OperationBody + ret, err = e.getOperationBody(entry.Body) + if err != nil { + log.ErrorfCtx(ctx, "V(SolutionVendor): Failed to convert to operation state for %s", operationId) + return OperationBody{}, err + } + return ret, err +} + +// getOperationBody converts the body to an OperationBody. +func (e *SolutionVendor) getOperationBody(body interface{}) (OperationBody, error) { + var operationBody OperationBody + bytes, _ := json.Marshal(body) + err := json.Unmarshal(bytes, &operationBody) + if err != nil { + return OperationBody{}, err + } + return operationBody, nil +} + +func (e *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onApplyDeployment", }) @@ -408,7 +1184,7 @@ func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2 Body: []byte(err.Error()), } } - response := c.doDeploy(ctx, *deployment, namespace, targetName) + response := e.doDeploy(ctx, *deployment, namespace, targetName) return observ_utils.CloseSpanWithCOAResponse(span, response) case fasthttp.MethodGet: ctx, span := observability.StartSpan("Get Components", rContext, nil) @@ -422,7 +1198,7 @@ func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2 Body: []byte(err.Error()), } } - response := c.doGet(ctx, *deployment, targetName) + response := e.doGet(ctx, *deployment, targetName) return observ_utils.CloseSpanWithCOAResponse(span, response) case fasthttp.MethodDelete: ctx, span := observability.StartSpan("Delete Components", rContext, nil) @@ -436,7 +1212,7 @@ func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2 Body: []byte(err.Error()), } } - response := c.doRemove(ctx, deployment, namespace, targetName) + response := e.doRemove(ctx, deployment, namespace, targetName) return observ_utils.CloseSpanWithCOAResponse(span, response) } sLog.ErrorCtx(rContext, "V (Solution): onApplyDeployment failed - 405 method not allowed") @@ -449,14 +1225,14 @@ func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2 return resp } -func (c *SolutionVendor) doGet(ctx context.Context, deployment model.DeploymentSpec, targetName string) v1alpha2.COAResponse { +func (e *SolutionVendor) doGet(ctx context.Context, deployment model.DeploymentSpec, targetName string) v1alpha2.COAResponse { ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ "method": "doGet", }) defer span.End() sLog.InfoCtx(ctx, "V (Solution): doGet") - _, components, err := c.SolutionManager.Get(ctx, deployment, targetName) + _, components, err := e.SolutionManager.Get(ctx, deployment, targetName) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): doGet failed - %s", err.Error()) response := v1alpha2.COAResponse{ @@ -475,13 +1251,13 @@ func (c *SolutionVendor) doGet(ctx context.Context, deployment model.DeploymentS observ_utils.UpdateSpanStatusFromCOAResponse(span, response) return response } -func (c *SolutionVendor) doDeploy(ctx context.Context, deployment model.DeploymentSpec, namespace string, targetName string) v1alpha2.COAResponse { +func (e *SolutionVendor) doDeploy(ctx context.Context, deployment model.DeploymentSpec, namespace string, targetName string) v1alpha2.COAResponse { ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ "method": "doDeploy", }) defer span.End() sLog.InfoCtx(ctx, "V (Solution): doDeploy") - summary, err := c.SolutionManager.Reconcile(ctx, deployment, false, namespace, targetName) + summary, err := e.SolutionManager.Reconcile(ctx, deployment, false, namespace, targetName) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): doDeploy failed - %s", err.Error()) @@ -500,14 +1276,14 @@ func (c *SolutionVendor) doDeploy(ctx context.Context, deployment model.Deployme observ_utils.UpdateSpanStatusFromCOAResponse(span, response) return response } -func (c *SolutionVendor) doRemove(ctx context.Context, deployment model.DeploymentSpec, namespace string, targetName string) v1alpha2.COAResponse { +func (e *SolutionVendor) doRemove(ctx context.Context, deployment model.DeploymentSpec, namespace string, targetName string) v1alpha2.COAResponse { ctx, span := observability.StartSpan("Solution Vendor", ctx, &map[string]string{ "method": "doRemove", }) defer span.End() sLog.InfoCtx(ctx, "V (Solution): doRemove") - summary, err := c.SolutionManager.Reconcile(ctx, deployment, true, namespace, targetName) + summary, err := e.SolutionManager.Reconcile(ctx, deployment, true, namespace, targetName) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): doRemove failed - %s", err.Error()) @@ -526,3 +1302,134 @@ func (c *SolutionVendor) doRemove(ctx context.Context, deployment model.Deployme observ_utils.UpdateSpanStatusFromCOAResponse(span, response) return response } + +// threeStateMerge merges the current, previous, and desired states to create a deployment plan. +func (e *SolutionVendor) threeStateMerge(ctx context.Context, planState *PlanState) (model.DeploymentPlan, *PlanState, error) { + currentState := model.DeploymentState{} + currentState.TargetComponent = make(map[string]string) + + for _, StepState := range planState.StepStates { + for _, c := range StepState.GetResult { + key := fmt.Sprintf("%s::%s", c.Name, StepState.Target) + role := c.Type + if role == "" { + role = "instance" + } + log.InfoCtx(ctx, "V(Solution): Store key value in current key: %s value: %s", key, role) + currentState.TargetComponent[key] = role + } + } + log.InfoCtx(ctx, "V(Solution): Compute current state %v for Plan ID: %s", currentState, planState.PlanId) + planState.CurrentState = currentState + previousDesiredState := e.SolutionManager.GetPreviousState(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Namespace) + log.InfoCtx(ctx, "V(Solution): Get previous desired state %+v", previousDesiredState) + planState.PreviousDesiredState = previousDesiredState + var currentDesiredState model.DeploymentState + currentDesiredState, err := solution.NewDeploymentState(planState.Deployment) + if err != nil { + log.ErrorfCtx(ctx, "V(Solution): Failed to get current desired state: %+v", err) + return model.DeploymentPlan{}, &PlanState{}, err + } + log.InfoCtx(ctx, "V(Solution): Get current desired state %+v", currentDesiredState) + desiredState := currentDesiredState + if previousDesiredState != nil { + desiredState = solution.MergeDeploymentStates(&previousDesiredState.State, currentDesiredState) + } + log.InfoCtx(ctx, "V(Solution): Get desired state %+v", desiredState) + if planState.Remove { + desiredState.MarkRemoveAll() + log.InfoCtx(ctx, "V(Solution): After remove desired state %+v", desiredState) + } + + mergedState := solution.MergeDeploymentStates(¤tState, desiredState) + planState.MergedState = mergedState + log.InfoCtx(ctx, "get merged state %+v", mergedState) + Plan, err := solution.PlanForDeployment(planState.Deployment, mergedState) + if err != nil { + return model.DeploymentPlan{}, &PlanState{}, err + } + e.PlanManager.Plans.Store(planState.PlanId, planState) + log.InfoCtx(ctx, "V(Solution): Begin to publish topic to deployment plan %v merged state %v get plan %v", planState, mergedState, Plan) + return Plan, planState, nil +} + +func (e *SolutionVendor) UnlockObject(ctx context.Context, lockName string) { + e.SolutionManager.KeyLockProvider.UnLock(lockName) +} +func (e *SolutionVendor) SaveSummaryInfo(ctx context.Context, planState *PlanState, state model.SummaryState) error { + return e.SolutionManager.SaveSummary(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Deployment.Generation, planState.Deployment.Hash, planState.Summary, state, planState.Namespace) +} + +func (e *SolutionVendor) handleAllPlanCompletetion(ctx context.Context, planState *PlanState) error { + log.InfofCtx(ctx, "handle plan completetion:begin to handle plan completetion %v", planState) + if err := e.SaveSummaryInfo(ctx, planState, model.SummaryStateDone); err != nil { + return err + } + // update summary + log.InfoCtx(ctx, "begin to save summary for %s", planState.Deployment.Instance.ObjectMeta.Name) + planState.MergedState.ClearAllRemoved() + log.InfoCtx(ctx, "if it is dry run %+v", planState.Deployment.IsDryRun) + log.InfoCtx(ctx, "get dep %+v", planState.Deployment) + if !planState.Deployment.IsDryRun { + if len(planState.MergedState.TargetComponent) == 0 && planState.Remove { + log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") + e.SolutionManager.StateProvider.Delete(ctx, states.DeleteRequest{ + ID: planState.Deployment.Instance.ObjectMeta.Name, + Metadata: map[string]interface{}{ + "namespace": planState.Namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + } else { + log.InfoCtx(ctx, "begin to save state %s", planState.Deployment.Instance.ObjectMeta.Name) + log.InfoCtx(ctx, "begin to save state deployment%s", planState.Deployment) + log.InfoCtx(ctx, "begin to save state deployment%s", planState.MergedState) + e.SolutionManager.StateProvider.Upsert(ctx, states.UpsertRequest{ + Value: states.StateEntry{ + ID: planState.Deployment.Instance.ObjectMeta.Name, + Body: solution.SolutionManagerDeploymentState{ + Spec: planState.Deployment, + State: planState.MergedState, + }, + }, + Metadata: map[string]interface{}{ + "namespace": planState.Namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + } + } + if planState.Deployment.IsDryRun { + planState.Summary.SuccessCount = 0 + } + if err := e.SolutionManager.ConcludeSummary(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Deployment.Generation, planState.Deployment.Hash, planState.Summary, planState.Namespace); err != nil { + return err + } + log.InfoCtx(ctx, "final unlock %s", planState.Deployment.Instance.ObjectMeta.Name) + // e.SolutionManager.CleanupHeartbeat(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Namespace, planState.Remove) + e.UnlockObject(ctx, api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name)) + return nil +} + +func (p *PlanState) IsExpired() bool { + log.Info("time now") + log.Info("time expired") + return time.Now().After(p.ExpireTime) +} + +func (p *PlanState) isCompleted() bool { + return p.CompletedSteps == p.TotalSteps +} +func (pm *PlanManager) GetPlan(planId string) (*PlanState, bool) { + if value, ok := pm.Plans.Load(planId); ok { + return value.(*PlanState), true + } + return nil, false +} +func (pm *PlanManager) DeletePlan(planId string) { + pm.Plans.Delete(planId) +} diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go index c20aeab3b..148232820 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go @@ -180,7 +180,7 @@ func TestSolutionEndpoints(t *testing.T) { vendor := createSolutionVendor() vendor.Route = "solution" endpoints := vendor.GetEndpoints() - assert.Equal(t, 3, len(endpoints)) + assert.Equal(t, 5, len(endpoints)) } func TestSolutionInfo(t *testing.T) { diff --git a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go index 6500c9704..9ac45717b 100644 --- a/api/pkg/apis/v1alpha1/vendors/stage-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/stage-vendor.go @@ -10,7 +10,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/activations" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/campaigns" @@ -26,7 +25,6 @@ import ( "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/pubsub" - states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/vendors" "github.com/eclipse-symphony/symphony/coa/pkg/logger" ) @@ -39,7 +37,6 @@ type StageVendor struct { CampaignsManager *campaigns.CampaignsManager ActivationsManager *activations.ActivationsManager SolutionManager *solution.SolutionManager - PlanManager *PlanManager } func (s *StageVendor) GetInfo() vendors.VendorInfo { @@ -53,10 +50,6 @@ func (s *StageVendor) GetInfo() vendors.VendorInfo { func (o *StageVendor) GetEndpoints() []v1alpha2.Endpoint { return []v1alpha2.Endpoint{} } - -func NewPlanManager() *PlanManager { - return &PlanManager{} -} func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IManagerFactroy, providers map[string]map[string]providers.IProvider, pubsubProvider pubsub.IPubSubProvider) error { err := s.Vendor.Init(config, factories, providers, pubsubProvider) if err != nil { @@ -76,7 +69,6 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa s.SolutionManager = c } } - s.PlanManager = NewPlanManager() if s.StageManager == nil { return v1alpha2.NewCOAError(nil, "stage manager is not supplied", v1alpha2.MissingConfig) } @@ -86,9 +78,6 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa if s.ActivationsManager == nil { return v1alpha2.NewCOAError(nil, "activations manager is not supplied", v1alpha2.MissingConfig) } - if s.SolutionManager == nil { - return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig) - } s.Vendor.Context.Subscribe("activation", v1alpha2.EventHandler{ Handler: func(topic string, event v1alpha2.Event) error { ctx := context.TODO() @@ -360,246 +349,10 @@ func (s *StageVendor) Init(config vendors.VendorConfig, factories []managers.IMa return nil }, }) - s.Vendor.Context.Subscribe(DeploymentPlanTopic, v1alpha2.EventHandler{ - Handler: func(topic string, event v1alpha2.Event) error { - ctx := context.TODO() - if event.Context != nil { - ctx = event.Context - } - - log.InfoCtx(ctx, "V(StageVendor): Begin to execute deployment-plan") - return s.handleDeploymentPlan(ctx, event) - }, - Group: "stage-vendor", - }) - - s.Vendor.Context.Subscribe(CollectStepResultTopic, v1alpha2.EventHandler{ - Handler: func(topic string, event v1alpha2.Event) error { - ctx := event.Context - if ctx == nil { - ctx = context.TODO() - } - return s.handleStepResult(ctx, event) - }, - Group: "stage-vendor", - }) - return nil -} - -// handleStepResult processes the event and updates the plan state accordingly. -func (s *StageVendor) handleStepResult(ctx context.Context, event v1alpha2.Event) error { - var stepResult StepResult - - // Marshal the event body to JSON - jData, _ := json.Marshal(event.Body) - log.InfofCtx(ctx, "Received event body: %s", string(jData)) - - // Unmarshal the JSON data into stepResult - if err := json.Unmarshal(jData, &stepResult); err != nil { - log.ErrorfCtx(ctx, "Failed to unmarshal step result: %v", err) - return err - } - - planId := stepResult.PlanId - - // Load the plan state object from the PlanManager - planStateObj, exists := s.PlanManager.Plans.Load(planId) - if !exists { - log.ErrorCtx(ctx, "Plan not found: %s", planId) - return fmt.Errorf("Plan not found: %s", planId) - } - planState := planStateObj.(*PlanState) - - // Update the plan state in the map and save the summary - if err := s.saveStepResult(ctx, planState, stepResult); err != nil { - log.ErrorCtx(ctx, "Failed to update plan state: %v", err) - return err - } - - return nil -} -func (s *StageVendor) handleDeploymentPlan(ctx context.Context, event v1alpha2.Event) error { - var planEnvelope PlanEnvelope - jData, _ := json.Marshal(event.Body) - err := json.Unmarshal(jData, &planEnvelope) - if err != nil { - log.ErrorCtx(ctx, "failed to unmarshal plan envelope :%v", err) - return err - } - planState := s.createPlanState(ctx, planEnvelope) - log.InfoCtx(ctx, "begin to save summary for %s", planState.Deployment.Instance.ObjectMeta.Name) - s.SaveSummaryInfo(ctx, planState, model.SummaryStateRunning) - if planState.isCompleted() { - return s.handlePlanComplete(ctx, planState) - - } - for _, step := range planEnvelope.Plan.Steps { - switch planEnvelope.Phase { - case PhaseGet: - log.InfoCtx(ctx, "phase get begin deployment %+v", planEnvelope.Deployment) - if err := s.publishStepResult(ctx, 0, planState, planEnvelope.Remove, planState.Steps[0]); err != nil { - log.InfoCtx(ctx, "V(Federation): publish deployment step failed PlanId %s, stepId %s", planEnvelope.PlanId, 0) - return err - } - case PhaseApply: - planState.Summary.PlannedDeployment += len(step.Components) - } - } - // for i, step := range planEnvelope.Plan.Steps { - switch planEnvelope.Phase { - case PhaseApply: - // planState.Summary.PlannedDeployment += len(planEnvelope.Plan.Steps[0].Components) - log.InfoCtx(ctx, "V(Federation): publish deployment step id %s step %+v", 0, planEnvelope.Plan.Steps[0].Role) - if err := s.publishStepResult(ctx, 0, planState, planEnvelope.Remove, planState.Steps[0]); err != nil { - log.InfoCtx(ctx, "V(Federation): publish deployment step failed PlanId %s, stepId %s", planEnvelope.PlanId, 0) - return err - } - } - // } - log.InfoCtx(ctx, "V(Federation): store plan id %s in map %+v", planEnvelope.PlanId) - s.PlanManager.Plans.Store(planEnvelope.PlanId, planState) - return nil -} -func (s *StageVendor) publishStepResult(ctx context.Context, stepId int, planState *PlanState, remove bool, step model.DeploymentStep) error { - log.InfoCtx(ctx, "V(StageVendor): publish deployment step for PlanId %s StepId %s", planState.PlanId, stepId) - if err := s.Vendor.Context.Publish("deployment-step", v1alpha2.Event{ - Body: StepEnvelope{ - Step: step, - StepId: stepId, - Remove: remove, - PlanState: planState, - }, - Context: ctx, - }); err != nil { - log.InfoCtx(ctx, "V(StageVendor): publish deployment step failed PlanId %s, stepId %s", planState.PlanId, stepId) - return err - } - return nil -} - -// create inital plan state -func (s *StageVendor) createPlanState(ctx context.Context, planEnvelope PlanEnvelope) *PlanState { - return &PlanState{ - PlanId: planEnvelope.PlanId, - StartTime: time.Now(), - TotalSteps: len(planEnvelope.Plan.Steps), - Phase: planEnvelope.Phase, - Summary: model.SummarySpec{ - TargetResults: make(map[string]model.TargetResultSpec), - TargetCount: len(planEnvelope.Deployment.Targets), - SuccessCount: 0, - AllAssignedDeployed: true, - JobID: planEnvelope.Deployment.JobID, - IsRemoval: planEnvelope.Remove, - }, - PreviousDesiredState: planEnvelope.PreviousDesiredState, - CompletedSteps: 0, - MergedState: planEnvelope.MergedState, - Deployment: planEnvelope.Deployment, - Namespace: planEnvelope.Namespace, - Remove: planEnvelope.Remove, - TargetResult: make(map[string]int), - CurrentState: planEnvelope.CurrentState, - StepStates: make([]StepState, len(planEnvelope.Plan.Steps)), - Steps: planEnvelope.Plan.Steps, - } -} - -// saveStepResult updates the plan state with the step result and saves the summary. -func (s *StageVendor) saveStepResult(ctx context.Context, planState *PlanState, stepResult StepResult) error { - // Log the update of plan state with the step result - log.InfoCtx(ctx, "V(Stage): Update plan state %v with step result %v phase %s", planState, stepResult, planState.Phase) - planState.CompletedSteps++ - - switch planState.Phase { - case PhaseGet: - // Update the GetResult for the specific step - planState.StepStates[stepResult.StepId].GetResult = stepResult.GetResult - case PhaseApply: - if stepResult.Error != "" { - // Handle error case and update the target result status and message - targetResultStatus := fmt.Sprintf("%s Failed", deploymentTypeMap[planState.Remove]) - targetResultMessage := fmt.Sprintf("Failed to create provider %s, err: %s", deploymentTypeMap[planState.Remove], stepResult.Error) - targetResultSpec := model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: stepResult.ApplyResult} - planState.Summary.UpdateTargetResult(stepResult.Target, targetResultSpec) - planState.Summary.AllAssignedDeployed = false - for _, ret := range stepResult.ApplyResult { - if (!planState.Remove && ret.Status == v1alpha2.Updated) || (planState.Remove && ret.Status == v1alpha2.Deleted) { - planState.Summary.CurrentDeployed++ - } - } - if planState.TargetResult[stepResult.Target] == 1 || planState.TargetResult[stepResult.Target] == 0 { - planState.TargetResult[stepResult.Target] = -1 - planState.Summary.SuccessCount -= planState.TargetResult[stepResult.Target] - } - return s.handlePlanComplete(ctx, planState) - } else { - // Handle success case and update the target result status and message - targetResultSpec := model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: stepResult.ApplyResult} - planState.Summary.UpdateTargetResult(stepResult.Target, targetResultSpec) - log.InfoCtx(ctx, "Update plan state target spec %v", targetResultSpec) - planState.Summary.CurrentDeployed += len(stepResult.ApplyResult) - if planState.TargetResult[stepResult.Target] == 0 { - planState.TargetResult[stepResult.Target] = 1 - planState.Summary.SuccessCount++ - } - // publish next step execute event - if stepResult.StepId != planState.TotalSteps-1 { - log.InfoCtx(ctx, "V(Stage): publish deployment step id %s step %+v", stepResult.StepId+1, planState.Steps[stepResult.StepId+1].Role) - if err := s.publishStepResult(ctx, stepResult.StepId+1, planState, planState.Remove, planState.Steps[stepResult.StepId+1]); err != nil { - log.InfoCtx(ctx, "V(Stage): publish deployment step failed PlanId %s, stepId %s", planState.PlanId, 0) - return err - } - } - - } - - // If no components are deployed, set success count to target count - if planState.Summary.CurrentDeployed == 0 && planState.Summary.AllAssignedDeployed { - planState.Summary.SuccessCount = planState.Summary.TargetCount - } - - // Save the summary information - log.InfoCtx(ctx, "begin to save summary for %s", planState.Deployment.Instance.ObjectMeta.Name) - if err := s.SaveSummaryInfo(ctx, planState, model.SummaryStateRunning); err != nil { - log.ErrorfCtx(ctx, "Failed to save summary progress: %v", err) - } - } - - // Store the updated plan state - s.PlanManager.Plans.Store(planState.PlanId, planState) - - // Check if all steps are completed and handle plan completion - if planState.isCompleted() { - return s.handlePlanComplete(ctx, planState) - } return nil } -// handlePlanComplete handles the completion of a plan and updates its status. -func (s *StageVendor) handlePlanComplete(ctx context.Context, planState *PlanState) error { - log.InfoCtx(ctx, "V(Stage): Plan state %s is completed %v", planState.Phase, planState) - if !planState.Summary.AllAssignedDeployed { - planState.Status = "failed" - } - log.InfoCtx(ctx, "V(Stage): Plan state is completed %v", planState.Summary.AllAssignedDeployed) - switch planState.Phase { - case PhaseGet: - if err := s.handleGetPlanCompletetion(ctx, planState); err != nil { - log.ErrorfCtx(ctx, "V(Stage): Failed to handle get plan completion: %v", err) - return err - } - case PhaseApply: - if err := s.handleApplyPlanCompletetion(ctx, planState); err != nil { - log.ErrorfCtx(ctx, "V(Stage): Failed to handle apply plan completion: %v", err) - return err - } - } - s.PlanManager.DeletePlan(planState.PlanId) - return nil -} - func (s *StageVendor) reportActivationStatusWithBadRequest(activation string, namespace string, err error) error { status := model.StageStatus{ Stage: "", @@ -616,153 +369,3 @@ func (s *StageVendor) reportActivationStatusWithBadRequest(activation string, na } return err } - -// handleGetPlanCompletetion handles the completion of the get plan phase. -func (s *StageVendor) handleGetPlanCompletetion(ctx context.Context, planState *PlanState) error { - // Collect result - log.InfoCtx(ctx, "V(Stage): Begin to get current state %v", planState) - Plan, err := s.threeStateMerge(ctx, planState) - if err != nil { - log.ErrorfCtx(ctx, "V(Stage): Failed to merge states: %v", err) - return err - } - s.Vendor.Context.Publish(DeploymentPlanTopic, v1alpha2.Event{ - Metadata: map[string]string{ - "Id": planState.Deployment.JobID, - }, - Body: PlanEnvelope{ - Plan: Plan, - Deployment: planState.Deployment, - MergedState: planState.MergedState, - CurrentState: planState.CurrentState, - PreviousDesiredState: planState.PreviousDesiredState, - PlanId: planState.Deployment.Instance.ObjectMeta.Name, - Remove: planState.Remove, - Namespace: planState.Namespace, - Phase: PhaseApply, - }, - Context: ctx, - }) - return nil -} - -// threeStateMerge merges the current, previous, and desired states to create a deployment plan. -func (s *StageVendor) threeStateMerge(ctx context.Context, planState *PlanState) (model.DeploymentPlan, error) { - currentState := model.DeploymentState{} - currentState.TargetComponent = make(map[string]string) - - for _, StepState := range planState.StepStates { - for _, c := range StepState.GetResult { - key := fmt.Sprintf("%s::%s", c.Name, StepState.Target) - role := c.Type - if role == "" { - role = "instance" - } - log.InfoCtx(ctx, "V(Stage): Store key value in current key: %s value: %s", key, role) - currentState.TargetComponent[key] = role - } - } - log.InfoCtx(ctx, "V(Stage): Compute current state %v for Plan ID: %s", currentState, planState.PlanId) - planState.CurrentState = currentState - previousDesiredState := s.SolutionManager.GetPreviousState(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Namespace) - planState.PreviousDesiredState = previousDesiredState - var currentDesiredState model.DeploymentState - currentDesiredState, err := solution.NewDeploymentState(planState.Deployment) - if err != nil { - log.ErrorfCtx(ctx, "V(Stage): Failed to get current desired state: %+v", err) - return model.DeploymentPlan{}, err - } - log.InfoCtx(ctx, "V(Stage): Get current desired state %+v", currentDesiredState) - desiredState := currentDesiredState - if previousDesiredState != nil { - desiredState = solution.MergeDeploymentStates(&previousDesiredState.State, currentDesiredState) - } - log.InfoCtx(ctx, "V(Stage): Get desired state %+v", desiredState) - if planState.Remove { - desiredState.MarkRemoveAll() - log.InfoCtx(ctx, "V(Stage): After remove desired state %+v", desiredState) - } - - mergedState := solution.MergeDeploymentStates(¤tState, desiredState) - planState.MergedState = mergedState - Plan, err := solution.PlanForDeployment(planState.Deployment, mergedState) - if err != nil { - log.ErrorfCtx(ctx, "V(Stage): Plan generate error") - return model.DeploymentPlan{}, err - } - log.InfoCtx(ctx, "V(Stage): Begin to publish topic to deployment plan %v merged state %v get plan %v", planState, mergedState, Plan) - return Plan, nil -} - -func (s *StageVendor) SaveSummaryInfo(ctx context.Context, planState *PlanState, state model.SummaryState) error { - return s.SolutionManager.SaveSummary(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Deployment.Generation, planState.Deployment.Hash, planState.Summary, model.SummaryStateRunning, planState.Namespace) -} -func (s *StageVendor) handleApplyPlanCompletetion(ctx context.Context, planState *PlanState) error { - log.InfofCtx(ctx, "handle plan completetion:begin to handle plan completetion %v", planState) - if err := s.SaveSummaryInfo(ctx, planState, model.SummaryStateDone); err != nil { - log.ErrorfCtx(ctx, "Failed to save summary progress done: %v", err) - return err - } - // update summary - log.InfoCtx(ctx, "begin to save summary for %s", planState.Deployment.Instance.ObjectMeta.Name) - if err := s.SolutionManager.ConcludeSummary(ctx, planState.Deployment.Instance.ObjectMeta.Name, planState.Deployment.Generation, planState.Deployment.Hash, planState.Summary, planState.Namespace); err != nil { - log.ErrorfCtx(ctx, "handle plan completetion: failed to conclude summary: %v", err) - return err - } - planState.MergedState.ClearAllRemoved() - - if !planState.Deployment.IsDryRun { - if len(planState.MergedState.TargetComponent) == 0 && planState.Remove { - log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") - s.SolutionManager.StateProvider.Delete(ctx, states.DeleteRequest{ - ID: planState.Deployment.Instance.ObjectMeta.Name, - Metadata: map[string]interface{}{ - "namespace": planState.Namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - } else { - s.SolutionManager.StateProvider.Upsert(ctx, states.UpsertRequest{ - Value: states.StateEntry{ - ID: planState.Deployment.Instance.ObjectMeta.Name, - Body: solution.SolutionManagerDeploymentState{ - Spec: planState.Deployment, - State: planState.MergedState, - }, - }, - Metadata: map[string]interface{}{ - "namespace": planState.Namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - } - } - log.InfoCtx(ctx, "unlock %s", planState.Deployment.Instance.ObjectMeta.Name) - if !s.SolutionManager.KeyLockProvider.TryLock(api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name)) { - log.InfoCtx(ctx, "try lock no lock %s", api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name)) - s.SolutionManager.KeyLockProvider.UnLock(api_utils.GenerateKeyLockName(planState.Namespace, planState.Deployment.Instance.ObjectMeta.Name)) - } - return nil -} -func (p *PlanState) IsExpired() bool { - log.Info("time now") - log.Info("time expired") - return time.Now().After(p.ExpireTime) -} - -func (p *PlanState) isCompleted() bool { - return p.CompletedSteps == p.TotalSteps -} -func (pm *PlanManager) GetPlan(planId string) (*PlanState, bool) { - if value, ok := pm.Plans.Load(planId); ok { - return value.(*PlanState), true - } - return nil, false -} -func (pm *PlanManager) DeletePlan(planId string) { - pm.Plans.Delete(planId) -} diff --git a/api/symphony-api-no-k8s.json b/api/symphony-api-no-k8s.json index ff84441bb..95cc024e1 100644 --- a/api/symphony-api-no-k8s.json +++ b/api/symphony-api-no-k8s.json @@ -508,12 +508,12 @@ "properties": { "poll.enabled": "true", "interval": "#15", - "providers.queue": "redis-queue", + "providers.queue": "memory-queue", "providers.volatilestate": "memory-state" }, "providers": { - "redis-queue": { - "type": "providers.queue.redis", + "memory-queue": { + "type": "providers.queue.memory", "config": {} }, "memory-state": { diff --git a/api/symphony-api-production.json b/api/symphony-api-production.json index 668e0fa16..5fc3a20c1 100644 --- a/api/symphony-api-production.json +++ b/api/symphony-api-production.json @@ -116,34 +116,6 @@ } } } - }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.persistentstate": "redis-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret" - }, - "providers": { - "redis-state": { - "type": "providers.state.redis", - "config": { - "name": "redis", - "host": "localhost:6379", - "requireTLS": false, - "password": "" - } - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } } ], "properties": { @@ -610,34 +582,6 @@ "user": "admin", "password": "" } - }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.persistentstate": "redis-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret" - }, - "providers": { - "redis-state": { - "type": "providers.state.redis", - "config": { - "name": "redis", - "host": "localhost:6379", - "requireTLS": false, - "password": "" - } - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } } ] }, diff --git a/api/symphony-api.json b/api/symphony-api.json index 4d963a685..767dfd0c1 100644 --- a/api/symphony-api.json +++ b/api/symphony-api.json @@ -68,8 +68,7 @@ "name": "stage-manager", "type": "managers.symphony.stage", "properties": { - "providers.volatilestate": "memory", - "providers.keylock": "mem-keylock" + "providers.volatilestate": "memory" }, "providers": { "memory": { @@ -91,12 +90,6 @@ "config": { "inCluster": true } - }, - "mem-keylock": { - "type": "providers.keylock.memory", - "config": { - "mode" : "Shared" - } } } }, @@ -115,29 +108,6 @@ } } } - }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.volatilestate": "mem-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret" - }, - "providers": { - "mem-state": { - "type": "providers.state.memory", - "config": {} - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } } ] }, @@ -559,18 +529,13 @@ "properties": { "poll.enabled": "true", "interval": "#15", - "providers.queue": "redis-queue", + "providers.queue": "memory-queue", "providers.volatilestate": "memory-state" }, "providers": { - "redis-queue": { - "type": "providers.queue.redis", - "config": { - "name": "redis", - "host": "localhost:6380", - "requireTLS": false, - "password": "" - } + "memory-queue": { + "type": "providers.queue.memory", + "config": {} }, "mem-state": { "type": "providers.state.memory", @@ -588,41 +553,6 @@ "user": "admin", "password": "" } - }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.persistentstate": "redis-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret", - "providers.keylock": "mem-keylock" - }, - "providers": { - "redis-state": { - "type": "providers.state.redis", - "config": { - "name": "redis", - "host": "localhost:6379", - "requireTLS": false, - "password": "" - } - }, - "mem-keylock": { - "type": "providers.keylock.memory", - "config": { - "mode" : "Shared" - } - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } } ] }, diff --git a/coa/pkg/apis/v1alpha2/providers/queue/memory/memoryprovider.go b/coa/pkg/apis/v1alpha2/providers/queue/memory/memoryprovider.go index bc59c22f7..51dc58cdc 100644 --- a/coa/pkg/apis/v1alpha2/providers/queue/memory/memoryprovider.go +++ b/coa/pkg/apis/v1alpha2/providers/queue/memory/memoryprovider.go @@ -66,10 +66,10 @@ func toMemoryQueueProviderConfig(config providers.IProviderConfig) (MemoryQueueP } // fake -func (s *MemoryQueueProvider) PeekFromBegining(queue string) (interface{}, error) { - var result interface{} - return result, nil +func (s *MemoryQueueProvider) QueryByPaging(queueName string, start string, size int) ([][]byte, string, error) { + return [][]byte{}, "", nil } + func (s *MemoryQueueProvider) Init(config providers.IProviderConfig) error { // parameter checks stateConfig, err := toMemoryQueueProviderConfig(config) diff --git a/coa/pkg/apis/v1alpha2/providers/queue/queue.go b/coa/pkg/apis/v1alpha2/providers/queue/queue.go index 2bdd4d756..69e0f4289 100644 --- a/coa/pkg/apis/v1alpha2/providers/queue/queue.go +++ b/coa/pkg/apis/v1alpha2/providers/queue/queue.go @@ -12,5 +12,5 @@ type IQueueProvider interface { Peek(queue string) (interface{}, error) Size(queue string) int RemoveFromQueue(queue string, messageID string) error - PeekFromBegining(queue string) (interface{}, error) + QueryByPaging(queueName string, start string, size int) ([][]byte, string, error) } diff --git a/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider.go b/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider.go index 2ac172e80..fa635431f 100644 --- a/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider.go +++ b/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider.go @@ -12,7 +12,6 @@ import ( "encoding/json" "fmt" "strconv" - "sync" "time" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" @@ -24,7 +23,6 @@ import ( ) var mLog = logger.NewLogger("coa.runtime") -var mLock sync.Mutex type RedisQueueProviderConfig struct { Name string `json:"name"` @@ -166,35 +164,9 @@ func (rq *RedisQueueProvider) Enqueue(queue string, element interface{}) (string } return rq.client.XAdd(rq.Ctx, &redis.XAddArgs{ Stream: queue, - Values: map[string]interface{}{"data": data}, + Values: map[string]interface{}{"data": data, "timestamp": time.Now().Unix()}, }).Result() } -func (rq *RedisQueueProvider) PeekFromBegining(queue string) (interface{}, error) { - // Get the last ID processed by this consumer - - // Read message - xMessages, err := rq.client.XRangeN(rq.Ctx, queue, "0", "+", 1).Result() - if err != nil { - return nil, err - } - if len(xMessages) == 0 { - return nil, nil - } - xMsg := xMessages[0] - jsonData := xMsg.Values["data"].(string) - var result interface{} - err = json.Unmarshal([]byte(jsonData), &result) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal message: %w", err) - } - // update last read ID - lastReadKey := fmt.Sprintf("%s:lastID", queue) - err = rq.client.Set(rq.Ctx, lastReadKey, xMsg.ID, 0).Err() - if err != nil { - return nil, fmt.Errorf("failed to update last read ID: %w", err) - } - return result, nil -} func (rq *RedisQueueProvider) Peek(queue string) (interface{}, error) { var start string @@ -248,7 +220,7 @@ func (rq *RedisQueueProvider) Dequeue(queue string) (interface{}, error) { } // Read message - xMessages, err := rq.client.XRangeN(context.TODO(), queue, start, "+", 1).Result() + xMessages, err := rq.client.XRangeN(rq.Ctx, queue, start, "+", 1).Result() if err != nil { return nil, err } @@ -276,3 +248,31 @@ func (rq *RedisQueueProvider) Dequeue(queue string) (interface{}, error) { return result, nil } + +// GetRecords retrieves records from the queue starting from the specified index and retrieves the specified size of records. +func (rq *RedisQueueProvider) QueryByPaging(queueName string, start string, size int) ([][]byte, string, error) { + if start != "0" { + start = "(" + start + } + + xMessages, err := rq.client.XRangeN(rq.Ctx, queueName, start, "+", int64(size+1)).Result() + if err != nil { + return nil, "", fmt.Errorf("failed to get message : %s ", start) + } + if len(xMessages) == 0 { + return nil, "", err + } + + lastMessageID := xMessages[len(xMessages)-1].ID + if len(xMessages) <= size { + lastMessageID = "" + } else { + xMessages = xMessages[:size] + } + var results [][]byte + for _, xMsg := range xMessages { + jsonData := xMsg.Values["data"].(string) + results = append(results, []byte(jsonData)) + } + return results, lastMessageID, nil +} diff --git a/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider_test.go b/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider_test.go new file mode 100644 index 000000000..c4bd36909 --- /dev/null +++ b/coa/pkg/apis/v1alpha2/providers/queue/redis/redisQueueProvider_test.go @@ -0,0 +1,39 @@ +package redisqueue + +import ( + "context" + "redis" + "testing" + + "github.com/go-redis/redismock/v9" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +func TestQueryByPaging(t *testing.T) { + db, mock := redismock.NewClientMock() + rq := &RedisQueueProvider{ + client: db, + Ctx: context.TODO(), + } + + queueName := "testQueue" + start := "0" + size := 2 + + mock.ExpectXRangeN(queueName, "0", "+", int64(size+1)).SetVal([]redis.XMessage{ + {ID: "1", Values: map[string]interface{}{"data": "message1"}}, + {ID: "2", Values: map[string]interface{}{"data": "message2"}}, + {ID: "3", Values: map[string]interface{}{"data": "message3"}}, + }) + + results, lastID, err := rq.QueryByPaging(queueName, start, size) + assert.Nil(t, err) + assert.Equal(t, 2, len(results)) + assert.Equal(t, "2", lastID) + assert.Equal(t, []byte("message1"), results[0]) + assert.Equal(t, []byte("message2"), results[1]) + + err = mock.ExpectationsWereMet() + assert.Nil(t, err) +} diff --git a/k8s/reconcilers/deployment.go b/k8s/reconcilers/deployment.go index 8903cba9c..54e54a337 100644 --- a/k8s/reconcilers/deployment.go +++ b/k8s/reconcilers/deployment.go @@ -548,9 +548,6 @@ func (r *DeploymentReconciler) determineProvisioningStatus(ctx context.Context, status = utilsmodel.ProvisioningStatusFailed } return status - case model.SummaryStateTimeout: - status := utilsmodel.ProvisioningStatusTimeout - return status default: return utilsmodel.GetNonTerminalStatus(object) } @@ -683,7 +680,7 @@ func (r *DeploymentReconciler) updateProvisioningStatus(ctx context.Context, obj objectStatus.ProvisioningStatus.PercentComplete = int64(percentComplete) } - diagnostic.InfoWithCtx(log, ctx, "Update provisioning status", "ProvisioningStatus", objectStatus.ProvisioningStatus, "Summary result", summaryResult) + diagnostic.InfoWithCtx(log, ctx, "Update provisioning status", "ProvisioningStatus", objectStatus.ProvisioningStatus) outputMap := objectStatus.ProvisioningStatus.Output // Fill component details into output field diff --git a/packages/helm/symphony/files/symphony-api.json b/packages/helm/symphony/files/symphony-api.json index 531f17817..182d5a89b 100644 --- a/packages/helm/symphony/files/symphony-api.json +++ b/packages/helm/symphony/files/symphony-api.json @@ -145,46 +145,6 @@ } } } - }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.persistentstate": "redis-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret", - "providers.keylock": "mem-keylock" - }, - - "providers": { - "redis-state": { - {{- if .Values.redis.enabled }} - "type": "providers.state.redis", - "config": { - "host": "{{ include "symphony.redisHost" . }}", - "requireTLS": false, - "password": "" - } - {{- else }} - "type": "providers.state.memory", - "config": {} - {{- end }} - }, - "mem-keylock": { - "type": "providers.keylock.memory", - "config": { - "mode" : "Shared" - } - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } } ], "properties": { @@ -533,6 +493,46 @@ "loopInterval": 15, "route": "solution", "managers": [ + { + "name": "staging-manager", + "type": "managers.symphony.staging", + "properties": { + "poll.enabled": "true", + "interval": "#15", + "providers.queue": "redis-queue", + "providers.volatilestate": "memory-state" + }, + "providers": { + "redis-queue": { + "type": "providers.queue.redis", + "config": { + "name": "redis", + "host": "{{ include "symphony.redisHost" . }}", + "requireTLS": false, + "password": "" + } + }, + "memory-state": { + "type": "providers.state.memory", + "config": {} + } + } + }, + { + "name": "stage-manager", + "type": "managers.symphony.stage", + "properties": { + "user": "admin", + "password": "", + "providers.volatilestate": "memory" + }, + "providers": { + "memory": { + "type": "providers.state.memory", + "config": {} + } + } + }, { "name": "solution-manager", "type": "managers.symphony.solution", @@ -628,46 +628,6 @@ } } }, - { - "name": "solution-manager", - "type": "managers.symphony.solution", - "properties": { - "providers.persistentstate": "redis-state", - "providers.config": "mock-config", - "providers.secret": "mock-secret", - "providers.keylock": "mem-keylock" - }, - - "providers": { - "redis-state": { - {{- if .Values.redis.enabled }} - "type": "providers.state.redis", - "config": { - "host": "{{ include "symphony.redisHost" . }}", - "requireTLS": false, - "password": "" - } - {{- else }} - "type": "providers.state.memory", - "config": {} - {{- end }} - }, - "mem-keylock": { - "type": "providers.keylock.memory", - "config": { - "mode" : "Shared" - } - }, - "mock-config": { - "type": "providers.config.mock", - "config": {} - }, - "mock-secret": { - "type": "providers.secret.mock", - "config": {} - } - } - }, { "name": "sites-manager", "type": "managers.symphony.sites", diff --git a/test/integration/scenarios/01.update/magefile.go b/test/integration/scenarios/01.update/magefile.go index cd88b91b7..91f20b3a6 100644 --- a/test/integration/scenarios/01.update/magefile.go +++ b/test/integration/scenarios/01.update/magefile.go @@ -20,7 +20,7 @@ import ( // Test config const ( TEST_NAME = "scenario/update" - TEST_TIMEOUT = "30m" + TEST_TIMEOUT = "15m" ) var ( diff --git a/test/integration/scenarios/06.ado/create_update_fallback_test.go b/test/integration/scenarios/06.ado/create_update_fallback_test.go index 3eb33c641..fb18bc619 100644 --- a/test/integration/scenarios/06.ado/create_update_fallback_test.go +++ b/test/integration/scenarios/06.ado/create_update_fallback_test.go @@ -140,9 +140,6 @@ var _ = Describe("Create/update resources for rollback testing", Ordered, func() TargetComponents: []string{"simple-chart-1"}, SolutionComponents: []string{"simple-chart-2"}, SolutionComponentsV2: []string{"simple-chart-2-nonexistent"}, - TargetProperties: map[string]string{ - "Arch": "arm", - }, PostUpdateExpectation: expectations.All( kube.Must(kube.Instance("instance", "default", kube.WithCondition(conditions.All( // make sure the instance named 'instance' is present in the 'default' namespace kube.ProvisioningFailedCondition, // and it is failed diff --git a/test/integration/scenarios/06.ado/create_update_test.go b/test/integration/scenarios/06.ado/create_update_test.go index a480a9086..8fbe8b119 100644 --- a/test/integration/scenarios/06.ado/create_update_test.go +++ b/test/integration/scenarios/06.ado/create_update_test.go @@ -31,7 +31,7 @@ var _ = Describe("Create resources with sequential changes", Ordered, func() { var targetBytes []byte var solutionBytes []byte var solutionContainerBytes []byte - var specTimeout = 220 * time.Second + var specTimeout = 120 * time.Second var targetProps map[string]string var instanceParams map[string]interface{} diff --git a/test/integration/scenarios/06.ado/delete_test.go b/test/integration/scenarios/06.ado/delete_test.go index b57c55673..1244a92ee 100644 --- a/test/integration/scenarios/06.ado/delete_test.go +++ b/test/integration/scenarios/06.ado/delete_test.go @@ -20,7 +20,7 @@ var _ = Describe("Delete", Ordered, func() { var targetBytes []byte var solutionBytes []byte var solutionContainerBytes []byte - var specTimeout = 4 * time.Minute + var specTimeout = 2 * time.Minute type DeleteTestCase struct { TargetComponents []string diff --git a/test/integration/scenarios/06.ado/rbac_test.go b/test/integration/scenarios/06.ado/rbac_test.go index ab1cd0a70..39f1f511a 100644 --- a/test/integration/scenarios/06.ado/rbac_test.go +++ b/test/integration/scenarios/06.ado/rbac_test.go @@ -31,7 +31,7 @@ var _ = Describe("RBAC", Ordered, func() { var targetBytes []byte var solutionBytes []byte var solutionContainerBytes []byte - var specTimeout = 6 * time.Minute + var specTimeout = 3 * time.Minute var installValues HelmValues var runRbacTest = func(ctx context.Context, testcase Rbac) { var err error