Skip to content

Commit

Permalink
codeine-237 add SERVER_HOST and SERVER_PORT env vars to commands - fi…
Browse files Browse the repository at this point in the history
…xed build
  • Loading branch information
rezra3 committed May 17, 2018
1 parent 399466b commit 34fe890
Showing 1 changed file with 157 additions and 136 deletions.
293 changes: 157 additions & 136 deletions src/peer/codeine/nodes/NodesRunner.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package codeine.nodes;

import codeine.jsons.global.GlobalConfigurationJsonStore;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -33,141 +34,161 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

public class NodesRunner implements Task{

private static final Logger log = Logger.getLogger(NodesRunner.class);

private static final long NODE_MONITOR_INTERVAL = TimeUnit.SECONDS.toMillis(29);
public static final long NODE_RUNNER_INTERVAL = TimeUnit.HOURS.toMillis(1);

@Inject private IConfigurationManager configurationManager;
@Inject private PathHelper pathHelper;
@Inject private PeerStatus peerStatus;
@Inject private MailSender mailSender;
@Inject private NotificationDeliverToDatabase notificationDeliverToMongo;
@Inject private NodesManager nodesManager;
@Inject private SnoozeKeeper snoozeKeeper;
private Map<String, Map<NodeInfo, PeriodicExecuter>> executers = Maps.newHashMap();
@Inject private PeerStatusChangedUpdater mongoPeerStatusUpdater;
@Inject private CollectorsRunnerFactory collectorsRunnerFactory;

@Override
public synchronized void run() {
InetAddress localHost = InetUtils.getLocalHost();
log.info("NodeRunner is starting on host " + localHost.getHostName() + " " + localHost.getCanonicalHostName());
log.info("NodeRunner is starting " + this + " with executers " + executers);
Set<String> removedProjects = Sets.newHashSet(executers.keySet());
for (ProjectJson project : getProjects()) {
removedProjects.remove(project.name());
try {
boolean hasNodes = startStopExecutorsForProject(project);
if (!hasNodes) {
cleanupProject(project.name());
}
} catch (Exception e) {
log.error("failed startStopExecutorsForProject for project " + project.name(), e);
}
}
for (String project : removedProjects) {
try {
stopNodes(project, executers.get(project));
cleanupProject(project);
log.info("removed project " + project);
} catch (Exception e) {
log.error("failed to stop nodes for project " + project, e);
}
}
}

/**
* assuming nodes already stopped
*/
private void cleanupProject(String project) {
log.info("cleanupProject " + project);
executers.remove(project);
peerStatus.removeProject(project);
}

private void stop(PeriodicExecuter e) {
log.info("stopping 1executor " + e.name());
e.stopWhenPossible();
}

private boolean startStopExecutorsForProject(ProjectJson project) {
Map<NodeInfo, PeriodicExecuter> currentNodes = getCurrentNodes(project);
log.info("project: " + project.name() + " currentProjectExecutors: " + currentNodes.keySet());
SelectedNodes selectedNodes;
try {
selectedNodes = new NodesSelector(currentNodes, getNodes(project)).selectStartStop();
} catch (Exception e) {
log.error("failed to select nodes for project " + project.name() + " will leave old nodes " + currentNodes, e);
return !currentNodes.isEmpty();
}
log.info("selectedNodes: " + selectedNodes);
stopNodes(project.name(), selectedNodes.nodesToStop());
Map<NodeInfo, PeriodicExecuter> newProjectExecutors = selectedNodes.existingProjectExecutors();
for (NodeInfo nodeJson : selectedNodes.nodesToStart()) {
log.info("start exec1 monitoring node " + nodeJson + " in project " + project.name());
try {
PeriodicExecuter e = startExecuter(project, nodeJson);
newProjectExecutors.put(nodeJson, e);
} catch (Exception e1) {
log.error("failed to start executor for node " + nodeJson + " in project " + project.name(), e1);
}
}
executers.put(project.name(), newProjectExecutors);
log.info("project: " + project.name() + " newProjectExecutors: " + newProjectExecutors.keySet());
return !executers.get(project.name()).isEmpty();
}

private void stopNodes(String project, Map<NodeInfo, PeriodicExecuter> map) {
for (Entry<NodeInfo, PeriodicExecuter> e : map.entrySet()) {
log.info("stop exec1 monitoring node " + e.getKey() + " in project " + project);
peerStatus.removeNode(project, e.getKey().name());
stop(e.getValue());
}
}

private Map<NodeInfo, PeriodicExecuter> getCurrentNodes(ProjectJson project) {
Map<NodeInfo, PeriodicExecuter> currentNodes = executers.get(project.name());
if (null == currentNodes) {
currentNodes = Maps.newHashMap();
executers.put(project.name(), currentNodes);
}
return currentNodes;
}

private PeriodicExecuter startExecuter(ProjectJson project, NodeInfo nodeJson) {
log.info("Starting monitor thread for project " + project.name() + " node " + nodeJson);
Task task;
RunMonitors monitorsTask = new RunMonitors(configurationManager, project.name(), peerStatus, mailSender, pathHelper,
nodeJson, notificationDeliverToMongo, mongoPeerStatusUpdater, snoozeKeeper);
if (Constants.RUNNING_COLLECTORS_IN_PEER) {
CollectorsRunner collectorsTask = collectorsRunnerFactory.create(project.name(), nodeJson);
collectorsTask.init();
task = collectorsTask;
}
else {
task = monitorsTask;
}
PeriodicExecuter periodicExecuter = new PeriodicExecuter(NODE_MONITOR_INTERVAL,
task, "RunMonitors_" + project.name() + "_" + nodeJson.name());
log.info("starting 1executor " + periodicExecuter.name());
periodicExecuter.runInThread();
return periodicExecuter;
}

private List<ProjectJson> getProjects() {
return configurationManager.getConfiguredProjects();
}

private List<NodeInfo> getNodes(ProjectJson project) {
try {
return nodesManager.nodesOf(project).nodes();
} catch (Exception e) {
log.warn("failed to get nodes for project " + project.name(), e);
}
return Lists.newArrayList();
}
public class NodesRunner implements Task {

private static final Logger log = Logger.getLogger(NodesRunner.class);

private static final long NODE_MONITOR_INTERVAL = TimeUnit.SECONDS.toMillis(29);
public static final long NODE_RUNNER_INTERVAL = TimeUnit.HOURS.toMillis(1);

@Inject
private IConfigurationManager configurationManager;
@Inject
private PathHelper pathHelper;
@Inject
private PeerStatus peerStatus;
@Inject
private MailSender mailSender;
@Inject
private NotificationDeliverToDatabase notificationDeliverToMongo;
@Inject
private NodesManager nodesManager;
@Inject
private SnoozeKeeper snoozeKeeper;
private Map<String, Map<NodeInfo, PeriodicExecuter>> executers = Maps.newHashMap();
@Inject
private PeerStatusChangedUpdater mongoPeerStatusUpdater;
@Inject
private CollectorsRunnerFactory collectorsRunnerFactory;
@Inject
private GlobalConfigurationJsonStore globalConfigurationJsonStore;

@Override
public synchronized void run() {
InetAddress localHost = InetUtils.getLocalHost();
log.info("NodeRunner is starting on host " + localHost.getHostName() + " " + localHost
.getCanonicalHostName());
log.info("NodeRunner is starting " + this + " with executers " + executers);
Set<String> removedProjects = Sets.newHashSet(executers.keySet());
for (ProjectJson project : getProjects()) {
removedProjects.remove(project.name());
try {
boolean hasNodes = startStopExecutorsForProject(project);
if (!hasNodes) {
cleanupProject(project.name());
}
} catch (Exception e) {
log.error("failed startStopExecutorsForProject for project " + project.name(), e);
}
}
for (String project : removedProjects) {
try {
stopNodes(project, executers.get(project));
cleanupProject(project);
log.info("removed project " + project);
} catch (Exception e) {
log.error("failed to stop nodes for project " + project, e);
}
}
}

/**
* assuming nodes already stopped
*/
private void cleanupProject(String project) {
log.info("cleanupProject " + project);
executers.remove(project);
peerStatus.removeProject(project);
}

private void stop(PeriodicExecuter e) {
log.info("stopping 1executor " + e.name());
e.stopWhenPossible();
}

private boolean startStopExecutorsForProject(ProjectJson project) {
Map<NodeInfo, PeriodicExecuter> currentNodes = getCurrentNodes(project);
log.info(
"project: " + project.name() + " currentProjectExecutors: " + currentNodes.keySet());
SelectedNodes selectedNodes;
try {
selectedNodes = new NodesSelector(currentNodes, getNodes(project)).selectStartStop();
} catch (Exception e) {
log.error(
"failed to select nodes for project " + project.name() + " will leave old nodes "
+ currentNodes, e);
return !currentNodes.isEmpty();
}
log.info("selectedNodes: " + selectedNodes);
stopNodes(project.name(), selectedNodes.nodesToStop());
Map<NodeInfo, PeriodicExecuter> newProjectExecutors = selectedNodes
.existingProjectExecutors();
for (NodeInfo nodeJson : selectedNodes.nodesToStart()) {
log.info("start exec1 monitoring node " + nodeJson + " in project " + project.name());
try {
PeriodicExecuter e = startExecuter(project, nodeJson);
newProjectExecutors.put(nodeJson, e);
} catch (Exception e1) {
log.error("failed to start executor for node " + nodeJson + " in project " + project
.name(), e1);
}
}
executers.put(project.name(), newProjectExecutors);
log.info(
"project: " + project.name() + " newProjectExecutors: " + newProjectExecutors.keySet());
return !executers.get(project.name()).isEmpty();
}

private void stopNodes(String project, Map<NodeInfo, PeriodicExecuter> map) {
for (Entry<NodeInfo, PeriodicExecuter> e : map.entrySet()) {
log.info("stop exec1 monitoring node " + e.getKey() + " in project " + project);
peerStatus.removeNode(project, e.getKey().name());
stop(e.getValue());
}
}

private Map<NodeInfo, PeriodicExecuter> getCurrentNodes(ProjectJson project) {
Map<NodeInfo, PeriodicExecuter> currentNodes = executers.get(project.name());
if (null == currentNodes) {
currentNodes = Maps.newHashMap();
executers.put(project.name(), currentNodes);
}
return currentNodes;
}

private PeriodicExecuter startExecuter(ProjectJson project, NodeInfo nodeJson) {
log.info("Starting monitor thread for project " + project.name() + " node " + nodeJson);
Task task;
RunMonitors monitorsTask = new RunMonitors(configurationManager, project.name(), peerStatus,
mailSender, pathHelper,
nodeJson, notificationDeliverToMongo, mongoPeerStatusUpdater, snoozeKeeper,
globalConfigurationJsonStore);
if (Constants.RUNNING_COLLECTORS_IN_PEER) {
CollectorsRunner collectorsTask = collectorsRunnerFactory
.create(project.name(), nodeJson);
collectorsTask.init();
task = collectorsTask;
} else {
task = monitorsTask;
}
PeriodicExecuter periodicExecuter = new PeriodicExecuter(NODE_MONITOR_INTERVAL,
task, "RunMonitors_" + project.name() + "_" + nodeJson.name());
log.info("starting 1executor " + periodicExecuter.name());
periodicExecuter.runInThread();
return periodicExecuter;
}

private List<ProjectJson> getProjects() {
return configurationManager.getConfiguredProjects();
}

private List<NodeInfo> getNodes(ProjectJson project) {
try {
return nodesManager.nodesOf(project).nodes();
} catch (Exception e) {
log.warn("failed to get nodes for project " + project.name(), e);
}
return Lists.newArrayList();
}

}

0 comments on commit 34fe890

Please sign in to comment.