Skip to content

Commit

Permalink
Merge pull request #182 from madgik/fix_failedContainer
Browse files Browse the repository at this point in the history
Fix failed container
  • Loading branch information
ThanKarab authored Jan 28, 2020
2 parents f8e7c65 + b7e3018 commit d9ae04a
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.jcraft.jsch.IO;
import madgik.exareme.common.consts.HBPConstants;
import madgik.exareme.master.client.AdpDBClient;
import madgik.exareme.master.client.AdpDBClientFactory;
Expand Down Expand Up @@ -139,7 +140,30 @@ private void handleInternal(HttpRequest request, HttpResponse response, HttpCont
HashMap<String, String[]> nodeDatasets = null;
List<String> nodesToBeChecked;
if (pathology != null) {
nodeDatasets = getDatasetsFromConsul(pathology);
try {
nodeDatasets = getDatasetsFromConsul(pathology);
}
catch (PathologyException e) {
log.error(e.getMessage());
String data = e.getMessage();
String type = user_error; //type could be error, user_error, warning regarding the error occured along the process
String result = defaultOutputFormat(data, type);
BasicHttpEntity entity = new BasicHttpEntity();
entity.setContent(new ByteArrayInputStream(result.getBytes()));
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
response.setEntity(entity);
return;
}
catch (Exception e){
String data = "An error has occurred.Please inform your system admin.";
String type = error; //type could be error, user_error, warning regarding the error occured along the process
String result = defaultOutputFormat(data, type);
BasicHttpEntity entity = new BasicHttpEntity();
entity.setContent(new ByteArrayInputStream(result.getBytes()));
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
response.setEntity(entity);
return;
}
if (userDatasets == null)
nodesToBeChecked = allNodesIPs(); //LIST_VARIABLES Algorithm
else
Expand Down Expand Up @@ -245,7 +269,7 @@ private void handleInternal(HttpRequest request, HttpResponse response, HttpCont
entity.setContent(new ByteArrayInputStream(result.getBytes()));
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
response.setEntity(entity);
} catch (PathologyException | DatasetsException e) {
} catch (PathologyException | DatasetsException | IOException e) {
log.error(e.getMessage());
String data = e.getMessage();
String type = user_error; //type could be error, user_error, warning regarding the error occured along the process
Expand All @@ -254,7 +278,8 @@ private void handleInternal(HttpRequest request, HttpResponse response, HttpCont
entity.setContent(new ByteArrayInputStream(result.getBytes()));
response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
response.setEntity(entity);
} catch (Exception e) {
}
catch (Exception e) {
log.error(e);
String data = e.getMessage();
String type = error; //type could be error, user_error, warning regarding the error occured along the process
Expand All @@ -266,7 +291,7 @@ private void handleInternal(HttpRequest request, HttpResponse response, HttpCont
}
}

private HashMap<String, String[]> getDatasetsFromConsul(String pathology) throws IOException, PathologyException {
private HashMap<String, String[]> getDatasetsFromConsul(String pathology) throws IOException,PathologyException {
Gson gson = new Gson();
HashMap<String, String[]> nodeDatasets = new HashMap<>();
List<String> pathologyNodes = new ArrayList<String>();
Expand Down Expand Up @@ -318,7 +343,7 @@ private HashMap<String, String[]> getDatasetsFromConsul(String pathology) throws
return nodeDatasets;
}

private HashMap<String, String> getNamesOfActiveNodes() throws IOException {
private HashMap<String, String> getNamesOfActiveNodes() throws Exception {
Gson gson = new Gson();
HashMap<String, String> nodeNames = new HashMap<>();
String masterKey = searchConsul(System.getenv("EXAREME_MASTER_PATH") + "/?keys");
Expand Down Expand Up @@ -434,15 +459,16 @@ private HashMap<String, String> getAlgoParameters(HttpRequest request) throws IO
}

private boolean nodesRunning(List<String> nodesToBeChecked, String pathology) throws Exception {

Gson gson = new Gson();
//Check if IP's gotten from Consul[Key-Value store] exist in Exareme's Registry
List<String> notContainerProxy = new ArrayList<>();
ContainerProxy[] containerProxy = ArtRegistryLocator.getArtRegistryProxy().getContainers(); //get IP's from Exareme's Registry
for (String IP : nodesToBeChecked) {
log.debug("Node to be checked: "+IP);
boolean flag = false;
for (ContainerProxy containers : containerProxy) {
log.debug("Container in registry: " + containers.getEntityName().getIP());
if (containers.getEntityName().getIP().contains(IP)) { //If IP exists in Exareme's Registry
log.debug("Container in registry: " + containers.getEntityName().getIP());
flag = true;
break;
}
Expand All @@ -462,12 +488,26 @@ private boolean nodesRunning(List<String> nodesToBeChecked, String pathology) th
String name = names.get(ip);
log.info("It seems that node[" + name + "," + ip + "] you are trying to check is not part of Exareme's registry. Deleting it from Consul....");

//Delete datasets and IP of the node
deleteFromConsul(System.getenv("DATA") + "/" + name);
//Delete pathologies and IP of the node
String pathologyKey = searchConsul(System.getenv("DATA") + "/" + name + "?keys");
String[] pathologyKeyArray = gson.fromJson(pathologyKey, String[].class);
for (String p : pathologyKeyArray) {
deleteFromConsul(p); //Delete every pathology for node with name $name
}
//Delete IP of active_worker with name $name
deleteFromConsul(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/" + name);

//Get datasets exist in other nodes for showing appropriate message to user
HashMap<String, String[]> nodeDatasets = getDatasetsFromConsul(pathology);
HashMap<String, String[]> nodeDatasets;
try {
nodeDatasets = getDatasetsFromConsul(pathology);
}
catch (PathologyException e) {
throw new PathologyException(e.getMessage());
}
catch (Exception e) {
throw new Exception("An error has occurred.Please inform your system admin.");
}
for (Map.Entry<String, String[]> entry : nodeDatasets.entrySet()) {
String[] getDatasets = entry.getValue();
for (String data : getDatasets) {
Expand Down Expand Up @@ -521,39 +561,35 @@ private String searchConsul(String query) throws IOException {
if (!consulURL.startsWith("http://")) {
consulURL = "http://" + consulURL;
}
try {
HttpGet httpGet;
httpGet = new HttpGet(consulURL + "/v1/kv/" + query);
log.debug("Running: " + httpGet.getURI());
CloseableHttpResponse response = null;
if (httpGet.toString().contains(System.getenv("EXAREME_MASTER_PATH") + "/") || httpGet.toString().contains(System.getenv("DATA") + "/")) { //if we can not contact : http://exareme-keystore:8500/v1/kv/master* or http://exareme-keystore:8500/v1/kv/datasets*
try { //then throw exception
response = httpclient.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) {
throw new ServerException("Cannot contact consul", new Exception(EntityUtils.toString(response.getEntity())));
} else {
result = EntityUtils.toString(response.getEntity());
}
} finally {
response.close();
}

HttpGet httpGet;
httpGet = new HttpGet(consulURL + "/v1/kv/" + query);
log.debug("Running: " + httpGet.getURI());
CloseableHttpResponse response = null;
if (httpGet.toString().contains(System.getenv("EXAREME_MASTER_PATH") + "/") || httpGet.toString().contains(System.getenv("DATA") + "/")) { //if we can not contact : http://exareme-keystore:8500/v1/kv/master* or http://exareme-keystore:8500/v1/kv/datasets*
try { //then throw exception
response = httpclient.execute(httpGet);
} catch (Exception e) {
response.close();
}
if (httpGet.toString().contains(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/")) { //if we can not contact : http://exareme-keystore:8500/v1/kv/active_workers*
try { //then maybe there are no workers running
response = httpclient.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) {
if (httpGet.toString().contains("?keys"))
log.debug("No workers running. Continue with master");
result = EntityUtils.toString(response.getEntity());
}
if (httpGet.toString().contains(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/")) { //if we can not contact : http://exareme-keystore:8500/v1/kv/active_workers*
//then maybe there are no workers running
try {
response = httpclient.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) {
if (httpGet.toString().contains("?keys"))
log.debug("No workers running. Continue with master");
} else {
result = EntityUtils.toString(response.getEntity());
}
} finally {
response.close();
}
catch (Exception e){
response.close();
}
} finally {
return result;
}
return result;
}

//Some times infos regarding Exareme nodes exist in Consul-Key-Value store], but the nodes are not part of Exareme's registry. We delete the infos from Consul[Key-Value store]
Expand All @@ -571,6 +607,7 @@ private void deleteFromConsul(String query) throws IOException {
//curl -X DELETE $CONSULURL/v1/kv/$1/$NODE_NAME

log.debug("Running: " + httpDelete.getURI());

CloseableHttpResponse response = null;
if (httpDelete.toString().contains(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/") || httpDelete.toString().contains(System.getenv("DATA") + "/")) { //if we can not contact : http://exareme-keystore:8500/v1/kv/master* or http://exareme-keystore:8500/v1/kv/datasets*
try { //then throw exception
Expand All @@ -587,4 +624,4 @@ private void deleteFromConsul(String query) throws IOException {
private String defaultOutputFormat(String data, String type) {
return "{\"result\" : [{\"data\":" + "\"" + data + "\",\"type\":" + "\"" + type + "\"}]}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import com.google.gson.Gson;
import madgik.exareme.common.art.entity.EntityName;
import madgik.exareme.worker.art.container.ContainerProxy;
import madgik.exareme.worker.art.registry.ArtRegistryLocator;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
Expand All @@ -24,6 +22,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;

/**
* University of Athens /
Expand Down Expand Up @@ -78,6 +77,8 @@ public synchronized T connect() throws RemoteException {
public T getRemoteObject() throws RemoteException {
String name = null;
Iterator<Map.Entry<String, String>> entries;
Gson gson = new Gson();
Semaphore semaphore = new Semaphore(1);

if (isConnected == false) {
try {
Expand All @@ -87,8 +88,12 @@ public T getRemoteObject() throws RemoteException {
//Get the Exareme's node name that is not responding
HashMap<String,String> names = null;
try {
semaphore.acquire();
names = getNamesOfActiveNodes();
} catch (IOException e) {
for (Map.Entry<String, String> entry : names.entrySet()) {
log.debug("ActiveNodes from Consul key-value store: " + entry.getKey() + " = " + entry.getValue());
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}

Expand All @@ -98,33 +103,30 @@ public T getRemoteObject() throws RemoteException {
Map.Entry<String, String> entry = entries.next();
if (Objects.equals(entry.getKey(), regEntityName.getIP())) {
name = entry.getValue();
break;
}
}
//Search if the Exareme's node IP exist in Exareme's registry
for (ContainerProxy containerProxy : ArtRegistryLocator.getArtRegistryProxy().getContainers()) {
log.debug("Container: " + containerProxy.getEntityName().getIP() + " : " +
containerProxy.getEntityName().getName());
if (containerProxy.getEntityName().getIP().equals(regEntityName.getIP())) {
//If exists, remove it from Exareme's registry
ArtRegistryLocator.getArtRegistryProxy().removeContainer(containerProxy.getEntityName());
log.info("Worker node:[" + name + "," + regEntityName.getIP() + "]" + " removed successfully from Exareme's registry");

//If exist in Consul[Key-Value store], delete infos regarding that Exareme node from there
if (name != null) {
try {
deleteFromConsul(System.getenv("DATA") + "/" + name);
deleteFromConsul(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/" + name);
log.info("Worker node:[" + name + "," + regEntityName.getIP() + "]" + " removed from Consul key-value store");
} catch (IOException E) {
throw new RemoteException("Can not contact Consul Key value Store");
log.info("Found node with name: "+name+" that seems to be down..");

try {
String pathologyKey = searchConsul(System.getenv("DATA") + "/" + name + "?keys");
String[] pathologyKeyArray = gson.fromJson(pathologyKey, String[].class);
for( String p: pathologyKeyArray) {
deleteFromConsul(p); //Delete every pathology for node with name $name
}
deleteFromConsul(System.getenv("EXAREME_ACTIVE_WORKERS_PATH") + "/" + name);
log.info("Worker node:[" + name + "," + regEntityName.getIP() + "]" + " removed from Consul key-value store");
} catch (IOException E) {
throw new RemoteException("Can not contact Consul Key value Store");
}
break;
}
}
}
throw new RemoteException("There was an error with worker "+ "[" + name + "," + regEntityName.getIP() + "].");
throw new RemoteException("There was an error with worker "+ "["+ regEntityName.getIP() + "].");
}
finally {
boolean acquired = semaphore.tryAcquire();
if (!acquired) {
semaphore.release();
}
}
}
return remoteObject;
Expand Down Expand Up @@ -233,3 +235,4 @@ public RetryPolicy getRetryPolicy() throws RemoteException {
return RetryPolicyFactory.defaultRetryPolicy();
}
}

2 changes: 1 addition & 1 deletion Federated-Deployment/Docker-Ansible/scripts/add_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ while IFS= read -r line || [[ -n "$line" ]]; do
continue
else #workerN exists below [workers] tag
workerExist=1 #TODO check if [workerX_X_X_X] exists as tag?
echo -e "\nWorker with IP: \"workerIP\" already exists under [workers] tag."
echo -e "\nWorker with IP: \"${workerIP}\" already exists under [workers] tag."
break
fi
if [[ -z "$line1" ]]; then
Expand Down

0 comments on commit d9ae04a

Please sign in to comment.