Skip to content

Commit

Permalink
Merge pull request #334 from aloubyansky/use-phaser-api
Browse files Browse the repository at this point in the history
Use phaser API when generating platform with parallel processing
  • Loading branch information
aloubyansky authored May 16, 2024
2 parents f5321d0 + 405418a commit 33ccb78
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;
import org.apache.maven.model.DistributionManagement;
import org.apache.maven.model.Model;
import org.apache.maven.project.ProjectBuildingRequest;
Expand Down Expand Up @@ -179,18 +180,21 @@ private DecomposedBom decompose() throws BomDecomposerException {
}

private void addConcurrently(DecomposedBomBuilder bomBuilder, Collection<Dependency> deps) {
final List<CompletableFuture<?>> tasks = new ArrayList<>(deps.size());
final Queue<Map.Entry<Dependency, Exception>> failed = new ConcurrentLinkedDeque<>();
var phaser = new Phaser(1);
for (var dep : deps) {
tasks.add(CompletableFuture.runAsync(() -> {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
addDependency(bomBuilder, dep);
} catch (Exception e) {
failed.add(Map.entry(dep, e));
} finally {
phaser.arriveAndDeregister();
}
}));
});
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
phaser.arriveAndAwaitAdvance();
if (!failed.isEmpty()) {
for (var d : failed) {
logger.error("Failed to process dependency " + d.getKey(), d.getValue());
Expand All @@ -205,7 +209,7 @@ private void addDependency(DecomposedBomBuilder bomBuilder, Dependency dep) thro
// if an artifact has a classifier we resolve the artifact itself
// if an artifact does not have a classifier we will try resolving its pom
validateArtifact(dep.getArtifact());
ScmRevision revision = resolveRevision(dep.getArtifact());
final ScmRevision revision = resolveRevision(dep.getArtifact());
bomBuilder.bomDependency(revision, dep);
} catch (ArtifactNotFoundException e) {
// there are plenty of BOMs that include artifacts that don't exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;
import java.util.stream.Collectors;
import javax.xml.transform.Result;
import javax.xml.transform.Source;
Expand Down Expand Up @@ -1798,13 +1799,13 @@ private void generateMemberIntegrationTestsModule(PlatformMemberImpl member)
}

final boolean parallelProcessing = testConfigs.size() > 1 && BomDecomposer.isParallelProcessing();
final List<CompletableFuture<?>> futures;
final Phaser phaser;
final Deque<Throwable> errors;
if (parallelProcessing) {
futures = new ArrayList<>(testConfigs.size());
phaser = new Phaser(1);
errors = new ConcurrentLinkedDeque<>();
} else {
futures = List.of();
phaser = null;
errors = null;
}

Expand All @@ -1828,20 +1829,23 @@ private void generateMemberIntegrationTestsModule(PlatformMemberImpl member)
}
pom.addModule(testModuleName);
if (parallelProcessing) {
futures.add(CompletableFuture.runAsync(() -> {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
generateIntegrationTestModule(testModuleName, testArtifact, testConfig, pom);
} catch (MojoExecutionException e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
}));
});
} else {
generateIntegrationTestModule(testModuleName, testArtifact, testConfig, pom);
}
}
}
if (!futures.isEmpty()) {
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
if (phaser != null) {
phaser.arriveAndAwaitAdvance();
if (!errors.isEmpty()) {
for (var e : errors) {
getLog().error(e);
Expand Down

0 comments on commit 33ccb78

Please sign in to comment.