diff --git a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java index f6abef3b..b2c2b2fd 100644 --- a/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java +++ b/metricsaggregator/src/main/java/io/dockstore/metricsaggregator/MetricsAggregatorS3Client.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -146,30 +147,47 @@ private ExecutionsRequestBody getExecutions(String toolId, String versionName, S for (MetricsData metricsData : metricsDataList) { String fileContent = metricsDataS3Client.getMetricsDataFileContent(metricsData.toolId(), metricsData.toolVersionName(), metricsData.platform(), metricsData.fileName()); - ExecutionsRequestBody executionsFromOneSubmission = GSON.fromJson(fileContent, ExecutionsRequestBody.class); + + ExecutionsRequestBody executionsFromOneSubmission; + try { + executionsFromOneSubmission = GSON.fromJson(fileContent, ExecutionsRequestBody.class); + } catch (JsonSyntaxException e) { + LOG.error("Could not read execution(s) from S3 key {}, ignoring file", metricsData.s3Key()); + continue; + } + + // For each execution, put it in a map so that there are no executions with duplicate execution IDs. + // The latest execution put in the map is the newest one based on the principal that S3 lists objects in alphabetical order, + // which is returned in an ordered list via getMetricsData. + // Note: executions that were submitted to S3 prior to the existence of execution IDs don't have an execution ID. + // For the purposes of aggregation, generate one so that the execution is considered unique. executionsFromOneSubmission.getRunExecutions().forEach(workflowExecution -> { - executionIdToWorkflowExecutionMap.put(workflowExecution.getExecutionId(), workflowExecution); - executionIdToValidationExecutionMap.remove(workflowExecution.getExecutionId()); - executionIdToTaskExecutionsMap.remove(workflowExecution.getExecutionId()); - executionIdToAggregatedExecutionMap.remove(workflowExecution.getExecutionId()); + final String executionId = generateExecutionIdIfNull(workflowExecution.getExecutionId()); + executionIdToWorkflowExecutionMap.put(executionId, workflowExecution); + executionIdToValidationExecutionMap.remove(executionId); + executionIdToTaskExecutionsMap.remove(executionId); + executionIdToAggregatedExecutionMap.remove(executionId); }); executionsFromOneSubmission.getTaskExecutions().forEach(taskExecutions -> { - executionIdToTaskExecutionsMap.put(taskExecutions.getExecutionId(), taskExecutions); - executionIdToWorkflowExecutionMap.remove(taskExecutions.getExecutionId()); - executionIdToValidationExecutionMap.remove(taskExecutions.getExecutionId()); - executionIdToAggregatedExecutionMap.remove(taskExecutions.getExecutionId()); + final String executionId = generateExecutionIdIfNull(taskExecutions.getExecutionId()); + executionIdToTaskExecutionsMap.put(executionId, taskExecutions); + executionIdToWorkflowExecutionMap.remove(executionId); + executionIdToValidationExecutionMap.remove(executionId); + executionIdToAggregatedExecutionMap.remove(executionId); }); executionsFromOneSubmission.getValidationExecutions().forEach(validationExecution -> { - executionIdToValidationExecutionMap.put(validationExecution.getExecutionId(), validationExecution); - executionIdToWorkflowExecutionMap.remove(validationExecution.getExecutionId()); - executionIdToTaskExecutionsMap.remove(validationExecution.getExecutionId()); - executionIdToAggregatedExecutionMap.remove(validationExecution.getExecutionId()); + final String executionId = generateExecutionIdIfNull(validationExecution.getExecutionId()); + executionIdToValidationExecutionMap.put(executionId, validationExecution); + executionIdToWorkflowExecutionMap.remove(executionId); + executionIdToTaskExecutionsMap.remove(executionId); + executionIdToAggregatedExecutionMap.remove(executionId); }); executionsFromOneSubmission.getAggregatedExecutions().forEach(aggregatedExecution -> { - executionIdToAggregatedExecutionMap.put(aggregatedExecution.getExecutionId(), aggregatedExecution); - executionIdToWorkflowExecutionMap.remove(aggregatedExecution.getExecutionId()); - executionIdToTaskExecutionsMap.remove(aggregatedExecution.getExecutionId()); - executionIdToValidationExecutionMap.remove(aggregatedExecution.getExecutionId()); + final String executionId = generateExecutionIdIfNull(aggregatedExecution.getExecutionId()); + executionIdToAggregatedExecutionMap.put(executionId, aggregatedExecution); + executionIdToWorkflowExecutionMap.remove(executionId); + executionIdToTaskExecutionsMap.remove(executionId); + executionIdToValidationExecutionMap.remove(executionId); }); } @@ -180,6 +198,20 @@ private ExecutionsRequestBody getExecutions(String toolId, String versionName, S .aggregatedExecutions(executionIdToAggregatedExecutionMap.values().stream().toList()); } + /** + * If the execution ID is null, generate a random one for the purposes of aggregation. + * Executions that were submitted to S3 prior to the existence of execution IDs don't have an execution ID, + * thus for the purposes of aggregation, generate one. + * @param executionId + * @return + */ + private String generateExecutionIdIfNull(String executionId) { + if (executionId == null) { + return UUID.randomUUID().toString(); + } + return executionId; + } + /** * Returns a unique list of directories containing metrics files. * For example, suppose the local-dockstore-metrics-data bucket looks like the following.