Skip to content

Commit

Permalink
TSPS-181 Add stairway hooks for failed flights - update metrics and s…
Browse files Browse the repository at this point in the history
…tatus (#141)
  • Loading branch information
mmorgantaylor authored Oct 11, 2024
1 parent d78464e commit 69441a7
Show file tree
Hide file tree
Showing 50 changed files with 862 additions and 545 deletions.
15 changes: 15 additions & 0 deletions service/IMPLEMENTATION_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,18 @@ These methods return a list of error messages, if any, that are accumulated into
thrown inside validateInputs(). Because ValidationException ultimately extends ErrorReportException,
which is handled by the global exception handler, these error messages are returned directly to the caller
in the API response.

## PipelineRun Status updates, Metrics, and Stairway hooks
We use our pipelineRuns table as the source of truth for our pipeline runs, and we pull in the pipelineRun
status from Stairway when the pipelineRun completes. In the case of a successful pipelineRun, we update its
status in the final step of the stairway flight. In the case of a failed pipelineRun, we update its status
using a Stairway hook ([StairwaySetPipelineRunStatusHook](src/main/java/bio/terra/pipelines/common/utils/StairwaySetPipelineRunStatusHook.java)).

We use a Stairway hook so that the pipelineRun will be marked as failed regardless of when in the flight the
failure occurs, and whether it's a roll-back-able error or a dismal failure.

Similarly, for metrics reporting, we use a Stairway hook ([StairwayFailedMetricsCounterHook](src/main/java/bio/terra/pipelines/common/utils/StairwayFailedMetricsCounterHook.java))
to increment the failed metrics counter when a pipelineRun fails.

Because Stairway hooks are applied at the Stairway instance level and not per-flight, we conditionally run the
logic in each of these hooks only if the corresponding flight map key is present and set to true in the flight map.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public static ApiGetJobsResponse mapEnumeratedJobsToApi(EnumeratedJobs enumerate

public static ApiJobReport mapFlightStateToApiJobReport(FlightState flightState) {
FlightMap inputParameters = flightState.getInputParameters();
String description = inputParameters.get(JobMapKeys.DESCRIPTION.getKeyName(), String.class);
String description = inputParameters.get(JobMapKeys.DESCRIPTION, String.class);
FlightStatus flightStatus = flightState.getFlightStatus();
String submittedDate = flightState.getSubmitted().toString();
ApiJobReport.StatusEnum jobStatus = mapFlightStatusToApi(flightStatus);
String resultURL = inputParameters.get(JobMapKeys.RESULT_PATH.getKeyName(), String.class);
String resultURL = inputParameters.get(JobMapKeys.RESULT_PATH, String.class);

String completedDate = null;
HttpStatus statusCode = HttpStatus.ACCEPTED;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static ApiJobReport mapFlightStateToApiJobReport(FlightState flightState)
case SUCCEEDED -> {
FlightMap resultMap =
flightState.getResultMap().orElseThrow(InvalidResultStateException::noResultMap);
statusCode = resultMap.get(JobMapKeys.STATUS_CODE.getKeyName(), HttpStatus.class);
statusCode = resultMap.get(JobMapKeys.STATUS_CODE, HttpStatus.class);
if (statusCode == null) {
statusCode = HttpStatus.OK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static void setErrorResponse(
public static void setResponse(
FlightContext context, Object responseObject, HttpStatus responseStatus) {
FlightMap workingMap = context.getWorkingMap();
workingMap.put(JobMapKeys.RESPONSE.getKeyName(), responseObject);
workingMap.put(JobMapKeys.STATUS_CODE.getKeyName(), responseStatus);
workingMap.put(JobMapKeys.RESPONSE, responseObject);
workingMap.put(JobMapKeys.STATUS_CODE, responseStatus);
}

/**
Expand Down Expand Up @@ -158,4 +158,9 @@ public static boolean flightComplete(FlightState flightState) {
|| flightState.getFlightStatus() == FlightStatus.FATAL
|| flightState.getFlightStatus() == FlightStatus.SUCCESS);
}

/** Check whether the provided FlightMap contain a particular key and whether its value is true */
public static boolean flightMapKeyIsTrue(FlightMap flightMap, String key) {
return flightMap.containsKey(key) && flightMap.get(key, boolean.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.app.common.MetricsUtils;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that logs a pipeline failure to Prometheus metrics upon flight failure.
*
* <p>This hook action will only run if the flight's input parameters contain the JobMapKeys key for
* DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for PIPELINE_NAME is required to increment the failed flight.
*/
@Component
public class StairwayFailedMetricsCounterHook implements StairwayHook {
private static final Logger logger =
LoggerFactory.getLogger(StairwayFailedMetricsCounterHook.class);

@Override
public HookAction endFlight(FlightContext context) {

if (flightMapKeyIsTrue(
context.getInputParameters(), JobMapKeys.DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, incrementing failed flight counter", context.getFlightStatus());

// increment failed runs counter metric
PipelinesEnum pipelinesEnum =
PipelinesEnum.valueOf(
context.getInputParameters().get(JobMapKeys.PIPELINE_NAME, String.class));
MetricsUtils.incrementPipelineRunFailed(pipelinesEnum);
}
return HookAction.CONTINUE;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that updates the PipelineRun status to FAILED on flight failure.
*
* <p>This the endFlight hook action will only run if the flight's input parameters contain the
* JobMapKeys key for DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for USER_ID is required to set the PipelineRun status to FAILED.
*/
@Component
public class StairwaySetPipelineRunStatusHook implements StairwayHook {
private static final Logger logger =
LoggerFactory.getLogger(StairwaySetPipelineRunStatusHook.class);
private final PipelineRunsService pipelineRunsService;

public StairwaySetPipelineRunStatusHook(PipelineRunsService pipelineRunsService) {
this.pipelineRunsService = pipelineRunsService;
}

@Override
public HookAction endFlight(FlightContext context) {

if (flightMapKeyIsTrue(
context.getInputParameters(), JobMapKeys.DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, setting PipelineRun status to FAILED", context.getFlightStatus());

// set PipelineRun status to FAILED
pipelineRunsService.markPipelineRunFailed(
UUID.fromString(context.getFlightId()),
context.getInputParameters().get(JobMapKeys.USER_ID, String.class));
}

return HookAction.CONTINUE;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package bio.terra.pipelines.dependencies.stairway;

import static bio.terra.pipelines.dependencies.stairway.JobMapKeys.getRequiredKeys;

import bio.terra.common.exception.BadRequestException;
import bio.terra.common.exception.MissingRequiredFieldException;
import bio.terra.common.stairway.MonitoringHook;
Expand Down Expand Up @@ -73,7 +75,7 @@ private void validateRequiredInputs() {
}

List<String> missingFields = new ArrayList<>();
for (String keyName : JobMapKeys.getRequiredKeys()) {
for (String keyName : getRequiredKeys()) {
if (!jobParameterMap.containsKey(keyName)
|| Objects.equals(
jobParameterMap.getRaw(keyName), "null") // getRaw stringifies the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import java.util.Arrays;
import java.util.List;

public enum JobMapKeys {
public class JobMapKeys {
// parameters for all flight types
DESCRIPTION("description"),
USER_ID("user_id"),
PIPELINE_NAME("pipeline_name"),
STATUS_CODE("status_code"),
RESPONSE("response"), // result or output of the job
RESULT_PATH(
"result_path"); // path to the result API endpoint for this job; only used for asynchronous
// endpoints
public static final String DESCRIPTION = "description";
public static final String USER_ID = "user_id";
public static final String PIPELINE_NAME = "pipeline_name";
public static final String PIPELINE_ID = "pipeline_id";
public static final String STATUS_CODE = "status_code";
public static final String RESPONSE = "response"; // result or output of the job
public static final String RESULT_PATH =
"result_path"; // path to the result API endpoint for this job; only used for asynchronous

private final String keyName;
// keys to determine which Stairway hooks to run
public static final String DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK =
"do_set_pipeline_run_status_failed_hook";
public static final String DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK =
"do_increment_metrics_failed_counter_hook";

JobMapKeys(String keyName) {
this.keyName = keyName;
}

public String getKeyName() {
return keyName;
JobMapKeys() {
throw new IllegalStateException("Attempted to instantiate utility class JobMapKeys");
}

public static List<String> getRequiredKeys() {
return Arrays.asList(JobMapKeys.USER_ID.getKeyName(), JobMapKeys.PIPELINE_NAME.getKeyName());
return Arrays.asList(USER_ID, PIPELINE_NAME, PIPELINE_ID);
}
}
Loading

0 comments on commit 69441a7

Please sign in to comment.