From 6b84efeb1b512baf79d0b8880d07696118c679c0 Mon Sep 17 00:00:00 2001 From: Kathy Tran Date: Sun, 7 Jan 2024 20:19:02 -0500 Subject: [PATCH] Group by source url - much quicker! --- .../client/cli/TerraMetricsSubmitter.java | 161 +++++++++++++++++- 1 file changed, 156 insertions(+), 5 deletions(-) diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java index dbc43bb4..d2062371 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/client/cli/TerraMetricsSubmitter.java @@ -2,6 +2,7 @@ import static io.dockstore.utils.CLIConstants.FAILURE_EXIT_CODE; import static io.dockstore.utils.DockstoreApiClientUtils.setupApiClient; +import static java.util.stream.Collectors.groupingBy; import io.dockstore.common.Partner; import io.dockstore.metricsaggregator.MetricsAggregatorConfig; @@ -105,11 +106,28 @@ public void submitTerraMetrics() { workflowMetricsToProcess.add(workflowMetricRecord); } else { workflowMetricsToProcess.add(workflowMetricRecord); + // Collect a map of CSV records with the same source URL + Map> sourceUrlToCsvRecords = workflowMetricsToProcess.stream() + .collect(groupingBy(csvRecord -> csvRecord.get(TerraMetricsCsvHeaders.source_url))); + /* + for (CSVRecord workflowMetricToProcess: workflowMetricsToProcess) { + final String csvRecordSourceUrl = workflowMetricToProcess.get(TerraMetricsCsvHeaders.source_url); + List csvRecordsWithSameSourceUrl = sourceUrlToCsvRecords.getOrDefault(csvRecordSourceUrl, new ArrayList<>()); + csvRecordsWithSameSourceUrl.add(workflowMetricToProcess); + sourceUrlToCsvRecords.put(csvRecordSourceUrl, csvRecordsWithSameSourceUrl); + } + */ + LOG.info("Processing rows {} to {}", workflowMetricsToProcess.get(0).getRecordNumber(), workflowMetricsToProcess.get(workflowMetricsToProcess.size() - 1).getRecordNumber()); - workflowMetricsToProcess.stream() + sourceUrlToCsvRecords.entrySet().stream() .parallel() - .forEach(record -> processWorkflowExecution(record, workflowsApi, extendedGa4GhApi, skippedExecutionsCsvPrinter)); + .forEach(entry -> { + final String sourceUrl = entry.getKey(); + final List csvRecordsWithSameSourceUrl = entry.getValue(); + submitWorkflowExecutions(sourceUrl, csvRecordsWithSameSourceUrl, workflowsApi, extendedGa4GhApi, skippedExecutionsCsvPrinter); + }); + workflowMetricsToProcess.clear(); logStats(); } @@ -157,12 +175,15 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA } if (!sourceUrlToSourceUrlTrsInfo.containsKey(sourceUrl)) { + LOG.info("Calculating TRS info for source_url {}", sourceUrl); Optional sourceUrlTrsInfo = calculateTrsInfoFromSourceUrl(workflowMetricRecord, sourceUrl, workflowsApi, skippedExecutionsCsvPrinter); if (sourceUrlTrsInfo.isEmpty()) { return; } else { sourceUrlToSourceUrlTrsInfo.put(sourceUrl, sourceUrlTrsInfo.get()); } + } else { + LOG.info("TRS info for source_url {} was previously calculated", sourceUrl); } // Check if the information from the workflow execution is valid @@ -182,6 +203,54 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA } } + private void submitWorkflowExecutions(String sourceUrl, List workflowMetricRecords, WorkflowsApi workflowsApi, ExtendedGa4GhApi extendedGa4GhApi, CSVPrinter skippedExecutionsCsvPrinter) { + LOG.info("Processing source_url {} for {} executions", sourceUrl, workflowMetricRecords.size()); + numberOfExecutionsProcessed.addAndGet(workflowMetricRecords.size()); + + if (StringUtils.isBlank(sourceUrl)) { + workflowMetricRecords.forEach(workflowMetricRecord -> { + logSkippedExecution("", workflowMetricRecord, "Can't determine TRS ID because source_url is missing", skippedExecutionsCsvPrinter, false); + }); + } + + // Check to see if this source_url was skipped before + if (skippedSourceUrlsToReason.containsKey(sourceUrl)) { + workflowMetricRecords.forEach(workflowMetricRecord -> { + logSkippedExecution(sourceUrl, workflowMetricRecord, skippedSourceUrlsToReason.get(sourceUrl), skippedExecutionsCsvPrinter, true); + }); + return; + } + + if (!sourceUrlToSourceUrlTrsInfo.containsKey(sourceUrl)) { + Optional sourceUrlTrsInfo = calculateTrsInfoFromSourceUrl(sourceUrl, workflowsApi); + if (sourceUrlTrsInfo.isEmpty()) { + workflowMetricRecords.forEach(workflowMetricRecord -> { + logSkippedExecution(sourceUrl, workflowMetricRecord, "Could not calculate TRS info", skippedExecutionsCsvPrinter, true); + }); + return; + } else { + sourceUrlToSourceUrlTrsInfo.put(sourceUrl, sourceUrlTrsInfo.get()); + } + } + + final SourceUrlTrsInfo sourceUrlTrsInfo = sourceUrlToSourceUrlTrsInfo.get(sourceUrl); + final List workflowExecutionsToSubmit = workflowMetricRecords.stream() + .map(workflowExecution -> getTerraWorkflowExecutionFromCsvRecord(workflowExecution, sourceUrlTrsInfo.sourceUrl(), skippedExecutionsCsvPrinter)) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); + final ExecutionsRequestBody executionsRequestBody = new ExecutionsRequestBody().runExecutions(workflowExecutionsToSubmit); + try { + extendedGa4GhApi.executionMetricsPost(executionsRequestBody, Partner.TERRA.toString(), sourceUrlTrsInfo.trsId(), sourceUrlTrsInfo.version(), + "Terra metrics from Q4 2023"); + numberOfExecutionsSubmitted.addAndGet(workflowMetricRecords.size()); + } catch (ApiException e) { + workflowMetricRecords.forEach(workflowMetricRecord -> { + logSkippedExecution(sourceUrlTrsInfo.sourceUrl(), workflowMetricRecord, String.format("Could not submit execution metrics to Dockstore for workflow %s: %s", sourceUrlTrsInfo, e.getMessage()), skippedExecutionsCsvPrinter, false); + }); + } + } + /** * Performs logging and writing of the skipped execution to an output file. * If skipFutureExecutionsWithSourceUrl is true, also adds the source_url of the skipped execution to the sourceUrlsToSkipToReason map @@ -193,8 +262,6 @@ private void processWorkflowExecution(CSVRecord workflowMetricRecord, WorkflowsA * @param skipFutureExecutionsWithSourceUrl boolean indicating if all executions with the same source_url should be skipped */ private void logSkippedExecution(String sourceUrl, CSVRecord csvRecordToSkip, String reason, CSVPrinter skippedExecutionsCsvPrinter, boolean skipFutureExecutionsWithSourceUrl) { - LOG.info("Skipping execution on row {}: {}", csvRecordToSkip.getRecordNumber(), reason); - // Record to map for future reference. Only want to do this if the skip reason applies for ALL executions with the source_url. // Should not add to this map if the skip reason is specific to one execution if (skipFutureExecutionsWithSourceUrl) { @@ -262,7 +329,7 @@ public Optional getTerraWorkflowExecutionFromCsvRecord(CSVRecord c RunExecution workflowExecution = new RunExecution(); // TODO: uncomment below when the update executions endpoint PR is merged - // workflowExecution.setExecutionId(executionId); + workflowExecution.setExecutionId(executionId); workflowExecution.setExecutionStatus(executionStatus.get()); workflowExecution.setDateExecuted(dateExecuted.get()); getExecutionTime(workflowRunTimeMinutes).ifPresent(workflowExecution::setExecutionTime); @@ -379,6 +446,90 @@ private Optional calculateTrsInfoFromSourceUrl(CSVRecord workf } } + /** + * Calculates the TRS ID from the source_url by: + *
    + *
  1. Looking at all published workflows that have the same workflow path prefix, i.e. they belong to the same GitHub repository.
  2. + *
  3. For each published workflow, getting the primary descriptor path for the version specified in source_url and checking if it matches the primary desciptor path in the source_url
  4. + *
  5. Ensuring that there is only one workflow in the repository with the same descriptor path. If there are multiple, it is an ambiguous case and we skip the execution
  6. + *
+ * + * @param sourceUrl + * @param workflowsApi + * @return + */ + private Optional calculateTrsInfoFromSourceUrl(String sourceUrl, WorkflowsApi workflowsApi) { + // Need to figure out the TRS ID and version name using the source_url. + // Example source_url: https://raw.githubusercontent.com/theiagen/public_health_viral_genomics/v2.0.0/workflows/wf_theiacov_fasta.wdl + // Organization = "theiagen/public_health_viral_genomics", version = "v2.0.0", the rest is the primary descriptor path + // Note that the TRS ID may also have a workflow name, which we need to figure out + final List sourceUrlComponents = getSourceUrlComponents(sourceUrl); + + + final int minNumberOfComponents = 3; + if (sourceUrlComponents.size() < minNumberOfComponents) { + //(sourceUrl, workflowMetricRecord, "Not enough components in the source_url to figure out the TRS ID and version", skippedExecutionsCsvPrinter, true); + return Optional.empty(); + } + + // There should be at least three elements in order for there to be an organization name, foo>/, and version + // in /// + final String organization = sourceUrlComponents.get(0) + "/" + sourceUrlComponents.get(1); + final String version = sourceUrlComponents.get(2); + final String primaryDescriptorPathFromUrl = "/" + String.join("/", sourceUrlComponents.subList(3, sourceUrlComponents.size())); + + final String workflowPathPrefix = "github.com/" + organization; + if (!workflowPathPrefixToWorkflows.containsKey(workflowPathPrefix)) { + try { + List publishedWorkflowsWithSamePathPrefix = workflowsApi.getAllPublishedWorkflowByPath( + workflowPathPrefix).stream() + .map(workflow -> new MinimalWorkflowInfo(workflow.getId(), workflow.getFullWorkflowPath(), workflow.getDescriptorType(), new ConcurrentHashMap<>())).toList(); + workflowPathPrefixToWorkflows.put(workflowPathPrefix, publishedWorkflowsWithSamePathPrefix); + } catch (ApiException e) { + /* + logSkippedExecution(sourceUrl, workflowMetricRecord, + "Could not get all published workflows for workflow path " + workflowPathPrefix + " to determine TRS ID", + skippedExecutionsCsvPrinter, true); + */ + return Optional.empty(); + } + } + + List workflowsFromSameRepo = workflowPathPrefixToWorkflows.get(workflowPathPrefix); + + List foundFullWorkflowPaths = new ArrayList<>(); + // Loop through each workflow to find one that matches the primary descriptor + workflowsFromSameRepo.forEach(workflow -> { + // Get the primary descriptor path for the version and update the map, either with the primary descriptor path or an empty string to indicate that it was not found + if (!workflow.versionToPrimaryDescriptorPath().containsKey(version)) { + final String primaryDescriptorAbsolutePath = makePathAbsolute(getPrimaryDescriptorAbsolutePath(workflowsApi, workflow, version).orElse("")); + workflow.versionToPrimaryDescriptorPath().put(version, primaryDescriptorAbsolutePath); + } + + // Check to see if there's a version that has the same primary descriptor path + final String primaryDescriptorPathForVersion = workflow.versionToPrimaryDescriptorPath().get(version); + if (primaryDescriptorPathFromUrl.equals(primaryDescriptorPathForVersion)) { + foundFullWorkflowPaths.add(workflow.fullWorkflowPath()); + } + }); + + if (foundFullWorkflowPaths.isEmpty()) { + //logSkippedExecution(sourceUrl, workflowMetricRecord, "Could not find workflow with primary descriptor " + primaryDescriptorPathFromUrl, skippedExecutionsCsvPrinter, true); + return Optional.empty(); + } else if (foundFullWorkflowPaths.size() > 1) { + // There is already a workflow in the same repository with the same descriptor path that we're looking for. + // Skip this source_url because it is an ambiguous case and we can't identify which workflow the source url is referring to. + /* + logSkippedExecution(sourceUrl, workflowMetricRecord, String.format("There's %s workflows in the repository with the same primary descriptor path '%s': %s", + foundFullWorkflowPaths.size(), primaryDescriptorPathFromUrl, foundFullWorkflowPaths), skippedExecutionsCsvPrinter, true); + */ + return Optional.empty(); + } else { + final SourceUrlTrsInfo sourceUrlTrsInfo = new SourceUrlTrsInfo(sourceUrl, "#workflow/" + foundFullWorkflowPaths.get(0), version); + return Optional.of(sourceUrlTrsInfo); + } + } + /** * Returns a list of slash-delimited components from the source_url. * Example: given source_url https://raw.githubusercontent.com/theiagen/public_health_viral_genomics/v2.0.0/workflows/wf_theiacov_fasta.wdl,