Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSPS-181 Add stairway hooks for failed flights - update metrics and status #141

Merged
merged 17 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions service/IMPLEMENTATION_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,18 @@ These methods return a list of error messages, if any, that are accumulated into
thrown inside validateInputs(). Because ValidationException ultimately extends ErrorReportException,
which is handled by the global exception handler, these error messages are returned directly to the caller
in the API response.

## PipelineRun Status updates, Metrics, and Stairway hooks
We use our pipelineRuns table as the source of truth for our pipeline runs, and we pull in the pipelineRun
status from Stairway when the pipelineRun completes. In the case of a successful pipelineRun, we update its
status in the final step of the stairway flight. In the case of a failed pipelineRun, we update its status
using a Stairway hook ([StairwaySetPipelineRunStatusHook](src/main/java/bio/terra/pipelines/common/utils/StairwaySetPipelineRunStatusHook.java)).

We use a Stairway hook so that the pipelineRun will be marked as failed regardless of when in the flight the
failure occurs, and whether it's a roll-back-able error or a dismal failure.

Similarly, for metrics reporting, we use a Stairway hook ([StairwayFailedMetricsCounterHook](src/main/java/bio/terra/pipelines/common/utils/StairwayFailedMetricsCounterHook.java))
to increment the failed metrics counter when a pipelineRun fails.

Because Stairway hooks are applied at the Stairway instance level and not per-flight, we conditionally run the
logic in each of these hooks only if the corresponding flight map key is present and set to true in the flight map.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public static ApiGetJobsResponse mapEnumeratedJobsToApi(EnumeratedJobs enumerate

public static ApiJobReport mapFlightStateToApiJobReport(FlightState flightState) {
FlightMap inputParameters = flightState.getInputParameters();
String description = inputParameters.get(JobMapKeys.DESCRIPTION.getKeyName(), String.class);
String description = inputParameters.get(JobMapKeys.DESCRIPTION, String.class);
FlightStatus flightStatus = flightState.getFlightStatus();
String submittedDate = flightState.getSubmitted().toString();
ApiJobReport.StatusEnum jobStatus = mapFlightStatusToApi(flightStatus);
String resultURL = inputParameters.get(JobMapKeys.RESULT_PATH.getKeyName(), String.class);
String resultURL = inputParameters.get(JobMapKeys.RESULT_PATH, String.class);

String completedDate = null;
HttpStatus statusCode = HttpStatus.ACCEPTED;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static ApiJobReport mapFlightStateToApiJobReport(FlightState flightState)
case SUCCEEDED -> {
FlightMap resultMap =
flightState.getResultMap().orElseThrow(InvalidResultStateException::noResultMap);
statusCode = resultMap.get(JobMapKeys.STATUS_CODE.getKeyName(), HttpStatus.class);
statusCode = resultMap.get(JobMapKeys.STATUS_CODE, HttpStatus.class);
if (statusCode == null) {
statusCode = HttpStatus.OK;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static void setErrorResponse(
public static void setResponse(
FlightContext context, Object responseObject, HttpStatus responseStatus) {
FlightMap workingMap = context.getWorkingMap();
workingMap.put(JobMapKeys.RESPONSE.getKeyName(), responseObject);
workingMap.put(JobMapKeys.STATUS_CODE.getKeyName(), responseStatus);
workingMap.put(JobMapKeys.RESPONSE, responseObject);
workingMap.put(JobMapKeys.STATUS_CODE, responseStatus);
}

/**
Expand Down Expand Up @@ -158,4 +158,9 @@ public static boolean flightComplete(FlightState flightState) {
|| flightState.getFlightStatus() == FlightStatus.FATAL
|| flightState.getFlightStatus() == FlightStatus.SUCCESS);
}

/** Check whether the provided FlightMap contain a particular key and whether its value is true */
public static boolean flightMapKeyIsTrue(FlightMap flightMap, String key) {
return flightMap.containsKey(key) && flightMap.get(key, boolean.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.app.common.MetricsUtils;
import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that logs a pipeline failure to Prometheus metrics upon flight failure.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some more documentation around what key to use to enable this functionality on a flight

*
* <p>This hook action will only run if the flight's input parameters contain the JobMapKeys key for
* DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for PIPELINE_NAME is required to increment the failed flight.
*/
@Component
public class StairwayFailedMetricsCounterHook implements StairwayHook {
private static final Logger logger =
LoggerFactory.getLogger(StairwayFailedMetricsCounterHook.class);

@Override
public HookAction endFlight(FlightContext context) {

if (flightMapKeyIsTrue(
context.getInputParameters(), JobMapKeys.DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, incrementing failed flight counter", context.getFlightStatus());

// increment failed runs counter metric
PipelinesEnum pipelinesEnum =
PipelinesEnum.valueOf(
context.getInputParameters().get(JobMapKeys.PIPELINE_NAME, String.class));
MetricsUtils.incrementPipelineRunFailed(pipelinesEnum);
}
return HookAction.CONTINUE;
}
}
snf2ye marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that updates the PipelineRun status to FAILED on flight failure.
*
* <p>This the endFlight hook action will only run if the flight's input parameters contain the
* JobMapKeys key for DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for USER_ID is required to set the PipelineRun status to FAILED.
*/
@Component
public class StairwaySetPipelineRunStatusHook implements StairwayHook {
private static final Logger logger =
LoggerFactory.getLogger(StairwaySetPipelineRunStatusHook.class);
private final PipelineRunsService pipelineRunsService;

public StairwaySetPipelineRunStatusHook(PipelineRunsService pipelineRunsService) {
this.pipelineRunsService = pipelineRunsService;
}

@Override
public HookAction endFlight(FlightContext context) {

if (flightMapKeyIsTrue(
context.getInputParameters(), JobMapKeys.DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, setting PipelineRun status to FAILED", context.getFlightStatus());

// set PipelineRun status to FAILED
pipelineRunsService.markPipelineRunFailed(
UUID.fromString(context.getFlightId()),
context.getInputParameters().get(JobMapKeys.USER_ID, String.class));
}

return HookAction.CONTINUE;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package bio.terra.pipelines.dependencies.stairway;

import static bio.terra.pipelines.dependencies.stairway.JobMapKeys.getRequiredKeys;

import bio.terra.common.exception.BadRequestException;
import bio.terra.common.exception.MissingRequiredFieldException;
import bio.terra.common.stairway.MonitoringHook;
Expand Down Expand Up @@ -73,7 +75,7 @@ private void validateRequiredInputs() {
}

List<String> missingFields = new ArrayList<>();
for (String keyName : JobMapKeys.getRequiredKeys()) {
for (String keyName : getRequiredKeys()) {
if (!jobParameterMap.containsKey(keyName)
|| Objects.equals(
jobParameterMap.getRaw(keyName), "null") // getRaw stringifies the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
import java.util.Arrays;
import java.util.List;

public enum JobMapKeys {
public class JobMapKeys {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting - we follow the same enum/getKeyName() pattern in TDR. We don't ever reference the enum without the keyName. I like this new approach - it feels much less bulky. Is there something driving this change besides that it is cleaner?

Has this been accepted as the new standard? I'll have to keep this in mind for incremental changes to TDR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is any new standard - when we copied things from WSM, they had the enum JobMapKeys and a class for custom keys. We realized that there wasn't really a reason to have both (and it was confusing to have both), and the class version is less bulky, like you said. so we're going for it!

// parameters for all flight types
DESCRIPTION("description"),
USER_ID("user_id"),
PIPELINE_NAME("pipeline_name"),
STATUS_CODE("status_code"),
RESPONSE("response"), // result or output of the job
RESULT_PATH(
"result_path"); // path to the result API endpoint for this job; only used for asynchronous
// endpoints
public static final String DESCRIPTION = "description";
public static final String USER_ID = "user_id";
public static final String PIPELINE_NAME = "pipeline_name";
public static final String PIPELINE_ID = "pipeline_id";
public static final String STATUS_CODE = "status_code";
public static final String RESPONSE = "response"; // result or output of the job
public static final String RESULT_PATH =
"result_path"; // path to the result API endpoint for this job; only used for asynchronous

private final String keyName;
// keys to determine which Stairway hooks to run
public static final String DO_SET_PIPELINE_RUN_STATUS_FAILED_HOOK =
"do_set_pipeline_run_status_failed_hook";
public static final String DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK =
"do_increment_metrics_failed_counter_hook";

JobMapKeys(String keyName) {
this.keyName = keyName;
}

public String getKeyName() {
return keyName;
JobMapKeys() {
throw new IllegalStateException("Attempted to instantiate utility class JobMapKeys");
}

public static List<String> getRequiredKeys() {
return Arrays.asList(JobMapKeys.USER_ID.getKeyName(), JobMapKeys.PIPELINE_NAME.getKeyName());
return Arrays.asList(USER_ID, PIPELINE_NAME, PIPELINE_ID);
}
}
Loading
Loading