-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TSPS-360 Add steps to calculate quota consumed #158
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package bio.terra.pipelines.app.configuration.internal; | ||
|
||
import lombok.Getter; | ||
import lombok.Setter; | ||
import org.springframework.boot.context.properties.ConfigurationProperties; | ||
|
||
@ConfigurationProperties(prefix = "pipelines.wdl") | ||
@Getter | ||
@Setter | ||
public class WdlPipelineConfiguration { | ||
private Long quotaConsumedPollingIntervalSeconds; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package bio.terra.pipelines.stairway; | ||
|
||
import bio.terra.common.exception.InternalServerErrorException; | ||
import bio.terra.pipelines.common.utils.FlightUtils; | ||
import bio.terra.pipelines.common.utils.PipelinesEnum; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsService; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; | ||
import bio.terra.pipelines.dependencies.sam.SamService; | ||
import bio.terra.pipelines.dependencies.stairway.JobMapKeys; | ||
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys; | ||
import bio.terra.rawls.model.Entity; | ||
import bio.terra.stairway.*; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This step calls Rawls to fetch outputs from a data table row for a given quota consumed job. It | ||
* specifically fetches the quota consumed value from the data table row using the quota_consumed | ||
* key | ||
* | ||
* <p>This step expects nothing from the working map | ||
*/ | ||
public class FetchQuotaConsumedFromDataTableStep implements Step { | ||
|
||
private final RawlsService rawlsService; | ||
private final SamService samService; | ||
private final Logger logger = LoggerFactory.getLogger(FetchQuotaConsumedFromDataTableStep.class); | ||
|
||
public FetchQuotaConsumedFromDataTableStep(RawlsService rawlsService, SamService samService) { | ||
this.rawlsService = rawlsService; | ||
this.samService = samService; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings( | ||
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(), | ||
// since we do validate that pipelineName is not null in `validateRequiredEntries` | ||
public StepResult doStep(FlightContext flightContext) { | ||
String jobId = flightContext.getFlightId(); | ||
|
||
// validate and extract parameters from input map | ||
var inputParameters = flightContext.getInputParameters(); | ||
FlightUtils.validateRequiredEntries( | ||
inputParameters, | ||
JobMapKeys.PIPELINE_NAME, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME); | ||
|
||
String controlWorkspaceBillingProject = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class); | ||
String controlWorkspaceName = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class); | ||
PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class); | ||
|
||
Entity entity; | ||
try { | ||
entity = | ||
rawlsService.getDataTableEntity( | ||
samService.getTeaspoonsServiceAccountToken(), | ||
controlWorkspaceBillingProject, | ||
controlWorkspaceName, | ||
pipelineName.getValue(), | ||
jobId); | ||
} catch (RawlsServiceApiException e) { | ||
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); | ||
} | ||
|
||
// extract quota_consumed from entity | ||
int quotaConsumed; | ||
try { | ||
quotaConsumed = (int) entity.getAttributes().get("quota_consumed"); | ||
if (quotaConsumed <= 0) { | ||
return new StepResult( | ||
StepStatus.STEP_RESULT_FAILURE_FATAL, | ||
new InternalServerErrorException("Quota consumed is unexpectedly not greater than 0")); | ||
} | ||
} catch (NullPointerException e) { | ||
return new StepResult( | ||
StepStatus.STEP_RESULT_FAILURE_FATAL, | ||
new InternalServerErrorException("Quota consumed is unexpectedly null")); | ||
} | ||
|
||
logger.info("Quota consumed: {}", quotaConsumed); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the idea that in future we'll store this in the working map to be used by the logic to do the quota check in a subsequent step? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we may do the quota check here or in a subsequent step - thats due to be done in https://broadworkbench.atlassian.net/browse/TSPS-361. I'd prob do it here and rename the step |
||
|
||
return StepResult.getStepResultSuccess(); | ||
} | ||
|
||
@Override | ||
public StepResult undoStep(FlightContext flightContext) { | ||
// nothing to undo | ||
return StepResult.getStepResultSuccess(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package bio.terra.pipelines.stairway; | ||
|
||
import bio.terra.pipelines.app.configuration.internal.WdlPipelineConfiguration; | ||
import bio.terra.pipelines.common.utils.FlightUtils; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsService; | ||
import bio.terra.pipelines.dependencies.sam.SamService; | ||
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys; | ||
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper; | ||
import bio.terra.stairway.*; | ||
import java.util.UUID; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This step polls rawls for a submission until all runs are in a finalized state. If submission is | ||
* not in a final state, this will step will poll again after an interval of time. Once the | ||
* submission is finalized then it will see if the workflows are all successful and if so will | ||
* succeed otherwise will fail. | ||
* | ||
* <p>this step expects quota submission id to be provided in the working map | ||
*/ | ||
public class PollQuotaConsumedSubmissionStatusStep implements Step { | ||
private final RawlsService rawlsService; | ||
private final SamService samService; | ||
private final WdlPipelineConfiguration wdlPipelineConfiguration; | ||
private final Logger logger = | ||
LoggerFactory.getLogger(PollQuotaConsumedSubmissionStatusStep.class); | ||
|
||
public PollQuotaConsumedSubmissionStatusStep( | ||
RawlsService rawlsService, | ||
SamService samService, | ||
WdlPipelineConfiguration wdlPipelineConfiguration) { | ||
this.samService = samService; | ||
this.rawlsService = rawlsService; | ||
this.wdlPipelineConfiguration = wdlPipelineConfiguration; | ||
} | ||
|
||
@Override | ||
public StepResult doStep(FlightContext flightContext) throws InterruptedException { | ||
// validate and extract parameters from input map | ||
FlightMap inputParameters = flightContext.getInputParameters(); | ||
FlightUtils.validateRequiredEntries( | ||
inputParameters, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT); | ||
String controlWorkspaceName = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class); | ||
String controlWorkspaceProject = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class); | ||
// validate and extract parameters from working map | ||
FlightMap workingMap = flightContext.getWorkingMap(); | ||
FlightUtils.validateRequiredEntries(workingMap, ImputationJobMapKeys.QUOTA_SUBMISSION_ID); | ||
|
||
UUID quotaSubmissionId = workingMap.get(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, UUID.class); | ||
|
||
RawlsSubmissionStepHelper rawlsSubmissionStepHelper = | ||
new RawlsSubmissionStepHelper( | ||
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger); | ||
return rawlsSubmissionStepHelper.pollRawlsSubmissionHelper( | ||
quotaSubmissionId, wdlPipelineConfiguration.getQuotaConsumedPollingIntervalSeconds()); | ||
} | ||
|
||
@Override | ||
public StepResult undoStep(FlightContext context) { | ||
// nothing to undo; there's nothing to undo about polling a cromwell submission | ||
return StepResult.getStepResultSuccess(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package bio.terra.pipelines.stairway; | ||
|
||
import bio.terra.pipelines.common.utils.FlightUtils; | ||
import bio.terra.pipelines.common.utils.PipelineVariableTypesEnum; | ||
import bio.terra.pipelines.common.utils.PipelinesEnum; | ||
import bio.terra.pipelines.db.entities.PipelineInputDefinition; | ||
import bio.terra.pipelines.db.entities.PipelineOutputDefinition; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsService; | ||
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException; | ||
import bio.terra.pipelines.dependencies.sam.SamService; | ||
import bio.terra.pipelines.dependencies.stairway.JobMapKeys; | ||
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys; | ||
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper; | ||
import bio.terra.rawls.model.SubmissionReport; | ||
import bio.terra.rawls.model.SubmissionRequest; | ||
import bio.terra.stairway.*; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* This step submits a quota consumed wdl to cromwell using the rawls submission endpoint. The quota | ||
* consumed wdl that is run depends on the workspace name and billing project provided to the step. | ||
* | ||
* <p>this step expects nothing from the working map | ||
* | ||
* <p>this step writes quota_submission_id to the working map | ||
*/ | ||
public class SubmitQuotaConsumedSubmissionStep implements Step { | ||
private final SamService samService; | ||
private final RawlsService rawlsService; | ||
|
||
public static final String QUOTA_CONSUMED_METHOD_NAME = "QuotaConsumed"; | ||
public static final List<PipelineOutputDefinition> QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST = | ||
List.of( | ||
new PipelineOutputDefinition( | ||
null, "quotaConsumed", "quota_consumed", PipelineVariableTypesEnum.INTEGER)); | ||
|
||
private final Logger logger = LoggerFactory.getLogger(SubmitQuotaConsumedSubmissionStep.class); | ||
|
||
public SubmitQuotaConsumedSubmissionStep(RawlsService rawlsService, SamService samService) { | ||
this.samService = samService; | ||
this.rawlsService = rawlsService; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings( | ||
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(), | ||
// since we do validate that pipelineName is not null in `validateRequiredEntries` | ||
public StepResult doStep(FlightContext flightContext) { | ||
// validate and extract parameters from input map | ||
FlightMap inputParameters = flightContext.getInputParameters(); | ||
FlightUtils.validateRequiredEntries( | ||
inputParameters, | ||
JobMapKeys.PIPELINE_NAME, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, | ||
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, | ||
ImputationJobMapKeys.WDL_METHOD_VERSION, | ||
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS); | ||
|
||
PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class); | ||
String controlWorkspaceName = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class); | ||
String controlWorkspaceProject = | ||
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class); | ||
String wdlMethodVersion = | ||
inputParameters.get(ImputationJobMapKeys.WDL_METHOD_VERSION, String.class); | ||
List<PipelineInputDefinition> inputDefinitions = | ||
inputParameters.get( | ||
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS, new TypeReference<>() {}); | ||
|
||
// validate and extract parameters from working map | ||
FlightMap workingMap = flightContext.getWorkingMap(); | ||
|
||
RawlsSubmissionStepHelper rawlsSubmissionStepHelper = | ||
new RawlsSubmissionStepHelper( | ||
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger); | ||
|
||
Optional<StepResult> validationResponse = | ||
rawlsSubmissionStepHelper.validateRawlsSubmissionMethodHelper( | ||
QUOTA_CONSUMED_METHOD_NAME, | ||
wdlMethodVersion, | ||
inputDefinitions, | ||
QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST, | ||
pipelineName); | ||
|
||
// if there is a validation response that means the validation failed so return it | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is elegant! nice! |
||
if (validationResponse.isPresent()) { | ||
return validationResponse.get(); | ||
} | ||
|
||
// create submission request | ||
SubmissionRequest submissionRequest = | ||
new SubmissionRequest() | ||
.entityName(flightContext.getFlightId()) | ||
.entityType(pipelineName.getValue()) | ||
.useCallCache(true) | ||
.deleteIntermediateOutputFiles(true) | ||
.useReferenceDisks(false) | ||
.userComment( | ||
"%s - getting quota consumed for flight id: %s" | ||
.formatted(pipelineName, flightContext.getFlightId())) | ||
.methodConfigurationNamespace(controlWorkspaceProject) | ||
.methodConfigurationName(QUOTA_CONSUMED_METHOD_NAME); | ||
|
||
// submit workflow to rawls | ||
SubmissionReport submissionReport; | ||
try { | ||
submissionReport = | ||
rawlsService.submitWorkflow( | ||
samService.getTeaspoonsServiceAccountToken(), | ||
submissionRequest, | ||
controlWorkspaceProject, | ||
controlWorkspaceName); | ||
} catch (RawlsServiceApiException e) { | ||
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e); | ||
} | ||
|
||
// add submission id to working map to be used for polling in downstream step | ||
workingMap.put(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, submissionReport.getSubmissionId()); | ||
return StepResult.getStepResultSuccess(); | ||
} | ||
|
||
@Override | ||
public StepResult undoStep(FlightContext context) { | ||
// nothing to undo; there's nothing to undo about submitting a run set | ||
return StepResult.getStepResultSuccess(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noticing you named this pretty generically. what else do you see adding to this configuration besides
quotaConsumedPollingIntervalSeconds
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure but i didnt want it associated with imputation - happy to put it in an already existing configuration that wasnt imputations. I imagine there will be a couple of things we'll want to move out of imputation config to this one once we get our next wdl pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sounds good. no need to combine with an existing configuration.