Skip to content

Commit 32de00b

Browse files
committed
Add more structured Run operations
1 parent 84d5428 commit 32de00b

14 files changed

+336
-60
lines changed

bench/bench_run.ocaml4.ml

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
let run_suite ~budgetf:_ = []

bench/bench_run.ocaml5.ml

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
open Multicore_bench
2+
open Picos_std_structured
3+
module Multififo = Picos_mux_multififo
4+
5+
let run_one_multififo ~budgetf ~n_domains ~n () =
6+
let context = ref (Obj.magic ()) in
7+
8+
let before _ = context := Multififo.context () in
9+
let init _ = !context in
10+
let work i context =
11+
if i <> 0 then Multififo.runner_on_this_thread context
12+
else ignore @@ Multififo.run ~context @@ fun () -> Run.for_n n ignore
13+
in
14+
15+
let config =
16+
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
17+
(if n_domains = 1 then "" else "s")
18+
n
19+
in
20+
Times.record ~budgetf ~n_domains ~before ~init ~work ()
21+
|> Times.to_thruput_metrics ~n ~singular:"ignore" ~config
22+
23+
let run_suite ~budgetf =
24+
Util.cross [ 1; 2; 4; 8 ]
25+
[ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
26+
|> List.concat_map @@ fun (n_domains, n) ->
27+
if Picos_domain.recommended_domain_count () < n_domains then []
28+
else run_one_multififo ~budgetf ~n_domains ~n ()

bench/dune

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
(run %{test} -brief "Picos binaries")
2424
(run %{test} -brief "Bounded_q with Picos_sync")
2525
(run %{test} -brief "Memory usage")
26+
(run %{test} -brief "Picos_std_structured.Run")
2627
;;
2728
))
2829
(foreign_stubs
@@ -49,6 +50,11 @@
4950
from
5051
(picos_mux.fifo -> scheduler.ocaml5.ml)
5152
(picos_mux.thread -> scheduler.ocaml4.ml))
53+
(select
54+
bench_run.ml
55+
from
56+
(picos_mux.multififo -> bench_run.ocaml5.ml)
57+
(-> bench_run.ocaml4.ml))
5258
(select
5359
bench_fib.ml
5460
from

bench/main.ml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let benchmarks =
2222
("Picos binaries", Bench_binaries.run_suite);
2323
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
2424
("Memory usage", Bench_memory.run_suite);
25+
("Picos_std_structured.Run", Bench_run.run_suite);
2526
]
2627

2728
let () = Multicore_bench.Cmd.run ~benchmarks ()

lib/picos_std.structured/bundle.ml

+31-18
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ type _ tdt =
1212
}
1313
-> [> `Bundle ] tdt
1414

15-
let config_terminated_bit = 0x01
16-
and config_callstack_mask = 0x3E
17-
and config_callstack_shift = 1
18-
and config_one = 0x40 (* memory runs out before overflow *)
15+
let config_on_return_terminate_bit = 0x01
16+
and config_on_terminate_raise_bit = 0x02
17+
and config_callstack_mask = 0x6C
18+
and config_callstack_shift = 2
19+
and config_one = 0x80 (* memory runs out before overflow *)
1920

2021
let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()
2122

@@ -35,6 +36,8 @@ let error ?callstack (Bundle r as t : t) exn bt =
3536
terminate ?callstack t;
3637
Control.Errors.push r.errors exn bt
3738
end
39+
else if Atomic.get r.config land config_on_terminate_raise_bit <> 0 then
40+
terminate ?callstack t
3841

3942
let decr (Bundle r : t) =
4043
let n = Atomic.fetch_and_add r.config (-config_one) in
@@ -48,6 +51,10 @@ type _ pass = FLS : unit pass | Arg : t pass
4851

4952
let[@inline never] no_flock () = invalid_arg "no flock"
5053

54+
let[@inline] on_terminate = function
55+
| None | Some `Ignore -> `Ignore
56+
| Some `Raise -> `Raise
57+
5158
let get_flock fiber =
5259
match Fiber.FLS.get fiber flock_key ~default:Nothing with
5360
| Bundle _ as t -> t
@@ -75,7 +82,7 @@ let[@inline never] raised exn t fiber packed canceler outer =
7582
let[@inline never] returned value (Bundle r as t : t) fiber packed canceler
7683
outer =
7784
let config = Atomic.get r.config in
78-
if config land config_terminated_bit <> 0 then begin
85+
if config land config_on_return_terminate_bit <> 0 then begin
7986
let callstack =
8087
let n = (config land config_callstack_mask) lsr config_callstack_shift in
8188
if n = 0 then None else Some n
@@ -90,25 +97,31 @@ let join_after_realloc x fn t fiber packed canceler outer =
9097
| value -> returned value t fiber packed canceler outer
9198
| exception exn -> raised exn t fiber packed canceler outer
9299

93-
let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
94-
=
100+
let join_after_pass (type a) ?callstack ?on_return ?on_terminate (fn : a -> _)
101+
(pass : a pass) =
95102
(* The sequence of operations below ensures that nothing is leaked. *)
96103
let (Bundle r as t : t) =
97-
let terminated =
104+
let config =
98105
match on_return with
99-
| None | Some `Wait -> 0
100-
| Some `Terminate -> config_terminated_bit
106+
| None | Some `Wait -> config_one
107+
| Some `Terminate -> config_one lor config_on_return_terminate_bit
101108
in
102-
let callstack =
109+
let config =
110+
match on_terminate with
111+
| None | Some `Ignore -> config
112+
| Some `Raise -> config lor config_on_terminate_raise_bit
113+
in
114+
let config =
103115
match callstack with
104-
| None -> 0
116+
| None -> config
105117
| Some n ->
106-
if n <= 0 then 0
118+
if n <= 0 then config
107119
else
108-
Int.min n (config_callstack_mask lsr config_callstack_shift)
109-
lsl config_callstack_shift
120+
config
121+
lor Int.min n (config_callstack_mask lsr config_callstack_shift)
122+
lsl config_callstack_shift
110123
in
111-
let config = Atomic.make (config_one lor callstack lor terminated) in
124+
let config = Atomic.make config in
112125
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
113126
let errors = Control.Errors.create () in
114127
let finished = Trigger.create () in
@@ -208,8 +221,8 @@ let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
208221
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
209222
Computation.is_running bundle
210223

211-
let join_after ?callstack ?on_return fn =
212-
join_after_pass ?callstack ?on_return fn Arg
224+
let join_after ?callstack ?on_return ?on_terminate fn =
225+
join_after_pass ?callstack ?on_return ?on_terminate fn Arg
213226

214227
let fork t thunk = fork_pass t thunk Arg
215228
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg

lib/picos_std.structured/control.ml

+9-7
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ module Errors = struct
4141
| [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt
4242
| exn_bts -> check exn_bts []
4343

44-
let rec push t exn bt backoff =
45-
let before = Atomic.get t in
46-
let after = (exn, bt) :: before in
47-
if not (Atomic.compare_and_set t before after) then
48-
push t exn bt (Backoff.once backoff)
49-
50-
let push t exn bt = push t exn bt Backoff.default
44+
let push t exn bt =
45+
let backoff = ref Backoff.default in
46+
while
47+
let before = Atomic.get t in
48+
let after = (exn, bt) :: before in
49+
not (Atomic.compare_and_set t before after)
50+
do
51+
backoff := Backoff.once !backoff
52+
done
5153
end
5254

5355
let raise_if_canceled () = Fiber.check (Fiber.current ())

lib/picos_std.structured/dune

+12
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
(rule
2+
(enabled_if
3+
(<= 5.0.0 %{ocaml_version}))
4+
(action
5+
(copy for.ocaml5.ml for.ml)))
6+
7+
(rule
8+
(enabled_if
9+
(< %{ocaml_version} 5.0.0))
10+
(action
11+
(copy for.ocaml4.ml for.ml)))
12+
113
(library
214
(name picos_std_structured)
315
(public_name picos_std.structured)

lib/picos_std.structured/flock.ml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ let error ?callstack exn_bt = Bundle.error (get ()) ?callstack exn_bt
1010
let fork_as_promise thunk = Bundle.fork_as_promise_pass (get ()) thunk FLS
1111
let fork action = Bundle.fork_pass (get ()) action FLS
1212

13-
let join_after ?callstack ?on_return fn =
14-
Bundle.join_after_pass ?callstack ?on_return fn Bundle.FLS
13+
let join_after ?callstack ?on_return ?on_terminate fn =
14+
Bundle.join_after_pass ?callstack ?on_return ?on_terminate fn Bundle.FLS
+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
type _ tdt =
2+
| Empty : [> `Empty ] tdt
3+
| Range : {
4+
mutable lo : int;
5+
hi : int;
6+
parent : [ `Empty | `Range ] tdt;
7+
}
8+
-> [> `Range ] tdt
9+
10+
let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after =
11+
r.lo == before
12+
&& begin
13+
r.lo <- after;
14+
true
15+
end
16+
17+
let rec for_out t (Range r as range : [ `Range ] tdt) action =
18+
let lo_before = r.lo in
19+
let n = r.hi - lo_before in
20+
if 0 < n then begin
21+
if Bundle.is_running t then begin
22+
let lo_after = lo_before + 1 in
23+
if cas_lo range lo_before lo_after then begin
24+
try action lo_before
25+
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
26+
end;
27+
for_out t range action
28+
end
29+
end
30+
else
31+
match r.parent with
32+
| Empty -> ()
33+
| Range _ as range -> for_out t range action
34+
35+
let rec for_in t (Range r as range : [ `Range ] tdt) action =
36+
let lo_before = r.lo in
37+
let n = r.hi - lo_before in
38+
if n <= 1 then for_out t range action
39+
else
40+
let lo_after = lo_before + (n asr 1) in
41+
if cas_lo range lo_before lo_after then begin
42+
Bundle.fork t (fun () -> for_in t range action);
43+
let child = Range { lo = lo_before; hi = lo_after; parent = range } in
44+
for_in t child action
45+
end
46+
else for_in t range action
47+
48+
let for_n ?on_terminate n action =
49+
if 0 < n then
50+
if n = 1 then
51+
try action 0
52+
with
53+
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
54+
()
55+
else
56+
let range = Range { lo = 0; hi = n; parent = Empty } in
57+
Bundle.join_after ?on_terminate @@ fun t -> for_in t range action
+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
open Picos
2+
3+
type per_fiber = { mutable lo : int; mutable hi : int }
4+
5+
type _ tdt =
6+
| Empty : [> `Empty ] tdt
7+
| Range : {
8+
mutable _lo : int;
9+
hi : int;
10+
parent : [ `Empty | `Range ] tdt;
11+
}
12+
-> [> `Range ] tdt
13+
14+
external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity"
15+
16+
let rec for_out t (Range r as range : [ `Range ] tdt) per_fiber action =
17+
let lo_before = Atomic.get (lo_as_atomic range) in
18+
let n = r.hi - lo_before in
19+
if 0 < n then begin
20+
let lo_after = lo_before + 1 + (n asr 1) in
21+
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
22+
per_fiber.lo <- lo_before;
23+
per_fiber.hi <- lo_after;
24+
while Bundle.is_running t && per_fiber.lo < per_fiber.hi do
25+
try
26+
while per_fiber.lo < per_fiber.hi do
27+
let i = per_fiber.lo in
28+
per_fiber.lo <- i + 1;
29+
action i
30+
done
31+
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
32+
done
33+
end;
34+
for_out t range per_fiber action
35+
end
36+
else
37+
match r.parent with
38+
| Empty -> ()
39+
| Range _ as range -> for_out t range per_fiber action
40+
41+
let rec for_in t (Range r as range : [ `Range ] tdt) per_fiber action =
42+
let lo_before = Atomic.get (lo_as_atomic range) in
43+
let n = r.hi - lo_before in
44+
if n <= 1 then for_out t range per_fiber action
45+
else
46+
let lo_after = lo_before + (n asr 1) in
47+
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
48+
Bundle.fork t (fun () -> for_in_enter t range action);
49+
let child = Range { _lo = lo_before; hi = lo_after; parent = range } in
50+
for_in t child per_fiber action
51+
end
52+
else for_in t range per_fiber action
53+
54+
and for_in_enter bundle range action =
55+
let per_fiber = { lo = 0; hi = 0 } in
56+
let effc (type a) :
57+
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
58+
| Fiber.Spawn _ | Fiber.Current | Computation.Cancel_after _ -> None
59+
| _ ->
60+
(* Might be blocking, so fork any remaining work to another fiber. *)
61+
if per_fiber.lo < per_fiber.hi then begin
62+
let range =
63+
Range { _lo = per_fiber.lo; hi = per_fiber.hi; parent = Empty }
64+
in
65+
per_fiber.lo <- per_fiber.hi;
66+
Bundle.fork bundle (fun () -> for_in_enter bundle range action)
67+
end;
68+
None
69+
in
70+
let handler = Effect.Deep.{ effc } in
71+
Effect.Deep.try_with (for_in bundle range per_fiber) action handler
72+
73+
let for_n ?on_terminate n action =
74+
if 0 < n then
75+
if n = 1 then
76+
try action 0
77+
with
78+
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
79+
()
80+
else
81+
let range = Range { _lo = 0; hi = n; parent = Empty } in
82+
Bundle.join_after ?on_terminate @@ fun t -> for_in_enter t range action

0 commit comments

Comments
 (0)