Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Developed version 1.19 #13

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
/*.iml
/dependency-reduced-pom.xml
/Dockerfile
.classpath
.project
.settings
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.hobbit</groupId>
<artifactId>hobbit-java-sdk</artifactId>
<version>1.1.18</version>
<version>1.1.19-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -69,7 +69,7 @@
<dependency>
<groupId>org.hobbit</groupId>
<artifactId>platform-controller</artifactId>
<version>2.0.5</version>
<version>2.0.15</version>
<scope>test</scope>
</dependency>

Expand Down Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>com.spotify</groupId>
<artifactId>docker-client</artifactId>
<version>8.11.7</version>
<version>8.16.0</version>
</dependency>


Expand Down
48 changes: 24 additions & 24 deletions src/main/java/org/hobbit/sdk/utils/CommandQueueListener.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
package org.hobbit.sdk.utils;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.QueueingConsumer;
import org.hobbit.core.Commands;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

import org.hobbit.core.Constants;
import org.hobbit.core.components.AbstractPlatformConnectorComponent;
import org.hobbit.core.data.StartCommandData;
import org.hobbit.core.rabbit.RabbitMQUtils;
import org.hobbit.sdk.utils.commandreactions.CommandReaction;
import org.hobbit.core.Constants;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

/**
* @author Roman Katerinenko
*/
Expand All @@ -29,9 +19,11 @@ public class CommandQueueListener extends AbstractPlatformConnectorComponent {
private final Semaphore terminationSemaphore = new Semaphore(0, true);
private final CountDownLatch countDownLatch = new CountDownLatch(1);

private CommandReaction[] commandReactions=new CommandReaction[0];
private CommandReaction[] commandReactions = new CommandReaction[0];
private String replyTo;
private final EnvironmentVariables environmentVariables = new EnvironmentVariables();
// private final EnvironmentVariables environmentVariables = new EnvironmentVariables();

private boolean catchedExceptions = false;

@Override
public void init() throws Exception {
Expand All @@ -50,27 +42,30 @@ public void run() throws Exception {
logger.debug("Terminated");
}

public String submit(String imageName, String[] envVariables){
public String submit(String imageName, String[] envVariables) {
return super.createContainer(imageName, envVariables);
}

public String createContainer(String imageName, String[] envVariables){
public String createContainer(String imageName, String[] envVariables) {
return super.createContainer(imageName, envVariables);
}

public String createContainer(String imageName, String containerType, String[] envVariables){
public String createContainer(String imageName, String containerType, String[] envVariables) {
String ret = super.createContainer(imageName, containerType, envVariables);
return ret;
}


// @Override
protected void handleCmd(byte[] bytes, String replyTo) {
this.replyTo = replyTo;
super.handleCmd(bytes, replyTo);
try {
super.handleCmd(bytes, replyTo);
} catch (Throwable e) {
logger.error("Catched exception while handling command.", e);
catchedExceptions = true;
}
}


public void waitForInitialisation() throws InterruptedException {
countDownLatch.await();
}
Expand All @@ -94,10 +89,15 @@ public void receiveCommand(byte command, byte[] data) {
for (CommandReaction commandReaction : commandReactions) {
try {
commandReaction.handleCmd(command, data, replyTo);
} catch (Exception e) {
logger.error("Failed to handle command with {}",commandReaction.getClass().getSimpleName(), e);
} catch (Throwable e) {
logger.error("Failed to handle command with {}", commandReaction.getClass().getSimpleName(), e);
catchedExceptions = true;
}
}
}

public boolean anyExceptions() {
return catchedExceptions;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.hobbit.sdk.utils.commandreactions;

import com.google.gson.Gson;

import org.apache.jena.rdf.model.ModelFactory;
import org.hobbit.core.Commands;
import org.hobbit.core.components.Component;
import org.hobbit.core.rabbit.RabbitMQUtils;
Expand All @@ -15,7 +17,6 @@
import java.util.List;
import java.util.Map;


public class BenchmarkSignalsReaction implements CommandReaction {
private static final Logger logger = LoggerFactory.getLogger(BenchmarkSignalsReaction.class);

Expand Down Expand Up @@ -51,23 +52,23 @@ public class BenchmarkSignalsReaction implements CommandReaction {
private boolean startBenchmarkCommandSent = false;
private Map<String, Component> customContainers = new HashMap<>();
private Map<String, Integer> customContainersRunning = new HashMap<>();
//private String systemContainerId = null;
// private String systemContainerId = null;

public BenchmarkSignalsReaction(CommandReactionsBuilder builder){
public BenchmarkSignalsReaction(CommandReactionsBuilder builder) {
this.componentsExecutor = builder.componentsExecutor;
this.commandQueueListener = builder.commandQueueListener;

this.benchmarkController=builder.benchmarkController;
this.dataGenerator=builder.dataGenerator;
this.taskGenerator=builder.taskGenerator;
this.evalStorage=builder.evalStorage;
this.evalModule=builder.evalModule;
this.systemAdapter=builder.systemAdapter;
this.benchmarkController = builder.benchmarkController;
this.dataGenerator = builder.dataGenerator;
this.taskGenerator = builder.taskGenerator;
this.evalStorage = builder.evalStorage;
this.evalModule = builder.evalModule;
this.systemAdapter = builder.systemAdapter;

this.benchmarkControllerImageName = builder.benchmarkControllerImageName;
this.dataGeneratorImageName = builder.dataGeneratorImageName;
this.taskGeneratorImageName = builder.taskGeneratorImageName;
this.evalStorageImageName = builder.evalStorageImageName ;
this.evalStorageImageName = builder.evalStorageImageName;
this.evalModuleImageName = builder.evalModuleImageName;
this.systemAdapterImageName = builder.systemAdapterImageName;
this.customContainers = builder.customContainers;
Expand All @@ -76,18 +77,19 @@ public BenchmarkSignalsReaction(CommandReactionsBuilder builder){
@Override
public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Exception {

if (command == Commands.BENCHMARK_FINISHED_SIGNAL){
if (command == Commands.BENCHMARK_FINISHED_SIGNAL) {
logger.debug("BENCHMARK_FINISHED_SIGNAL received");
try {
assert bytes != null;
assert bytes.length != 0;
componentsExecutor.resultModel = RabbitMQUtils.readModel(bytes);

} catch (Throwable e) {
// The exception will be handled somewhere else. However, we should make sure
// that there is some model set
componentsExecutor.resultModel = ModelFactory.createDefaultModel();
} finally {
commandQueueListener.terminate();
componentsExecutor.shutdown();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
//Assert.fail(e.getMessage());
}
}

Expand All @@ -114,37 +116,38 @@ public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Excepti
if (command == Commands.SYSTEM_READY_SIGNAL) {
systemReady = true;
logger.debug("SYSTEM_READY_SIGNAL signal received");
if(!System.getenv().containsKey("SYSTEM_CONTAINER_ID"))
throw new Exception("SYSTEM_CONTAINER_ID is not specified as env variable. Specify it where you submit system/create system container in checkHealth");
if (!System.getenv().containsKey("SYSTEM_CONTAINER_ID"))
throw new Exception(
"SYSTEM_CONTAINER_ID is not specified as env variable. Specify it where you submit system/create system container in checkHealth");
}

synchronized (this){
synchronized (this) {
List<String> waitForComponents = new ArrayList<>();
if(!benchmarkReady)
if (!benchmarkReady)
waitForComponents.add("benchmarkController");
if(dataGenerator!=null && !dataGenReady)
if (dataGenerator != null && !dataGenReady)
waitForComponents.add("dataGenerator");
if(taskGenerator!=null && !taskGenReady)
if (taskGenerator != null && !taskGenReady)
waitForComponents.add("taskGenerator");
if(evalStorage!=null && !evalStorageReady)
if (evalStorage != null && !evalStorageReady)
waitForComponents.add("evalStorage");
if(!systemReady)
if (!systemReady)
waitForComponents.add("systemAdapter");
if(waitForComponents.size()>0)
if (waitForComponents.size() > 0)
logger.debug("Waiting ready signals for {}", String.join(", ", waitForComponents));
else if (!startBenchmarkCommandSent){
else if (!startBenchmarkCommandSent) {
logger.debug("sending START_BENCHMARK_SIGNAL");
try {
new CommandSender(Commands.START_BENCHMARK_SIGNAL, System.getenv().get("SYSTEM_CONTAINER_ID")).send();
new CommandSender(Commands.START_BENCHMARK_SIGNAL, System.getenv().get("SYSTEM_CONTAINER_ID"))
.send();
startBenchmarkCommandSent = true;
} catch (Exception e) {
logger.error("Failed to send START_BENCHMARK_SIGNAL: {}", e.getMessage());
//Assert.fail(e.getMessage());
// Assert.fail(e.getMessage());
}
}
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ public CommandReactionsBuilder systemAdapterImageName(String value){
}

public CommandReactionsBuilder customContainerImage(Component component, String imageName){
String[] splitted = imageName.split("/");
String cleanedImageName=splitted[splitted.length-1].split(":")[0];

customContainers.put(cleanedImageName, component);
// String[] splitted = imageName.split("/");
// String cleanedImageName=splitted[splitted.length-1].split(":")[0];
//
// customContainers.put(cleanedImageName, component);
customContainers.put(imageName, component);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ public void handleCmd(Byte command, byte[] bytes, String replyTo) throws Excepti
compToSubmit = systemAdapter;
containerId = cleanedImageName+"_"+systemContainersCount;
systemContainersCount++;
}else if(customContainers.containsKey(cleanedImageName)){
compToSubmit = customContainers.get(cleanedImageName);
}else if(customContainers.containsKey(startCommandData.image)){
compToSubmit = customContainers.get(startCommandData.image);
int runningCustomContainersCount = (customContainersRunning.containsKey(cleanedImageName)? customContainersRunning.get(cleanedImageName)+1 :0);
containerId = cleanedImageName+"_"+runningCustomContainersCount;
customContainersRunning.put(cleanedImageName, runningCustomContainersCount);
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/hobbit/sdk/DummyBenchmarkTestRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private void checkHealth(Boolean dockerize) throws Exception {
rabbitMqDockerizer.stop();

Assert.assertFalse(componentsExecutor.anyExceptions());
Assert.assertFalse(commandQueueListener.anyExceptions());
}

public static Model createBenchmarkParameters() throws IOException {
Expand Down