diff --git a/src/common/codeine/configuration/Links.java b/src/common/codeine/configuration/Links.java index 102535ca..b7c6f7ee 100755 --- a/src/common/codeine/configuration/Links.java +++ b/src/common/codeine/configuration/Links.java @@ -9,68 +9,80 @@ public class Links { - private @Inject GlobalConfigurationJsonStore globalConfiguration; - - public String directoryPeerStatus() { - return "http://" + globalConfiguration.get().directory_host() + ":" + globalConfiguration.get().directory_port() - + Constants.PEER_STATUS_CONTEXT; - } - - public String getLogLink(String hostport) { - return "http://" + hostport + Constants.RESOURCESS_CONTEXT; - } - - public String getPeerLink(String hostport) { - return "http://" + hostport; - } - - public String getPeerCommandLink(String hostport, String project, String command, String userArgs) { - String args = null == userArgs ? "" : "&version=" + HttpUtils.encodeURL(userArgs); - return getPeerLink(hostport) + Constants.COMMAND_NODE_CONTEXT + "?project=" + HttpUtils.encodeURL(project) + "&command=" + HttpUtils.encodeURL(command) + args; - } - - public String getProjectLink(String name) { - return Constants.PROJECT_STATUS_CONTEXT + "?project="+HttpUtils.encodeURL(name); - } - - public String getPeerMonitorResultLink(String hostport, String projectName, String collectorName, String nodeName) { - String nodeContextPath = getNodeMonitorOutputContextPath(projectName); - return getPeerLink(hostport) + nodeContextPath + "/" + HttpUtils.specialEncode(nodeName) + "/" + HttpUtils.specialEncode(collectorName) + ".txt"; - } - public String getPeerCollectorResultLink(String hostport, String projectName, String collectorName, String nodeName) { - String nodeContextPath = getNodeCollectorOutputContextPath(projectName); - return getPeerLink(hostport) + nodeContextPath + "/" + HttpUtils.specialEncode(nodeName) + "/" + HttpUtils.specialEncode(collectorName) + ".txt"; - } - - public String getWebServerLink() { - return "http://" + globalConfiguration.get().web_server_host() + ":" + globalConfiguration.get().web_server_port(); - } - - public String getNodeMonitorOutputContextPath(String projectName) { - return getNodeMonitorOutputContextPathAllProjects() + "/" + HttpUtils.encodeURL(projectName) + Constants.MONITOR_OUTPUT_CONTEXT + Constants.NODE_PATH; - } - public String getNodeCollectorOutputContextPath(String projectName) { - return getNodeMonitorOutputContextPathAllProjects() + "/" + HttpUtils.encodeURL(projectName) + Constants.COLLECTOR_OUTPUT_CONTEXT + Constants.NODE_PATH; - } - public String getNodeMonitorOutputContextPathAllProjects() { - return Constants.PROJECT_PATH; - } - - public String getWebServerLandingPage() { - return getWebServerLink() + Constants.PROJECTS_LIST_CONTEXT; - } - - public String getWebServerProjectAlerts(ProjectJson project) { - return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project.name()) + "/status"; - } - - public String getWebServerMonitorStatus(String project_name, String node_name, String collector_name) { - return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project_name) + - "/node/" + HttpUtils.encodeURL(node_name) + "/monitor/" + HttpUtils.encodeURL(collector_name) + "/status"; - } - public String getWebServerCollectorStatus(String project_name, String node_name, String collector_name) { - return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project_name) + - "/node/" + HttpUtils.encodeURL(node_name) + "/collector/" + HttpUtils.encodeURL(collector_name) + "/status"; - } + private @Inject + GlobalConfigurationJsonStore globalConfiguration; + + public String directoryPeerStatus() { + return "http://" + globalConfiguration.get().directory_host() + ":" + globalConfiguration.get().directory_port() + + Constants.PEER_STATUS_CONTEXT; + } + + public String getLogLink(String hostport) { + return "http://" + hostport + Constants.RESOURCESS_CONTEXT; + } + + public String getPeerLink(String hostport) { + return "http://" + hostport; + } + + public String getPeerCommandLink(String hostport, String project, String command, String userArgs) { + String args = null == userArgs ? "" : "&version=" + HttpUtils.encodeURL(userArgs); + return getPeerLink(hostport) + Constants.COMMAND_NODE_CONTEXT + "?project=" + HttpUtils.encodeURL(project) + + "&command=" + HttpUtils.encodeURL(command) + args; + } + + public String getProjectLink(String name) { + return Constants.PROJECT_STATUS_CONTEXT + "?project=" + HttpUtils.encodeURL(name); + } + + public String getPeerMonitorResultLink(String hostport, String projectName, String collectorName, String nodeName) { + String nodeContextPath = getNodeMonitorOutputContextPath(projectName); + return getPeerLink(hostport) + nodeContextPath + "/" + HttpUtils.specialEncode(nodeName) + "/" + HttpUtils + .specialEncode(collectorName) + ".txt"; + } + + public String getPeerCollectorResultLink(String hostport, String projectName, String collectorName, + String nodeName) { + String nodeContextPath = getNodeCollectorOutputContextPath(projectName); + return getPeerLink(hostport) + nodeContextPath + "/" + HttpUtils.specialEncode(nodeName) + "/" + HttpUtils + .specialEncode(collectorName) + ".txt"; + } + + public String getWebServerLink() { + return "http://" + globalConfiguration.get().web_server_host() + ":" + globalConfiguration.get() + .web_server_port(); + } + + public String getNodeMonitorOutputContextPath(String projectName) { + return getNodeMonitorOutputContextPathAllProjects() + "/" + HttpUtils.encodeURL(projectName) + + Constants.MONITOR_OUTPUT_CONTEXT + Constants.NODE_PATH; + } + + public String getNodeCollectorOutputContextPath(String projectName) { + return getNodeMonitorOutputContextPathAllProjects() + "/" + HttpUtils.encodeURL(projectName) + + Constants.COLLECTOR_OUTPUT_CONTEXT + Constants.NODE_PATH; + } + + public String getNodeMonitorOutputContextPathAllProjects() { + return Constants.PROJECT_PATH; + } + + public String getWebServerLandingPage() { + return getWebServerLink() + Constants.PROJECTS_LIST_CONTEXT; + } + + public String getWebServerProjectAlerts(ProjectJson project) { + return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project.name()) + "/status"; + } + + public String getWebServerCollectorStatus(String project_name, String node_name, String collector_name) { + return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project_name) + "/node/" + HttpUtils + .encodeURL(node_name) + "/collector/" + HttpUtils.encodeURL(collector_name) + "/status"; + } + + public String getWebServerCommandStatus(String project_name, String command_name, long command_id) { + return getWebServerLink() + "/codeine/project/" + HttpUtils.encodeURL(project_name) + "/command/" + HttpUtils + .encodeURL(command_name) + "/" + command_id + "/status"; + } } diff --git a/src/common/codeine/jsons/command/CommandInfoForSpecificNode.java b/src/common/codeine/jsons/command/CommandInfoForSpecificNode.java index 5a67b2aa..dc0c270b 100755 --- a/src/common/codeine/jsons/command/CommandInfoForSpecificNode.java +++ b/src/common/codeine/jsons/command/CommandInfoForSpecificNode.java @@ -6,44 +6,52 @@ public class CommandInfoForSpecificNode { - private String node_name; - private String node_alias; - private String tmp_dir; - private String key; - private Map environment_variables = Maps.newLinkedHashMap(); - - public CommandInfoForSpecificNode(String node_name, String node_alias, String tmp_dir, String key, Map environment_variables) { - super(); - this.node_name = node_name; - this.node_alias = node_alias; - this.tmp_dir = tmp_dir; - this.key = key; - this.environment_variables = environment_variables; - } - public String node_alias() { - return node_alias; - } - public String node_name() { - return node_name; - } - public String tmp_dir() { - return tmp_dir; - } - public String key() { - return key; - } - public Map environment_variables() { - return environment_variables; - } - - @Override - public String toString() { - return "CommandInfoForSpecificNode{" + - "node_name='" + node_name + '\'' + - ", node_alias='" + node_alias + '\'' + - ", tmp_dir='" + tmp_dir + '\'' + - ", key='" + key + '\'' + - ", environment_variables=" + environment_variables + - '}'; - } + private String node_name; + private String node_alias; + private String tmp_dir; + private String key; + private Map environment_variables = Maps.newLinkedHashMap(); + private Long commandId; + + public CommandInfoForSpecificNode(String node_name, String node_alias, String tmp_dir, String key, + Map environment_variables, long commandId) { + super(); + this.node_name = node_name; + this.node_alias = node_alias; + this.tmp_dir = tmp_dir; + this.key = key; + this.environment_variables = environment_variables; + this.commandId = commandId; + } + + public String node_alias() { + return node_alias; + } + + public String node_name() { + return node_name; + } + + public String tmp_dir() { + return tmp_dir; + } + + public String key() { + return key; + } + + public Map environment_variables() { + return environment_variables; + } + + public Long command_id() { + return commandId; + } + + @Override + public String toString() { + return "CommandInfoForSpecificNode{" + "node_name='" + node_name + '\'' + ", node_alias='" + node_alias + '\'' + + ", tmp_dir='" + tmp_dir + '\'' + ", key='" + key + '\'' + ", environment_variables=" + + environment_variables + ", commandId=" + commandId + '}'; + } } diff --git a/src/common/codeine/model/Constants.java b/src/common/codeine/model/Constants.java index 38ae7fe8..657d7b25 100755 --- a/src/common/codeine/model/Constants.java +++ b/src/common/codeine/model/Constants.java @@ -104,11 +104,13 @@ public class Constants { public static final String EXECUTION_ENV_CONFIGURATION_STEP = "CODEINE_CONFIGURATION_STEP"; public static final String EXECUTION_ENV_OUTPUT_FILE = "CODEINE_OUTPUT_FILE"; public static final String EXECUTION_ENV_PROJECT_NAME = "CODEINE_PROJECT_NAME"; + public static final String EXECUTION_ENV_COMMAND_NAME = "CODEINE_COMMAND_NAME"; public static final String EXECUTION_ENV_USER_NAME = "CODEINE_USER_NAME"; public static final String EXECUTION_ENV_PROJECT_STATUS = "CODEINE_PROJECT_STATUS"; public static final String EXECUTION_ENV_NODE_NAME = "CODEINE_NODE_NAME"; public static final String EXECUTION_ENV_NODE_ALIAS = "CODEINE_NODE_ALIAS"; public static final String EXECUTION_ENV_NODE_TAGS = "CODEINE_NODE_TAGS"; + public static final String EXECUTION_ENV_COMMAND_STATUS_LINK = "CODEINE_COMMAND_STATUS_LINK"; public static final String EXECUTION_ENV_CODEINE_SERVER = "CODEINE_HOST"; public static final String EXECUTION_ENV_CODEINE_SERVER_PORT = "CODEINE_PORT"; public static final String ENV_CODEINE_WORKAREA = "CODEINE_WORKAREA"; diff --git a/src/peer/codeine/CodeinePeerModule.java b/src/peer/codeine/CodeinePeerModule.java index 0c154d1f..35fefadf 100755 --- a/src/peer/codeine/CodeinePeerModule.java +++ b/src/peer/codeine/CodeinePeerModule.java @@ -20,7 +20,6 @@ import codeine.jsons.peer_status.PeerStatus; import codeine.nodes.NodesManagerPeer; import codeine.nodes.NodesRunner; - import com.google.inject.AbstractModule; import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; @@ -48,4 +47,4 @@ protected void configure() { install(new FactoryModuleBuilder().build(TagsCollectorRunnerFactory.class)); } -} +} \ No newline at end of file diff --git a/src/peer/codeine/servlets/CommandNodeServlet.java b/src/peer/codeine/servlets/CommandNodeServlet.java index 0732784b..0e828d7a 100755 --- a/src/peer/codeine/servlets/CommandNodeServlet.java +++ b/src/peer/codeine/servlets/CommandNodeServlet.java @@ -2,6 +2,7 @@ import codeine.SnoozeKeeper; import codeine.configuration.IConfigurationManager; +import codeine.configuration.Links; import codeine.configuration.PathHelper; import codeine.credentials.CredHelper; import codeine.jsons.auth.EncryptionUtils; @@ -54,13 +55,15 @@ public class CommandNodeServlet extends AbstractServlet { private SnoozeKeeper snoozeKeeper; @Inject private PeerStatus projectStatusUpdater; + @Inject + private Links links; @Override public void myPost(HttpServletRequest request, HttpServletResponse res) { log.info("start handle command"); - if (Boolean.parseBoolean(getParameter(request, Constants.UrlParameters.FORCE)) - || experimentalConfJsonStore.get().allow_concurrent_commands_in_peer()) { + if (Boolean.parseBoolean(getParameter(request, Constants.UrlParameters.FORCE)) || experimentalConfJsonStore + .get().allow_concurrent_commands_in_peer()) { executeCommandNotSync(request, res); } else { executeCommandSync(request, res); @@ -69,15 +72,13 @@ public void myPost(HttpServletRequest request, HttpServletResponse res) { } /** - * this prevents multiple commands on the same peer, so preventing upgrade the peer during - * command for example + * this prevents multiple commands on the same peer, so preventing upgrade the peer during command for example */ private void executeCommandNotSync(HttpServletRequest request, HttpServletResponse res) { executeInternal(request, res); } - private synchronized void executeCommandSync(HttpServletRequest request, - HttpServletResponse res) { + private synchronized void executeCommandSync(HttpServletRequest request, HttpServletResponse res) { executeInternal(request, res); } @@ -89,13 +90,10 @@ private void executeInternal(HttpServletRequest request, HttpServletResponse res String parameter = Constants.UrlParameters.DATA_NAME; String data = getParameter(request, parameter); CommandInfo commandInfo = gson().fromJson(data, CommandInfo.class); - String data2 = getParameter(request, - Constants.UrlParameters.DATA_ADDITIONAL_COMMAND_INFO_NAME); - CommandInfoForSpecificNode commandInfo2 = gson() - .fromJson(data2, CommandInfoForSpecificNode.class); + String data2 = getParameter(request, Constants.UrlParameters.DATA_ADDITIONAL_COMMAND_INFO_NAME); + CommandInfoForSpecificNode commandInfo2 = gson().fromJson(data2, CommandInfoForSpecificNode.class); if (null != commandInfo2.key()) { - String decrypt = EncryptionUtils - .decrypt(Constants.CODEINE_API_TOKEN_DERIVER, commandInfo2.key()); + String decrypt = EncryptionUtils.decrypt(Constants.CODEINE_API_TOKEN_DERIVER, commandInfo2.key()); validateKey(decrypt); } else { log.warn("key is null", new RuntimeException()); @@ -106,8 +104,8 @@ private void executeInternal(HttpServletRequest request, HttpServletResponse res ProjectJson project = getProject(commandInfo.project_name()); boolean windows_peer = project.operating_system() == OperatingSystem.Windows; if (null != script_content) { - cmdScript = new ShellScript(file, script_content, project.operating_system(), - commandInfo2.tmp_dir(), null, null, null); + cmdScript = new ShellScript(file, script_content, project.operating_system(), commandInfo2.tmp_dir(), + null, null, null); file = cmdScript.create(); } else { log.info("command not found " + file); @@ -151,19 +149,24 @@ private void executeInternal(HttpServletRequest request, HttpServletResponse res }; Map env = Maps.newHashMap(); env.put(Constants.EXECUTION_ENV_PROJECT_NAME, commandInfo.project_name()); + if (commandInfo2.command_id() != null) { + env.put(Constants.EXECUTION_ENV_COMMAND_STATUS_LINK, links + .getWebServerCommandStatus(commandInfo.project_name(), commandInfo.name(), + commandInfo2.command_id())); + } + env.put(Constants.EXECUTION_ENV_COMMAND_NAME, commandInfo.name()); env.put(Constants.EXECUTION_ENV_NODE_NAME, commandInfo2.node_name()); env.put(Constants.EXECUTION_ENV_NODE_ALIAS, commandInfo2.node_alias()); - env.put(Constants.EXECUTION_ENV_NODE_TAGS, - StringUtils.collectionToString(getTags(commandInfo.project_name(), commandInfo2.node_name()),";")); - env.put(Constants.EXECUTION_ENV_CODEINE_SERVER, - globalConfigurationJsonStore.get().web_server_host()); + env.put(Constants.EXECUTION_ENV_NODE_TAGS, StringUtils + .collectionToString(projectStatusUpdater.getTags(commandInfo.project_name(), commandInfo2.node_name()), + ";")); + env.put(Constants.EXECUTION_ENV_CODEINE_SERVER, globalConfigurationJsonStore.get().web_server_host()); env.put(Constants.EXECUTION_ENV_CODEINE_SERVER_PORT, globalConfigurationJsonStore.get().web_server_port().toString()); env.putAll(commandInfo2.environment_variables()); env.putAll(getEnvParams(commandInfo)); - Result result = new ProcessExecuterBuilder(cmd, - pathHelper.getProjectDir(commandInfo.project_name())).cmdForOutput(cmdForOutput) - .timeoutInMinutes(commandInfo.timeoutInMinutes()).function(function).env(env) + Result result = new ProcessExecuterBuilder(cmd, pathHelper.getProjectDir(commandInfo.project_name())) + .cmdForOutput(cmdForOutput).timeoutInMinutes(commandInfo.timeoutInMinutes()).function(function).env(env) .user(cred).build().execute(); writer.println(Constants.COMMAND_RESULT + result.exit()); writer.flush(); diff --git a/src/web_server/codeine/command_peer/AllNodesCommandExecuter.java b/src/web_server/codeine/command_peer/AllNodesCommandExecuter.java index 204afc9b..f52fe2f9 100755 --- a/src/web_server/codeine/command_peer/AllNodesCommandExecuter.java +++ b/src/web_server/codeine/command_peer/AllNodesCommandExecuter.java @@ -34,288 +34,297 @@ public class AllNodesCommandExecuter { - private static final Logger log = Logger.getLogger(AllNodesCommandExecuter.class); - - @Inject private Links links; - @Inject private PathHelper pathHelper; - @Inject private NodeGetter nodeGetter; - @Inject private IMonitorStatistics monitorsStatistics; - @Inject private DiscardOldCommandsPlugin discardOldCommandsPlugin; - @Inject private CommandFileWriter commandFileWriter; - @Inject private Gson gson; - - private int total; - private int count; - private int fail; - private int skipped; - private BufferedWriter writer; - private boolean active = true; - private long commandId; - private String dirNameFull; - private ScehudleCommandExecutionInfo commandData; - private CommandExecutionStatusInfo commandExecutionInfo; - private ProjectJson project; - private IUserWithPermissions userObject; - private Object fileWriteSync = new Object(); - private CommandExecutionStrategy strategy; - - private String cancelingUser; - - public long executeOnAllNodes(IUserWithPermissions userObject, ScehudleCommandExecutionInfo commandData, ProjectJson project) { - this.project = project; - this.userObject = userObject; - discardOldCommandsPlugin.queueForDelete(project); - try { - this.commandData = commandData; - this.total = commandData.nodes().size(); - commandId = getNewDirName(); - dirNameFull = pathHelper.getCommandOutputDir(commandData.command_info().project_name(), String.valueOf(commandId)); - FilesUtils.mkdirs(dirNameFull); - String pathname = dirNameFull + "/log"; - File file = new File(pathname); - FilesUtils.createNewFile(file); - createCommandDataFile(userObject.user().username()); - writer = TextFileUtils.getWriter(file, false); - log.info("running command " + commandData.command_info().command_name() + " with concurrency " + commandData.command_info().concurrency() + "by " + userObject.user()); - String nodesWord = commandData.nodes().size() == 1 ? "node" : "nodes"; - writeLine("running command '"+commandData.command_info().command_name()+"' on " + commandData.nodes().size() + " " + nodesWord + " by " + userObject.user().username()); - writeNodesList(commandData); - updatePeersAddresses(); - Thread commandThread = ThreadUtils.createThread(new Runnable() { - @Override - public void run() { - execute(); - }; - }, "AllNodesCommandExecuter_"+commandData.command_info().command_name()); - commandThread.start(); - monitorsStatistics.updateCommand(commandExecutionInfo); - return commandId; - } catch (Exception ex) { - finish(); - throw ExceptionUtils.asUnchecked(ex); - } - } - - private void updatePeersAddresses() { - for (NodeWithPeerInfo n : commandData.nodes()) { - PeerStatusJsonV2 p = nodeGetter.peer(n.peer_key()); - if (null == p) { - writeLine("Warning: ignoring node '" + n.alias() + "' since peer not found"); - continue; - } - n.peer_address(p.address_port()); - } - } - - public void writeNodesList(ScehudleCommandExecutionInfo commandData) { - if (commandData.nodes().size() < 11) { - Function predicate = new Function(){ - @Override - public String apply(NodeWithPeerInfo input) { - return input.alias(); - } - }; - writeLine("nodes list: " + StringUtils.collectionToString(commandData.nodes(), predicate)); - } - } - - private void finish() { - log.info("Finishing command " + commandExecutionInfo.id()); - if (null != commandExecutionInfo) { - commandExecutionInfo.finish(); - } - try { - updateJson(); - FilesUtils.createNewFile(dirNameFull + Constants.COMMAND_FINISH_FILE); - } catch (Exception e) { - log.warn("Failed to mark command as finished " + commandExecutionInfo, e); - } - active = false; - } - - private void execute() { - try { - initStrategy(); - strategy.execute(); - if (strategy.isCancel()) { - writeLine("Execution was canceled by user " + cancelingUser); - } - if (strategy.isError()) { - writeLine(strategy.error()); - } - writeFooter(); - if (null != commandData.address_to_notify()) { - int status = fail > 0 ? 1 : 0; - String message = "command-finished,project=" + commandData.command_info().project_name() + ",id=" + commandId + ",status=" + status; - log.info("sending finished event: " + message); - SocketUtils.sendToPort(commandData.address_to_notify(), message); - } - } finally { - finish(); - } - } - - private void initStrategy() { - switch (commandData.command_info().command_strategy()) - { - case Single: { - strategy = new SingleNodeCommandStrategy(this, commandData, links,project, userObject); - break; - } - case Immediately: { - strategy = new ImmediatlyCommandStrategy(this, commandData, links,project, userObject); - break; - } - case Progressive: { - strategy = new ProgressiveExecutionStrategy(this, commandData, links, nodeGetter,project, userObject); - break; - } - default: - throw new IllegalStateException("couldnt handle strategy " + commandData.command_info().command_strategy()); - } - } - - private void writeFooter() { - writeLine("finished!"); - Function f = new Function() { - @Override - public String apply(NodeInfoNameAndAlias n){ - return n.alias(); - } - }; - if (!commandExecutionInfo.fail_list().isEmpty()) { - writeLine("failed nodes: " + StringUtils.collectionToString(commandExecutionInfo.fail_list(), f)); - } - writeLine("=========> aggregate-command-statistics (success/total): " + (total - fail) + "/" + total + "\n"); - } - - private void createCommandDataFile(String user) { - commandExecutionInfo = new CommandExecutionStatusInfo(user, commandData.command_info().command_name(), commandData.command_info().parameters(), commandData.command_info().project_name(), - commandData.nodes(), commandId); - FilesUtils.createNewFile(commandFile()); - updateJson(); - } - - private String commandFile() { - return dirNameFull + Constants.JSON_COMMAND_FILE_NAME; - } - - private long getNewDirName() { - long i = 0; - String dir = pathHelper.getAllCommandsInProjectOutputDir(commandData.command_info().project_name()); - List filesInDir = FilesUtils.getFilesInDir(dir); - for (String dir1 : filesInDir) { - try { - long j = Long.parseLong(dir1); - i = Math.max(i, j); - } catch (NumberFormatException e) { - log.debug("error parsing " + dir1); - } - } - return i + 1; - } - - void writeLine(String line) { - writeLineToFile(line); - } - - private synchronized void writeLineToFile(String line) { - try { - writer.append(line); - writer.newLine(); - writer.flush(); - } catch (IOException e) { - throw ExceptionUtils.asUnchecked(e); - } - } - - public void fail(NodeWithPeerInfo node) { - log.debug("node fail " + node.name()); - fail++; - commandExecutionInfo.addFailedNode(node); - } - - public void nodeSuccess(NodeWithPeerInfo node) { - log.debug("node success " + node.name()); - commandExecutionInfo.addSuccessNode(node); - } - - public void nodeSkipped(NodeWithPeerInfo node) { - log.debug("node skipped " + node.name()); - skipped++; - commandExecutionInfo.addSkippedNode(node); - } - - public void workerFinished() { - count++; - updateJsonAsync(); - } - - public boolean isActive() { - return active; - } - - private void updateJsonAsync() { - commandFileWriter.queue(new CommandFileWriterItem(fileWriteSync, commandFile(), commandExecutionInfo)); - } - - private void updateJson() { - String json; - json = gson.toJson(commandExecutionInfo); - synchronized (fileWriteSync) { - TextFileUtils.setContents(commandFile(), json); - } - } - - public String name() { - return commandData.command_info().command_name(); - } - - public int success() { - return (int) (count - fail - skipped) * 100 / total; - } - - public int error() { - return fail * 100 / total; - } - - public int skipped() { - return skipped * 100 / total; - } - - public String project() { - return commandData.command_info().project_name(); - } - - public int nodes() { - return commandData.nodes().size(); - } - - public CommandExecutionStatusInfo commandData() { - return commandExecutionInfo; - } - - public long id() { - return commandId; - } - - public void cancel(String username) { - strategy.setCancel(); - this.cancelingUser = username; - } - - public List nodesList() { - return commandData.nodes(); - } - public Object fileWriteSync() { - return fileWriteSync; - } - - public CommandExecutionStatusInfo commandExecutionInfo() { - return commandExecutionInfo; - } - - public String commandString() { - return commandExecutionInfo().project_name() + "/" + commandExecutionInfo().command_name() + "/" + commandExecutionInfo().id(); - } + private static final Logger log = Logger.getLogger(AllNodesCommandExecuter.class); + + @Inject + private Links links; + @Inject + private PathHelper pathHelper; + @Inject + private NodeGetter nodeGetter; + @Inject + private IMonitorStatistics monitorsStatistics; + @Inject + private DiscardOldCommandsPlugin discardOldCommandsPlugin; + @Inject + private CommandFileWriter commandFileWriter; + @Inject + private Gson gson; + + private int total; + private int count; + private int fail; + private int skipped; + private BufferedWriter writer; + private boolean active = true; + private long commandId; + private String dirNameFull; + private ScehudleCommandExecutionInfo commandData; + private CommandExecutionStatusInfo commandExecutionInfo; + private ProjectJson project; + private IUserWithPermissions userObject; + private Object fileWriteSync = new Object(); + private CommandExecutionStrategy strategy; + + private String cancelingUser; + + public long executeOnAllNodes(IUserWithPermissions userObject, ScehudleCommandExecutionInfo commandData, + ProjectJson project) { + this.project = project; + this.userObject = userObject; + discardOldCommandsPlugin.queueForDelete(project); + try { + this.commandData = commandData; + this.total = commandData.nodes().size(); + commandId = getNewDirName(); + dirNameFull = pathHelper + .getCommandOutputDir(commandData.command_info().project_name(), String.valueOf(commandId)); + FilesUtils.mkdirs(dirNameFull); + String pathname = dirNameFull + "/log"; + File file = new File(pathname); + FilesUtils.createNewFile(file); + createCommandDataFile(userObject.user().username()); + writer = TextFileUtils.getWriter(file, false); + log.info("running command " + commandData.command_info().command_name() + " with concurrency " + commandData + .command_info().concurrency() + "by " + userObject.user()); + String nodesWord = commandData.nodes().size() == 1 ? "node" : "nodes"; + writeLine( + "running command '" + commandData.command_info().command_name() + "' on " + commandData.nodes().size() + + " " + nodesWord + " by " + userObject.user().username()); + writeNodesList(commandData); + updatePeersAddresses(); + Thread commandThread = ThreadUtils.createThread(this::execute, + "AllNodesCommandExecuter_" + commandData.command_info().command_name()); + commandThread.start(); + monitorsStatistics.updateCommand(commandExecutionInfo); + return commandId; + } catch (Exception ex) { + finish(); + throw ExceptionUtils.asUnchecked(ex); + } + } + + private void updatePeersAddresses() { + for (NodeWithPeerInfo n : commandData.nodes()) { + PeerStatusJsonV2 p = nodeGetter.peer(n.peer_key()); + if (null == p) { + writeLine("Warning: ignoring node '" + n.alias() + "' since peer not found"); + continue; + } + n.peer_address(p.address_port()); + } + } + + public void writeNodesList(ScehudleCommandExecutionInfo commandData) { + if (commandData.nodes().size() < 11) { + Function predicate = input -> input.alias(); + writeLine("nodes list: " + StringUtils.collectionToString(commandData.nodes(), predicate)); + } + } + + private void finish() { + log.info("Finishing command " + commandExecutionInfo.id()); + if (null != commandExecutionInfo) { + commandExecutionInfo.finish(); + } + try { + updateJson(); + FilesUtils.createNewFile(dirNameFull + Constants.COMMAND_FINISH_FILE); + } catch (Exception e) { + log.warn("Failed to mark command as finished " + commandExecutionInfo, e); + } + active = false; + } + + private void execute() { + try { + initStrategy(); + strategy.execute(); + if (strategy.isCancel()) { + writeLine("Execution was canceled by user " + cancelingUser); + } + if (strategy.isError()) { + writeLine(strategy.error()); + } + writeFooter(); + if (null != commandData.address_to_notify()) { + int status = fail > 0 ? 1 : 0; + String message = + "command-finished,project=" + commandData.command_info().project_name() + ",id=" + commandId + + ",status=" + status; + log.info("sending finished event: " + message); + SocketUtils.sendToPort(commandData.address_to_notify(), message); + } + } finally { + finish(); + } + } + + private void initStrategy() { + switch (commandData.command_info().command_strategy()) { + case Single: { + strategy = new SingleNodeCommandStrategy(this, commandData, links, project, userObject, commandId); + break; + } + case Immediately: { + strategy = new ImmediatlyCommandStrategy(this, commandData, links, project, userObject, commandId); + break; + } + case Progressive: { + strategy = new ProgressiveExecutionStrategy(this, commandData, links, nodeGetter, project, userObject, + commandId); + break; + } + default: + throw new IllegalStateException( + "couldnt handle strategy " + commandData.command_info().command_strategy()); + } + } + + private void writeFooter() { + writeLine("finished!"); + Function f = new Function() { + @Override + public String apply(NodeInfoNameAndAlias n) { + return n.alias(); + } + }; + if (!commandExecutionInfo.fail_list().isEmpty()) { + writeLine("failed nodes: " + StringUtils.collectionToString(commandExecutionInfo.fail_list(), f)); + } + writeLine("=========> aggregate-command-statistics (success/total): " + (total - fail) + "/" + total + "\n"); + } + + private void createCommandDataFile(String user) { + commandExecutionInfo = new CommandExecutionStatusInfo(user, commandData.command_info().command_name(), + commandData.command_info().parameters(), commandData.command_info().project_name(), commandData.nodes(), + commandId); + FilesUtils.createNewFile(commandFile()); + updateJson(); + } + + private String commandFile() { + return dirNameFull + Constants.JSON_COMMAND_FILE_NAME; + } + + private long getNewDirName() { + long i = 0; + String dir = pathHelper.getAllCommandsInProjectOutputDir(commandData.command_info().project_name()); + List filesInDir = FilesUtils.getFilesInDir(dir); + for (String dir1 : filesInDir) { + try { + long j = Long.parseLong(dir1); + i = Math.max(i, j); + } catch (NumberFormatException e) { + log.debug("error parsing " + dir1); + } + } + return i + 1; + } + + void writeLine(String line) { + writeLineToFile(line); + } + + private synchronized void writeLineToFile(String line) { + try { + writer.append(line); + writer.newLine(); + writer.flush(); + } catch (IOException e) { + throw ExceptionUtils.asUnchecked(e); + } + } + + public void fail(NodeWithPeerInfo node) { + log.debug("node fail " + node.name()); + fail++; + commandExecutionInfo.addFailedNode(node); + } + + public void nodeSuccess(NodeWithPeerInfo node) { + log.debug("node success " + node.name()); + commandExecutionInfo.addSuccessNode(node); + } + + public void nodeSkipped(NodeWithPeerInfo node) { + log.debug("node skipped " + node.name()); + skipped++; + commandExecutionInfo.addSkippedNode(node); + } + + public void workerFinished() { + count++; + updateJsonAsync(); + } + + public boolean isActive() { + return active; + } + + private void updateJsonAsync() { + commandFileWriter.queue(new CommandFileWriterItem(fileWriteSync, commandFile(), commandExecutionInfo)); + } + + private void updateJson() { + String json; + json = gson.toJson(commandExecutionInfo); + synchronized (fileWriteSync) { + TextFileUtils.setContents(commandFile(), json); + } + } + + public String name() { + return commandData.command_info().command_name(); + } + + public int success() { + return (int) (count - fail - skipped) * 100 / total; + } + + public int error() { + return fail * 100 / total; + } + + public int skipped() { + return skipped * 100 / total; + } + + public String project() { + return commandData.command_info().project_name(); + } + + public int nodes() { + return commandData.nodes().size(); + } + + public CommandExecutionStatusInfo commandData() { + return commandExecutionInfo; + } + + public long id() { + return commandId; + } + + public void cancel(String username) { + strategy.setCancel(); + this.cancelingUser = username; + } + + public List nodesList() { + return commandData.nodes(); + } + + public Object fileWriteSync() { + return fileWriteSync; + } + + public CommandExecutionStatusInfo commandExecutionInfo() { + return commandExecutionInfo; + } + + public String commandString() { + return commandExecutionInfo().project_name() + "/" + commandExecutionInfo().command_name() + "/" + + commandExecutionInfo().id(); + } } diff --git a/src/web_server/codeine/command_peer/CommandExecutionStrategy.java b/src/web_server/codeine/command_peer/CommandExecutionStrategy.java index f90f1f59..4d2156ba 100755 --- a/src/web_server/codeine/command_peer/CommandExecutionStrategy.java +++ b/src/web_server/codeine/command_peer/CommandExecutionStrategy.java @@ -22,18 +22,21 @@ public abstract class CommandExecutionStrategy { private boolean cancel; private ProjectJson project; private IUserWithPermissions userObject; + private final long commandId; private String error; public static final int MAX_NODES_TO_EXECUTE = 100; public CommandExecutionStrategy(ScehudleCommandExecutionInfo commandData, - AllNodesCommandExecuter allNodesCommandExecuter, Links links, ProjectJson project, IUserWithPermissions userObject) { + AllNodesCommandExecuter allNodesCommandExecuter, Links links, ProjectJson project, + IUserWithPermissions userObject, long commandId) { super(); this.commandData = commandData; this.allNodesCommandExecuter = allNodesCommandExecuter; this.links = links; this.project = project; this.userObject = userObject; + this.commandId = commandId; } public abstract void execute(); @@ -43,7 +46,8 @@ protected void writeLine(String message) { } private void commandNode(ExecutorService executor, NodeWithPeerInfo node, boolean shouldOutputImmediatly) { - PeerCommandWorker worker = new PeerCommandWorker(node, allNodesCommandExecuter, commandData.command_info(), shouldOutputImmediatly, links, project, userObject); + PeerCommandWorker worker = new PeerCommandWorker(node, allNodesCommandExecuter, commandData.command_info(), + shouldOutputImmediatly, links, project, userObject, commandId); executor.execute(worker); } diff --git a/src/web_server/codeine/command_peer/ImmediatlyCommandStrategy.java b/src/web_server/codeine/command_peer/ImmediatlyCommandStrategy.java index b3e3c656..4d82936b 100755 --- a/src/web_server/codeine/command_peer/ImmediatlyCommandStrategy.java +++ b/src/web_server/codeine/command_peer/ImmediatlyCommandStrategy.java @@ -17,8 +17,9 @@ public class ImmediatlyCommandStrategy extends CommandExecutionStrategy { public ImmediatlyCommandStrategy(AllNodesCommandExecuter allNodesCommandExecuter, - ScehudleCommandExecutionInfo commandData, Links links, ProjectJson project, IUserWithPermissions userObject) { - super(commandData, allNodesCommandExecuter, links, project, userObject); + ScehudleCommandExecutionInfo commandData, Links links, ProjectJson project, + IUserWithPermissions userObject, long commandId) { + super(commandData, allNodesCommandExecuter, links, project, userObject, commandId); } @Override diff --git a/src/web_server/codeine/command_peer/PeerCommandWorker.java b/src/web_server/codeine/command_peer/PeerCommandWorker.java index 5143af0c..5eef2212 100755 --- a/src/web_server/codeine/command_peer/PeerCommandWorker.java +++ b/src/web_server/codeine/command_peer/PeerCommandWorker.java @@ -35,9 +35,12 @@ public class PeerCommandWorker implements Runnable { private ProjectJson project; private static Pattern pattern = Pattern.compile(".*" + Constants.COMMAND_RESULT + "(-?\\d+).*"); private IUserWithPermissions userObject; + private final long commandId; private boolean failedReported = false; - public PeerCommandWorker(NodeWithPeerInfo node, AllNodesCommandExecuter allNodesCommandExecuter, CommandInfo command_info, boolean shouldOutputImmediatly, Links links, ProjectJson project, IUserWithPermissions userObject) { + public PeerCommandWorker(NodeWithPeerInfo node, AllNodesCommandExecuter allNodesCommandExecuter, + CommandInfo command_info, boolean shouldOutputImmediatly, Links links, ProjectJson project, + IUserWithPermissions userObject, long commandId) { this.node = node; this.allNodesCommandExecuter = allNodesCommandExecuter; this.command_info = command_info; @@ -45,6 +48,7 @@ public PeerCommandWorker(NodeWithPeerInfo node, AllNodesCommandExecuter allNodes this.links = links; this.project = project; this.userObject = userObject; + this.commandId = commandId; } @@ -86,7 +90,8 @@ private void executeInternal() { } if (POST && !command_info.name().equals("upgrade_old_peers")) { String key = userObject.user().encodedApiTokenWithTime(); - CommandInfoForSpecificNode command_info2 = new CommandInfoForSpecificNode(node.name(), node.alias(), null, key, project.environmentVariables()); + CommandInfoForSpecificNode command_info2 = new CommandInfoForSpecificNode(node.name(), node.alias(), + null, key, project.environmentVariables(), commandId); log.info("Post data of command is " + command_info2.toString()); String postData = UrlParameters.DATA_NAME + "=" + HttpUtils.encodeURL(new Gson().toJson(command_info)) +"&" + UrlParameters.DATA_ADDITIONAL_COMMAND_INFO_NAME + "=" + HttpUtils.encodeURL(new Gson().toJson(command_info2)); diff --git a/src/web_server/codeine/command_peer/ProgressiveExecutionStrategy.java b/src/web_server/codeine/command_peer/ProgressiveExecutionStrategy.java index 277eb567..9c6c8ca5 100755 --- a/src/web_server/codeine/command_peer/ProgressiveExecutionStrategy.java +++ b/src/web_server/codeine/command_peer/ProgressiveExecutionStrategy.java @@ -23,8 +23,10 @@ public class ProgressiveExecutionStrategy extends CommandExecutionStrategy { private NodeGetter nodesGetter; private Object cancelObject = new Object(); - public ProgressiveExecutionStrategy(AllNodesCommandExecuter allNodesCommandExecuter,ScehudleCommandExecutionInfo commandData, Links links, NodeGetter nodesGetter, ProjectJson project, IUserWithPermissions userObject) { - super(commandData, allNodesCommandExecuter, links, project, userObject); + public ProgressiveExecutionStrategy(AllNodesCommandExecuter allNodesCommandExecuter, + ScehudleCommandExecutionInfo commandData, Links links, NodeGetter nodesGetter, ProjectJson project, + IUserWithPermissions userObject, long commandId) { + super(commandData, allNodesCommandExecuter, links, project, userObject, commandId); this.nodesGetter = nodesGetter; } diff --git a/src/web_server/codeine/command_peer/SingleNodeCommandStrategy.java b/src/web_server/codeine/command_peer/SingleNodeCommandStrategy.java index 5b9fefe9..d9e2c125 100644 --- a/src/web_server/codeine/command_peer/SingleNodeCommandStrategy.java +++ b/src/web_server/codeine/command_peer/SingleNodeCommandStrategy.java @@ -17,8 +17,9 @@ public class SingleNodeCommandStrategy extends CommandExecutionStrategy { public SingleNodeCommandStrategy(AllNodesCommandExecuter allNodesCommandExecuter, - ScehudleCommandExecutionInfo commandData, Links links, ProjectJson project, IUserWithPermissions userObject) { - super(commandData, allNodesCommandExecuter, links, project, userObject); + ScehudleCommandExecutionInfo commandData, Links links, ProjectJson project, + IUserWithPermissions userObject, long commandId) { + super(commandData, allNodesCommandExecuter, links, project, userObject, commandId); } @Override