From 92a875f6e88345e2e4d6785b85f8c6e7590325f7 Mon Sep 17 00:00:00 2001 From: Ramana Kumar Date: Thu, 9 Nov 2017 09:02:54 +1100 Subject: [PATCH] Initial commit, moved from CakeML Taken from CakeML/cakeml@7a6c12f64ea382cda052dc102129590a1655a81d --- README.md | 19 ++ TODO | 62 +++++++ api.sml | 162 ++++++++++++++++ design.txt | 153 ++++++++++++++++ poll.sml | 220 ++++++++++++++++++++++ regressionLib.sml | 457 ++++++++++++++++++++++++++++++++++++++++++++++ worker.sml | 318 ++++++++++++++++++++++++++++++++ 7 files changed, 1391 insertions(+) create mode 100644 README.md create mode 100644 TODO create mode 100644 api.sml create mode 100644 design.txt create mode 100644 poll.sml create mode 100644 regressionLib.sml create mode 100644 worker.sml diff --git a/README.md b/README.md new file mode 100644 index 0000000..16ed271 --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +Automated regression test infrastructure for CakeML. + +[api.sml](api.sml): +Implements the server-side regression-test API as a CGI program. +The API is for workers to view and manipulate the job queues. + +[design.txt](design.txt): +Notes on the design of the automated regression test infrastructure. + +[poll.sml](poll.sml): +Implements automatic refreshing of the job queues. +If there are new jobs on GitHub, they will be added to the waiting queue. +If there are stale jobs, they will be removed. + +[regressionLib.sml](regressionLib.sml): +Code shared between the pieces of the regression test suite. + +[worker.sml](worker.sml): +Worker that claims and runs regression test jobs. diff --git a/TODO b/TODO new file mode 100644 index 0000000..e1492b3 --- /dev/null +++ b/TODO @@ -0,0 +1,62 @@ +TODO: Change (text) API endpoint to regression.cgi/api/ + +TODO: Write HTML interface for status output + Thus regression.cgi/{,active,waiting,stopped,etc.} will return a nicely + presented webpage (with links) for viewing the current status of the queues. + +TODO: Write help/usage interface for worker + Make the program more self-documenting. + +TODO: Make server send email to builds@cakeml.org when a job is stopped + +TODO: Update GitHub commit status for jobs in the system + Probably this should be done by the server or poller. + When a job is claimed: mark its commit as being tested + When a job is stopped: mark its commit as passed/failed + +TODO: Use GitHub webhooks or similar to avoid polling for changes + Thus the poller won't be run in polling mode, but instead will be the target + of a webhook that GitHub calls to update our queues. + +TODO: Add interface for disowning jobs (1) + Currently, you can manually stop a job that is supposedly active and claimed + by a worker that has gone rogue or died. It might be nice to support this + programmatically as an option to the worker, so it can make the appropriate + API calls. + +TODO: Make poller stop jobs that have been running too long + +TODO: Sort the id lists returned by {active,waiting,stopped}? + Prioritise pull requests? + Pull requests could be assigned lower ids by default + +TODO: Be smarter about assigning job ids? + Currently, we only assign new ids that a greater than all existing ones. + Would it be nice to fill in gaps instead? Maybe... When the system is running + well, every job will be either properly stopped or superseded, so the only + way for there to be a gap is when a waiting job gets superseded. + + Gaps can be filled manually by renaming the highest waiting job into the + first gap. Indeed, that strategy could be automated (rather than assigning + job ids correctly in the first place). But assigning correctly is probably + just as easy. + +TODO: Replace calls to the OS shell with SML Basis functionality + In regressionLib.sml there is a call to OS.Process.system that passes quite a + lot of work off to the shell: setting environment variables, executing a + subprocess, and redirecting standard streams. This could all be implemented + in the SML Basis, in particular using Posix structure, instead. Doing so + might be more reliable? + +TODO: Add interface for disowning jobs (2)? + After "disowning" (and stopping) a job, another similar job will eventually + be added to the queue if necessary. But would it be nice to instead be able + to re-use the existing job number? I.e., do not stop the job after disowning + but rather return it to the waiting queue? This would require a new API + action. + +(* +server = api.sml +poller = poll.sml +worker = worker.sml +*) diff --git a/api.sml b/api.sml new file mode 100644 index 0000000..08bc764 --- /dev/null +++ b/api.sml @@ -0,0 +1,162 @@ +(* + Implements the server-side regression-test API as a CGI program. + The API is for workers to view and manipulate the job queues. +*) +use "regressionLib.sml"; + +open regressionLib + +val text_response_header = "Content-Type:text/plain\n\n" + +fun text_response s = + let + val () = TextIO.output(TextIO.stdOut, text_response_header) + val () = TextIO.output(TextIO.stdOut, s) + in () end + +fun cgi_die ls = + (List.app (fn s => TextIO.output(TextIO.stdOut, s)) + (text_response_header::"Error:\n"::ls); + TextIO.output(TextIO.stdOut,"\n"); + OS.Process.exit OS.Process.success; + raise (Fail "impossible")) + +fun cgi_assert b ls = if b then () else cgi_die ls + +val waiting = read_list cgi_die "waiting" +val active = read_list cgi_die "active" +val stopped = read_list cgi_die "stopped" + +fun job id = + let + val f = Int.toString id + val q = queue_of_job cgi_die f + in + OS.Path.concat(q,f) + end + +fun claim id name = + let + val f = Int.toString id + val old = OS.Path.concat("waiting",f) + val new = OS.Path.concat("active",f) + val () = + if OS.FileSys.access(old,[OS.FileSys.A_READ]) then + if OS.FileSys.access(new,[OS.FileSys.A_READ]) then + cgi_die ["job ",f, " is both waiting and active"] + else OS.FileSys.rename{old = old, new = new} + else cgi_die ["job ",f," is not waiting to be claimed"] + val out = TextIO.openAppend new + in + print_claimed out (name,Date.fromTimeUniv(Time.now())) before TextIO.closeOut out + end + +fun append id line = + let + val f = Int.toString id + val p = OS.Path.concat("active",f) + val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not active: cannot append"]; raise e) + in + print_log_entry out (Date.fromTimeUniv(Time.now()),line) before TextIO.closeOut out + end + +fun log id data = + let + val f = Int.toString id + val p = OS.Path.concat("active",f) + val out = TextIO.openAppend p handle e as IO.Io _ => (cgi_die ["job ",f," is not active: cannot log"]; raise e) + in + TextIO.output(out,data) before TextIO.closeOut out + end + +fun stop id = + let + val f = Int.toString id + val old = OS.Path.concat("active",f) + val new = OS.Path.concat("stopped",f) + val () = + if OS.FileSys.access(old,[OS.FileSys.A_READ]) then + if OS.FileSys.access(new,[OS.FileSys.A_READ]) then + cgi_die ["job ",f, " is both active and stopped"] + else OS.FileSys.rename{old = old, new = new} + else cgi_die ["job ",f," is not active: cannot stop"] + in + () (* TODO: send email *) + end + +fun retry id = + let + val f = Int.toString id + val old = OS.Path.concat("stopped",f) + val () = cgi_assert (OS.FileSys.access(old,[OS.FileSys.A_READ])) ["job ",f," is not stopped: cannot retry"] + val id = next_job_id [waiting,active,stopped] + val new = OS.Path.concat("waiting",Int.toString id) + val inp = TextIO.openIn old + val out = TextIO.openOut new + fun loop last = + case TextIO.inputLine inp of NONE => cgi_die ["stopped job ",f," has invalid file format"] + | SOME line => (TextIO.output(out,line); + if last then () else + loop (String.isPrefix "HOL: " line)) + val () = loop false + val () = TextIO.closeOut out + val () = TextIO.closeIn inp + in id end + +datatype request_api = Get of api | Post of id * string + +fun get_api () = + case (OS.Process.getEnv "PATH_INFO", + OS.Process.getEnv "REQUEST_METHOD") of + (SOME path_info, SOME "GET") + => Option.map Get (api_from_string path_info (OS.Process.getEnv "QUERY_STRING")) + | (SOME path_info, SOME "POST") + => (case String.tokens (equal #"/") path_info of + ["log",n] => + (Option.mapPartial + (fn len => + Option.compose + ((fn id => Post(id,TextIO.inputN(TextIO.stdIn,len))), + id_from_string) n) + (Option.composePartial(Int.fromString,OS.Process.getEnv) "CONTENT_LENGTH")) + | _ => NONE) + | _ => NONE + +local + fun id_list ids = String.concatWith " " (List.map Int.toString ids) +in + fun dispatch api = + text_response ( + case api of + Waiting => id_list (waiting()) + | Active => id_list (active()) + | Stopped => id_list (stopped()) + | Job id => file_to_string (job id) + | Claim(id,name) => (claim id name; claim_response) + | Append(id,line) => (append id line; append_response) + | Stop id => (stop id; stop_response) + | Retry id => String.concat["retried as job ",Int.toString(retry id),"\n"] + ) handle e => cgi_die [exnMessage e] +end + +fun dispatch_log id data = + text_response (log id data; log_response) + handle e => cgi_die [exnMessage e] + +fun dispatch_req (Get api) = dispatch api + | dispatch_req (Post (id,data)) = dispatch_log id data + +fun main () = + let + val () = ensure_queue_dirs cgi_die + in + case get_api () of + NONE => cgi_die ["bad usage"] + | SOME req => + let + val fd = acquire_lock () + in + dispatch_req req before + Posix.IO.close fd + end + end diff --git a/design.txt b/design.txt new file mode 100644 index 0000000..37ed2d9 --- /dev/null +++ b/design.txt @@ -0,0 +1,153 @@ +Notes on the design of the automated regression test infrastructure. + +Server API: + + waiting: + returns space-separated list of ids of waiting jobs + + active: + returns space-separated list of ids of active jobs + + stopped: + returns space-separated list of ids of stopped jobs + + job id: + returns information on job + including: + - commits (including pull request integration, if any) + - (worker) name and time started (if any) + - output so far + + claim id name: + worker claims job . + is in the query string. + returns "claimed" + fails if is not currently waiting + + append id line: + record as additional output for job with a timestamp. + is in the query string. + returns "appended" + fails if is not currently active + + log id data: + append as additional output for job . + is POST data. + returns "logged" + fails if is not currently active + + stop id: + mark job as stopped + returns "stopped" + sends email with output + fails if is not currently active + + retry id: + create a new job with the same commits as job + returns "retried as job " + fails if is not currently stopped + fails if there is already an active or waiting job for the same commits + + all failures return text starting with "Error:" + +each job is on exactly one list: waiting, active, stopped +if a job changes list, it can only move to the right + +Poller automatic behaviours: + + 1. add a job to the waiting list: + - create a new job id + - the commits for the job satisfy the following: + - they are the current commits (on GitHub) for a particular target + - there are no other jobs with the same commits + + 2. remove a job from the waiting list: + - if it does not have the current commits (on GitHub) for any target, or + - if there are other active or waiting jobs for the same commits + (this should never happen -- not checking it currently) + (we don't count duplicate stopped jobs since + they might have been retried) + - the removed job's id number could be re-used (but is that worthwhile?) + - the race with workers trying to obtain this job is handled by the global + lock on queues. either the job is claimed before it can be removed, or + removed before it can be claimed. + + 3. move a job from active to stopped: + - if the time since it started is too long + - adds a note, "timed out", to the output + - does stop API actions + +Targets: + + CakeML: + - branch "master" + - each open, mergeable pull request + in this case, there are two commits: + - the pull request commit + - the master commit to merge into + but they move together (as captured by the state of the PR on GitHub) + HOL: + - branch "master" + +---- OLD STUFF ---- + +CakeML Regression Test + +Organised as two programs: + + Queue Manager (Server) + - 1 instance, runs on cakeml.org + - Tracks commits on GitHub over the GitHub API + - Manages a queue of jobs + - Allocates jobs to workers as they are available + - Records which jobs have run or are running + - Stores all its state in the filesystem/database: + can be killed and restarted without problems. + + Worker (Client) + - Runs regression test(s) as designated by the queue manager + - Can send the status (generated output) of any currently running job + - Interrupts the currently running job if requested + - May reuse a HOL build if the desired commit has already been built + - Assumes Poly/ML and git are installed + - Is almost stateless: either it's running a job, or it's ready for a job. + The state of HOL builds is kept in the filesystem. + +Scheduling is based on + - "targets" identified by branch name or pull request number + in the CakeML repository. + - "snapshots" representing a specific combination of commits (in all + relevant repositories, e.g., CakeML and HOL). + - "jobs" representing a snapshot and a job state. + - "workers" representing instances of the worker program, each of + which has a worker state. +Each snapshot will correspond to some target. +A target may have many corresponding snapshots. +The possible job states are as follows: + - queued + - active (running, has a corresponding worker) + - finished + - dead (cancelled or superseded) +The possible worker states are as follows: + - testing (running) + - ready + +Jobs can be queued, superseded, allocated (made active and assigned a +worker), or cancelled manually. + +A job will be automatically queued if: + - The job's snapshot has the most recent commits (on GitHub) for its target, and + - No active or finished job has the same snapshot + +A job will be automatically superseded if: + - Either of the conditions for queueing above is false, and + - The job was not queued manually + +A job will be automatically allocated if: + - It is currently queued + - A worker is ready + +The current policy for targets is defined as follows: + - each open pull request is a target + - the branch "master" is a target + - the branch "master" in HOL is assumed diff --git a/poll.sml b/poll.sml new file mode 100644 index 0000000..7042210 --- /dev/null +++ b/poll.sml @@ -0,0 +1,220 @@ +(* + Implements automatic refreshing of the job queues. + If there are new jobs on GitHub, they will be added to the waiting queue. + If there are stale jobs, they will be removed. +*) +use "regressionLib.sml"; + +open regressionLib + +structure ReadJSON = struct + + type 'a basic_reader = substring -> ('a * substring) + type 'a reader = 'a -> 'a basic_reader + + fun transform f (reader:'a basic_reader) : 'b reader + = fn acc => fn ss => + let val (v, ss) = reader ss + in (f v acc, ss) end + + val replace_acc : 'a basic_reader -> 'a reader = + fn r => transform (fn x => fn _ => x) r + + fun read1 ss c = + case Substring.getc ss of SOME (c',ss) => + if c = c' then ss + else die ["expected ",String.str c," got ",String.str c'] + | _ => die ["expected ",String.str c," got nothing"] + + val read_string : string basic_reader = fn ss => + let + val ss = read1 ss #"\"" + fun loop ss acc = + let + val (chunk,ss) = Substring.splitl (not o equal #"\"") ss + val z = Substring.size chunk + val (c,ss) = Substring.splitAt(ss,1) + in + if 0 < z andalso Substring.sub(chunk,z-1) = #"\\" then + loop ss (c::chunk::acc) + else + (Option.valOf(String.fromCString(Substring.concat(List.rev(chunk::acc)))), + ss) + end + in + loop ss [] + end + + fun read_dict (dispatch : (string * 'a reader) list) : 'a reader + = fn acc => fn ss => + let + val ss = read1 ss #"{" + fun loop ss acc = + case Substring.getc ss of + SOME(#"}",ss) => (acc, ss) + | SOME(#",",ss) => loop ss acc + | _ => + let + val (key, ss) = read_string ss + val ss = read1 ss #":" + val (acc, ss) = assoc key dispatch acc ss + in loop ss acc end + in loop ss acc end + + fun read_opt_list read_item acc ss = + let + val ss = read1 ss #"[" + fun loop ss acc = + case Substring.getc ss of + SOME(#"]",ss) => (List.rev acc, ss) + | SOME(#",",ss) => loop ss acc + | _ => + (case read_item ss + of (NONE, ss) => loop ss acc + | (SOME v, ss) => loop ss (v::acc)) + in loop ss acc end + + fun mergeable_only "MERGEABLE" acc = acc + | mergeable_only _ _ = NONE + + val int_from_ss = Option.valOf o Int.fromString o Substring.string + + fun read_date ss = + let + val (s, ss) = read_string ss + val d = Substring.full s + val (year,d) = Substring.splitl (not o equal #"-") d + val (month,d) = Substring.splitl (not o equal #"-") (Substring.triml 1 d) + val (day,d) = Substring.splitl (not o equal #"T") (Substring.triml 1 d) + val (hour,d) = Substring.splitl (not o equal #":") (Substring.triml 1 d) + val (minute,d) = Substring.splitl (not o equal #":") (Substring.triml 1 d) + val (second,d) = Substring.splitl (not o equal #"Z") (Substring.triml 1 d) + val date = Date.date { + day = int_from_ss day, + hour = int_from_ss hour, + minute = int_from_ss minute, + month = month_from_int (int_from_ss month), + offset = SOME (Time.zeroTime), + second = int_from_ss second, + year = int_from_ss year } + in (date, ss) end + + fun read_number ss = + let val (n,ss) = Substring.splitl Char.isDigit ss + in (int_from_ss n, ss) end + + val read_obj : obj basic_reader = + read_dict + [("oid", transform with_hash read_string) + ,("messageHeadline", transform with_message read_string) + ,("committedDate", transform with_date read_date) + ] empty_obj + + val read_pr : pr option basic_reader = + read_dict + [("mergeable", transform mergeable_only read_string) + ,("number", transform (Option.map o with_num) read_number) + ,("headRefName", transform (Option.map o with_head_ref) read_string) + ,("headRef", + read_dict + [("target", transform (Option.map o with_head_obj) read_obj)])] + (SOME empty_pr) + +end + +val waiting = read_list die "waiting" +val active = read_list die "active" +val stopped = read_list die "stopped" + +fun add_waiting (snapshot,id) = + let + val f = Int.toString id + val path = OS.Path.concat("waiting",f) + val () = assert (not(OS.FileSys.access(path, []))) ["job ",f," already exists waiting"] + val out = TextIO.openOut path + val () = print_snapshot out snapshot + val () = TextIO.closeOut out + in id+1 end + +val cakeml_query = String.concat [ + "{repository(name: \\\"cakeml\\\", owner: \\\"CakeML\\\"){", + "defaultBranchRef { target { ... on Commit {", + " oid messageHeadline committedDate }}}", + "pullRequests(baseRefName: \\\"master\\\", first: 100, states: [OPEN]", + " orderBy: {field: CREATED_AT, direction: DESC}){", + " nodes { mergeable number headRefName", + " headRef { target { ... on Commit {", + " oid messageHeadline committedDate }}}", + "}}}}" ] + +val hol_query = String.concat [ + "{repository(name: \\\"HOL\\\", owner: \\\"HOL-Theorem-Prover\\\"){", + "defaultBranchRef { target { ... on Commit {", + " oid messageHeadline committedDate }}}}}" ] + +local + open ReadJSON +in + fun get_current_snapshots () : snapshot list = + let + val response = GitHub.graphql cakeml_query + fun add_master obj acc = (Branch("master",obj)::acc) + (* This assumes the PR base always matches master. + We could read it from GitHub instead. *) + fun add_prs prs [m as (Branch(_,base_obj))] = + m :: (List.map (PR o with_base_obj base_obj) prs) + | add_prs _ _ = die ["add_prs"] + val (cakeml_integrations,ss) = + read_dict + [("data", + read_dict + [("repository", + read_dict + [("defaultBranchRef", + read_dict + [("target", transform add_master read_obj)]) + ,("pullRequests", + read_dict + [("nodes", transform add_prs (read_opt_list read_pr []))]) + ])])] [] + (Substring.full response) + val response = GitHub.graphql hol_query + val (hol_obj,ss) = + read_dict + [("data", + read_dict + [("repository", + read_dict + [("defaultBranchRef", + read_dict + [("target", replace_acc read_obj)])])])] + empty_obj (Substring.full response) + in + List.map (fn i => { cakeml = i, hol = hol_obj } ) + (List.rev cakeml_integrations) (* after rev: oldest pull request first, master last *) + end +end + +fun main () = + let + val no_poll = List.exists (equal"--no-poll") (CommandLine.arguments()) + val () = ensure_queue_dirs die + fun loop () = + let + val snapshots = get_current_snapshots () + val fd = acquire_lock () + val () = List.app (remove_if_superseded snapshots) (waiting()) + val to_queue = + filter_existing snapshots + [("waiting",waiting), + ("active" ,active ), + ("stopped",stopped)] + val () = if List.null to_queue then () + else ignore (List.foldl add_waiting (next_job_id [waiting,active,stopped]) to_queue) + (* TODO: stop timed out jobs *) + val () = Posix.IO.close fd + in + if no_poll then () + else (OS.Process.sleep poll_delay; loop ()) + end + in loop () end diff --git a/regressionLib.sml b/regressionLib.sml new file mode 100644 index 0000000..a844863 --- /dev/null +++ b/regressionLib.sml @@ -0,0 +1,457 @@ +(* + Code shared between the pieces of the regression test suite. + + We use the filesystem as a database and put all state in it. + Everything is relative to the current directory. + + We expect to be single-threaded, and use a lock file called + lock + to ensure this. + + API requests are expected via CGI environment variables. + + Job lists are implemented as three directories: + waiting, active, stopped + + Jobs are implemented as files with their id as filename. + + The directory a job is in indicates its status. + + File contents for a job: + - HOL and CakeML commits + - optional worker name and start time + - output + More concretely: + CakeML: + () + [# () + Merging into: + ()] + HOL: + () + [Machine: + Claimed: ] + : + ... +*) + +structure regressionLib = struct + +fun equal x y = x = y + +fun assoc k [] = raise Match + | assoc k ((k',v)::ls) = if k = k' then v else assoc k ls + +fun find f (x::xs) = if f x then x else find f xs + | find _ _ = raise Match + +fun max n m = if n < m then m else n + +local + fun maxl acc [] = acc + | maxl acc (n::ns) = maxl (max acc n) ns +in val max_list = maxl 0 end + +fun month_from_int 1 = Date.Jan + | month_from_int 2 = Date.Feb + | month_from_int 3 = Date.Mar + | month_from_int 4 = Date.Apr + | month_from_int 5 = Date.May + | month_from_int 6 = Date.Jun + | month_from_int 7 = Date.Jul + | month_from_int 8 = Date.Aug + | month_from_int 9 = Date.Sep + | month_from_int 10 = Date.Oct + | month_from_int 11 = Date.Nov + | month_from_int 12 = Date.Dec + | month_from_int _ = raise Match + +fun warn ls = ( + TextIO.output(TextIO.stdErr,String.concat ls); + TextIO.output(TextIO.stdErr,"\n")) + +fun die ls = ( + warn ls; + OS.Process.exit OS.Process.failure; + raise (Fail "impossible")) + +fun diag ls = ( + TextIO.output(TextIO.stdOut,String.concat ls); + TextIO.output(TextIO.stdOut,"\n")) + +fun assert b ls = if b then () else die ls + +type obj = { hash : string, message : string, date : Date.date } +val empty_obj : obj = { hash = "", message = "", date = Date.fromTimeUniv Time.zeroTime } +fun with_hash x (obj : obj) : obj = { hash = x, message = #message obj, date = #date obj } +fun with_message x (obj : obj) : obj = { hash = #hash obj, message = x, date = #date obj } +fun with_date d (obj : obj) : obj = { hash = #hash obj, message = #message obj, date = d } + +type pr = { num : int, head_ref : string, head_obj : obj, base_obj : obj } +val empty_pr : pr = { num = 0, head_ref = "", head_obj = empty_obj, base_obj = empty_obj } +fun with_num x (pr : pr) : pr = { num = x, head_ref = #head_ref pr, head_obj = #head_obj pr, base_obj = #base_obj pr } +fun with_head_ref x (pr : pr) : pr = { num = #num pr, head_ref = x, head_obj = #head_obj pr, base_obj = #base_obj pr } +fun with_head_obj x (pr : pr) : pr = { num = #num pr, head_ref = #head_ref pr, head_obj = x, base_obj = #base_obj pr } +fun with_base_obj x (pr : pr) : pr = { num = #num pr, head_ref = #head_ref pr, head_obj = #head_obj pr, base_obj = x } + +datatype integration = Branch of string * obj | PR of pr +type snapshot = { cakeml : integration, hol : obj } + +type bare_pr = { head_sha : string, base_sha : string } +datatype bare_integration = Bbr of string | Bpr of bare_pr +type bare_snapshot = { bcml : bare_integration, bhol : string } + +fun bare_of_pr ({num,head_ref,head_obj,base_obj}:pr) : bare_pr = + {head_sha = #hash head_obj, base_sha = #hash base_obj} +fun bare_of_integration (Branch (_,obj)) = Bbr (#hash obj) + | bare_of_integration (PR pr) = Bpr (bare_of_pr pr) +fun bare_of_snapshot ({cakeml,hol}:snapshot) : bare_snapshot = + {bcml = bare_of_integration cakeml, bhol = #hash hol} + +type line = string +type log_entry = Date.date * line +type log = log_entry list + +type worker_name = string + +type job = { + id : int, + snapshot : snapshot, + claimed : (worker_name * Date.date) option, + output : log +} + +local + val full_date = Date.fmt "%Y %b %d %H:%M:%S" +in + fun print_claimed out (worker,date) = + let + fun pr s = TextIO.output(out,s) + val prl = List.app pr + in + prl ["Machine: ",worker,"\nClaimed: ",full_date date,"\n"] + end + + fun print_log_entry out (date,line) = + let + fun pr s = TextIO.output(out,s) + val prl = List.app pr + in + prl [full_date date, ": ", line, "\n"] + end + + fun print_snapshot out (s:snapshot) = + let + fun pr s = TextIO.output(out,s) + val prl = List.app pr + fun print_obj obj = + prl [#hash obj, "\n ", #message obj, " (", Date.fmt "%d/%m/%y" (#date obj), ")\n"] + + val () = pr "CakeML: " + val () = + case #cakeml s of + Branch (head_ref,base_obj) => print_obj base_obj + | PR {num,head_ref,head_obj,base_obj} => ( + print_obj head_obj; + prl ["#", Int.toString num, " (", head_ref, ")\nMerging into: "]; + print_obj base_obj + ) + val () = pr "HOL: " + val () = print_obj (#hol s) + in () end + + fun print_job out (j:job) = + let + val () = print_snapshot out (#snapshot j) + val () = case #claimed j of NONE => () | SOME claimed => print_claimed out claimed + val () = List.app (print_log_entry out) (#output j) + in () end +end + +val queue_dirs = ["waiting","active","stopped"] + +local + open OS.FileSys +in + fun ensure_queue_dirs die = + let + val dir = openDir(getDir()) + fun loop ls = + case readDir dir of NONE => ls + | SOME d => if isDir d then loop (List.filter(not o equal d) ls) + else if List.exists (equal d) ls then die [d," exists and is not a directory"] + else loop ls + in + List.app mkDir (loop queue_dirs) before closeDir dir + end +end + +local + open Posix.IO Posix.FileSys + val flock = FLock.flock {ltype=F_WRLCK, whence=SEEK_SET, start=0, len=0, pid=NONE} + val smode = S.flags[S.irusr,S.iwusr,S.irgrp,S.iroth] + val lock_name = "lock" +in + fun acquire_lock() = + let + val fd = Posix.FileSys.creat(lock_name,smode) + val _ = Posix.IO.setlkw(fd,flock) + in fd end +end + +fun check_id f id = + 0 <= id andalso Int.toString id = f + +local + open OS.FileSys +in + fun read_list die q () = + let + fun die x = die x (* value restriction *) + val dir = openDir q handle OS.SysErr _ => die ["could not open ",q," directory"] + fun badFile f = die ["found bad filename ",f," in ",q] + fun loop acc = + case readDir dir of NONE => acc + | SOME f => if isDir (OS.Path.concat(q,f)) handle OS.SysErr _ => die [f, " disappeared from ", q, " unexpectedly"] + then die ["found unexpected directory ",f," in ",q] + else case Int.fromString f of NONE => badFile f + | SOME id => if check_id f id then loop (id::acc) else badFile f + val ids = loop [] + in ids end +end + +fun queue_of_job die f = + let + fun mk_path dir = OS.Path.concat(dir,f) + fun access dir = OS.FileSys.access(mk_path dir, [OS.FileSys.A_READ]) + in + find access queue_dirs + handle Match => die ["job ",f," not found"] + end + +fun next_job_id qs = + 1 + List.foldl (fn (q,id) => max id (max_list(q()))) 0 qs + +fun read_bare_snapshot invalid inp = + let + fun extract_sha prefix line = + let + val line = Substring.full line + val () = assert (Substring.isPrefix prefix line) invalid + in + Substring.string( + Substring.dropr Char.isSpace + (Substring.triml (String.size prefix) line)) + end + fun read_line () = + case TextIO.inputLine inp of + NONE => die invalid + | SOME line => line + + val head_sha = extract_sha "CakeML: " (read_line()) + val _ = read_line () + val line = read_line () + val (line,base_sha) = + if String.isPrefix "#" line then + let + val line = read_line () + val _ = read_line () + in (read_line(), SOME (extract_sha "Merging into: " line)) end + else (line, NONE) + val hol_sha = extract_sha "HOL: " line + val () = TextIO.closeIn inp + in + { bcml = case base_sha + of NONE => Bbr head_sha + | SOME base_sha => Bpr { head_sha = head_sha, base_sha = base_sha } + , bhol = hol_sha } + end + +fun read_job_snapshot q id : bare_snapshot = + let + val f = OS.Path.concat(q,Int.toString id) + val inp = TextIO.openIn f handle IO.Io _ => die ["cannot open ",f] + val invalid = [f," has invalid file format"] + in read_bare_snapshot invalid inp end + +fun filter_existing snapshots qs = + let + exception Return + fun check_null x = if List.null x then raise Return else x + fun foldthis q (id,snapshots) = + check_null + (List.filter (not o (equal (read_job_snapshot q id)) o bare_of_snapshot) snapshots) + in + List.foldl + (fn ((q,get_ids),snapshots) => List.foldl (foldthis q) snapshots (get_ids())) + snapshots qs + handle Return => [] + end + +fun remove_if_superseded snapshots id = + if List.exists (equal (read_job_snapshot "waiting" id) o bare_of_snapshot) snapshots + then () + else OS.FileSys.remove(OS.Path.concat("waiting",Int.toString id)) + handle OS.SysErr _ => die["waiting job ",Int.toString id," disappeared"] + +fun file_to_string f = + let val inp = TextIO.openIn f in TextIO.inputAll inp before TextIO.closeIn inp end + +fun file_to_line f = + let + val inp = TextIO.openIn f + val lopt = TextIO.inputLine inp + val () = TextIO.closeIn inp + in + case lopt of NONE => "" + | SOME line => String.extract(line,0,SOME(String.size line - 1)) + end + +fun output_to_file (f,s) = + let + val out = TextIO.openOut f + val () = TextIO.output(out,s) + in TextIO.closeOut out end + +val until_space = + Substring.string o Substring.takel (not o Char.isSpace) o Substring.full + +local + open Unix +in + fun system_output (cmd,args) = + let + val proc = execute (cmd,args) + handle e as OS.SysErr _ => die[cmd," failed to execute on ",String.concatWith" "args,"\n",exnMessage e] + val output = TextIO.inputAll (textInstreamOf proc) + val status = reap proc + in + if OS.Process.isSuccess status then output + else die[cmd," failed on ",String.concatWith" "args] + end +end + +val capture_file = "regression.log" +val timing_file = "timing.log" + +fun system_capture_with redirector cmd_args = + let + (* This could be implemented using Posix without relying on the shell *) + val status = OS.Process.system(String.concat[cmd_args, redirector, capture_file]) + in OS.Process.isSuccess status end + +val system_capture = system_capture_with " &>" +val system_capture_append = system_capture_with " &>>" + +val curl_path = "/usr/bin/curl" + +structure GitHub = struct + val token = until_space (file_to_string "token") + val endpoint = "https://api.github.com/graphql" + fun curl_cmd query = (curl_path,["--silent","--show-error", + "--header",String.concat["Authorization: bearer ",token], + "--request","POST", + "--data",String.concat["{\"query\" : \"",query,"\"}"], + endpoint]) + val graphql = system_output o curl_cmd +end + +val poll_delay = Time.fromSeconds(60 * 30) + +type id = int + +datatype api = Waiting | Active | Stopped + | Job of id | Claim of id * worker_name + | Append of id * line (* not including newline *) + | Stop of id | Retry of id + +val claim_response = "claimed\n" +val append_response = "appended\n" +val stop_response = "stopped\n" +val log_response = "logged\n" + +fun percent_decode s = + let + fun loop ss acc = + let + val (chunk,ss) = Substring.splitl (not o equal #"%") ss + in + if Substring.isEmpty ss then + Substring.concat(List.rev(chunk::acc)) + else + let + val (ns,ss) = Substring.splitAt(Substring.triml 1 ss,2) + val n = #1 (Option.valOf (Int.scan StringCvt.HEX Substring.getc ns)) + val c = Substring.full (String.str (Char.chr n)) + in + loop ss (c::chunk::acc) + end + end + in + loop (Substring.full s) [] + handle e => die ["percent decode failed on ",s,"\n",exnMessage e] + end + +fun api_to_string Waiting = "/waiting" + | api_to_string Active = "/active" + | api_to_string Stopped = "/stopped" + | api_to_string (Job id) = String.concat["/job/",Int.toString id] + | api_to_string (Claim (id,name)) = String.concat["/claim/",Int.toString id] + | api_to_string (Append (id,line)) = String.concat["/append/",Int.toString id] + | api_to_string (Stop id) = String.concat["/stop/",Int.toString id] + | api_to_string (Retry id) = String.concat["/retry/",Int.toString id] + +fun api_curl_args (Append (_,line)) = ["--get","--data-urlencode",String.concat["line=",line]] + | api_curl_args (Claim (_,name)) = ["--get","--data-urlencode",String.concat["name=",name]] + | api_curl_args _ = [] + +fun id_from_string n = + case Int.fromString n of NONE => NONE + | SOME id => if check_id n id then SOME id else NONE + +fun read_query prefix s = + case String.tokens (equal #"&") s of [s] => + if String.isPrefix (String.concat[prefix,"="]) s then + SOME (percent_decode (String.extract(s,String.size prefix + 1,NONE))) + else NONE + | _ => NONE + +fun api_from_string s q = + if s = "/waiting" then SOME Waiting + else if s = "/active" then SOME Active + else if s = "/stopped" then SOME Stopped + else (case String.tokens (equal #"/") s of + ["job",n] => Option.map Job (id_from_string n) + | ["claim",n] => Option.mapPartial + (fn id => Option.map (fn s => Claim(id,s)) + (Option.mapPartial (read_query "name") q)) + (id_from_string n) + | ["append",n] => Option.mapPartial + (fn id => Option.map (fn s => Append(id,s)) + (Option.mapPartial (read_query "line") q)) + (id_from_string n) + | ["stop",n] => Option.map Stop (id_from_string n) + | ["retry",n] => Option.map Retry (id_from_string n) + | _ => NONE) + +structure API = struct + val endpoint = "https://cakeml.org/regression.cgi" + fun curl_cmd api = (curl_path, + ["--silent","--show-error"] @ api_curl_args api @ [String.concat[endpoint,api_to_string api]]) + val send = system_output o curl_cmd + fun curl_log id file = + (curl_path,["--silent","--show-error","--request","POST", + "--data",String.concat["@",file], + String.concat[endpoint,"/log/",Int.toString id]]) + fun append id line = + let val response = send (Append(id,line)) + in assert (response=append_response) ["Unexpected append response: ",response] end + fun stop id = + let val response = send (Stop id) + in assert (response=stop_response) ["Unexpected stop response: ",response] end + fun log id file = + let val response = system_output (curl_log id file) + in assert (response=log_response) ["Unexpected log response: ",response] end +end + +end diff --git a/worker.sml b/worker.sml new file mode 100644 index 0000000..f3b114f --- /dev/null +++ b/worker.sml @@ -0,0 +1,318 @@ +(* + Worker that claims and runs regression test jobs. + + Assumes the following are available: + /usr/bin/curl, /usr/bin/git, /usr/bin/time, poly + + Also assumes the default shell (/bin/sh) understands + ENV=val ... cmd [args ...] &>file + to mean redirect both stdout and stderr to file when running + cmd on args in an environment augmented by the (ENV,val) pairs, + and &>> instead of &> appends to file instead of truncating it. + + Can be run either as a daemon (default) that will keep looking for work by + polling or as a one-shot command (--no-poll) that will do nothing if no work + is currently available. This means polling or work notifications can be + handled externally if desired. + + Summary of options: + --no-poll : Exit when no waiting jobs are found rather than polling. + Will still check for more waiting jobs after completing a job. + --no-loop : Exit after finishing a job, do not check for more waiting jobs. + --select id : Ignore the waiting jobs list and instead attempt to claim job . + --resume id : Assume job has previously been claimed by this worker and + attempt to start running it again. If the job fails again, + exit (even without --no-loop). + + All work will be carried out in the current directory + (or subdirectories of it) with no special permissions. + Assumes this directory starts empty except for the + worker executable and one file + name + containing this worker node's identity. This could be + created as follows: + uname -norm > name + It must not start with whitespace nor contain any single quotes. + + Manipulates working directories for HOL and CakeML, which are created if + necessary. If a job can reuse the HOL working directory without rebuilding + (the commit has not changed), then it will be reused. Otherwise, it is + cleaned ("git clean -xdf") and rebuilt. The CakeML working directory is + cleaned before every job. + + Jobs are handled as follows: + 1. Find a job in the waiting queue + 2. Claim the job + 3. Set up HOL and CakeML working directories + according to the job snapshot + 4. Build HOL, capturing stdout and stderr + On failure: + 1. Append "FAILED: building HOL" + 2. Log the captured output + 3. Stop the job + 5. For each directory in the CakeML build sequence + 1. Append "Starting " + 2. Holmake in that directory, + capturing stdout and stderr, + and capturing time and memory usage + On failure: + 1. Append "FAILED: " + 2. Log the captured output + 3. Stop the job + 3. Append "Finished :