Skip to content

Commit

Permalink
Use AtomicInteger for metricsaggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
kathy-t committed Jan 11, 2024
1 parent 3580491 commit e10f668
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand All @@ -48,9 +49,9 @@ public class MetricsAggregatorS3Client {

private static final Logger LOG = LoggerFactory.getLogger(MetricsAggregatorS3Client.class);
private static final Gson GSON = new Gson();
private int numberOfDirectoriesProcessed = 0;
private int numberOfMetricsSubmitted = 0;
private int numberOfMetricsSkipped = 0;
private final AtomicInteger numberOfDirectoriesProcessed = new AtomicInteger(0);
private final AtomicInteger numberOfMetricsSubmitted = new AtomicInteger(0);
private final AtomicInteger numberOfMetricsSkipped = new AtomicInteger(0);

private final String bucketName;

Expand Down Expand Up @@ -82,7 +83,7 @@ public void aggregateMetrics(ExtendedGa4GhApi extendedGa4GhApi) {
metricsDirectories.stream()
.parallel()
.forEach(directoryInfo -> aggregateMetricsForDirectory(directoryInfo, extendedGa4GhApi));
LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} metrics, and skipped {} metrics", numberOfDirectoriesProcessed, numberOfMetricsSubmitted, numberOfMetricsSkipped);
LOG.info("Completed aggregating metrics. Processed {} directories, submitted {} platform metrics, and skipped {} platform metrics", numberOfDirectoriesProcessed, numberOfMetricsSubmitted, numberOfMetricsSkipped);
}

private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, ExtendedGa4GhApi extendedGa4GhApi) {
Expand All @@ -101,18 +102,20 @@ private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, Extende
allSubmissions = getExecutions(toolId, versionName, platform);
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not get all executions from directory {}", versionS3KeyPrefix, e);
++numberOfMetricsSkipped;
numberOfMetricsSkipped.incrementAndGet();
continue; // Continue aggregating metrics for other directories
}

try {
getAggregatedMetrics(allSubmissions).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, platform, toolId, versionName);
System.out.printf("Aggregated metrics for tool ID %s, version %s, platform %s from directory %s%n", toolId, versionName, platform, versionS3KeyPrefix);
LOG.info("Aggregated metrics for tool ID {}, version {}, platform {} from directory {}", toolId, versionName, platform, versionS3KeyPrefix);
allMetrics.add(metrics);
numberOfMetricsSubmitted.incrementAndGet();
});
} catch (Exception e) {
LOG.error("Error aggregating metrics: Could not put all executions from directory {}", versionS3KeyPrefix, e);
numberOfMetricsSkipped.incrementAndGet();
// Continue aggregating metrics for other platforms
}
}
Expand All @@ -122,19 +125,20 @@ private void aggregateMetricsForDirectory(S3DirectoryInfo directoryInfo, Extende
try {
getAggregatedMetrics(new ExecutionsRequestBody().aggregatedExecutions(allMetrics)).ifPresent(metrics -> {
extendedGa4GhApi.aggregatedMetricsPut(metrics, Partner.ALL.name(), toolId, versionName);
System.out.printf("Aggregated metrics across all platforms (%s) for tool ID %s, version %s from directory %s%n",
LOG.info("Aggregated metrics across all platforms ({}) for tool ID {}, version {} from directory {}",
platformsString, toolId, versionName, versionS3KeyPrefix);
allMetrics.add(metrics);
numberOfMetricsSubmitted.incrementAndGet();
});
} catch (Exception e) {
LOG.error("Error aggregating metrics across all platforms ({}) for tool ID {}, version {} from directory {}", platformsString, toolId, versionName, versionS3KeyPrefix, e);
numberOfMetricsSkipped.incrementAndGet();
// Continue aggregating metrics for other directories
}
++numberOfMetricsSubmitted;
} else {
++numberOfMetricsSkipped;
numberOfMetricsSkipped.incrementAndGet();
}
++numberOfDirectoriesProcessed;
numberOfDirectoriesProcessed.incrementAndGet();
LOG.info("Processed {} directories", numberOfDirectoriesProcessed);
}

Expand Down

0 comments on commit e10f668

Please sign in to comment.