Skip to content

Commit

Permalink
Merge pull request #335 from aloubyansky/more-parallel
Browse files Browse the repository at this point in the history
Platform gen task scheduler API and a bit more of concurrent tasks
  • Loading branch information
aloubyansky authored May 17, 2024
2 parents 33ccb78 + 45fd19c commit 8c04614
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.quarkus.bom.resolver.ArtifactNotFoundException;
import io.quarkus.bom.resolver.ArtifactResolver;
import io.quarkus.bom.resolver.ArtifactResolverProvider;
import io.quarkus.bom.task.PlatformGenTaskScheduler;
import io.quarkus.bootstrap.resolver.maven.BootstrapMavenContext;
import io.quarkus.bootstrap.resolver.maven.BootstrapMavenException;
import io.quarkus.bootstrap.resolver.maven.MavenArtifactResolver;
Expand All @@ -15,9 +16,6 @@
import io.quarkus.maven.dependency.ArtifactCoords;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;
import org.apache.maven.model.DistributionManagement;
import org.apache.maven.model.Model;
import org.apache.maven.project.ProjectBuildingRequest;
Expand Down Expand Up @@ -169,38 +167,26 @@ private DecomposedBom decompose() throws BomDecomposerException {
bomBuilder.bomArtifact(bomArtifact);
//bomBuilder.bomSource(PomSource.of(resolve(bomArtifact).getFile().toPath()));
var artifacts = this.artifacts == null ? bomManagedDeps() : this.artifacts;
if (isParallelProcessing()) {
addConcurrently(bomBuilder, artifacts);
} else {

var scheduler = PlatformGenTaskScheduler.getInstance();
try {
for (Dependency dep : artifacts) {
addDependency(bomBuilder, dep);
scheduler.schedule(() -> addDependency(bomBuilder, dep));
}
scheduler.waitForCompletion();
} catch (BomDecomposerException e) {
throw e;
} catch (Exception e) {
throw new BomDecomposerException("Failed to process dependency constraints", e);
}
return transformer == null ? bomBuilder.build() : transformer.transform(bomBuilder.build());
}

private void addConcurrently(DecomposedBomBuilder bomBuilder, Collection<Dependency> deps) {
final Queue<Map.Entry<Dependency, Exception>> failed = new ConcurrentLinkedDeque<>();
var phaser = new Phaser(1);
for (var dep : deps) {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
addDependency(bomBuilder, dep);
} catch (Exception e) {
failed.add(Map.entry(dep, e));
} finally {
phaser.arriveAndDeregister();
}
});
}
phaser.arriveAndAwaitAdvance();
if (!failed.isEmpty()) {
for (var d : failed) {
logger.error("Failed to process dependency " + d.getKey(), d.getValue());
if (scheduler.hasErrors()) {
for (var e : scheduler.getErrors()) {
logger.error("Failed to process dependency constraint", e);
}
throw new RuntimeException("Failed to process dependencies reported above");
throw new BomDecomposerException("Failed to process dependency constraints reported above");
}

return transformer == null ? bomBuilder.build() : transformer.transform(bomBuilder.build());
}

private void addDependency(DecomposedBomBuilder bomBuilder, Dependency dep) throws BomDecomposerException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.bom.task;

import java.util.Collection;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;

public class ConcurrentTaskScheduler implements PlatformGenTaskScheduler {

private final Phaser phaser = new Phaser(1);
private final Deque<Exception> errors = new ConcurrentLinkedDeque<>();
private final Deque<PlatformGenTask> finalizingTasks = new ConcurrentLinkedDeque<>();

@Override
public void schedule(PlatformGenTask task) {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
task.run();
} catch (Exception e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
});
}

@Override
public void addFinializingTask(PlatformGenTask task) {
finalizingTasks.add(task);
}

@Override
public void waitForCompletion() throws Exception {
phaser.arriveAndAwaitAdvance();
for (var t : finalizingTasks) {
t.run();
}
}

@Override
public boolean hasErrors() {
return !errors.isEmpty();
}

@Override
public Collection<Exception> getErrors() {
return errors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.quarkus.bom.task;

public interface PlatformGenTask {
void run() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.bom.task;

import io.quarkus.bom.decomposer.BomDecomposer;
import java.util.Collection;

public interface PlatformGenTaskScheduler {

static PlatformGenTaskScheduler getInstance() {
return BomDecomposer.isParallelProcessing() ? new ConcurrentTaskScheduler() : new SequentialTaskScheduler();
}

void schedule(PlatformGenTask task) throws Exception;

void addFinializingTask(PlatformGenTask task) throws Exception;

void waitForCompletion() throws Exception;

boolean hasErrors();

Collection<Exception> getErrors();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.quarkus.bom.task;

import java.util.Collection;
import java.util.List;

public class SequentialTaskScheduler implements PlatformGenTaskScheduler {

@Override
public void schedule(PlatformGenTask task) throws Exception {
task.run();
}

@Override
public void addFinializingTask(PlatformGenTask task) throws Exception {
task.run();
}

@Override
public void waitForCompletion() {
}

@Override
public boolean hasErrors() {
return false;
}

@Override
public Collection<Exception> getErrors() {
return List.of();
}
}
Loading

0 comments on commit 8c04614

Please sign in to comment.