diff --git a/CHANGES.md b/CHANGES.md index 8cc55938..b65a839b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -61,7 +61,7 @@ https://github.com/burke/zeus/compare/v0.15.6...v0.15.8 zeus Gem no longer requires native extensions and file monitoring is much faster and more reliable. * Track files from exceptions during Zeus actions in Ruby. -* Fix a thread safety in SlaveNode state access. +* Fix a thread safety in WorkerNode state access. # 0.15.7 @@ -158,7 +158,7 @@ https://github.com/burke/zeus/compare/v0.13.1...v0.13.2 * Improved a few cases where client processes disconnect unexpectedly. -* Changed up the slave/master IPC, solving a bunch of issues on Linux, by +* Changed up the worker/coordinator IPC, solving a bunch of issues on Linux, by switching from a socket to a pipe. * Client terminations are now handled a bit more gracefully. The terminal is diff --git a/README.md b/README.md index a3acd9bf..cc73be96 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ A: No. You can, but running `bundle exec zeus` instead of `zeus` adds precious s It is common to see tests running twice when starting out with Zeus. If you see your tests/specs running twice, you should try disabling `require 'rspec/autotest'` and `require 'rspec/autorun'` (for RSpec), or `require 'minitest/autorun'` (for Minitest). (see [#134](https://github.com/burke/zeus/issues/134) for more information). -## Rails Set up +## Rails Set up In your app's directory initialize zeus: diff --git a/contributing.md b/contributing.md index d9ac6546..6bdcd57f 100644 --- a/contributing.md +++ b/contributing.md @@ -10,9 +10,9 @@ One or two sentences giving an overview of the issue. ## System details -* **`uname -a`**: +* **`uname -a`**: -* **`ruby -v`**: +* **`ruby -v`**: * **`go version`**: (only if hacking on the go code) @@ -62,10 +62,10 @@ use to crosscompile multiple binaries. ### Context: How zeus is structured The core of zeus is a single go program that acts as the coordinating process -(master, e.g. `zeus start`), or the client (called per-command, e.g. `zeus +(coordinator, e.g. `zeus start`), or the client (called per-command, e.g. `zeus client`). This code is cross-compiled for a handful of different architectures and bundled with a ruby gem. The ruby gem contains all the shim code necessary -to boot a rails app under the control of the master process. +to boot a rails app under the control of the coordinator process. ### Building diff --git a/docs/client_coordinator_handshake.md b/docs/client_coordinator_handshake.md new file mode 100644 index 00000000..b64038ee --- /dev/null +++ b/docs/client_coordinator_handshake.md @@ -0,0 +1,56 @@ +# Client/Coordinator/Command handshake + + Client Coordinator Command + 1 ----------> | Command, Arguments, Pid + 2 ----------> | Terminal IO + 3 -----------> | Terminal IO + 4 -----------> | Arguments, Pid + 5 <----------- | pid + 6 <--------- | pid + (time passes) + 7 <----------- | exit status + 8 <--------- | exit status + + +#### 1. Command & Arguments (Client -> Coordinator) + +The Coordinator always has a UNIX domain server listening at a known socket path. + +The Client connects to this server and sends a string indicating the command to run and any arguments to run with (ie. the ARGV). See message_format.md for more info. + +#### 2. Terminal IO (Client -> Coordinator) + +The Client then sends an IO over the server socket to be used for raw terminal IO. + +#### 3. Arguments (Coordinator -> Command) + +The Coordinator sends the Client arguments from step 1 to the Command. + +#### 4. Terminal IO (Coordinator -> Command) + +The Coordinator forks a new Command process and sends it the Terminal IO from the Client. + +#### 5. Pid (Command -> Coordinator) + +The Command process sends the Coordinator its pid, using a Pid & Identifier message. + +#### 6. Pid (Coordinator -> Client) + +The Coordinator responds to the client with the pid of the newly-forked Command process. + +The Client is now connected to the Command process. + +#### 7. Exit status (Command -> Coordinator) + +When the command terminates, it must send its exit code to the coordinator. This is normally easiest to implement as a wrapper process that does the setsid, then forks the command and `waitpid`s on it. + +The form of this message is `{{code}}`, eg: `1`. + +#### 8. Exit status (Coordinator -> Client) + +Finally, the Coordinator forwards the exit status to the Client. The command cycle is now complete. + +The form of this message is `{{code}}`, eg: `1`. + +See [`message_format.md`](message_format.md) for more information on messages. + diff --git a/docs/client_master_handshake.md b/docs/client_master_handshake.md deleted file mode 100644 index 41297bbf..00000000 --- a/docs/client_master_handshake.md +++ /dev/null @@ -1,56 +0,0 @@ -# Client/Master/Command handshake - - Client Master Command - 1 ----------> | Command, Arguments, Pid - 2 ----------> | Terminal IO - 3 -----------> | Terminal IO - 4 -----------> | Arguments, Pid - 5 <----------- | pid - 6 <--------- | pid - (time passes) - 7 <----------- | exit status - 8 <--------- | exit status - - -#### 1. Command & Arguments (Client -> Master) - -The Master always has a UNIX domain server listening at a known socket path. - -The Client connects to this server and sends a string indicating the command to run and any arguments to run with (ie. the ARGV). See message_format.md for more info. - -#### 2. Terminal IO (Client -> Master) - -The Client then sends an IO over the server socket to be used for raw terminal IO. - -#### 3. Arguments (Master -> Command) - -The Master sends the Client arguments from step 1 to the Command. - -#### 4. Terminal IO (Master -> Command) - -The Master forks a new Command process and sends it the Terminal IO from the Client. - -#### 5. Pid (Command -> Master) - -The Command process sends the Master its pid, using a Pid & Identifier message. - -#### 6. Pid (Master -> Client) - -The Master responds to the client with the pid of the newly-forked Command process. - -The Client is now connected to the Command process. - -#### 7. Exit status (Command -> Master) - -When the command terminates, it must send its exit code to the master. This is normally easiest to implement as a wrapper process that does the setsid, then forks the command and `waitpid`s on it. - -The form of this message is `{{code}}`, eg: `1`. - -#### 8. Exit status (Master -> Client) - -Finally, the Master forwards the exit status to the Client. The command cycle is now complete. - -The form of this message is `{{code}}`, eg: `1`. - -See [`message_format.md`](message_format.md) for more information on messages. - diff --git a/docs/coordinator_worker_handshake.md b/docs/coordinator_worker_handshake.md new file mode 100644 index 00000000..cc40593a --- /dev/null +++ b/docs/coordinator_worker_handshake.md @@ -0,0 +1,44 @@ +# Coordinator/Worker Handshake + +#### 1. Socket + +The Worker is always started with an environment variable named `ZEUS_COORDINATOR_FD`. The file descriptor at the given integer value is a socket to the Coordinator process. + +The Worker should open a UNIX Domain Socket using the `ZEUS_COORDINATOR_FD` File Descriptor (`globalCoordinatorSock`). + +The Worker opens a new UNIX datagram Socketpair (`local`, `remote`) + +The Worker sends `remote` across `globalCoordinatorSock`. + +#### 2. PID and Identifier + +The Worker determines whether it has been given an Identifier. If it is the first-booted worker, it was booted +by the Coordinator, and will not have one. When a Worker forks, it is passed an Identifier by the Coordinator that it +passes along to the newly-forked process. + +The Worker sends a "Pid & Identifier" message containing the pid and the identifier (blank if initial process) + +#### 4. Action Result + +The Worker now executes the code it's intended to run by looking up the action +in a collection of predefined actions indexed by identifier. In ruby this is implemented +as a module that responds to a method named according to each identifier. + +If there were no runtime errors in evaluating the action, the Worker writes "OK" to `local`. + +If there were runtime errors, the worker returns a string representing the errors in an arbitrary and +hopefully helpful format. It should normally be identical to the console output format should the errors +have been raised and printed to stderr. + +Before the server kills a crashed worker process, it attempts to read +any loaded files from `local`, until that socket is closed. + +#### 5. Loaded Files + +Any time after the action has been executed, the Worker may (and should) send, over `local`, a list of files +that have been newly-loaded in the course of evaluating the action. + +Languages are expected to implement this using clever tricks. + +Steps 1-4 happend sequentially and in-order, but Submitting files in Step 5 should not prevent the Worker from +handling further commands from the coordinator. The Worker should be considered 'connected' after Step 4. diff --git a/docs/master_slave_handshake.md b/docs/master_slave_handshake.md deleted file mode 100644 index c221964b..00000000 --- a/docs/master_slave_handshake.md +++ /dev/null @@ -1,44 +0,0 @@ -# Master/Slave Handshake - -#### 1. Socket - -The Slave is always started with an environment variable named `ZEUS_MASTER_FD`. The file descriptor at the given integer value is a socket to the Master process. - -The Slave should open a UNIX Domain Socket using the `ZEUS_MASTER_FD` File Descriptor (`globalMasterSock`). - -The Slave opens a new UNIX datagram Socketpair (`local`, `remote`) - -The Slave sends `remote` across `globalMasterSock`. - -#### 2. PID and Identifier - -The Slave determines whether it has been given an Identifier. If it is the first-booted slave, it was booted -by the Master, and will not have one. When a Slave forks, it is passed an Identifier by the Master that it -passes along to the newly-forked process. - -The Slave sends a "Pid & Identifier" message containing the pid and the identifier (blank if initial process) - -#### 4. Action Result - -The Slave now executes the code it's intended to run by looking up the action -in a collection of predefined actions indexed by identifier. In ruby this is implemented -as a module that responds to a method named according to each identifier. - -If there were no runtime errors in evaluating the action, the Slave writes "OK" to `local`. - -If there were runtime errors, the slave returns a string representing the errors in an arbitrary and -hopefully helpful format. It should normally be identical to the console output format should the errors -have been raised and printed to stderr. - -Before the server kills a crashed slave process, it attempts to read -any loaded files from `local`, until that socket is closed. - -#### 5. Loaded Files - -Any time after the action has been executed, the Slave may (and should) send, over `local`, a list of files -that have been newly-loaded in the course of evaluating the action. - -Languages are expected to implement this using clever tricks. - -Steps 1-4 happend sequentially and in-order, but Submitting files in Step 5 should not prevent the Slave from -handling further commands from the master. The Slave should be considered 'connected' after Step 4. diff --git a/docs/message_format.md b/docs/message_format.md index ff250ec5..35221117 100644 --- a/docs/message_format.md +++ b/docs/message_format.md @@ -1,39 +1,39 @@ # Message Format -There are a number of different types of messages passed between Master and Slave processes. +There are a number of different types of messages passed between Coordinator and Worker processes. -In the interest of simplifying Slave libraries, messages are sent as single packets over a UNIX datagram socket, with a single-letter prefix, followed by a colon, indicating the message type. +In the interest of simplifying Worker libraries, messages are sent as single packets over a UNIX datagram socket, with a single-letter prefix, followed by a colon, indicating the message type. the parenthesesized values after each title are the message code, and the handling module. -#### Pid & Identifier message (`P`, `SlaveMonitor`) +#### Pid & Identifier message (`P`, `WorkerMonitor`) -This is sent from Slave to Master immediately after booting, to identify itself. +This is sent from Worker to Coordinator immediately after booting, to identify itself. It is formed by joining the process's pid and identifier with a colon. Example: `P:1235:default_bundle` -#### Action response message (`R`, `SlaveMonitor`) +#### Action response message (`R`, `WorkerMonitor`) -This is sent from the Slave to the Master once the action has executed. +This is sent from the Worker to the Coordinator once the action has executed. -It can either be "OK", if the action was successful, or any other string, which should be a stderr-like +It can either be "OK", if the action was successful, or any other string, which should be a stderr-like representation of the error, including stack trace if applicable. Example: `R:OK` Example: `R:-e:1:in '<main>': unhandled exception` -#### Spawn Slave message (`S`, `SlaveMonitor`) +#### Spawn Worker message (`S`, `WorkerMonitor`) -This is sent from the Master to the Slave and contains the Identifier of a new Slave to fork immediately. +This is sent from the Coordinator to the Worker and contains the Identifier of a new Worker to fork immediately. Example: `S:test_environment` #### Spawn Command message (`C`, `ClientHandler`) -This is sent from the Master to the Slave and contains the Identifier of a new Command to fork immediately. +This is sent from the Coordinator to the Worker and contains the Identifier of a new Command to fork immediately. Example: `C:console` @@ -46,7 +46,7 @@ Example: `Q:testrb:-Itest -I. test/unit/module_test.rb` #### Feature message (`F`, `FileMonitor`) -This is sent from the Slave to the Master to indicate it now depends on a file at a given path. +This is sent from the Worker to the Coordinator to indicate it now depends on a file at a given path. The path is expected to be the full, expanded path. diff --git a/docs/overview.md b/docs/overview.md index 2fe858ae..58d6f0e3 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -2,17 +2,17 @@ Zeus is composed of three components: -1. [The Master Process](../go/zeusmaster). This is written in Go, and coordinates all the other processes. It connects Clients to Slaves and handles reloading when files change. +1. [The Coordinator Process](../go/zeuscoordinator). This is written in Go, and coordinates all the other processes. It connects Clients to Workers and handles reloading when files change. -2. [Clients](../go/zeusclient). The Client is also written in Go. It sends a command to the Master, and has its streams wired up to a Command process, to make it appear to be running locally. +2. [Clients](../go/zeusclient). The Client is also written in Go. It sends a command to the Coordinator, and has its streams wired up to a Command process, to make it appear to be running locally. -3. [Slaves/Commands](../rubygem). These are the target application. A small shim, written in the target language, manages the communication between the application and the Master process, and boots the application in phases. Though the Master and Client are completely language-agnostic, currently ruby is the only language for which a Slave shim exists. +3. [Workers/Commands](../rubygem). These are the target application. A small shim, written in the target language, manages the communication between the application and the Coordinator process, and boots the application in phases. Though the Coordinator and Client are completely language-agnostic, currently ruby is the only language for which a Worker shim exists. If you've read Tony Hoare's (or C.A.R. Hoare's) "Communicating Sequential Processes", [`csp.pdf`](http://www.usingcsp.com/cspbook.pdf) might be a bit helpful in addition to this document. I haven't studied the math enough for it to be fully correct, but it gets some of the point across. See: [`terminology.md`](terminology.md) -## Master Process +## Coordinator Process ### Logical Modules @@ -22,11 +22,11 @@ See: [`terminology.md`](terminology.md) 3. FileMonitor -4. SlaveMonitor +4. WorkerMonitor  -The Master process revolves around the [`ProcessTree`](../go/processtree/processtree.go) -- the core data structure that maintains most of the state of the application. Each module performs most of its communication with other modules through interactions with the Tree. +The Coordinator process revolves around the [`ProcessTree`](../go/processtree/processtree.go) -- the core data structure that maintains most of the state of the application. Each module performs most of its communication with other modules through interactions with the Tree. ### 1. Config @@ -37,28 +37,28 @@ This component reads the configuration file on initialization, and constructs th ### 2. ClientHandler -The `ClientHandler` listens on a socket for incoming requests from Client processes, and negotiates connections to running Slave processes. It is responsible for interactions with the client for its entire life-cycle. +The `ClientHandler` listens on a socket for incoming requests from Client processes, and negotiates connections to running Worker processes. It is responsible for interactions with the client for its entire life-cycle. * [`clienthandler.go`](../go/clienthandler/clienthandler.go) ### 3. FileMonitor -The `FileMonitor`'s job is to restart slaves when one of their dependencies has changed. Slaves are expected to report back with a list of files they have loaded. The `FileMonitor` listens for these messages and registers them with an external process that watches the filesystem for changes. When the external process reports a change, the `FileMonitor` restarts any slaves that have loaded that file. +The `FileMonitor`'s job is to restart workers when one of their dependencies has changed. Workers are expected to report back with a list of files they have loaded. The `FileMonitor` listens for these messages and registers them with an external process that watches the filesystem for changes. When the external process reports a change, the `FileMonitor` restarts any workers that have loaded that file. * [`filemonitor.go`](../go/filemonitor/filemonitor.go) * [`fsevents/main.m`](../ext/fsevents/main.m) -### 4. SlaveMonitor +### 4. WorkerMonitor -This component is responsible for communication with the target-language shim to manage booting and forking of application phase slaves. It constantly attempts to keep all slaves booted, restarting them when they are killed or die. +This component is responsible for communication with the target-language shim to manage booting and forking of application phase workers. It constantly attempts to keep all workers booted, restarting them when they are killed or die. -* [`slavemonitor.go`](../go/processtree/slavemonitor.go) -* [`slavenode.go`](../go/processtree/slavenode.go) -* [`master_slave_handshake.md`](master_slave_handshake.md) +* [`workermonitor.go`](../go/processtree/workermonitor.go) +* [`workernode.go`](../go/processtree/workernode.go) +* [`coordinator_worker_handshake.md`](coordinator_worker_handshake.md) ## Client Process -The client process is mostly about terminal configuration. It opens a PTY, sets it to raw mode (so that 'fancy' commands behave as if they were running locally), and passes the slave side of the PTY to the Master process. +The client process is mostly about terminal configuration. It opens a PTY, sets it to raw mode (so that 'fancy' commands behave as if they were running locally), and passes the worker side of the PTY to the Coordinator process. The client then sets up handlers to write STDIN to the PTY, and write the PTY's output to STDOUT. STDIN is scanned for certain escape codes (`^C`, `^\`, and `^Z`), which are sent as signals to the remote process to mimic the behaviour of a local process. @@ -67,11 +67,11 @@ A handler is set up for SIGWINCH, again to forward it to the remote process, and When the remote process exits, it reports its exit status, which the client process then exits with. * [`zeusclient.go`](../go/zeusclient/zeusclient.go) -* [`client_master_handshake.md`](client_master_handshake.md) +* [`client_coordinator_handshake.md`](client_coordinator_handshake.md) -## Slave/Command Processes +## Worker/Command Processes -The Slave processes boot the actual application, and run commands. See [`master_slave_handshake.md`](master_slave_handshake.md), and the ruby implementation in the `rubygem` directory. +The Worker processes boot the actual application, and run commands. See [`coordinator_worker_handshake.md`](coordinator_worker_handshake.md), and the ruby implementation in the `rubygem` directory. * [`zeus.rb`](../rubygem/lib/zeus.rb) * [`zeus/rails.rb`](../rubygem/lib/zeus/rails.rb) diff --git a/docs/terminology.md b/docs/terminology.md index ab0ca91d..18df35fe 100644 --- a/docs/terminology.md +++ b/docs/terminology.md @@ -2,8 +2,8 @@ * a Client is a process initiated by the user requesting zeus to run a command. -* the Master is the Go program which mediates all the interaction between the other processes +* the Coordinator is the Go program which mediates all the interaction between the other processes -* a Slave is a process managed by Zeus which is used to load dependencies for commands +* a Worker is a process managed by Zeus which is used to load dependencies for commands -* a Command process is one forked from a Slave and connected to a Client +* a Command process is one forked from a Worker and connected to a Client diff --git a/go/clienthandler/clienthandler.go b/go/clienthandler/clienthandler.go index abcd7e8c..57db0bb8 100644 --- a/go/clienthandler/clienthandler.go +++ b/go/clienthandler/clienthandler.go @@ -55,13 +55,13 @@ func Start(tree *processtree.ProcessTree, done chan bool) chan bool { return quit } -// see docs/client_master_handshake.md +// see docs/client_coordinator_handshake.md func handleClientConnection(tree *processtree.ProcessTree, usock *unixsocket.Usock) { defer usock.Close() // we have established first contact to the client. command, clientPid, argCount, argFD, err := receiveCommandArgumentsAndPid(usock, nil) - commandNode, slaveNode, err := findCommandAndSlaveNodes(tree, command, err) + commandNode, workerNode, err := findCommandAndWorkerNodes(tree, command, err) if err != nil { // connection was established, no data was sent. Ignore. return @@ -74,17 +74,17 @@ func handleClientConnection(tree *processtree.ProcessTree, usock *unixsocket.Uso stderrFile, err := receiveTTY(usock, err) defer stderrFile.Close() - if err == nil && slaveNode.Error != "" { - writeStacktrace(usock, slaveNode, clientFile) + if err == nil && workerNode.Error != "" { + writeStacktrace(usock, workerNode, clientFile) return } - commandUsock, err := bootNewCommand(slaveNode, command, err) + commandUsock, err := bootNewCommand(workerNode, command, err) if err != nil { // If a client connects while the command is just // booting up, it actually makes it here - still // expects a backtrace, of course. - writeStacktrace(usock, slaveNode, clientFile) + writeStacktrace(usock, workerNode, clientFile) return } defer commandUsock.Close() @@ -110,12 +110,12 @@ func handleClientConnection(tree *processtree.ProcessTree, usock *unixsocket.Uso // Done! Hooray! } -func writeStacktrace(usock *unixsocket.Usock, slaveNode *processtree.SlaveNode, clientFile *os.File) { +func writeStacktrace(usock *unixsocket.Usock, workerNode *processtree.WorkerNode, clientFile *os.File) { // Fake process ID / output / error codes: // Write a fake pid (step 6) usock.WriteMessage("0") // Write the error message to the terminal - clientFile.Write([]byte(slaveNode.Error)) + clientFile.Write([]byte(workerNode.Error)) // Write a non-positive exit code to the client usock.WriteMessage("1") } @@ -148,7 +148,7 @@ func receiveCommandArgumentsAndPid(usock *unixsocket.Usock, err error) (string, return command, clientPid, argCount, argFD, err } -func findCommandAndSlaveNodes(tree *processtree.ProcessTree, command string, err error) (*processtree.CommandNode, *processtree.SlaveNode, error) { +func findCommandAndWorkerNodes(tree *processtree.ProcessTree, command string, err error) (*processtree.CommandNode, *processtree.WorkerNode, error) { if err != nil { return nil, nil, err } @@ -158,9 +158,9 @@ func findCommandAndSlaveNodes(tree *processtree.ProcessTree, command string, err return nil, nil, errors.New("ERROR: Node not found!: " + command) } command = commandNode.Name - slaveNode := commandNode.Parent + workerNode := commandNode.Parent - return commandNode, slaveNode, nil + return commandNode, workerNode, nil } func receiveTTY(usock *unixsocket.Usock, err error) (*os.File, error) { @@ -226,13 +226,13 @@ func sendCommandPidToClient(usock *unixsocket.Usock, pid int, err error) error { return err } -func bootNewCommand(slaveNode *processtree.SlaveNode, command string, err error) (*unixsocket.Usock, error) { +func bootNewCommand(workerNode *processtree.WorkerNode, command string, err error) (*unixsocket.Usock, error) { if err != nil { return nil, err } request := &processtree.CommandRequest{Name: command, Retchan: make(chan *processtree.CommandReply)} - slaveNode.RequestCommandBoot(request) + workerNode.RequestCommandBoot(request) reply := <-request.Retchan // TODO: don't really want to wait indefinitely. // defer commandFile.Close() // TODO: can't do this here anymore. diff --git a/go/cmd/zeus/zeus.go b/go/cmd/zeus/zeus.go index 7458586e..8db7db76 100644 --- a/go/cmd/zeus/zeus.go +++ b/go/cmd/zeus/zeus.go @@ -14,7 +14,7 @@ import ( "github.com/burke/zeus/go/filemonitor" slog "github.com/burke/zeus/go/shinylog" "github.com/burke/zeus/go/zeusclient" - "github.com/burke/zeus/go/zeusmaster" + "github.com/burke/zeus/go/zeuscoordinator" "github.com/burke/zeus/go/zeusversion" ) @@ -79,7 +79,7 @@ func main() { } else if args[0] == "version" { printVersion() } else if args[0] == "start" { - os.Exit(zeusmaster.Run(configFile, fileChangeDelay, simpleStatus)) + os.Exit(zeuscoordinator.Run(configFile, fileChangeDelay, simpleStatus)) } else if args[0] == "init" { zeusInit() } else if args[0] == "commands" { @@ -88,7 +88,7 @@ func main() { tree := config.BuildProcessTree(configFile, nil) for _, name := range tree.AllCommandsAndAliases() { if args[0] == name { - // Don't confuse the master by sending *full* args to + // Don't confuse the coordinator by sending *full* args to // it; just those that are not zeus-specific. os.Exit(zeusclient.Run(args, os.Stdin, os.Stdout, os.Stderr)) } diff --git a/go/config/config.go b/go/config/config.go index 73413063..b6286531 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -21,7 +21,7 @@ type config struct { func BuildProcessTree(configFile string, monitor filemonitor.FileMonitor) *processtree.ProcessTree { conf := parseConfig(configFile) tree := &processtree.ProcessTree{} - tree.SlavesByName = make(map[string]*processtree.SlaveNode) + tree.WorkersByName = make(map[string]*processtree.WorkerNode) tree.StateChanged = make(chan bool, 16) tree.ExecCommand = conf.Command @@ -39,15 +39,15 @@ func iteratePlan( tree *processtree.ProcessTree, plan map[string]interface{}, monitor filemonitor.FileMonitor, - parent *processtree.SlaveNode, + parent *processtree.WorkerNode, ) { for name, v := range plan { if subPlan, ok := v.(map[string]interface{}); ok { - newNode := tree.NewSlaveNode(name, parent, monitor) + newNode := tree.NewWorkerNode(name, parent, monitor) if parent == nil { tree.Root = newNode } else { - parent.Slaves = append(parent.Slaves, newNode) + parent.Workers = append(parent.Workers, newNode) } iteratePlan(tree, subPlan, monitor, newNode) } else { diff --git a/go/messages/messages.go b/go/messages/messages.go index eab43480..151d60ef 100644 --- a/go/messages/messages.go +++ b/go/messages/messages.go @@ -46,7 +46,7 @@ func ParseActionResponseMessage(msg string) (string, error) { return parts[1], nil } -func CreateSpawnSlaveMessage(identifier string) string { +func CreateSpawnWorkerMessage(identifier string) string { return "S:" + identifier } diff --git a/go/processtree/processtree.go b/go/processtree/processtree.go index e2ecd952..d314b79d 100644 --- a/go/processtree/processtree.go +++ b/go/processtree/processtree.go @@ -5,16 +5,16 @@ import ( ) type ProcessTree struct { - Root *SlaveNode - ExecCommand string - SlavesByName map[string]*SlaveNode - Commands []*CommandNode - StateChanged chan bool + Root *WorkerNode + ExecCommand string + WorkersByName map[string]*WorkerNode + Commands []*CommandNode + StateChanged chan bool } type ProcessTreeNode struct { mu sync.RWMutex - Parent *SlaveNode + Parent *WorkerNode Name string } @@ -24,7 +24,7 @@ type CommandNode struct { Aliases []string } -func (tree *ProcessTree) NewCommandNode(name string, aliases []string, parent *SlaveNode) *CommandNode { +func (tree *ProcessTree) NewCommandNode(name string, aliases []string, parent *WorkerNode) *CommandNode { x := &CommandNode{} x.Parent = parent x.Name = name @@ -33,11 +33,11 @@ func (tree *ProcessTree) NewCommandNode(name string, aliases []string, parent *S return x } -func (tree *ProcessTree) FindSlaveByName(name string) *SlaveNode { +func (tree *ProcessTree) FindWorkerByName(name string) *WorkerNode { if name == "" { return tree.Root } - return tree.SlavesByName[name] + return tree.WorkersByName[name] } func (tree *ProcessTree) FindCommand(requested string) *CommandNode { @@ -75,7 +75,7 @@ func (tree *ProcessTree) RestartNodesWithFeatures(files []string) { } // Serialized: restartMutex is always held when this is called. -func (node *SlaveNode) restartNodesWithFeatures(tree *ProcessTree, files []string) { +func (node *WorkerNode) restartNodesWithFeatures(tree *ProcessTree, files []string) { for _, file := range files { if node.HasFeature(file) { node.trace("restarting for %q", file) @@ -83,7 +83,7 @@ func (node *SlaveNode) restartNodesWithFeatures(tree *ProcessTree, files []strin return } } - for _, s := range node.Slaves { + for _, s := range node.Workers { s.restartNodesWithFeatures(tree, files) } } diff --git a/go/processtree/slavemonitor.go b/go/processtree/slavemonitor.go deleted file mode 100644 index e561ef96..00000000 --- a/go/processtree/slavemonitor.go +++ /dev/null @@ -1,108 +0,0 @@ -package processtree - -import ( - "math/rand" - "os" - "strconv" - "syscall" - - "github.com/burke/zeus/go/messages" - slog "github.com/burke/zeus/go/shinylog" - "github.com/burke/zeus/go/unixsocket" -) - -type SlaveMonitor struct { - tree *ProcessTree - remoteMasterFile *os.File -} - -func Error(err string) { - // TODO - println(err) -} - -func StartSlaveMonitor(tree *ProcessTree, fileChanges <-chan []string, done chan bool) chan bool { - quit := make(chan bool) - go func() { - localMasterFile, remoteMasterFile, err := unixsocket.Socketpair(syscall.SOCK_DGRAM) - if err != nil { - Error("Couldn't create socketpair") - } - - monitor := &SlaveMonitor{tree, remoteMasterFile} - defer monitor.cleanupChildren() - - localMasterSocket, err := unixsocket.NewFromFile(localMasterFile) - if err != nil { - Error("Couldn't Open UNIXSocket") - } - - // We just want this unix socket to be a channel so we can select on it... - registeringFds := make(chan int, 3) - go func() { - for { - if fd, err := localMasterSocket.ReadFD(); err != nil { - slog.Error(err) - } else { - registeringFds <- fd - } - } - }() - - for _, slave := range monitor.tree.SlavesByName { - go slave.Run(monitor) - } - - for { - select { - case <-quit: - done <- true - return - case fd := <-registeringFds: - go monitor.slaveDidBeginRegistration(fd) - case files := <-fileChanges: - if len(files) > 0 { - tree.RestartNodesWithFeatures(files) - } - } - } - }() - return quit -} - -func (mon *SlaveMonitor) cleanupChildren() { - for _, slave := range mon.tree.SlavesByName { - slave.ForceKill() - } -} - -func (mon *SlaveMonitor) slaveDidBeginRegistration(fd int) { - // Having just started the process, we expect an IO, which we convert to a UNIX domain socket - fileName := strconv.Itoa(rand.Int()) - slaveFile := os.NewFile(uintptr(fd), fileName) - slaveUsock, err := unixsocket.NewFromFile(slaveFile) - if err != nil { - slog.Error(err) - } - - // We now expect the slave to use this fd they send us to send a Pid&Identifier Message - msg, err := slaveUsock.ReadMessage() - if err != nil { - slog.Error(err) - } - pid, parentPid, identifier, err := messages.ParsePidMessage(msg) - - // And the last step before executing its action, the slave sends us a pipe it will later use to - // send us all the features it's loaded. - featurePipeFd, err := slaveUsock.ReadFD() - if err != nil { - slog.Error(err) - } - - slaveNode := mon.tree.FindSlaveByName(identifier) - if slaveNode == nil { - Error("slavemonitor.go:slaveDidBeginRegistration:Unknown identifier:" + identifier) - } - - slaveNode.SlaveWasInitialized(pid, parentPid, slaveUsock, featurePipeFd) -} diff --git a/go/processtree/workermonitor.go b/go/processtree/workermonitor.go new file mode 100644 index 00000000..83f80a1d --- /dev/null +++ b/go/processtree/workermonitor.go @@ -0,0 +1,108 @@ +package processtree + +import ( + "math/rand" + "os" + "strconv" + "syscall" + + "github.com/burke/zeus/go/messages" + slog "github.com/burke/zeus/go/shinylog" + "github.com/burke/zeus/go/unixsocket" +) + +type WorkerMonitor struct { + tree *ProcessTree + remoteCoordinatorFile *os.File +} + +func Error(err string) { + // TODO + println(err) +} + +func StartWorkerMonitor(tree *ProcessTree, fileChanges <-chan []string, done chan bool) chan bool { + quit := make(chan bool) + go func() { + localCoordinatorFile, remoteCoordinatorFile, err := unixsocket.Socketpair(syscall.SOCK_DGRAM) + if err != nil { + Error("Couldn't create socketpair") + } + + monitor := &WorkerMonitor{tree, remoteCoordinatorFile} + defer monitor.cleanupChildren() + + localCoordinatorSocket, err := unixsocket.NewFromFile(localCoordinatorFile) + if err != nil { + Error("Couldn't Open UNIXSocket") + } + + // We just want this unix socket to be a channel so we can select on it... + registeringFds := make(chan int, 3) + go func() { + for { + if fd, err := localCoordinatorSocket.ReadFD(); err != nil { + slog.Error(err) + } else { + registeringFds <- fd + } + } + }() + + for _, worker := range monitor.tree.WorkersByName { + go worker.Run(monitor) + } + + for { + select { + case <-quit: + done <- true + return + case fd := <-registeringFds: + go monitor.workerDidBeginRegistration(fd) + case files := <-fileChanges: + if len(files) > 0 { + tree.RestartNodesWithFeatures(files) + } + } + } + }() + return quit +} + +func (mon *WorkerMonitor) cleanupChildren() { + for _, worker := range mon.tree.WorkersByName { + worker.ForceKill() + } +} + +func (mon *WorkerMonitor) workerDidBeginRegistration(fd int) { + // Having just started the process, we expect an IO, which we convert to a UNIX domain socket + fileName := strconv.Itoa(rand.Int()) + workerFile := os.NewFile(uintptr(fd), fileName) + workerUsock, err := unixsocket.NewFromFile(workerFile) + if err != nil { + slog.Error(err) + } + + // We now expect the worker to use this fd they send us to send a Pid&Identifier Message + msg, err := workerUsock.ReadMessage() + if err != nil { + slog.Error(err) + } + pid, parentPid, identifier, err := messages.ParsePidMessage(msg) + + // And the last step before executing its action, the worker sends us a pipe it will later use to + // send us all the features it's loaded. + featurePipeFd, err := workerUsock.ReadFD() + if err != nil { + slog.Error(err) + } + + workerNode := mon.tree.FindWorkerByName(identifier) + if workerNode == nil { + Error("workermonitor.go:workerDidBeginRegistration:Unknown identifier:" + identifier) + } + + workerNode.WorkerWasInitialized(pid, parentPid, workerUsock, featurePipeFd) +} diff --git a/go/processtree/slavenode.go b/go/processtree/workernode.go similarity index 75% rename from go/processtree/slavenode.go rename to go/processtree/workernode.go index 98a239c1..28b59534 100644 --- a/go/processtree/slavenode.go +++ b/go/processtree/workernode.go @@ -24,12 +24,12 @@ const ( forceKillTimeout = time.Second ) -type SlaveNode struct { +type WorkerNode struct { ProcessTreeNode socket *unixsocket.Usock pid int Error string - Slaves []*SlaveNode + Workers []*WorkerNode Commands []*CommandNode fileMonitor filemonitor.FileMonitor @@ -37,7 +37,7 @@ type SlaveNode struct { needsRestart chan bool commandBootRequests chan *CommandRequest - slaveBootRequests chan *SlaveNode + workerBootRequests chan *WorkerNode L sync.Mutex features map[string]bool @@ -71,25 +71,25 @@ var humanreadableStates = map[string]string{ SCrashed: "crashed", } -func (tree *ProcessTree) NewSlaveNode(identifier string, parent *SlaveNode, monitor filemonitor.FileMonitor) *SlaveNode { - s := SlaveNode{} +func (tree *ProcessTree) NewWorkerNode(identifier string, parent *WorkerNode, monitor filemonitor.FileMonitor) *WorkerNode { + s := WorkerNode{} s.needsRestart = make(chan bool, 1) - s.slaveBootRequests = make(chan *SlaveNode, 256) + s.workerBootRequests = make(chan *WorkerNode, 256) s.commandBootRequests = make(chan *CommandRequest, 256) s.features = make(map[string]bool) s.event = make(chan bool) s.Name = identifier s.Parent = parent s.fileMonitor = monitor - tree.SlavesByName[identifier] = &s + tree.WorkersByName[identifier] = &s return &s } -func (s *SlaveNode) RequestRestart() { +func (s *WorkerNode) RequestRestart() { s.L.Lock() defer s.L.Unlock() - // If this slave is currently waiting on a process to boot, + // If this worker is currently waiting on a process to boot, // unhang it and force it to transition to the crashed state // where it will wait for restart messages. if s.ReportBootEvent() { @@ -103,15 +103,15 @@ func (s *SlaveNode) RequestRestart() { } } -func (s *SlaveNode) RequestSlaveBoot(slave *SlaveNode) { - s.slaveBootRequests <- slave +func (s *WorkerNode) RequestWorkerBoot(worker *WorkerNode) { + s.workerBootRequests <- worker } -func (s *SlaveNode) RequestCommandBoot(request *CommandRequest) { +func (s *WorkerNode) RequestCommandBoot(request *CommandRequest) { s.commandBootRequests <- request } -func (s *SlaveNode) ReportBootEvent() bool { +func (s *WorkerNode) ReportBootEvent() bool { select { case s.event <- true: return true @@ -120,24 +120,24 @@ func (s *SlaveNode) ReportBootEvent() bool { } } -func (s *SlaveNode) SlaveWasInitialized(pid, parentPid int, usock *unixsocket.Usock, featurePipeFd int) { +func (s *WorkerNode) WorkerWasInitialized(pid, parentPid int, usock *unixsocket.Usock, featurePipeFd int) { file := os.NewFile(uintptr(featurePipeFd), "featurepipe") s.L.Lock() if !s.ReportBootEvent() { s.forceKillPid(pid) - s.trace("Unexpected process %d with parent %d for slave %q was killed", pid, parentPid, s.Name) + s.trace("Unexpected process %d with parent %d for worker %q was killed", pid, parentPid, s.Name) } else { s.wipe() s.pid = pid s.socket = usock go s.handleMessages(file) - s.trace("initialized slave %s with pid %d from parent %d", s.Name, pid, parentPid) + s.trace("initialized worker %s with pid %d from parent %d", s.Name, pid, parentPid) } s.L.Unlock() } -func (s *SlaveNode) Run(monitor *SlaveMonitor) { +func (s *WorkerNode) Run(monitor *WorkerMonitor) { nextState := SUnbooted for { s.L.Lock() @@ -163,45 +163,45 @@ func (s *SlaveNode) Run(monitor *SlaveMonitor) { } } -func (s *SlaveNode) State() string { +func (s *WorkerNode) State() string { s.L.Lock() defer s.L.Unlock() return s.state } -func (s *SlaveNode) HumanReadableState() string { +func (s *WorkerNode) HumanReadableState() string { return humanreadableStates[s.state] } -func (s *SlaveNode) HasFeature(file string) bool { +func (s *WorkerNode) HasFeature(file string) bool { s.featureL.Lock() defer s.featureL.Unlock() return s.features[file] } -// These "doXState" functions are called when a SlaveNode enters a state. They are expected +// These "doXState" functions are called when a WorkerNode enters a state. They are expected // to continue to execute until // "SUnbooted" represents the state where we do not yet have the PID // of a process to use for *this* node. In this state, we wait for the // parent process to spawn a process for us and hear back from the -// SlaveMonitor. -func (s *SlaveNode) doUnbootedState(monitor *SlaveMonitor) string { // -> {SBooting, SCrashed} +// WorkerMonitor. +func (s *WorkerNode) doUnbootedState(monitor *WorkerMonitor) string { // -> {SBooting, SCrashed} if s.Parent == nil { s.L.Lock() parts := strings.Split(monitor.tree.ExecCommand, " ") cmd := exec.Command(parts[0], parts[1:]...) - file := monitor.remoteMasterFile - cmd.Env = append(os.Environ(), fmt.Sprintf("ZEUS_MASTER_FD=%d", file.Fd())) + file := monitor.remoteCoordinatorFile + cmd.Env = append(os.Environ(), fmt.Sprintf("ZEUS_COORDINATOR_FD=%d", file.Fd())) cmd.ExtraFiles = []*os.File{file} go s.babysitRootProcess(cmd) s.L.Unlock() } else { - s.Parent.RequestSlaveBoot(s) + s.Parent.RequestWorkerBoot(s) } - <-s.event // sent by SlaveWasInitialized + <-s.event // sent by WorkerWasInitialized s.L.Lock() defer s.L.Unlock() @@ -213,10 +213,10 @@ func (s *SlaveNode) doUnbootedState(monitor *SlaveMonitor) string { // -> {SBoot // In "SBooting", we have a pid and socket to the process we will use, // but it has not yet finished initializing (generally, running the code -// specific to this slave). When we receive a message about the success or +// specific to this worker). When we receive a message about the success or // failure of this operation, we transition to either crashed or ready. -func (s *SlaveNode) doBootingState() string { // -> {SCrashed, SReady} - // The slave will execute its action and respond with a status... +func (s *WorkerNode) doBootingState() string { // -> {SCrashed, SReady} + // The worker will execute its action and respond with a status... // Note we don't hold the mutex while waiting for the action to execute. msg, err := s.socket.ReadMessage() if err != nil { @@ -250,14 +250,14 @@ func (s *SlaveNode) doBootingState() string { // -> {SCrashed, SReady} } // In the "SReady" state, we have a functioning process we can spawn -// new processes of of. We respond to requests to boot slaves and +// new processes of of. We respond to requests to boot workers and // run commands until we receive a request to restart. This kills // the process and transitions to SUnbooted. -func (s *SlaveNode) doReadyState() string { // -> SUnbooted +func (s *WorkerNode) doReadyState() string { // -> SUnbooted s.hasSuccessfullyBooted = true // If we have a queued restart, service that rather than booting - // slaves or commands on potentially stale code. + // workers or commands on potentially stale code. select { case <-s.needsRestart: s.doRestart() @@ -270,8 +270,8 @@ func (s *SlaveNode) doReadyState() string { // -> SUnbooted case <-s.needsRestart: s.doRestart() return SUnbooted - case slave := <-s.slaveBootRequests: - s.bootSlave(slave) + case worker := <-s.workerBootRequests: + s.bootWorker(worker) case request := <-s.commandBootRequests: s.bootCommand(request) } @@ -279,11 +279,11 @@ func (s *SlaveNode) doReadyState() string { // -> SUnbooted } // In the "SCrashed" state, we have an error message from starting -// a process to propogate to the user and all slave nodes. We will +// a process to propogate to the user and all worker nodes. We will // continue propogating the error until we receive a request to restart. -func (s *SlaveNode) doCrashedState() string { // -> SUnbooted +func (s *WorkerNode) doCrashedState() string { // -> SUnbooted // If we have a queued restart, service that rather than booting - // slaves or commands on potentially stale code. + // workers or commands on potentially stale code. select { case <-s.needsRestart: s.doRestart() @@ -296,11 +296,11 @@ func (s *SlaveNode) doCrashedState() string { // -> SUnbooted case <-s.needsRestart: s.doRestart() return SUnbooted - case slave := <-s.slaveBootRequests: - slave.L.Lock() - slave.Error = s.Error - slave.ReportBootEvent() - slave.L.Unlock() + case worker := <-s.workerBootRequests: + worker.L.Lock() + worker.Error = s.Error + worker.ReportBootEvent() + worker.L.Unlock() case request := <-s.commandBootRequests: s.L.Lock() s.trace("reporting crash to command %v", request) @@ -310,35 +310,35 @@ func (s *SlaveNode) doCrashedState() string { // -> SUnbooted } } -func (s *SlaveNode) doRestart() { +func (s *WorkerNode) doRestart() { s.L.Lock() s.ForceKill() s.wipe() s.L.Unlock() - // Drain and ignore any enqueued slave boot requests since + // Drain and ignore any enqueued worker boot requests since // we're going to make them all restart again anyway. drained := false for !drained { select { - case <-s.slaveBootRequests: + case <-s.workerBootRequests: default: drained = true } } - for _, slave := range s.Slaves { - slave.RequestRestart() + for _, worker := range s.Workers { + worker.RequestRestart() } } -func (s *SlaveNode) bootSlave(slave *SlaveNode) { +func (s *WorkerNode) bootWorker(worker *WorkerNode) { s.L.Lock() defer s.L.Unlock() - s.trace("now sending slave boot request for %s", slave.Name) + s.trace("now sending worker boot request for %s", worker.Name) - msg := messages.CreateSpawnSlaveMessage(slave.Name) + msg := messages.CreateSpawnWorkerMessage(worker.Name) _, err := s.socket.WriteMessage(msg) if err != nil { slog.Error(err) @@ -346,9 +346,9 @@ func (s *SlaveNode) bootSlave(slave *SlaveNode) { } // This unfortunately holds the mutex for a little while, and if the -// command dies super early, the entire slave pretty well deadlocks. +// command dies super early, the entire worker pretty well deadlocks. // TODO: review this. -func (s *SlaveNode) bootCommand(request *CommandRequest) { +func (s *WorkerNode) bootCommand(request *CommandRequest) { s.L.Lock() defer s.L.Unlock() @@ -372,18 +372,18 @@ func (s *SlaveNode) bootCommand(request *CommandRequest) { request.Retchan <- &CommandReply{s.state, commandFile} } -func (s *SlaveNode) ForceKill() { +func (s *WorkerNode) ForceKill() { // note that we don't try to lock the mutex. s.forceKillPid(s.pid) } -func (s *SlaveNode) wipe() { +func (s *WorkerNode) wipe() { s.pid = 0 s.socket = nil s.Error = "" } -func (s *SlaveNode) babysitRootProcess(cmd *exec.Cmd) { +func (s *WorkerNode) babysitRootProcess(cmd *exec.Cmd) { // We want to let this process run "forever", but it will eventually // die... either on program termination or when its dependencies change // and we kill it. when it's requested to restart, err is "signal 9", @@ -419,7 +419,7 @@ func (s *SlaveNode) babysitRootProcess(cmd *exec.Cmd) { // We want to make this the single interface point with the socket. // we want to republish unneeded messages to channels so other modules //can pick them up. (notably, clienthandler.) -func (s *SlaveNode) handleMessages(featurePipe *os.File) { +func (s *WorkerNode) handleMessages(featurePipe *os.File) { reader := bufio.NewReader(featurePipe) for { if msg, err := reader.ReadString('\n'); err != nil { @@ -434,7 +434,7 @@ func (s *SlaveNode) handleMessages(featurePipe *os.File) { } } -func (s *SlaveNode) forceKillPid(pid int) error { +func (s *WorkerNode) forceKillPid(pid int) error { if pid <= 0 { return nil } @@ -475,7 +475,7 @@ func (s *SlaveNode) forceKillPid(pid int) error { } } -func (s *SlaveNode) trace(format string, args ...interface{}) { +func (s *WorkerNode) trace(format string, args ...interface{}) { if !slog.TraceEnabled() { return } diff --git a/go/shinylog/shinylog.go b/go/shinylog/shinylog.go index 98c22a48..b9f05094 100644 --- a/go/shinylog/shinylog.go +++ b/go/shinylog/shinylog.go @@ -131,7 +131,7 @@ func (l *ShinyLogger) ColorizedSansNl(msg string) (printed bool) { } // If we send SIGTERM rather than explicitly exiting, -// the signal can be handled and the master can clean up. +// the signal can be handled and the coordinator can clean up. // This is a workaround for Go not having `atexit` :(. func terminate() { proc, _ := os.FindProcess(os.Getpid()) diff --git a/go/statuschart/statuschart.go b/go/statuschart/statuschart.go index 1ce990a9..1000ab93 100644 --- a/go/statuschart/statuschart.go +++ b/go/statuschart/statuschart.go @@ -14,13 +14,13 @@ import ( const updateDebounceInterval = 1 * time.Millisecond type StatusChart struct { - RootSlave *processtree.SlaveNode - update chan bool + RootWorker *processtree.WorkerNode + update chan bool - numberOfSlaves int - Commands []*processtree.CommandNode - L sync.Mutex - drawnInitial bool + numberOfWorkers int + Commands []*processtree.CommandNode + L sync.Mutex + drawnInitial bool directLogger *slog.ShinyLogger @@ -36,8 +36,8 @@ func Start(tree *processtree.ProcessTree, done chan bool, simple bool) chan bool quit := make(chan bool) theChart = &StatusChart{} - theChart.RootSlave = tree.Root - theChart.numberOfSlaves = len(tree.SlavesByName) + theChart.RootWorker = tree.Root + theChart.numberOfWorkers = len(tree.WorkersByName) theChart.Commands = tree.Commands theChart.update = make(chan bool) theChart.directLogger = slog.NewShinyLogger(os.Stdout, os.Stderr) @@ -68,11 +68,11 @@ func startLineOutput(tree *processtree.ProcessTree, done, quit chan bool) { done <- true return case <-theChart.update: - for name, slave := range tree.SlavesByName { + for name, worker := range tree.WorkersByName { state, found := states[name] - if !found || (state != slave.State()) { - fmt.Println("environment: " + name + " status: " + slave.HumanReadableState()) - states[name] = slave.State() + if !found || (state != worker.State()) { + fmt.Println("environment: " + name + " status: " + worker.HumanReadableState()) + states[name] = worker.State() } } } diff --git a/go/statuschart/stdout.go b/go/statuschart/stdout.go index be439200..f479474f 100644 --- a/go/statuschart/stdout.go +++ b/go/statuschart/stdout.go @@ -27,7 +27,7 @@ func (s *StatusChart) logChanges() { log := theChart.directLogger log.ColorizedSansNl("{reset}Status: ") - s.logSubtree(s.RootSlave) + s.logSubtree(s.RootWorker) log.Colorized("{reset}") s.logCommands() } @@ -75,20 +75,20 @@ func (s *StatusChart) logCommands() { } } -func (s *StatusChart) logSubtree(node *processtree.SlaveNode) { +func (s *StatusChart) logSubtree(node *processtree.WorkerNode) { log := theChart.directLogger printStateInfo("", node.Name, node.State(), true, false) - if len(node.Slaves) > 0 { + if len(node.Workers) > 0 { log.ColorizedSansNl("{reset}(") } - for i, slave := range node.Slaves { + for i, worker := range node.Workers { if i != 0 { log.ColorizedSansNl("{reset}, ") } - s.logSubtree(slave) + s.logSubtree(worker) } - if len(node.Slaves) > 0 { + if len(node.Workers) > 0 { log.ColorizedSansNl("{reset})") } } diff --git a/go/statuschart/tty.go b/go/statuschart/tty.go index bf1dbc78..0d4a18e9 100644 --- a/go/statuschart/tty.go +++ b/go/statuschart/tty.go @@ -57,7 +57,7 @@ func (s *StatusChart) draw() { if s.drawnInitial { lengthOfOutput := s.lengthOfOutput() - numberOfOutputLines := s.numberOfSlaves + len(s.Commands) + lengthOfOutput + 3 + numberOfOutputLines := s.numberOfWorkers + len(s.Commands) + lengthOfOutput + 3 fmt.Printf("\033[%dA", numberOfOutputLines) } else { s.drawnInitial = true @@ -66,7 +66,7 @@ func (s *StatusChart) draw() { log := theChart.directLogger log.Colorized("\x1b[4m{green}[ready] {red}[crashed] {blue}[running] {magenta}[connecting] {yellow}[waiting]\033[K") - s.drawSubtree(s.RootSlave, "", "") + s.drawSubtree(s.RootWorker, "", "") log.Colorized("\033[K\n\x1b[4mAvailable Commands: {yellow}[waiting] {red}[crashed] {green}[ready]\033[K") s.drawCommands() @@ -127,14 +127,14 @@ func (s *StatusChart) drawCommands() { } } -func (s *StatusChart) drawSubtree(node *processtree.SlaveNode, myIndentation, childIndentation string) { +func (s *StatusChart) drawSubtree(node *processtree.WorkerNode, myIndentation, childIndentation string) { printStateInfo(myIndentation, node.Name, node.State(), false, true) - for i, slave := range node.Slaves { - if i == len(node.Slaves)-1 { - s.drawSubtree(slave, childIndentation+lineL, childIndentation+lineX) + for i, worker := range node.Workers { + if i == len(node.Workers)-1 { + s.drawSubtree(worker, childIndentation+lineL, childIndentation+lineX) } else { - s.drawSubtree(slave, childIndentation+lineT, childIndentation+lineI) + s.drawSubtree(worker, childIndentation+lineT, childIndentation+lineI) } } } diff --git a/go/zerror/zerror.go b/go/zerror/zerror.go index a2f52a40..09dda5fe 100644 --- a/go/zerror/zerror.go +++ b/go/zerror/zerror.go @@ -33,8 +33,8 @@ func Error(msg string) { }) } -func ErrorCantConnectToMaster() { - slog.StdErrorString("Can't connect to master. Run {yellow}zeus start{red} first.\r") +func ErrorCantConnectToCoordinator() { + slog.StdErrorString("Can't connect to coordinator. Run {yellow}zeus start{red} first.\r") } func ErrorConfigCommandCouldntStart(msg, output string) { diff --git a/go/zeusclient/zeusclient.go b/go/zeusclient/zeusclient.go index 1b73d578..336b4305 100644 --- a/go/zeusclient/zeusclient.go +++ b/go/zeusclient/zeusclient.go @@ -61,7 +61,7 @@ func Run(args []string, input io.Reader, output *os.File, stderr *os.File) int { conn, err := net.DialUnix("unix", nil, addr) if err != nil { - zerror.ErrorCantConnectToMaster() + zerror.ErrorCantConnectToCoordinator() return 1 } usock := unixsocket.New(conn) @@ -192,24 +192,24 @@ func Run(args []string, input io.Reader, output *os.File, stderr *os.File) int { } func sendCommandLineArguments(usock *unixsocket.Usock, args []string) error { - master, slave, err := unixsocket.Socketpair(syscall.SOCK_STREAM) + coordinator, worker, err := unixsocket.Socketpair(syscall.SOCK_STREAM) if err != nil { return err } - usock.WriteFD(int(slave.Fd())) + usock.WriteFD(int(worker.Fd())) if err != nil { return err } - slave.Close() + worker.Close() go func() { - defer master.Close() + defer coordinator.Close() argAsBytes := []byte{} for _, arg := range args[1:] { argAsBytes = append(argAsBytes, []byte(arg)...) argAsBytes = append(argAsBytes, byte(0)) } - _, err = master.Write(argAsBytes) + _, err = coordinator.Write(argAsBytes) if err != nil { slog.ErrorString("Could not send arguments across: " + err.Error() + "\r") diff --git a/go/zeusmaster/zeusmaster.go b/go/zeuscoordinator/zeuscoordinator.go similarity index 91% rename from go/zeusmaster/zeusmaster.go rename to go/zeuscoordinator/zeuscoordinator.go index 849bc4f2..1c033452 100644 --- a/go/zeusmaster/zeusmaster.go +++ b/go/zeuscoordinator/zeuscoordinator.go @@ -1,4 +1,4 @@ -package zeusmaster +package zeuscoordinator import ( "fmt" @@ -22,7 +22,7 @@ import ( const listenerPortVar = "ZEUS_NETWORK_FILE_MONITOR_PORT" // man signal | grep 'terminate process' | awk '{print $2}' | xargs -I '{}' echo -n "syscall.{}, " -// Leaving out SIGPIPE as that is a signal the master receives if a client process is killed. +// Leaving out SIGPIPE as that is a signal the coordinator receives if a client process is killed. var terminatingSignals = []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGKILL, syscall.SIGALRM, syscall.SIGTERM, syscall.SIGXCPU, syscall.SIGXFSZ, syscall.SIGVTALRM, syscall.SIGPROF, syscall.SIGUSR1, syscall.SIGUSR2} func Run(configFile string, fileChangeDelay time.Duration, simpleStatus bool) int { @@ -39,7 +39,7 @@ func Run(configFile string, fileChangeDelay time.Duration, simpleStatus bool) in done := make(chan bool) - defer exit(processtree.StartSlaveMonitor(tree, monitor.Listen(), done), done) + defer exit(processtree.StartWorkerMonitor(tree, monitor.Listen(), done), done) defer exit(clienthandler.Start(tree, done), done) defer monitor.Close() defer slog.Suppress() diff --git a/go/zeusmaster/zeusmaster_test.go b/go/zeuscoordinator/zeuscoordinator_test.go similarity index 96% rename from go/zeusmaster/zeusmaster_test.go rename to go/zeuscoordinator/zeuscoordinator_test.go index cf11344d..bcb3254a 100644 --- a/go/zeusmaster/zeusmaster_test.go +++ b/go/zeuscoordinator/zeuscoordinator_test.go @@ -1,4 +1,4 @@ -package zeusmaster_test +package zeuscoordinator_test import ( "bytes" @@ -16,7 +16,7 @@ import ( slog "github.com/burke/zeus/go/shinylog" "github.com/burke/zeus/go/unixsocket" "github.com/burke/zeus/go/zeusclient" - "github.com/burke/zeus/go/zeusmaster" + "github.com/burke/zeus/go/zeuscoordinator" ) var testFiles = map[string]string{ @@ -197,7 +197,7 @@ func TestZeusBoots(t *testing.T) { enableTracing() zexit := make(chan int) go func() { - zexit <- zeusmaster.Run(filepath.Join(dir, "zeus.json"), filemonitor.DefaultFileChangeDelay, false) + zexit <- zeuscoordinator.Run(filepath.Join(dir, "zeus.json"), filemonitor.DefaultFileChangeDelay, false) }() expects := map[string]string{ @@ -277,7 +277,7 @@ func TestZeusBoots(t *testing.T) { t.Errorf("expected stderr %q, got %q", want, have) } - // The zeusmaster catches the interrupt and exits gracefully + // The zeuscoordinator catches the interrupt and exits gracefully me.Signal(os.Interrupt) if code := <-zexit; code != 0 { t.Fatalf("Zeus exited with %d", code) diff --git a/man/src/zeus.ronn b/man/src/zeus.ronn index 05725279..5f893953 100644 --- a/man/src/zeus.ronn +++ b/man/src/zeus.ronn @@ -13,7 +13,7 @@ To use Zeus with Ruby on Rails 3.0+, Just run `zeus start` in your project directory. It will output a list of available commands, including tasks to spawn consoles, servers, tests, and rake tasks. -See `https://github.com/burke/zeus/blob/master/docs/ruby/modifying.md` +See `https://github.com/burke/zeus/blob/coordinator/docs/ruby/modifying.md` for information on modifying the boot process that Zeus uses by default and adding custom tasks. @@ -41,7 +41,7 @@ parts of it when dependencies change, to keep everything up-to-date. ## WAY MORE TECHNICAL DESCRIPTION -See `https://github.com/burke/zeus/blob/master/docs/overview.md` +See `https://github.com/burke/zeus/blob/coordinator/docs/overview.md` ## OPTIONS diff --git a/rubygem/lib/zeus.rb b/rubygem/lib/zeus.rb index 0194f88e..a2b3d28c 100644 --- a/rubygem/lib/zeus.rb +++ b/rubygem/lib/zeus.rb @@ -17,7 +17,7 @@ module Zeus class << self - attr_accessor :plan, :dummy_tty, :master_socket + attr_accessor :plan, :dummy_tty, :coordinator_socket # this is totally asinine, but readline gets super confused when it's # required at a time when stdin or stdout is not connected to a TTY, @@ -27,19 +27,20 @@ class << self # Yup. def setup_dummy_tty! return if self.dummy_tty - master, self.dummy_tty = PTY.send(:open) + coordinator, self.dummy_tty = PTY.send(:open) Thread.new { - loop { master.read(1024) } + loop { coordinator.read(1024) } } STDIN.reopen(dummy_tty) STDOUT.reopen(dummy_tty) end - def setup_master_socket! - return master_socket if master_socket + def setup_coordinator_socket! + return coordinator_socket if coordinator_socket - fd = ENV['ZEUS_MASTER_FD'].to_i - self.master_socket = UNIXSocket.for_fd(fd) + # compatibility in case someone ends up with a new ruby but an old zeus + fd = (ENV['ZEUS_COORDINATOR_FD'] || ENV['ZEUS_MASTER_FD']).to_i + self.coordinator_socket = UNIXSocket.for_fd(fd) end def go(identifier=:boot) @@ -55,17 +56,17 @@ def go(identifier=:boot) def boot_steps(identifier) while true boot_step = catch(:boot_step) do - $0 = "zeus slave: #{identifier}" + $0 = "zeus worker: #{identifier}" setup_dummy_tty! - master = setup_master_socket! + coordinator = setup_coordinator_socket! feature_pipe_r, feature_pipe_w = IO.pipe - # I need to give the master a way to talk to me exclusively + # I need to give the coordinator a way to talk to me exclusively local, remote = UNIXSocket.pair(Socket::SOCK_STREAM) - master.send_io(remote) + coordinator.send_io(remote) - # Now I need to tell the master about my PID and ID + # Now I need to tell the coordinator about my PID and ID local.write "P:#{Process.pid}:#{@parent_pid || 0}:#{identifier}\0" local.send_io(feature_pipe_r) feature_pipe_r.close @@ -79,9 +80,9 @@ def boot_steps(identifier) while true messages = local.recv(2**16) - # Reap any child runners or slaves that might have exited in + # Reap any child runners or workers that might have exited in # the meantime. Note that reaping them like this can leave <=1 - # zombie process per slave around while the slave waits for a + # zombie process per worker around while the worker waits for a # new command. children.each do |pid| children.delete(pid) if Process.waitpid(pid, Process::WNOHANG) @@ -186,7 +187,7 @@ def kill_command_if_client_quits!(command_pid, client_pid) } end - def report_error_to_master(local, error) + def report_error_to_coordinator(local, error) str = "R:" str << "#{error.backtrace[0]}: #{error.message} (#{error.class})\n" error.backtrace[1..-1].each do |line| @@ -197,7 +198,7 @@ def report_error_to_master(local, error) end def run_action(socket, identifier) - # Now we run the action and report its success/fail status to the master. + # Now we run the action and report its success/fail status to the coordinator. begin Zeus::LoadTracking.track_features_loaded_by do plan.after_fork unless identifier == :boot @@ -206,7 +207,7 @@ def run_action(socket, identifier) socket.write "R:OK\0" rescue => err - report_error_to_master(socket, err) + report_error_to_coordinator(socket, err) raise end end diff --git a/rubygem/lib/zeus/m.rb b/rubygem/lib/zeus/m.rb index e5478a1a..07200a8f 100644 --- a/rubygem/lib/zeus/m.rb +++ b/rubygem/lib/zeus/m.rb @@ -25,7 +25,7 @@ module Zeus # #[](http://travis-ci.org/qrush/m) # - # + # # #<sub>[Rush at the Bristol Colston Hall May 1979](http://www.flickr.com/photos/8507625@N02/3468299995/)</sub> ### Install diff --git a/rubygem/spec/zeus_spec.rb b/rubygem/spec/zeus_spec.rb index 6bb66777..4d6b23c4 100644 --- a/rubygem/spec/zeus_spec.rb +++ b/rubygem/spec/zeus_spec.rb @@ -33,8 +33,8 @@ def self.boot end it 'boots and tracks features' do - master_r, master_w = UNIXSocket.pair(Socket::SOCK_STREAM) - ENV['ZEUS_MASTER_FD'] = master_w.to_i.to_s + coordinator_r, coordinator_w = UNIXSocket.pair(Socket::SOCK_STREAM) + ENV['ZEUS_COORDINATOR_FD'] = coordinator_w.to_i.to_s thr = Thread.new do begin @@ -49,7 +49,7 @@ def self.boot begin # Receive the control IO and start message - ctrl_io = master_r.recv_io(UNIXSocket) + ctrl_io = coordinator_r.recv_io(UNIXSocket) begin # We use recv instead of readline on the UNIXSocket to avoid # converting it to a buffered reader. That seems to interact