diff --git a/.gitignore b/.gitignore index be3bf956..e35e1023 100755 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ /*.iml /dependency-reduced-pom.xml /Dockerfile +.classpath +.project +.settings diff --git a/pom.xml b/pom.xml index 21b6b073..25f9074a 100755 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <groupId>org.hobbit</groupId> <artifactId>hobbit-java-sdk</artifactId> - <version>1.1.18</version> + <version>1.1.19-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -69,7 +69,7 @@ <dependency> <groupId>org.hobbit</groupId> <artifactId>platform-controller</artifactId> - <version>2.0.5</version> + <version>2.0.15</version> <scope>test</scope> </dependency> @@ -101,7 +101,7 @@ <dependency> <groupId>com.spotify</groupId> <artifactId>docker-client</artifactId> - <version>8.11.7</version> + <version>8.16.0</version> </dependency> diff --git a/src/main/java/org/hobbit/sdk/utils/CommandQueueListener.java b/src/main/java/org/hobbit/sdk/utils/CommandQueueListener.java index f7ad2141..d429599f 100755 --- a/src/main/java/org/hobbit/sdk/utils/CommandQueueListener.java +++ b/src/main/java/org/hobbit/sdk/utils/CommandQueueListener.java @@ -1,24 +1,14 @@ package org.hobbit.sdk.utils; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.QueueingConsumer; -import org.hobbit.core.Commands; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +import org.hobbit.core.Constants; import org.hobbit.core.components.AbstractPlatformConnectorComponent; -import org.hobbit.core.data.StartCommandData; -import org.hobbit.core.rabbit.RabbitMQUtils; import org.hobbit.sdk.utils.commandreactions.CommandReaction; -import org.hobbit.core.Constants; -import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; - /** * @author Roman Katerinenko */ @@ -29,9 +19,11 @@ public class CommandQueueListener extends AbstractPlatformConnectorComponent { private final Semaphore terminationSemaphore = new Semaphore(0, true); private final CountDownLatch countDownLatch = new CountDownLatch(1); - private CommandReaction[] commandReactions=new CommandReaction[0]; + private CommandReaction[] commandReactions = new CommandReaction[0]; private String replyTo; - private final EnvironmentVariables environmentVariables = new EnvironmentVariables(); +// private final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + + private boolean catchedExceptions = false; @Override public void init() throws Exception { @@ -50,27 +42,30 @@ public void run() throws Exception { logger.debug("Terminated"); } - public String submit(String imageName, String[] envVariables){ + public String submit(String imageName, String[] envVariables) { return super.createContainer(imageName, envVariables); } - public String createContainer(String imageName, String[] envVariables){ + public String createContainer(String imageName, String[] envVariables) { return super.createContainer(imageName, envVariables); } - public String createContainer(String imageName, String containerType, String[] envVariables){ + public String createContainer(String imageName, String containerType, String[] envVariables) { String ret = super.createContainer(imageName, containerType, envVariables); return ret; } - // @Override protected void handleCmd(byte[] bytes, String replyTo) { this.replyTo = replyTo; - super.handleCmd(bytes, replyTo); + try { + super.handleCmd(bytes, replyTo); + } catch (Throwable e) { + logger.error("Catched exception while handling command.", e); + catchedExceptions = true; + } } - public void waitForInitialisation() throws InterruptedException { countDownLatch.await(); } @@ -94,10 +89,15 @@ public void receiveCommand(byte command, byte[] data) { for (CommandReaction commandReaction : commandReactions) { try { commandReaction.handleCmd(command, data, replyTo); - } catch (Exception e) { - logger.error("Failed to handle command with {}",commandReaction.getClass().getSimpleName(), e); + } catch (Throwable e) { + logger.error("Failed to handle command with {}", commandReaction.getClass().getSimpleName(), e); + catchedExceptions = true; } } } + public boolean anyExceptions() { + return catchedExceptions; + } + } \ No newline at end of file diff --git a/src/main/java/org/hobbit/sdk/utils/commandreactions/BenchmarkSignalsReaction.java b/src/main/java/org/hobbit/sdk/utils/commandreactions/BenchmarkSignalsReaction.java index cbead10c..051e4b98 100755 --- a/src/main/java/org/hobbit/sdk/utils/commandreactions/BenchmarkSignalsReaction.java +++ b/src/main/java/org/hobbit/sdk/utils/commandreactions/BenchmarkSignalsReaction.java @@ -1,6 +1,8 @@ package org.hobbit.sdk.utils.commandreactions; import com.google.gson.Gson; + +import org.apache.jena.rdf.model.ModelFactory; import org.hobbit.core.Commands; import org.hobbit.core.components.Component; import org.hobbit.core.rabbit.RabbitMQUtils; @@ -15,7 +17,6 @@ import java.util.List; import java.util.Map; - public class BenchmarkSignalsReaction implements CommandReaction { private static final Logger logger = LoggerFactory.getLogger(BenchmarkSignalsReaction.class); @@ -51,23 +52,23 @@ public class BenchmarkSignalsReaction implements CommandReaction { private boolean startBenchmarkCommandSent = false; private Map<String, Component> customContainers = new HashMap<>(); private Map<String, Integer> customContainersRunning = new HashMap<>(); - //private String systemContainerId = null; + // private String systemContainerId = null; - public BenchmarkSignalsReaction(CommandReactionsBuilder builder){ + public BenchmarkSignalsReaction(CommandReactionsBuilder builder) { this.componentsExecutor = builder.componentsExecutor; this.commandQueueListener = builder.commandQueueListener; - this.benchmarkController=builder.benchmarkController; - this.dataGenerator=builder.dataGenerator; - this.taskGenerator=builder.taskGenerator; - this.evalStorage=builder.evalStorage; - this.evalModule=builder.evalModule; - this.systemAdapter=builder.systemAdapter; + this.benchmarkController = builder.benchmarkController; + this.dataGenerator = builder.dataGenerator; + this.taskGenerator = builder.taskGenerator; + this.evalStorage = builder.evalStorage; + this.evalModule = builder.evalModule; + this.systemAdapter = builder.systemAdapter; this.benchmarkControllerImageName = builder.benchmarkControllerImageName; this.dataGeneratorImageName = builder.dataGeneratorImageName; this.taskGeneratorImageName = builder.taskGeneratorImageName; - this.evalStorageImageName = builder.evalStorageImageName ; + this.evalStorageImageName = builder.evalStorageImageName; this.evalModuleImageName = builder.evalModuleImageName; this.systemAdapterImageName = builder.systemAdapterImageName; this.customContainers = builder.customContainers; @@ -76,18 +77,19 @@ public BenchmarkSignalsReaction(CommandReactionsBuilder builder){ @Override public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Exception { - if (command == Commands.BENCHMARK_FINISHED_SIGNAL){ + if (command == Commands.BENCHMARK_FINISHED_SIGNAL) { logger.debug("BENCHMARK_FINISHED_SIGNAL received"); try { assert bytes != null; assert bytes.length != 0; componentsExecutor.resultModel = RabbitMQUtils.readModel(bytes); - + } catch (Throwable e) { + // The exception will be handled somewhere else. However, we should make sure + // that there is some model set + componentsExecutor.resultModel = ModelFactory.createDefaultModel(); + } finally { commandQueueListener.terminate(); componentsExecutor.shutdown(); - } catch (InterruptedException e) { - System.out.println(e.getMessage()); - //Assert.fail(e.getMessage()); } } @@ -114,37 +116,38 @@ public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Excepti if (command == Commands.SYSTEM_READY_SIGNAL) { systemReady = true; logger.debug("SYSTEM_READY_SIGNAL signal received"); - if(!System.getenv().containsKey("SYSTEM_CONTAINER_ID")) - throw new Exception("SYSTEM_CONTAINER_ID is not specified as env variable. Specify it where you submit system/create system container in checkHealth"); + if (!System.getenv().containsKey("SYSTEM_CONTAINER_ID")) + throw new Exception( + "SYSTEM_CONTAINER_ID is not specified as env variable. Specify it where you submit system/create system container in checkHealth"); } - synchronized (this){ + synchronized (this) { List<String> waitForComponents = new ArrayList<>(); - if(!benchmarkReady) + if (!benchmarkReady) waitForComponents.add("benchmarkController"); - if(dataGenerator!=null && !dataGenReady) + if (dataGenerator != null && !dataGenReady) waitForComponents.add("dataGenerator"); - if(taskGenerator!=null && !taskGenReady) + if (taskGenerator != null && !taskGenReady) waitForComponents.add("taskGenerator"); - if(evalStorage!=null && !evalStorageReady) + if (evalStorage != null && !evalStorageReady) waitForComponents.add("evalStorage"); - if(!systemReady) + if (!systemReady) waitForComponents.add("systemAdapter"); - if(waitForComponents.size()>0) + if (waitForComponents.size() > 0) logger.debug("Waiting ready signals for {}", String.join(", ", waitForComponents)); - else if (!startBenchmarkCommandSent){ + else if (!startBenchmarkCommandSent) { logger.debug("sending START_BENCHMARK_SIGNAL"); try { - new CommandSender(Commands.START_BENCHMARK_SIGNAL, System.getenv().get("SYSTEM_CONTAINER_ID")).send(); + new CommandSender(Commands.START_BENCHMARK_SIGNAL, System.getenv().get("SYSTEM_CONTAINER_ID")) + .send(); startBenchmarkCommandSent = true; } catch (Exception e) { logger.error("Failed to send START_BENCHMARK_SIGNAL: {}", e.getMessage()); - //Assert.fail(e.getMessage()); + // Assert.fail(e.getMessage()); } } } } - } diff --git a/src/main/java/org/hobbit/sdk/utils/commandreactions/CommandReactionsBuilder.java b/src/main/java/org/hobbit/sdk/utils/commandreactions/CommandReactionsBuilder.java index 88b35168..2fa33489 100755 --- a/src/main/java/org/hobbit/sdk/utils/commandreactions/CommandReactionsBuilder.java +++ b/src/main/java/org/hobbit/sdk/utils/commandreactions/CommandReactionsBuilder.java @@ -90,10 +90,11 @@ public CommandReactionsBuilder systemAdapterImageName(String value){ } public CommandReactionsBuilder customContainerImage(Component component, String imageName){ - String[] splitted = imageName.split("/"); - String cleanedImageName=splitted[splitted.length-1].split(":")[0]; - - customContainers.put(cleanedImageName, component); +// String[] splitted = imageName.split("/"); +// String cleanedImageName=splitted[splitted.length-1].split(":")[0]; +// +// customContainers.put(cleanedImageName, component); + customContainers.put(imageName, component); return this; } diff --git a/src/main/java/org/hobbit/sdk/utils/commandreactions/ContainerCommandsReaction.java b/src/main/java/org/hobbit/sdk/utils/commandreactions/ContainerCommandsReaction.java index a4bf17f0..7927be66 100755 --- a/src/main/java/org/hobbit/sdk/utils/commandreactions/ContainerCommandsReaction.java +++ b/src/main/java/org/hobbit/sdk/utils/commandreactions/ContainerCommandsReaction.java @@ -129,8 +129,8 @@ public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Excepti compToSubmit = systemAdapter; containerId = cleanedImageName+"_"+systemContainersCount; systemContainersCount++; - }else if(customContainers.containsKey(cleanedImageName)){ - compToSubmit = customContainers.get(cleanedImageName); + }else if(customContainers.containsKey(startCommandData.image)){ + compToSubmit = customContainers.get(startCommandData.image); int runningCustomContainersCount = (customContainersRunning.containsKey(cleanedImageName)? customContainersRunning.get(cleanedImageName)+1 :0); containerId = cleanedImageName+"_"+runningCustomContainersCount; customContainersRunning.put(cleanedImageName, runningCustomContainersCount); diff --git a/src/test/java/org/hobbit/sdk/DummyBenchmarkTestRunner.java b/src/test/java/org/hobbit/sdk/DummyBenchmarkTestRunner.java index 3343269e..03f91391 100644 --- a/src/test/java/org/hobbit/sdk/DummyBenchmarkTestRunner.java +++ b/src/test/java/org/hobbit/sdk/DummyBenchmarkTestRunner.java @@ -175,6 +175,7 @@ private void checkHealth(Boolean dockerize) throws Exception { rabbitMqDockerizer.stop(); Assert.assertFalse(componentsExecutor.anyExceptions()); + Assert.assertFalse(commandQueueListener.anyExceptions()); } public static Model createBenchmarkParameters() throws IOException {