Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split workflow executions in half if request body size is too big #481

Merged
merged 9 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions metricsaggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down Expand Up @@ -292,6 +296,7 @@
<usedDependency>org.javamoney.moneta:moneta-core</usedDependency>
<usedDependency>ch.qos.logback:logback-classic</usedDependency>
<usedDependency>ch.qos.logback:logback-core</usedDependency>
<usedDependency>com.google.guava:guava</usedDependency>
</usedDependencies>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static io.dockstore.utils.ExceptionHandler.exceptionMessage;
import static java.util.stream.Collectors.groupingBy;

import com.google.common.collect.Lists;
import com.google.common.math.IntMath;
import io.dockstore.common.Partner;
import io.dockstore.metricsaggregator.MetricsAggregatorConfig;
import io.dockstore.metricsaggregator.client.cli.CommandLineArgs.SubmitTerraMetrics;
Expand All @@ -22,6 +24,7 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -44,6 +47,7 @@
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -167,21 +171,60 @@
}

final SourceUrlTrsInfo sourceUrlTrsInfo = sourceUrlToSourceUrlTrsInfo.get(sourceUrl);
final List<RunExecution> workflowExecutionsToSubmit = workflowMetricRecords.stream()
List<RunExecution> workflowExecutionsToSubmit = workflowMetricRecords.stream()
.map(workflowExecution -> getTerraWorkflowExecutionFromCsvRecord(workflowExecution, sourceUrlTrsInfo.sourceUrl(), skippedExecutionsCsvPrinter))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
final ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody().runExecutions(workflowExecutionsToSubmit);
String description = "Submitted using the metricsaggregator's submit-terra-metrics command";
if (StringUtils.isNotBlank(submitTerraMetricsCommand.getDescription())) {
description += ". " + submitTerraMetricsCommand.getDescription();

Check warning on line 181 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L181

Added line #L181 was not covered by tests
}

executionMetricsPost(workflowExecutionsToSubmit, sourceUrlTrsInfo, description, extendedGa4GhApi, workflowMetricRecords, skippedExecutionsCsvPrinter);
}

/**
* Submit Terra workflow executions to Dockstore.
* If the request fails with a 413 Request Entity Too Large and there are more than one execution to submit, the function halves the number of workflow executions to submit then re-attempts submission
* until it's successful or a non-413 error occurs.
* @param workflowExecutionsToSubmit
* @param sourceUrlTrsInfo
* @param description
* @param extendedGa4GhApi
* @param workflowMetricRecords
* @param skippedExecutionsCsvPrinter
*/
private void executionMetricsPost(List<RunExecution> workflowExecutionsToSubmit, SourceUrlTrsInfo sourceUrlTrsInfo, String description, ExtendedGa4GhApi extendedGa4GhApi, List<CSVRecord> workflowMetricRecords, CSVPrinter skippedExecutionsCsvPrinter) {
try {
String description = "Submitted using the metricsaggregator's submit-terra-metrics command";
if (StringUtils.isNotBlank(submitTerraMetricsCommand.getDescription())) {
description += ". " + submitTerraMetricsCommand.getDescription();
}
extendedGa4GhApi.executionMetricsPost(executionsRequestBody, Partner.TERRA.toString(), sourceUrlTrsInfo.trsId(), sourceUrlTrsInfo.version(), description);
extendedGa4GhApi.executionMetricsPost(new ExecutionsRequestBody().runExecutions(workflowExecutionsToSubmit), Partner.TERRA.toString(), sourceUrlTrsInfo.trsId(),
sourceUrlTrsInfo.version(), description);
numberOfExecutionsSubmitted.addAndGet(workflowMetricRecords.size());
} catch (ApiException e) {
logSkippedExecutions(sourceUrlTrsInfo.sourceUrl(), workflowMetricRecords, String.format("Could not submit execution metrics to Dockstore for workflow %s: %s", sourceUrlTrsInfo, e.getMessage()), skippedExecutionsCsvPrinter, false);
if (e.getCode() == HttpStatus.SC_REQUEST_TOO_LONG) {
// One execution is too large, not much that can be done, so log and skip it
if (workflowExecutionsToSubmit.size() == 1) {
logSkippedExecutions(sourceUrlTrsInfo.sourceUrl(), workflowMetricRecords,
String.format("Could not submit execution metric to Dockstore for workflow %s. Single execution is too large: %s", sourceUrlTrsInfo,
e.getMessage()), skippedExecutionsCsvPrinter, false);

Check warning on line 209 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L207-L209

Added lines #L207 - L209 were not covered by tests
} else {
int partitionSize = IntMath.divide(workflowExecutionsToSubmit.size(), 2, RoundingMode.UP);
List<List<RunExecution>> workflowExecutionsToSubmitPartitions = Lists.partition(workflowExecutionsToSubmit,

Check warning on line 212 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L211-L212

Added lines #L211 - L212 were not covered by tests
partitionSize);
LOG.info(

Check warning on line 214 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L214

Added line #L214 was not covered by tests
"Request body too large, dividing list of {} workflow executions in half with partition size {} and re-attempting",
workflowExecutionsToSubmit.size(), partitionSize);

Check warning on line 216 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L216

Added line #L216 was not covered by tests
for (List<RunExecution> partition : workflowExecutionsToSubmitPartitions) {
LOG.info("Re-attempting with {} workflow executions", partition.size());
executionMetricsPost(partition, sourceUrlTrsInfo, description, extendedGa4GhApi, workflowMetricRecords,

Check warning on line 219 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L218-L219

Added lines #L218 - L219 were not covered by tests
skippedExecutionsCsvPrinter);
}
}

Check warning on line 222 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L221-L222

Added lines #L221 - L222 were not covered by tests
} else {
logSkippedExecutions(sourceUrlTrsInfo.sourceUrl(), workflowMetricRecords,
String.format("Could not submit execution metrics to Dockstore for workflow %s: %s", sourceUrlTrsInfo,
e.getMessage()), skippedExecutionsCsvPrinter, false);

Check warning on line 226 in metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java

View check run for this annotation

Codecov / codecov/patch

metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java#L224-L226

Added lines #L224 - L226 were not covered by tests
}
}
}

Expand Down