Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSPS-360 Add steps to calculate quota consumed #158

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package bio.terra.pipelines.app.configuration.internal;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "pipelines.wdl")
@Getter
@Setter
public class WdlPipelineConfiguration {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noticing you named this pretty generically. what else do you see adding to this configuration besides quotaConsumedPollingIntervalSeconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure but i didnt want it associated with imputation - happy to put it in an already existing configuration that wasnt imputations. I imagine there will be a couple of things we'll want to move out of imputation config to this one once we get our next wdl pipeline.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sounds good. no need to combine with an existing configuration.

private Long quotaConsumedPollingIntervalSeconds;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import bio.terra.pipelines.app.configuration.external.CbasConfiguration;
import bio.terra.pipelines.app.configuration.internal.ImputationConfiguration;
import bio.terra.pipelines.app.configuration.internal.WdlPipelineConfiguration;
import bio.terra.pipelines.dependencies.cbas.CbasService;
import bio.terra.pipelines.dependencies.leonardo.LeonardoService;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class FlightBeanBag {
private final RawlsService rawlsService;
private final ImputationConfiguration imputationConfiguration;
private final CbasConfiguration cbasConfiguration;
private final WdlPipelineConfiguration wdlPipelineConfiguration;

@Lazy
@Autowired
Expand All @@ -51,7 +53,8 @@ public FlightBeanBag(
RawlsService rawlsService,
WorkspaceManagerService workspaceManagerService,
ImputationConfiguration imputationConfiguration,
CbasConfiguration cbasConfiguration) {
CbasConfiguration cbasConfiguration,
WdlPipelineConfiguration wdlPipelineConfiguration) {
this.pipelinesService = pipelinesService;
this.pipelineRunsService = pipelineRunsService;
this.pipelineInputsOutputsService = pipelineInputsOutputsService;
Expand All @@ -63,6 +66,7 @@ public FlightBeanBag(
this.rawlsService = rawlsService;
this.imputationConfiguration = imputationConfiguration;
this.cbasConfiguration = cbasConfiguration;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
}

public static FlightBeanBag getFromObject(Object object) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bio.terra.pipelines.stairway;

import bio.terra.common.exception.InternalServerErrorException;
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.rawls.model.Entity;
import bio.terra.stairway.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step calls Rawls to fetch outputs from a data table row for a given quota consumed job. It
* specifically fetches the quota consumed value from the data table row using the quota_consumed
* key
*
* <p>This step expects nothing from the working map
*/
public class FetchQuotaConsumedFromDataTableStep implements Step {

private final RawlsService rawlsService;
private final SamService samService;
private final Logger logger = LoggerFactory.getLogger(FetchQuotaConsumedFromDataTableStep.class);

public FetchQuotaConsumedFromDataTableStep(RawlsService rawlsService, SamService samService) {
this.rawlsService = rawlsService;
this.samService = samService;
}

@Override
@SuppressWarnings(
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(),
// since we do validate that pipelineName is not null in `validateRequiredEntries`
public StepResult doStep(FlightContext flightContext) {
String jobId = flightContext.getFlightId();

// validate and extract parameters from input map
var inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
JobMapKeys.PIPELINE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME);

String controlWorkspaceBillingProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class);

Entity entity;
try {
entity =
rawlsService.getDataTableEntity(
samService.getTeaspoonsServiceAccountToken(),
controlWorkspaceBillingProject,
controlWorkspaceName,
pipelineName.getValue(),
jobId);
} catch (RawlsServiceApiException e) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e);
}

// extract quota_consumed from entity
int quotaConsumed;
try {
quotaConsumed = (int) entity.getAttributes().get("quota_consumed");
if (quotaConsumed <= 0) {
return new StepResult(
StepStatus.STEP_RESULT_FAILURE_FATAL,
new InternalServerErrorException("Quota consumed is unexpectedly not greater than 0"));
}
} catch (NullPointerException e) {
return new StepResult(
StepStatus.STEP_RESULT_FAILURE_FATAL,
new InternalServerErrorException("Quota consumed is unexpectedly null"));
}

logger.info("Quota consumed: {}", quotaConsumed);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the idea that in future we'll store this in the working map to be used by the logic to do the quota check in a subsequent step?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we may do the quota check here or in a subsequent step - thats due to be done in https://broadworkbench.atlassian.net/browse/TSPS-361. I'd prob do it here and rename the step


return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext flightContext) {
// nothing to undo
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package bio.terra.pipelines.stairway;

import bio.terra.pipelines.app.configuration.internal.WdlPipelineConfiguration;
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper;
import bio.terra.stairway.*;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step polls rawls for a submission until all runs are in a finalized state. If submission is
* not in a final state, this will step will poll again after an interval of time. Once the
* submission is finalized then it will see if the workflows are all successful and if so will
* succeed otherwise will fail.
*
* <p>this step expects quota submission id to be provided in the working map
*/
public class PollQuotaConsumedSubmissionStatusStep implements Step {
private final RawlsService rawlsService;
private final SamService samService;
private final WdlPipelineConfiguration wdlPipelineConfiguration;
private final Logger logger =
LoggerFactory.getLogger(PollQuotaConsumedSubmissionStatusStep.class);

public PollQuotaConsumedSubmissionStatusStep(
RawlsService rawlsService,
SamService samService,
WdlPipelineConfiguration wdlPipelineConfiguration) {
this.samService = samService;
this.rawlsService = rawlsService;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
}

@Override
public StepResult doStep(FlightContext flightContext) throws InterruptedException {
// validate and extract parameters from input map
FlightMap inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
String controlWorkspaceProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
// validate and extract parameters from working map
FlightMap workingMap = flightContext.getWorkingMap();
FlightUtils.validateRequiredEntries(workingMap, ImputationJobMapKeys.QUOTA_SUBMISSION_ID);

UUID quotaSubmissionId = workingMap.get(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, UUID.class);

RawlsSubmissionStepHelper rawlsSubmissionStepHelper =
new RawlsSubmissionStepHelper(
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger);
return rawlsSubmissionStepHelper.pollRawlsSubmissionHelper(
quotaSubmissionId, wdlPipelineConfiguration.getQuotaConsumedPollingIntervalSeconds());
}

@Override
public StepResult undoStep(FlightContext context) {
// nothing to undo; there's nothing to undo about polling a cromwell submission
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package bio.terra.pipelines.stairway;

import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelineVariableTypesEnum;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.db.entities.PipelineInputDefinition;
import bio.terra.pipelines.db.entities.PipelineOutputDefinition;
import bio.terra.pipelines.dependencies.rawls.RawlsService;
import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.imputation.ImputationJobMapKeys;
import bio.terra.pipelines.stairway.utils.RawlsSubmissionStepHelper;
import bio.terra.rawls.model.SubmissionReport;
import bio.terra.rawls.model.SubmissionRequest;
import bio.terra.stairway.*;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This step submits a quota consumed wdl to cromwell using the rawls submission endpoint. The quota
* consumed wdl that is run depends on the workspace name and billing project provided to the step.
*
* <p>this step expects nothing from the working map
*
* <p>this step writes quota_submission_id to the working map
*/
public class SubmitQuotaConsumedSubmissionStep implements Step {
private final SamService samService;
private final RawlsService rawlsService;

public static final String QUOTA_CONSUMED_METHOD_NAME = "QuotaConsumed";
public static final List<PipelineOutputDefinition> QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST =
List.of(
new PipelineOutputDefinition(
null, "quotaConsumed", "quota_consumed", PipelineVariableTypesEnum.INTEGER));

private final Logger logger = LoggerFactory.getLogger(SubmitQuotaConsumedSubmissionStep.class);

public SubmitQuotaConsumedSubmissionStep(RawlsService rawlsService, SamService samService) {
this.samService = samService;
this.rawlsService = rawlsService;
}

@Override
@SuppressWarnings(
"java:S2259") // suppress warning for possible NPE when calling pipelineName.getValue(),
// since we do validate that pipelineName is not null in `validateRequiredEntries`
public StepResult doStep(FlightContext flightContext) {
// validate and extract parameters from input map
FlightMap inputParameters = flightContext.getInputParameters();
FlightUtils.validateRequiredEntries(
inputParameters,
JobMapKeys.PIPELINE_NAME,
ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT,
ImputationJobMapKeys.CONTROL_WORKSPACE_NAME,
ImputationJobMapKeys.WDL_METHOD_VERSION,
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS);

PipelinesEnum pipelineName = inputParameters.get(JobMapKeys.PIPELINE_NAME, PipelinesEnum.class);
String controlWorkspaceName =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_NAME, String.class);
String controlWorkspaceProject =
inputParameters.get(ImputationJobMapKeys.CONTROL_WORKSPACE_BILLING_PROJECT, String.class);
String wdlMethodVersion =
inputParameters.get(ImputationJobMapKeys.WDL_METHOD_VERSION, String.class);
List<PipelineInputDefinition> inputDefinitions =
inputParameters.get(
ImputationJobMapKeys.PIPELINE_INPUT_DEFINITIONS, new TypeReference<>() {});

// validate and extract parameters from working map
FlightMap workingMap = flightContext.getWorkingMap();

RawlsSubmissionStepHelper rawlsSubmissionStepHelper =
new RawlsSubmissionStepHelper(
rawlsService, samService, controlWorkspaceProject, controlWorkspaceName, logger);

Optional<StepResult> validationResponse =
rawlsSubmissionStepHelper.validateRawlsSubmissionMethodHelper(
QUOTA_CONSUMED_METHOD_NAME,
wdlMethodVersion,
inputDefinitions,
QUOTA_CONSUMED_OUTPUT_DEFINITION_LIST,
pipelineName);

// if there is a validation response that means the validation failed so return it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is elegant! nice!

if (validationResponse.isPresent()) {
return validationResponse.get();
}

// create submission request
SubmissionRequest submissionRequest =
new SubmissionRequest()
.entityName(flightContext.getFlightId())
.entityType(pipelineName.getValue())
.useCallCache(true)
.deleteIntermediateOutputFiles(true)
.useReferenceDisks(false)
.userComment(
"%s - getting quota consumed for flight id: %s"
.formatted(pipelineName, flightContext.getFlightId()))
.methodConfigurationNamespace(controlWorkspaceProject)
.methodConfigurationName(QUOTA_CONSUMED_METHOD_NAME);

// submit workflow to rawls
SubmissionReport submissionReport;
try {
submissionReport =
rawlsService.submitWorkflow(
samService.getTeaspoonsServiceAccountToken(),
submissionRequest,
controlWorkspaceProject,
controlWorkspaceName);
} catch (RawlsServiceApiException e) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_RETRY, e);
}

// add submission id to working map to be used for polling in downstream step
workingMap.put(ImputationJobMapKeys.QUOTA_SUBMISSION_ID, submissionReport.getSubmissionId());
return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext context) {
// nothing to undo; there's nothing to undo about submitting a run set
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ImputationJobMapKeys {
"control_workspace_billing_project";
public static final String CONTROL_WORKSPACE_NAME = "control_workspace_name";
public static final String SUBMISSION_ID = "submission_id";
public static final String QUOTA_SUBMISSION_ID = "quota_submission_id";

// Azure specific keys
public static final String CONTROL_WORKSPACE_ID = "control_workspace_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import bio.terra.pipelines.common.utils.FlightUtils;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.stairway.FetchQuotaConsumedFromDataTableStep;
import bio.terra.pipelines.stairway.PollQuotaConsumedSubmissionStatusStep;
import bio.terra.pipelines.stairway.SubmitQuotaConsumedSubmissionStep;
import bio.terra.pipelines.stairway.imputation.steps.CompletePipelineRunStep;
import bio.terra.pipelines.stairway.imputation.steps.PrepareImputationInputsStep;
import bio.terra.pipelines.stairway.imputation.steps.gcp.AddDataTableRowStep;
Expand Down Expand Up @@ -75,6 +78,23 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {
new AddDataTableRowStep(flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new SubmitQuotaConsumedSubmissionStep(
flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new PollQuotaConsumedSubmissionStatusStep(
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getWdlPipelineConfiguration()),
externalServiceRetryRule);

addStep(
new FetchQuotaConsumedFromDataTableStep(
flightBeanBag.getRawlsService(), flightBeanBag.getSamService()),
externalServiceRetryRule);

addStep(
new SubmitCromwellSubmissionStep(
flightBeanBag.getRawlsService(),
Expand All @@ -84,8 +104,8 @@ public RunImputationGcpJobFlight(FlightMap inputParameters, Object beanBag) {

addStep(
new PollCromwellSubmissionStatusStep(
flightBeanBag.getSamService(),
flightBeanBag.getRawlsService(),
flightBeanBag.getSamService(),
flightBeanBag.getImputationConfiguration()),
externalServiceRetryRule);

Expand Down
Loading
Loading