From 4a8abab9109b1724d04409b05ba01fffbe3150e4 Mon Sep 17 00:00:00 2001 From: aldbr Date: Tue, 16 Jan 2024 17:03:14 +0100 Subject: [PATCH] feat: Workflow minor fix --- src/DIRAC/Core/Workflow/Workflow.py | 3 +-- .../WorkloadManagementSystem/Agent/PushJobAgent.py | 10 ++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/DIRAC/Core/Workflow/Workflow.py b/src/DIRAC/Core/Workflow/Workflow.py index a658b69d365..2cd018be54a 100755 --- a/src/DIRAC/Core/Workflow/Workflow.py +++ b/src/DIRAC/Core/Workflow/Workflow.py @@ -123,8 +123,7 @@ def createStepInstance(self, type_o, name): raise KeyError("Can not find StepDefinition " + type + " to create StepInstrance " + name) def removeStepInstance(self, name): - self.step_instances[name].setParent(None) - self.step_instances.delete(name) + self.step_instances = [step for step in self.step_instances if step.getName() != name] def updateParents(self): self.module_definitions.updateParents(self) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 905effda644..ffb690862f5 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -65,7 +65,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): # Choose the submission policy # - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution - self.submissionPolicy = "Workflow" + self.submissionPolicy = "Application" # cleanTask is used to clean the task in the remote site self.cleanTask = True @@ -80,7 +80,7 @@ def initialize(self): # Get the submission policy # Initialized here because it cannot be dynamically modified during the execution self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) - if self.submissionPolicy not in ["Workflow", "JobWrapper"]: + if self.submissionPolicy not in ["Application", "JobWrapper"]: return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") result = self._initializeComputingElement("Pool") @@ -283,7 +283,7 @@ def execute(self): # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - if self.submissionPolicy == "Workflow": + if self.submissionPolicy == "Application": resultSubmission = self._submitJob( jobID=jobID, jobParams=params, @@ -412,9 +412,11 @@ def _setCEDict(self, ceDict): ceDict["ReleaseProject"] = project # Add a RemoteExecution entry, which can be used in the next stages - if self.submissionPolicy == "Workflow": + if self.submissionPolicy == "Application": ceDict["RemoteExecution"] = True + ceDict["SubmissionPolicy"] = self.submissionPolicy + def _checkMatchingIssues(self, jobRequest): """Check the source of the matching issue