Skip to content

Commit

Permalink
Group by source url - much quicker!
Browse files Browse the repository at this point in the history
  • Loading branch information
kathy-t committed Jan 8, 2024
1 parent 3d8f785 commit 6b84efe
Showing 1 changed file with 156 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.dockstore.utils.CLIConstants.FAILURE_EXIT_CODE;
import static io.dockstore.utils.DockstoreApiClientUtils.setupApiClient;
import static java.util.stream.Collectors.groupingBy;

import io.dockstore.common.Partner;
import io.dockstore.metricsaggregator.MetricsAggregatorConfig;
Expand Down Expand Up @@ -105,11 +106,28 @@ public void submitTerraMetrics() {
workflowMetricsToProcess.add(workflowMetricRecord);
} else {
workflowMetricsToProcess.add(workflowMetricRecord);
// Collect a map of CSV records with the same source URL
Map<String, List<CSVRecord>> sourceUrlToCsvRecords = workflowMetricsToProcess.stream()
.collect(groupingBy(csvRecord -> csvRecord.get(TerraMetricsCsvHeaders.source_url)));
/*
for (CSVRecord workflowMetricToProcess: workflowMetricsToProcess) {
final String csvRecordSourceUrl = workflowMetricToProcess.get(TerraMetricsCsvHeaders.source_url);
List<CSVRecord> csvRecordsWithSameSourceUrl = sourceUrlToCsvRecords.getOrDefault(csvRecordSourceUrl, new ArrayList<>());
csvRecordsWithSameSourceUrl.add(workflowMetricToProcess);
sourceUrlToCsvRecords.put(csvRecordSourceUrl, csvRecordsWithSameSourceUrl);
}
*/

LOG.info("Processing rows {} to {}", workflowMetricsToProcess.get(0).getRecordNumber(), workflowMetricsToProcess.get(workflowMetricsToProcess.size() - 1).getRecordNumber());

workflowMetricsToProcess.stream()
sourceUrlToCsvRecords.entrySet().stream()
.parallel()
.forEach(record -> processWorkflowExecution(record, workflowsApi, extendedGa4GhApi, skippedExecutionsCsvPrinter));
.forEach(entry -> {
final String sourceUrl = entry.getKey();
final List<CSVRecord> csvRecordsWithSameSourceUrl = entry.getValue();
submitWorkflowExecutions(sourceUrl, csvRecordsWithSameSourceUrl, workflowsApi, extendedGa4GhApi, skippedExecutionsCsvPrinter);
});

workflowMetricsToProcess.clear();
logStats();
}
Expand Down Expand Up @@ -157,12 +175,15 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA
}

if (!sourceUrlToSourceUrlTrsInfo.containsKey(sourceUrl)) {
LOG.info("Calculating TRS info for source_url {}", sourceUrl);
Optional<SourceUrlTrsInfo> sourceUrlTrsInfo = calculateTrsInfoFromSourceUrl(workflowMetricRecord, sourceUrl, workflowsApi, skippedExecutionsCsvPrinter);
if (sourceUrlTrsInfo.isEmpty()) {
return;
} else {
sourceUrlToSourceUrlTrsInfo.put(sourceUrl, sourceUrlTrsInfo.get());
}
} else {
LOG.info("TRS info for source_url {} was previously calculated", sourceUrl);
}

// Check if the information from the workflow execution is valid
Expand All @@ -182,6 +203,54 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA
}
}

private void submitWorkflowExecutions(String sourceUrl, List<CSVRecord> workflowMetricRecords, WorkflowsApi workflowsApi, ExtendedGa4GhApi extendedGa4GhApi, CSVPrinter skippedExecutionsCsvPrinter) {
LOG.info("Processing source_url {} for {} executions", sourceUrl, workflowMetricRecords.size());
numberOfExecutionsProcessed.addAndGet(workflowMetricRecords.size());

if (StringUtils.isBlank(sourceUrl)) {
workflowMetricRecords.forEach(workflowMetricRecord -> {
logSkippedExecution("", workflowMetricRecord, "Can't determine TRS ID because source_url is missing", skippedExecutionsCsvPrinter, false);
});
}

// Check to see if this source_url was skipped before
if (skippedSourceUrlsToReason.containsKey(sourceUrl)) {
workflowMetricRecords.forEach(workflowMetricRecord -> {
logSkippedExecution(sourceUrl, workflowMetricRecord, skippedSourceUrlsToReason.get(sourceUrl), skippedExecutionsCsvPrinter, true);
});
return;
}

if (!sourceUrlToSourceUrlTrsInfo.containsKey(sourceUrl)) {
Optional<SourceUrlTrsInfo> sourceUrlTrsInfo = calculateTrsInfoFromSourceUrl(sourceUrl, workflowsApi);
if (sourceUrlTrsInfo.isEmpty()) {
workflowMetricRecords.forEach(workflowMetricRecord -> {
logSkippedExecution(sourceUrl, workflowMetricRecord, "Could not calculate TRS info", skippedExecutionsCsvPrinter, true);
});
return;
} else {
sourceUrlToSourceUrlTrsInfo.put(sourceUrl, sourceUrlTrsInfo.get());
}
}

final SourceUrlTrsInfo sourceUrlTrsInfo = sourceUrlToSourceUrlTrsInfo.get(sourceUrl);
final List<RunExecution> workflowExecutionsToSubmit = workflowMetricRecords.stream()
.map(workflowExecution -> getTerraWorkflowExecutionFromCsvRecord(workflowExecution, sourceUrlTrsInfo.sourceUrl(), skippedExecutionsCsvPrinter))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
final ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody().runExecutions(workflowExecutionsToSubmit);
try {
extendedGa4GhApi.executionMetricsPost(executionsRequestBody, Partner.TERRA.toString(), sourceUrlTrsInfo.trsId(), sourceUrlTrsInfo.version(),
"Terra metrics from Q4 2023");
numberOfExecutionsSubmitted.addAndGet(workflowMetricRecords.size());
} catch (ApiException e) {
workflowMetricRecords.forEach(workflowMetricRecord -> {
logSkippedExecution(sourceUrlTrsInfo.sourceUrl(), workflowMetricRecord, String.format("Could not submit execution metrics to Dockstore for workflow %s: %s", sourceUrlTrsInfo, e.getMessage()), skippedExecutionsCsvPrinter, false);
});
}
}

/**
* Performs logging and writing of the skipped execution to an output file.
* If skipFutureExecutionsWithSourceUrl is true, also adds the source_url of the skipped execution to the sourceUrlsToSkipToReason map
Expand All @@ -193,8 +262,6 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA
* @param skipFutureExecutionsWithSourceUrl boolean indicating if all executions with the same source_url should be skipped
*/
private void logSkippedExecution(String sourceUrl, CSVRecord csvRecordToSkip, String reason, CSVPrinter skippedExecutionsCsvPrinter, boolean skipFutureExecutionsWithSourceUrl) {
LOG.info("Skipping execution on row {}: {}", csvRecordToSkip.getRecordNumber(), reason);

// Record to map for future reference. Only want to do this if the skip reason applies for ALL executions with the source_url.
// Should not add to this map if the skip reason is specific to one execution
if (skipFutureExecutionsWithSourceUrl) {
Expand Down Expand Up @@ -262,7 +329,7 @@ public Optional<RunExecution> getTerraWorkflowExecutionFromCsvRecord(CSVRecord c

RunExecution workflowExecution = new RunExecution();
// TODO: uncomment below when the update executions endpoint PR is merged
// workflowExecution.setExecutionId(executionId);
workflowExecution.setExecutionId(executionId);
workflowExecution.setExecutionStatus(executionStatus.get());
workflowExecution.setDateExecuted(dateExecuted.get());
getExecutionTime(workflowRunTimeMinutes).ifPresent(workflowExecution::setExecutionTime);
Expand Down Expand Up @@ -379,6 +446,90 @@ private Optional<SourceUrlTrsInfo> calculateTrsInfoFromSourceUrl(CSVRecord workf
}
}

/**
* Calculates the TRS ID from the source_url by:
* <ol>
* <li>Looking at all published workflows that have the same workflow path prefix, i.e. they belong to the same GitHub repository.</li>
* <li>For each published workflow, getting the primary descriptor path for the version specified in source_url and checking if it matches the primary desciptor path in the source_url</li>
* <li>Ensuring that there is only one workflow in the repository with the same descriptor path. If there are multiple, it is an ambiguous case and we skip the execution</li>
* </ol>
*
* @param sourceUrl
* @param workflowsApi
* @return
*/
private Optional<SourceUrlTrsInfo> calculateTrsInfoFromSourceUrl(String sourceUrl, WorkflowsApi workflowsApi) {
// Need to figure out the TRS ID and version name using the source_url.
// Example source_url: https://raw.githubusercontent.com/theiagen/public_health_viral_genomics/v2.0.0/workflows/wf_theiacov_fasta.wdl
// Organization = "theiagen/public_health_viral_genomics", version = "v2.0.0", the rest is the primary descriptor path
// Note that the TRS ID may also have a workflow name, which we need to figure out
final List<String> sourceUrlComponents = getSourceUrlComponents(sourceUrl);


final int minNumberOfComponents = 3;
if (sourceUrlComponents.size() < minNumberOfComponents) {
//(sourceUrl, workflowMetricRecord, "Not enough components in the source_url to figure out the TRS ID and version", skippedExecutionsCsvPrinter, true);
return Optional.empty();
}

// There should be at least three elements in order for there to be an organization name, foo>/<organization>, and version <version>
// in <foo>/<organization>/<version>/<path-to-descriptor>
final String organization = sourceUrlComponents.get(0) + "/" + sourceUrlComponents.get(1);
final String version = sourceUrlComponents.get(2);
final String primaryDescriptorPathFromUrl = "/" + String.join("/", sourceUrlComponents.subList(3, sourceUrlComponents.size()));

final String workflowPathPrefix = "github.com/" + organization;
if (!workflowPathPrefixToWorkflows.containsKey(workflowPathPrefix)) {
try {
List<MinimalWorkflowInfo> publishedWorkflowsWithSamePathPrefix = workflowsApi.getAllPublishedWorkflowByPath(
workflowPathPrefix).stream()
.map(workflow -> new MinimalWorkflowInfo(workflow.getId(), workflow.getFullWorkflowPath(), workflow.getDescriptorType(), new ConcurrentHashMap<>())).toList();
workflowPathPrefixToWorkflows.put(workflowPathPrefix, publishedWorkflowsWithSamePathPrefix);
} catch (ApiException e) {
/*
logSkippedExecution(sourceUrl, workflowMetricRecord,
"Could not get all published workflows for workflow path " + workflowPathPrefix + " to determine TRS ID",
skippedExecutionsCsvPrinter, true);
*/
return Optional.empty();
}
}

List<MinimalWorkflowInfo> workflowsFromSameRepo = workflowPathPrefixToWorkflows.get(workflowPathPrefix);

List<String> foundFullWorkflowPaths = new ArrayList<>();
// Loop through each workflow to find one that matches the primary descriptor
workflowsFromSameRepo.forEach(workflow -> {
// Get the primary descriptor path for the version and update the map, either with the primary descriptor path or an empty string to indicate that it was not found
if (!workflow.versionToPrimaryDescriptorPath().containsKey(version)) {
final String primaryDescriptorAbsolutePath = makePathAbsolute(getPrimaryDescriptorAbsolutePath(workflowsApi, workflow, version).orElse(""));
workflow.versionToPrimaryDescriptorPath().put(version, primaryDescriptorAbsolutePath);
}

// Check to see if there's a version that has the same primary descriptor path
final String primaryDescriptorPathForVersion = workflow.versionToPrimaryDescriptorPath().get(version);
if (primaryDescriptorPathFromUrl.equals(primaryDescriptorPathForVersion)) {
foundFullWorkflowPaths.add(workflow.fullWorkflowPath());
}
});

if (foundFullWorkflowPaths.isEmpty()) {
//logSkippedExecution(sourceUrl, workflowMetricRecord, "Could not find workflow with primary descriptor " + primaryDescriptorPathFromUrl, skippedExecutionsCsvPrinter, true);
return Optional.empty();
} else if (foundFullWorkflowPaths.size() > 1) {
// There is already a workflow in the same repository with the same descriptor path that we're looking for.
// Skip this source_url because it is an ambiguous case and we can't identify which workflow the source url is referring to.
/*
logSkippedExecution(sourceUrl, workflowMetricRecord, String.format("There's %s workflows in the repository with the same primary descriptor path '%s': %s",
foundFullWorkflowPaths.size(), primaryDescriptorPathFromUrl, foundFullWorkflowPaths), skippedExecutionsCsvPrinter, true);
*/
return Optional.empty();
} else {
final SourceUrlTrsInfo sourceUrlTrsInfo = new SourceUrlTrsInfo(sourceUrl, "#workflow/" + foundFullWorkflowPaths.get(0), version);
return Optional.of(sourceUrlTrsInfo);
}
}

/**
* Returns a list of slash-delimited components from the source_url.
* Example: given source_url https://raw.githubusercontent.com/theiagen/public_health_viral_genomics/v2.0.0/workflows/wf_theiacov_fasta.wdl,
Expand Down

0 comments on commit 6b84efe

Please sign in to comment.