Skip to content

Commit

Permalink
TSPS-360 Add steps to calculate quota consumed (#158)
Browse files Browse the repository at this point in the history
Co-authored-by: Jose Soto <[email protected]>
  • Loading branch information
jsotobroad and Jose Soto authored Nov 15, 2024
1 parent 75534c8 commit ca723c8
Show file tree
Hide file tree
Showing 20 changed files with 1,052 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package bio.terra.pipelines.app.configuration.internal;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "pipelines.wdl")
@Getter
@Setter
public class WdlPipelineConfiguration {
private Long quotaConsumedPollingIntervalSeconds;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import bio.terra.pipelines.app.configuration.external.CbasConfiguration;
import bio.terra.pipelines.app.configuration.internal.ImputationConfiguration;
import bio.terra.pipelines.app.configuration.internal.WdlPipelineConfiguration;
import bio.terra.pipelines.dependencies.cbas.CbasService;
import bio.terra.pipelines.dependencies.leonardo.LeonardoService;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class FlightBeanBag {
private final RawlsService rawlsService;
private final ImputationConfiguration imputationConfiguration;
private final CbasConfiguration cbasConfiguration;
private final WdlPipelineConfiguration wdlPipelineConfiguration;

@Lazy
@Autowired
Expand All @@ -51,7 +53,8 @@ public FlightBeanBag(
RawlsService rawlsService,
WorkspaceManagerService workspaceManagerService,
ImputationConfiguration imputationConfiguration,
CbasConfiguration cbasConfiguration) {
CbasConfiguration cbasConfiguration,
WdlPipelineConfiguration wdlPipelineConfiguration) {
this.pipelinesService = pipelinesService;
this.pipelineRunsService = pipelineRunsService;
this.pipelineInputsOutputsService = pipelineInputsOutputsService;
Expand All @@ -63,6 +66,7 @@ public FlightBeanBag(
this.rawlsService = rawlsService;
this.imputationConfiguration = imputationConfiguration;
this.cbasConfiguration = cbasConfiguration;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
}

public static FlightBeanBag getFromObject(Object object) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bio.terra.pipelines.stairway;

import bio.terra.common.exception.InternalServerErrorException;
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.rawls.model.Entity;
import bio.terra.stairway.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step calls Rawls to fetch outputs from a data table row for a given quota consumed job. It
* specifically fetches the quota consumed value from the data table row using the quota_consumed
* key
*
* <p>This step expects nothing from the working map
*/
public class FetchQuotaConsumedFromDataTableStep implements Step {

private final RawlsService rawlsService;
private final SamService samService;
private final Logger logger = LoggerFactory.getLogger(FetchQuotaConsumedFromDataTableStep.class);

public FetchQuotaConsumedFromDataTableStep(RawlsService rawlsService, SamService samService) {
this.rawlsService = rawlsService;
this.samService = samService;
}

@Override
@SuppressWarnings(
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(),
// since we do validate that pipelineName is not null in `validateRequiredEntries`
public StepResult doStep(FlightContext flightContext) {
String jobId = flightContext.getFlightId();

// validate and extract parameters from input map
var inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
JobMapKeys.PIPELINE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME);

String controlWorkspaceBillingProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class);

Entity entity;
try {
entity =
rawlsService.getDataTableEntity(
samService.getTeaspoonsServiceAccountToken(),
controlWorkspaceBillingProject,
controlWorkspaceName,
pipelineName.getValue(),
jobId);
} catch (RawlsServiceApiException e) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e);
}

// extract quota_consumed from entity
int quotaConsumed;
try {
quotaConsumed = (int) entity.getAttributes().get("quota_consumed");
if (quotaConsumed <= 0) {
return new StepResult(
StepStatus.STEP_RESULT_FAILURE_FATAL,
new InternalServerErrorException("Quota consumed is unexpectedly not greater than 0"));
}
} catch (NullPointerException e) {
return new StepResult(
StepStatus.STEP_RESULT_FAILURE_FATAL,
new InternalServerErrorException("Quota consumed is unexpectedly null"));
}

logger.info("Quota consumed: {}", quotaConsumed);

return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext flightContext) {
// nothing to undo
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bio.terra.pipelines.stairway;

import bio.terra.pipelines.app.configuration.internal.WdlPipelineConfiguration;
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper;
import bio.terra.stairway.*;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step polls rawls for a submission until all runs are in a finalized state. If submission is
* not in a final state, this will step will poll again after an interval of time. Once the
* submission is finalized then it will see if the workflows are all successful and if so will
* succeed otherwise will fail.
*
* <p>this step expects quota submission id to be provided in the working map
*/
public class PollQuotaConsumedSubmissionStatusStep implements Step {
private final RawlsService rawlsService;
private final SamService samService;
private final WdlPipelineConfiguration wdlPipelineConfiguration;
private final Logger logger =
LoggerFactory.getLogger(PollQuotaConsumedSubmissionStatusStep.class);

public PollQuotaConsumedSubmissionStatusStep(
RawlsService rawlsService,
SamService samService,
WdlPipelineConfiguration wdlPipelineConfiguration) {
this.samService = samService;
this.rawlsService = rawlsService;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
}

@Override
public StepResult doStep(FlightContext flightContext) throws InterruptedException {
// validate and extract parameters from input map
FlightMap inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
String controlWorkspaceProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
// validate and extract parameters from working map
FlightMap workingMap = flightContext.getWorkingMap();
FlightUtils.validateRequiredEntries(workingMap, ImputationJobMapKeys.QUOTA_SUBMISSION_ID);

UUID quotaSubmissionId = workingMap.get(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, UUID.class);

RawlsSubmissionStepHelper rawlsSubmissionStepHelper =
new RawlsSubmissionStepHelper(
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger);
return rawlsSubmissionStepHelper.pollRawlsSubmissionHelper(
quotaSubmissionId, wdlPipelineConfiguration.getQuotaConsumedPollingIntervalSeconds());
}

@Override
public StepResult undoStep(FlightContext context) {
// nothing to undo; there's nothing to undo about polling a cromwell submission
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package bio.terra.pipelines.stairway;

import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelineVariableTypesEnum;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.db.entities.PipelineInputDefinition;
import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper;
import bio.terra.rawls.model.SubmissionReport;
import bio.terra.rawls.model.SubmissionRequest;
import bio.terra.stairway.*;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step submits a quota consumed wdl to cromwell using the rawls submission endpoint. The quota
* consumed wdl that is run depends on the workspace name and billing project provided to the step.
*
* <p>this step expects nothing from the working map
*
* <p>this step writes quota_submission_id to the working map
*/
public class SubmitQuotaConsumedSubmissionStep implements Step {
private final SamService samService;
private final RawlsService rawlsService;

public static final String QUOTA_CONSUMED_METHOD_NAME = "QuotaConsumed";
public static final List<PipelineOutputDefinition> QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST =
List.of(
new PipelineOutputDefinition(
null, "quotaConsumed", "quota_consumed", PipelineVariableTypesEnum.INTEGER));

private final Logger logger = LoggerFactory.getLogger(SubmitQuotaConsumedSubmissionStep.class);

public SubmitQuotaConsumedSubmissionStep(RawlsService rawlsService, SamService samService) {
this.samService = samService;
this.rawlsService = rawlsService;
}

@Override
@SuppressWarnings(
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(),
// since we do validate that pipelineName is not null in `validateRequiredEntries`
public StepResult doStep(FlightContext flightContext) {
// validate and extract parameters from input map
FlightMap inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
JobMapKeys.PIPELINE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME,
ImputationJobMapKeys.WDL_METHOD_VERSION,
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS);

PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
String controlWorkspaceProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
String wdlMethodVersion =
inputParameters.get(ImputationJobMapKeys.WDL_METHOD_VERSION, String.class);
List<PipelineInputDefinition> inputDefinitions =
inputParameters.get(
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS, new TypeReference<>() {});

// validate and extract parameters from working map
FlightMap workingMap = flightContext.getWorkingMap();

RawlsSubmissionStepHelper rawlsSubmissionStepHelper =
new RawlsSubmissionStepHelper(
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger);

Optional<StepResult> validationResponse =
rawlsSubmissionStepHelper.validateRawlsSubmissionMethodHelper(
QUOTA_CONSUMED_METHOD_NAME,
wdlMethodVersion,
inputDefinitions,
QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST,
pipelineName);

// if there is a validation response that means the validation failed so return it
if (validationResponse.isPresent()) {
return validationResponse.get();
}

// create submission request
SubmissionRequest submissionRequest =
new SubmissionRequest()
.entityName(flightContext.getFlightId())
.entityType(pipelineName.getValue())
.useCallCache(true)
.deleteIntermediateOutputFiles(true)
.useReferenceDisks(false)
.userComment(
"%s - getting quota consumed for flight id: %s"
.formatted(pipelineName, flightContext.getFlightId()))
.methodConfigurationNamespace(controlWorkspaceProject)
.methodConfigurationName(QUOTA_CONSUMED_METHOD_NAME);

// submit workflow to rawls
SubmissionReport submissionReport;
try {
submissionReport =
rawlsService.submitWorkflow(
samService.getTeaspoonsServiceAccountToken(),
submissionRequest,
controlWorkspaceProject,
controlWorkspaceName);
} catch (RawlsServiceApiException e) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e);
}

// add submission id to working map to be used for polling in downstream step
workingMap.put(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, submissionReport.getSubmissionId());
return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext context) {
// nothing to undo; there's nothing to undo about submitting a run set
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ImputationJobMapKeys {
"control_workspace_billing_project";
public static final String CONTROL_WORKSPACE_NAME = "control_workspace_name";
public static final String SUBMISSION_ID = "submission_id";
public static final String QUOTA_SUBMISSION_ID = "quota_submission_id";

// Azure specific keys
public static final String CONTROL_WORKSPACE_ID = "control_workspace_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.FetchQuotaConsumedFromDataTableStep;
import bio.terra.pipelines.stairway.PollQuotaConsumedSubmissionStatusStep;
import bio.terra.pipelines.stairway.SubmitQuotaConsumedSubmissionStep;
import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep;
import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep;
Expand Down Expand Up @@ -75,6 +78,23 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new SubmitQuotaConsumedSubmissionStep(
flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new PollQuotaConsumedSubmissionStatusStep(
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getWdlPipelineConfiguration()),
externalServiceRetryRule);

addStep(
new FetchQuotaConsumedFromDataTableStep(
flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new SubmitCromwellSubmissionStep(
flightBeanBag.getRawlsService(),
Expand All @@ -84,8 +104,8 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {

addStep(
new PollCromwellSubmissionStatusStep(
flightBeanBag.getSamService(),
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getImputationConfiguration()),
externalServiceRetryRule);

Expand Down
Loading

0 comments on commit ca723c8

Please sign in to comment.