From 400b327c498cad6cbebcf2569e591aac7096a538 Mon Sep 17 00:00:00 2001 From: aldbr Date: Tue, 16 Jan 2024 17:03:14 +0100 Subject: [PATCH] feat: Workflow minor fix --- src/DIRAC/Core/Workflow/Workflow.py | 3 +-- .../WorkloadManagementSystem/Agent/PushJobAgent.py | 12 +++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/DIRAC/Core/Workflow/Workflow.py b/src/DIRAC/Core/Workflow/Workflow.py index a658b69d365..2cd018be54a 100755 --- a/src/DIRAC/Core/Workflow/Workflow.py +++ b/src/DIRAC/Core/Workflow/Workflow.py @@ -123,8 +123,7 @@ def createStepInstance(self, type_o, name): raise KeyError("Can not find StepDefinition " + type + " to create StepInstrance " + name) def removeStepInstance(self, name): - self.step_instances[name].setParent(None) - self.step_instances.delete(name) + self.step_instances = [step for step in self.step_instances if step.getName() != name] def updateParents(self): self.module_definitions.updateParents(self) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 2da325993fa..48d474f3345 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -67,7 +67,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): # Choose the submission policy # - Workflow: the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution - self.submissionPolicy = "Workflow" + self.submissionPolicy = "Application" # cleanTask is used to clean the task in the remote site self.cleanTask = True @@ -82,7 +82,7 @@ def initialize(self): # Get the submission policy # Initialized here because it cannot be dynamically modified during the execution self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) - if self.submissionPolicy not in ["Workflow", "JobWrapper"]: + if self.submissionPolicy not in ["Application", "JobWrapper"]: return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") result = self._initializeComputingElement("Pool") @@ -169,7 +169,7 @@ def execute(self): if not result["OK"] or result["Value"]: return result - if self.submissionPolicy == "Workflow": + if self.submissionPolicy == "Application": # Check errors that could have occurred during job submission and/or execution # Status are handled internally, and therefore, not checked outside of the method result = self._checkSubmittedJobs() @@ -284,7 +284,7 @@ def execute(self): # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - if self.submissionPolicy == "Workflow": + if self.submissionPolicy == "Application": resultSubmission = self._submitJob( jobID=jobID, jobParams=params, @@ -421,9 +421,11 @@ def _setCEDict(self, ceDict): ceDict["ReleaseProject"] = project # Add a RemoteExecution entry, which can be used in the next stages - if self.submissionPolicy == "Workflow": + if self.submissionPolicy == "Application": ceDict["RemoteExecution"] = True + ceDict["SubmissionPolicy"] = self.submissionPolicy + def _checkMatchingIssues(self, jobRequest): """Check the source of the matching issue