Skip to content

Commit

Permalink
Simplify refresh action
Browse files Browse the repository at this point in the history
Also rename "active" to "running" and be smarter about assigning job
ids.
  • Loading branch information
xrchz committed Nov 9, 2017
1 parent 407d218 commit b45fa33
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 70 deletions.
17 changes: 3 additions & 14 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TODO: Change (text) API endpoint to regression.cgi/api/

TODO: Write HTML interface for status output
Thus regression.cgi/{,active,waiting,stopped,etc.} will return a nicely
Thus regression.cgi/{,running,waiting,stopped,etc.} will return a nicely
presented webpage (with links) for viewing the current status of the queues.

TODO: Write help/usage interface for worker
Expand All @@ -15,29 +15,18 @@ TODO: Update GitHub commit status for jobs in the system
When a job is stopped: mark its commit as passed/failed

TODO: Add interface for disowning jobs (1)
Currently, you can manually stop a job that is supposedly active and claimed
Currently, you can manually stop a job that is supposedly running and claimed
by a worker that has gone rogue or died. It might be nice to support this
programmatically as an option to the worker, so it can make the appropriate
API calls.

TODO: Make server stop jobs that have been running too long
This could be done as part of the refresh action.

TODO: Sort the id lists returned by {active,waiting,stopped}?
TODO: Sort the id lists returned by {running,waiting,stopped}?
Prioritise pull requests?
Pull requests could be assigned lower ids by default

TODO: Be smarter about assigning job ids?
Currently, we only assign new ids that a greater than all existing ones.
Would it be nice to fill in gaps instead? Maybe... When the system is running
well, every job will be either properly stopped or superseded, so the only
way for there to be a gap is when a waiting job gets superseded.

Gaps can be filled manually by renaming the highest waiting job into the
first gap. Indeed, that strategy could be automated (rather than assigning
job ids correctly in the first place). But assigning correctly is probably
just as easy.

TODO: Replace calls to the OS shell with SML Basis functionality
In regressionLib.sml there is a call to OS.Process.system that passes quite a
lot of work off to the shell: setting environment variables, executing a
Expand Down
44 changes: 22 additions & 22 deletions api.sml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ It also provides a hook for refreshing the queues:
If there are new jobs on GitHub, they will be added as waiting.
If there are stale jobs, they will be removed.
each job is on exactly one list: waiting, active, stopped
each job is on exactly one list: waiting, running, stopped
if a job changes list, it can only move to the right
Refreshing the queues:
Expand All @@ -21,7 +21,7 @@ Refreshing the queues:
2. remove a job from the waiting list:
- if it does not have the current commits (on GitHub) for any target, or
- if there are other active or waiting jobs for the same commits
- if there are other running or waiting jobs for the same commits
(this should never happen -- not checking it currently)
(we don't count duplicate stopped jobs since
they might have been retried)
Expand All @@ -30,7 +30,7 @@ Refreshing the queues:
lock on queues. either the job is claimed before it can be removed, or
removed before it can be claimed.
3. move a job from active to stopped:
3. move a job from running to stopped:
- if the time since it started is too long
- adds a note, "timed out", to the output
- does the stop API actions
Expand Down Expand Up @@ -63,11 +63,11 @@ fun claim id name =
let
val f = Int.toString id
val old = OS.Path.concat("waiting",f)
val new = OS.Path.concat("active",f)
val new = OS.Path.concat("running",f)
val () =
if OS.FileSys.access(old,[OS.FileSys.A_READ]) then
if OS.FileSys.access(new,[OS.FileSys.A_READ]) then
cgi_die ["job ",f, " is both waiting and active"]
cgi_die ["job ",f, " is both waiting and running"]
else OS.FileSys.rename{old = old, new = new}
else cgi_die ["job ",f," is not waiting to be claimed"]
val out = TextIO.openAppend new
Expand All @@ -78,32 +78,32 @@ fun claim id name =
fun append id line =
let
val f = Int.toString id
val p = OS.Path.concat("active",f)
val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not active: cannot append"]; raise e)
val p = OS.Path.concat("running",f)
val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not running: cannot append"]; raise e)
in
print_log_entry out (Date.fromTimeUniv(Time.now()),line) before TextIO.closeOut out
end

fun log id data =
let
val f = Int.toString id
val p = OS.Path.concat("active",f)
val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not active: cannot log"]; raise e)
val p = OS.Path.concat("running",f)
val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not running: cannot log"]; raise e)
in
TextIO.output(out,data) before TextIO.closeOut out
end

fun stop id =
let
val f = Int.toString id
val old = OS.Path.concat("active",f)
val old = OS.Path.concat("running",f)
val new = OS.Path.concat("stopped",f)
val () =
if OS.FileSys.access(old,[OS.FileSys.A_READ]) then
if OS.FileSys.access(new,[OS.FileSys.A_READ]) then
cgi_die ["job ",f, " is both active and stopped"]
cgi_die ["job ",f, " is both running and stopped"]
else OS.FileSys.rename{old = old, new = new}
else cgi_die ["job ",f," is not active: cannot stop"]
else cgi_die ["job ",f," is not running: cannot stop"]
in
() (* TODO: send email *)
end
Expand All @@ -113,7 +113,7 @@ fun retry id =
val f = Int.toString id
val old = OS.Path.concat("stopped",f)
val () = cgi_assert (OS.FileSys.access(old,[OS.FileSys.A_READ])) ["job ",f," is not stopped: cannot retry"]
val id = next_job_id [waiting,active,stopped]
val id = first_unused_id (waiting()@running()@stopped()) 1
val new = OS.Path.concat("waiting",Int.toString id)
val inp = TextIO.openIn old
val out = TextIO.openOut new
Expand All @@ -130,15 +130,15 @@ fun retry id =
fun refresh () =
let
val snapshots = get_current_snapshots ()
val () = List.app (remove_if_superseded snapshots) (waiting())
val to_queue =
filter_existing snapshots
[("waiting",waiting),
("active" ,active ),
("stopped",stopped)]
val () = if List.null to_queue then ()
else ignore (List.foldl add_waiting (next_job_id [waiting,active,stopped]) to_queue)
val () = clear_list "waiting"
(* TODO: stop timed out jobs *)
val running_ids = running()
val stopped_ids = stopped()
val snapshots = filter_out "running" running_ids snapshots
val snapshots = filter_out "stopped" stopped_ids snapshots
val avoid_ids = running_ids @ stopped_ids
val () = if List.null snapshots then ()
else ignore (List.foldl (add_waiting avoid_ids) 1 snapshots)
in () end

datatype request_api = Get of api | Post of id * string
Expand Down Expand Up @@ -167,7 +167,7 @@ in
text_response (
case api of
Waiting => id_list (waiting())
| Active => id_list (active())
| Running => id_list (running())
| Stopped => id_list (stopped())
| Refresh => (refresh (); refresh_response)
| Job id => file_to_string (job id)
Expand Down
18 changes: 9 additions & 9 deletions apiLib.sml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Reference:
waiting:
returns space-separated list of ids of waiting jobs
active:
returns space-separated list of ids of active jobs
running:
returns space-separated list of ids of running jobs
stopped:
returns space-separated list of ids of stopped jobs
Expand All @@ -33,25 +33,25 @@ Reference:
record <line> as additional output for job <id> with a timestamp.
<line> is in the query string.
returns "appended"
fails if <id> is not currently active
fails if <id> is not currently running
log id data:
append <data> as additional output for job <id>.
<data> is POST data.
returns "logged"
fails if <id> is not currently active
fails if <id> is not currently running
stop id:
mark job <id> as stopped
returns "stopped"
sends email with output
fails if <id> is not currently active
fails if <id> is not currently running
retry id:
create a new job with the same commits as job <id>
returns "retried as job <new id>"
fails if <id> is not currently stopped
fails if there is already an active or waiting job for the same commits
fails if there is already a running or waiting job for the same commits
all failures return text starting with "Error:"
*)
Expand All @@ -68,7 +68,7 @@ type line = string
fun check_id f id =
0 <= id andalso Int.toString id = f

datatype api = Waiting | Active | Stopped | Refresh
datatype api = Waiting | Running | Stopped | Refresh
| Job of id | Claim of id * worker_name
| Append of id * line (* not including newline *)
| Stop of id | Retry of id
Expand Down Expand Up @@ -102,7 +102,7 @@ fun percent_decode s =
end

fun api_to_string Waiting = "/waiting"
| api_to_string Active = "/active"
| api_to_string Running = "/running"
| api_to_string Stopped = "/stopped"
| api_to_string Refresh = "/refresh"
| api_to_string (Job id) = String.concat["/job/",Int.toString id]
Expand All @@ -128,7 +128,7 @@ fun read_query prefix s =

fun api_from_string s q =
if s = "/waiting" then SOME Waiting
else if s = "/active" then SOME Active
else if s = "/running" then SOME Running
else if s = "/stopped" then SOME Stopped
else if s = "/refresh" then SOME Refresh
else (case String.tokens (equal #"/") s of
Expand Down
46 changes: 28 additions & 18 deletions serverLib.sml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
to ensure this.
Job lists are implemented as three directories:
waiting, active, stopped
waiting, running, stopped
Jobs are implemented as files with their id as filename.
Expand Down Expand Up @@ -150,7 +150,7 @@ in
in () end
end

val queue_dirs = ["waiting","active","stopped"]
val queue_dirs = ["waiting","running","stopped"]

local
open OS.FileSys
Expand All @@ -176,17 +176,30 @@ in
val dir = openDir q handle OS.SysErr _ => cgi_die ["could not open ",q," directory"]
fun badFile f = cgi_die ["found bad filename ",f," in ",q]
fun loop acc =
case readDir dir of NONE => acc
case readDir dir of NONE => acc before closeDir dir
| SOME f => if isDir (OS.Path.concat(q,f)) handle OS.SysErr _ => cgi_die [f, " disappeared from ", q, " unexpectedly"]
then cgi_die ["found unexpected directory ",f," in ",q]
else case Int.fromString f of NONE => badFile f
| SOME id => if check_id f id then loop (id::acc) else badFile f
val ids = loop []
in ids end
fun clear_list q =
let
val dir = openDir q handle OS.SysErr _ => cgi_die ["could not open ",q," directory"]
fun loop () =
case readDir dir of NONE => closeDir dir
| SOME f =>
let
val f = OS.Path.concat(q,f)
val () = remove f
handle (e as OS.SysErr _) =>
cgi_die ["unexpected error removing ",f,"\n",exnMessage e]
in loop () end
in loop () end
end

val waiting = read_list "waiting"
val active = read_list "active"
val running = read_list "running"
val stopped = read_list "stopped"

fun queue_of_job f =
Expand All @@ -207,31 +220,28 @@ fun read_job_snapshot q id : bare_snapshot =
handle Option => cgi_die [f," has invalid file format"]
end

fun filter_existing snapshots qs =
fun filter_out q ids snapshots =
let
exception Return
fun check_null x = if List.null x then raise Return else x
fun foldthis q (id,snapshots) =
fun remove_all_matching_this_id (id,snapshots) =
check_null
(List.filter (not o (equal (read_job_snapshot q id)) o bare_of_snapshot) snapshots)
in
List.foldl
(fn ((q,get_ids),snapshots) => List.foldl (foldthis q) snapshots (get_ids()))
snapshots qs
List.foldl remove_all_matching_this_id snapshots ids
handle Return => []
end

fun remove_if_superseded snapshots id =
if List.exists (equal (read_job_snapshot "waiting" id) o bare_of_snapshot) snapshots
then ()
else OS.FileSys.remove(OS.Path.concat("waiting",Int.toString id))
handle OS.SysErr _ => cgi_die["waiting job ",Int.toString id," disappeared"]

fun next_job_id qs =
1 + List.foldl (fn (q,id) => max id (max_list(q()))) 0 qs
fun first_unused_id avoid_ids id =
let
fun loop id =
if List.exists (equal id) avoid_ids
then loop (id+1) else id
in loop id end

fun add_waiting (snapshot,id) =
fun add_waiting avoid_ids (snapshot,id) =
let
val id = first_unused_id avoid_ids id
val f = Int.toString id
val path = OS.Path.concat("waiting",f)
val () = cgi_assert (not(OS.FileSys.access(path, []))) ["job ",f," already exists waiting"]
Expand Down
7 changes: 0 additions & 7 deletions utilLib.sml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ structure utilLib = struct
fun assoc k [] = raise Match
| assoc k ((k',v)::ls) = if k = k' then v else assoc k ls

fun max n m = if n < m then m else n

local
fun maxl acc [] = acc
| maxl acc (n::ns) = maxl (max acc n) ns
in val max_list = maxl 0 end

val until_space =
Substring.string o Substring.takel (not o Char.isSpace) o Substring.full

Expand Down

0 comments on commit b45fa33

Please sign in to comment.