From 50f6ea52339afa49f9ce594516debeca8728cee4 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Mon, 30 Oct 2023 11:42:23 +0000 Subject: [PATCH 1/9] Update tests to make paxos work with new check --- impl/lib/utils.ml | 20 +- impl/test/test_conspire_dc.ml | 652 ++++++++++++++++++---------------- impl/test/test_conspire_mp.ml | 126 +++---- impl/test/test_paxos.ml | 4 +- impl/test/test_raft.ml | 6 +- 5 files changed, 420 insertions(+), 388 deletions(-) diff --git a/impl/lib/utils.ml b/impl/lib/utils.ml index 79071b3..9b43a1f 100644 --- a/impl/lib/utils.ml +++ b/impl/lib/utils.ml @@ -166,24 +166,32 @@ module SegmentLog = struct (i mod t.segmentsize) v ; if t.vhi < i then t.vhi <- i + let norm t lo hi = + let lo = min lo t.vhi in + let lo = max lo 0 in + let hi = min hi t.vhi in + let hi = max hi (-1) in + (lo, hi) + let to_seq t ?(lo = 0) ?(hi = t.vhi) : 'a Seq.t = - let lo = max 0 lo in + let lo, hi = norm t lo hi in Seq.unfold (fun i -> if i <= hi then Some (get t i, i + 1) else None) lo let to_seqi t ?(lo = 0) ?(hi = t.vhi) : 'a Seq.t = - let lo = max 0 lo in + let lo, hi = norm t lo hi in Seq.unfold (fun i -> if i <= hi then Some ((i, get t i), i + 1) else None) lo let iteri t ?(lo = 0) ?(hi = t.vhi) : (int * 'a) Iter.t = - let lo = max 0 lo in + let lo, hi = norm t lo hi in fun f -> for i = lo to hi do f (i, get t i) done let iteri_len t ?(lo = 0) ?(hi = t.vhi) () : (int * 'a) Iter.t * int = + let lo, hi = norm t lo hi in let iter f = iteri t ~lo ~hi f in let len = hi - lo + 1 in if not (Iter.length iter = len) then @@ -313,7 +321,9 @@ let pp_set pp : _ Set.t Fmt.t = fun ppf v -> Fmt.pf ppf "%a" Fmt.(brackets @@ list ~sep:comma @@ pp) (Set.to_list v) -let float_to_time f = f |> Time_float_unix.Span.of_sec |> Time_float_unix.of_span_since_epoch +let float_to_time f = + f |> Time_float_unix.Span.of_sec |> Time_float_unix.of_span_since_epoch let time_to_float t = - t |> Time_float_unix.to_span_since_epoch |> Time_float_unix.Span.to_proportional_float + t |> Time_float_unix.to_span_since_epoch + |> Time_float_unix.Span.to_proportional_float diff --git a/impl/test/test_conspire_dc.ml b/impl/test/test_conspire_dc.ml index 79b7ae2..035656b 100644 --- a/impl/test/test_conspire_dc.ml +++ b/impl/test/test_conspire_dc.ml @@ -198,8 +198,9 @@ let%expect_test "e2e commit" = let t1 = create (c4 1 clk1) in let c1 = make_command (Read "c1") in let t0, actions = Impl.advance t0 (Commands (Iter.of_list [c1])) in - print t0 actions; - [%expect {| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -231,11 +232,11 @@ let%expect_test "e2e commit" = ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)))] |}] ; let t1, actions = - Impl.advance t1 - (Recv ((Commands ([c1], Utils.float_to_time 1.)), 0)) + Impl.advance t1 (Recv (Commands ([c1], Utils.float_to_time 1.), 0)) in - print t1 actions; - [%expect {| + print t1 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -263,13 +264,16 @@ let%expect_test "e2e commit" = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } - actions: [] |}]; - ac0 1.; ac1 1.; - [%expect{| + actions: [] |}] ; + ac0 1. ; + ac1 1. ; + [%expect {| +mock time is now 1 - +mock time is now 1 |}]; - let t0, actions = Impl.advance t0 (Tick) in - print t0 actions; [%expect{| + +mock time is now 1 |}] ; + let t0, actions = Impl.advance t0 Tick in + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -346,101 +350,104 @@ let%expect_test "e2e commit" = vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) - }))] |}]; - let t1, actions = Impl.advance t1 (Tick) in - print t1 actions; [%expect{| - t: { config = ; - conspire = - { rep = - { state = - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }; - store = - { ctree = - [(c4677fbe3fa92d4246f849e282f15527: - { node = - (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); - parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = - [(0: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (2: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (3: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e })]; - config = ; commit_log = [] }; - command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; - tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; - clock = } - actions: [Send(0,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(2,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(3,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - }))] |}]; + }))] |}] ; + let t1, actions = Impl.advance t1 Tick in + print t1 actions ; + [%expect + {| + t: { config = ; + conspire = + { rep = + { state = + { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = + [(c4677fbe3fa92d4246f849e282f15527: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); + parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); + (d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = + [(0: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (2: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (3: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e })]; + config = ; commit_log = [] }; + command_buffer = + { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; + compare = }; + tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; + clock = } + actions: [Send(0,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = c4677fbe3fa92d4246f849e282f15527; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + })) + Send(2,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = c4677fbe3fa92d4246f849e282f15527; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + })) + Send(3,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = c4677fbe3fa92d4246f849e282f15527; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + }))] |}] ; let root_clk = t0.conspire.rep.store.root in let c1_clk = Md5.of_hex_exn "c4677fbe3fa92d4246f849e282f15527" in - let replication_message = Conspire (Ok { - ctree = Some { - new_head = c1_clk - ; extension = [(1, root_clk, ([c1], Utils.float_to_time 1.))] - }; - cons = Some { - vval = c1_clk - ; vterm = 0 ; term = 0 - ; commit_index = root_clk - } - }) in + let replication_message = + Conspire + (Ok + { ctree= + Some + { new_head= c1_clk + ; extension= [(1, root_clk, ([c1], Utils.float_to_time 1.))] } + ; cons= Some {vval= c1_clk; vterm= 0; term= 0; commit_index= root_clk} + } ) + in let t0, actions = Impl.advance t0 (Recv (replication_message, 1)) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -473,9 +480,11 @@ let%expect_test "e2e commit" = compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } - actions: [] |}]; + actions: [] |}] ; let t0, actions = Impl.advance t0 (Recv (replication_message, 2)) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -543,7 +552,9 @@ let%expect_test "batching" = let t0 = create (c4 0 clk0) in let c0 = make_command (Read "c0") in let t0, actions = Impl.advance t0 (Commands (Iter.singleton c0)) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -573,11 +584,14 @@ let%expect_test "batching" = clock = } actions: [Broadcast((Conspire_dc.Types.Commands ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)))] |}]; - ac0 1.1; [%expect{| +mock time is now 1.1 |}]; + 1970-01-01 01:00:01.000000+01:00)))] |}] ; + ac0 1.1 ; + [%expect {| +mock time is now 1.1 |}] ; let c1 = make_command (Read "c1") in let t0, actions = Impl.advance t0 (Commands (Iter.of_list [c1])) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -608,11 +622,13 @@ let%expect_test "batching" = clock = } actions: [Broadcast((Conspire_dc.Types.Commands ([Command(Read c1, 2)], - 1970-01-01 01:00:02.100000+01:00)))] |}]; - ac0 0.5; + 1970-01-01 01:00:02.100000+01:00)))] |}] ; + ac0 0.5 ; let c2 = make_command (Read "c2") in let t0, actions = Impl.advance t0 (Commands (Iter.of_list [c2])) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| +mock time is now 1.6 t: { config = ; conspire = @@ -645,92 +661,98 @@ let%expect_test "batching" = clock = } actions: [Broadcast((Conspire_dc.Types.Commands ([Command(Read c2, 3)], - 1970-01-01 01:00:02.600000+01:00)))] |}]; + 1970-01-01 01:00:02.600000+01:00)))] |}] ; let t0, actions = Impl.advance t0 Tick in - print t0 actions; [%expect{| - t: { config = ; - conspire = - { rep = - { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }; - store = - { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: - { node = - (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = - [(1: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (2: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (3: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e })]; - config = ; commit_log = [] }; - command_buffer = - { store = - [(2.10000: [Command(Read c1, 2)]); (2.60000: [Command(Read c2, 3)])]; - hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; compare = - }; - tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; - clock = } - actions: [Send(1,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(2,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(3,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - }))] |}]; - ac0 1.; [%expect{| +mock time is now 2.6 |}]; - ac0 1.; [%expect{| +mock time is now 3.6 |}]; + print t0 actions ; + [%expect + {| + t: { config = ; + conspire = + { rep = + { state = + { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = + [(34e452ffd92c22ea72ef125a33a0a593: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + (d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = + [(1: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (2: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (3: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e })]; + config = ; commit_log = [] }; + command_buffer = + { store = + [(2.10000: [Command(Read c1, 2)]); (2.60000: [Command(Read c2, 3)])]; + hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; compare = + }; + tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; + clock = } + actions: [Send(1,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c0, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + })) + Send(2,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c0, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + })) + Send(3,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c0, 1)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + }))] |}] ; + ac0 1. ; + [%expect {| +mock time is now 2.6 |}] ; + ac0 1. ; + [%expect {| +mock time is now 3.6 |}] ; let t0, actions = Impl.advance t0 Tick in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -825,9 +847,11 @@ let%expect_test "Conflict" = (* add c0 to t0 *) let c0 = make_command (Read "c0") in let t0, _ = Impl.advance t0 (Commands (Iter.singleton c0)) in - ac0 2.; + ac0 2. ; let t0, actions = Impl.advance t0 Tick in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| +mock time is now 2 t: { config = ; conspire = @@ -905,109 +929,109 @@ let%expect_test "Conflict" = vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) - }))] |}]; + }))] |}] ; (* add c1 to t1 *) let c1 = make_command (Read "c1") in let t1, _ = Impl.advance t1 (Commands (Iter.singleton c1)) in - ac1 2.; + ac1 2. ; let t1, actions = Impl.advance t1 Tick in - print t1 actions; [%expect{| - +mock time is now 2 - t: { config = ; - conspire = - { rep = - { state = - { vval = 9ddc31c480e04b8965ff34f30c37d919; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }; - store = - { ctree = - [(9ddc31c480e04b8965ff34f30c37d919: - { node = - (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); - parent = ; key = 9ddc31c480e04b8965ff34f30c37d919 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = - [(0: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (2: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }); - (3: - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e })]; - config = ; commit_log = [] }; - command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; - tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; - clock = } - actions: [Send(0,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(2,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - })) - Send(3,(Conspire_dc.Types.Conspire - { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; - extension = - [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] - }); - cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; - vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e - }) - }))] |}]; - let root_clk = t0.conspire.rep.store.root in - let c0_clk = Md5.of_hex_exn "34e452ffd92c22ea72ef125a33a0a593" in - let update_t0 term = (Conspire - (Ok { ctree = - (Some { new_head = c0_clk; + print t1 actions ; + [%expect + {| + +mock time is now 2 + t: { config = ; + conspire = + { rep = + { state = + { vval = 9ddc31c480e04b8965ff34f30c37d919; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = + [(9ddc31c480e04b8965ff34f30c37d919: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); + parent = ; key = 9ddc31c480e04b8965ff34f30c37d919 }); + (d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = + [(0: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (2: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (3: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e })]; + config = ; commit_log = [] }; + command_buffer = + { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; + compare = }; + tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; + clock = } + actions: [Send(0,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; extension = - [(1, root_clk, - ([c0], Utils.float_to_time 1.))] + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 2)], + 1970-01-01 01:00:01.000000+01:00)) + ] }); cons = - (Some { vval = c0_clk; - vterm = 0; term ; - commit_index = root_clk + (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) + Send(2,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 2)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + })) + Send(3,(Conspire_dc.Types.Conspire + { ctree = + (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + ([Command(Read c1, 2)], + 1970-01-01 01:00:01.000000+01:00)) + ] + }); + cons = + (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e + }) + }))] |}] ; + let root_clk = t0.conspire.rep.store.root in + let c0_clk = Md5.of_hex_exn "34e452ffd92c22ea72ef125a33a0a593" in + let update_t0 term = + Conspire + (Ok + { ctree= + Some + { new_head= c0_clk + ; extension= [(1, root_clk, ([c0], Utils.float_to_time 1.))] } + ; cons= Some {vval= c0_clk; vterm= 0; term; commit_index= root_clk} } + ) in let t0, actions = Impl.advance t0 (Recv (update_t0 0, 2)) in - print t0 actions; [%expect{| + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -1040,24 +1064,22 @@ let%expect_test "Conflict" = compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } - actions: [] |}]; + actions: [] |}] ; let c1_clk = Md5.of_hex_exn "620122743bc84de6b418bd632ea0cdc2" in - let update_t1 term = (Conspire - (Ok { ctree = - (Some { new_head = c1_clk; - extension = - [(1, root_clk, - ([c1], Utils.float_to_time 1.))] - }); - cons = - (Some { vval = c1_clk; - vterm = 0; term ; - commit_index = root_clk - }) - })) + let update_t1 term = + Conspire + (Ok + { ctree= + Some + { new_head= c1_clk + ; extension= [(1, root_clk, ([c1], Utils.float_to_time 1.))] } + ; cons= Some {vval= c1_clk; vterm= 0; term; commit_index= root_clk} } + ) in - let t0,actions = Impl.advance t0 (Recv (update_t1 0, 1)) in - print t0 actions; [%expect{| + let t0, actions = Impl.advance t0 (Recv (update_t1 0, 1)) in + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -1118,9 +1140,11 @@ let%expect_test "Conflict" = vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) - }))] |}]; - let t0,actions = Impl.advance t0 (Recv (update_t1 1, 1)) in - print t0 actions; [%expect{| + }))] |}] ; + let t0, actions = Impl.advance t0 (Recv (update_t1 1, 1)) in + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = @@ -1158,9 +1182,11 @@ let%expect_test "Conflict" = compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } - actions: [] |}]; - let t0,actions = Impl.advance t0 (Recv (update_t0 1, 2)) in - print t0 actions; [%expect{| + actions: [] |}] ; + let t0, actions = Impl.advance t0 (Recv (update_t0 1, 2)) in + print t0 actions ; + [%expect + {| t: { config = ; conspire = { rep = diff --git a/impl/test/test_conspire_mp.ml b/impl/test/test_conspire_mp.ml index afba832..eac26cf 100644 --- a/impl/test/test_conspire_mp.ml +++ b/impl/test/test_conspire_mp.ml @@ -24,79 +24,79 @@ let%expect_test "local_commit" = print t [] ; [%expect {| - t: { config = ; - conspire = - { rep = - { state = - { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; - commit_index = d41d8cd98f00b204e9800998ecf8427e }; - store = - { ctree = [(d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = []; config = ; commit_log = [] }; - failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } - actions: [] |}] ; + t: { config = ; + conspire = + { rep = + { state = + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = [(d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = []; config = ; commit_log = [] }; + failure_detector = + { Conspire_mp.FailureDetector.state = []; timeout = 2 }; + stall_checker = } + actions: [] |}] ; let c1 = make_command (Read "c1") in let t, actions = Impl.advance t (Commands (c1 |> Iter.singleton)) in print t actions ; [%expect {| - t: { config = ; - conspire = - { rep = - { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }; - store = - { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: - { node = - (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = []; config = ; - commit_log = [[Command(Read c1, 1)]] }; - failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } - actions: [CommitCommands(Command(Read c1, 1))] |}] ; + t: { config = ; + conspire = + { rep = + { state = + { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + commit_index = 1183a904cd1a3b8f3cf219be9367701f }; + store = + { ctree = + [(1183a904cd1a3b8f3cf219be9367701f: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); + parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + (d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = []; config = ; + commit_log = [[Command(Read c1, 1)]] }; + failure_detector = + { Conspire_mp.FailureDetector.state = []; timeout = 2 }; + stall_checker = } + actions: [CommitCommands(Command(Read c1, 1))] |}] ; let c2, c3 = (make_command (Read "c2"), make_command (Read "c3")) in let t, actions = Impl.advance t (Commands (Iter.of_list [c2; c3])) in print t actions ; [%expect {| - t: { config = ; - conspire = - { rep = - { state = - { vval = 7f2c0aae94bf199f9b303480537af547; vterm = 0; term = 0; - commit_index = 7f2c0aae94bf199f9b303480537af547 }; - store = - { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: - { node = - (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); - (7f2c0aae94bf199f9b303480537af547: - { node = - (2, 1183a904cd1a3b8f3cf219be9367701f, - [Command(Read c2, 3); Command(Read c3, 2)]); - parent = ; key = 7f2c0aae94bf199f9b303480537af547 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; - root = d41d8cd98f00b204e9800998ecf8427e }; - remotes = }; - other_nodes_state = []; config = ; - commit_log = - [[Command(Read c1, 1)][Command(Read c2, 3); Command(Read c3, 2)]] }; - failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } - actions: [CommitCommands(Command(Read c2, 3), Command(Read c3, 2))] |}] + t: { config = ; + conspire = + { rep = + { state = + { vval = 7f2c0aae94bf199f9b303480537af547; vterm = 0; term = 0; + commit_index = 7f2c0aae94bf199f9b303480537af547 }; + store = + { ctree = + [(1183a904cd1a3b8f3cf219be9367701f: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); + parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + (7f2c0aae94bf199f9b303480537af547: + { node = + (2, 1183a904cd1a3b8f3cf219be9367701f, + [Command(Read c2, 3); Command(Read c3, 2)]); + parent = ; key = 7f2c0aae94bf199f9b303480537af547 }); + (d41d8cd98f00b204e9800998ecf8427e: Root)]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = []; config = ; + commit_log = + [[Command(Read c1, 1)][Command(Read c2, 3); Command(Read c3, 2)]] }; + failure_detector = + { Conspire_mp.FailureDetector.state = []; timeout = 2 }; + stall_checker = } + actions: [CommitCommands(Command(Read c2, 3), Command(Read c3, 2))] |}] let%expect_test "e2e commit" = Imp.set_is_test true ; diff --git a/impl/test/test_paxos.ml b/impl/test/test_paxos.ml index e0a57ba..be74ce9 100644 --- a/impl/test/test_paxos.ml +++ b/impl/test/test_paxos.ml @@ -168,9 +168,7 @@ let%expect_test "Loop" = Fmt.pr "actions: %a\n" Fmt.(brackets @@ list ~sep:(const string "\n") action_pp) actions ; - [%expect - {| - actions: [Broadcast(RequestVote {term:12; leader_commit:-1})] |}] ; + [%expect {| actions: [Broadcast(RequestVote {term:12; leader_commit:-1})] |}] ; let rv = RequestVote {term= 12; leader_commit= -1} in let t1 = create (c3 1) in let t1, a1 = Impl.advance t1 (Recv (rv, 2)) in diff --git a/impl/test/test_raft.ml b/impl/test/test_raft.ml index f26f862..36f2169 100644 --- a/impl/test/test_raft.ml +++ b/impl/test/test_raft.ml @@ -176,8 +176,7 @@ let%expect_test "append_entries from other leader" = in Fmt.pr "t1: %a\n" PP.t_pp t ; [%expect - {| - t1: {log: []; commit_index:-1; current_term: 1; node_state:Follower{timeout:5; voted_for:1}} |}] + {| t1: {log: []; commit_index:-1; current_term: 1; node_state:Follower{timeout:5; voted_for:1}} |}] let pp_res t actions = Fmt.pr "t: %a\n" PP.t_pp t ; @@ -212,8 +211,7 @@ let%expect_test "Loop" = Fmt.(brackets @@ list ~sep:(const string "\n") action_pp) actions ; [%expect - {| - actions: [Broadcast(RequestVote {term:11; lastIndex:-1; lastTerm:0})] |}] ; + {| actions: [Broadcast(RequestVote {term:11; lastIndex:-1; lastTerm:0})] |}] ; let rv = RequestVote {term= 11; lastIndex= -1; lastTerm= 0} in let t1 = create (c3 1) in let t1, a1 = Impl.advance t1 (Recv (rv, 2)) in From f818fb70759e317af4724b6177692afc235b62bc Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 31 Oct 2023 11:32:19 +0000 Subject: [PATCH 2/9] Reset failure detector on vote for candidate --- impl/lib/paxos.ml | 3 +++ impl/lib/prevote.ml | 2 ++ impl/lib/prevote_sbn.ml | 2 ++ impl/lib/raft.ml | 2 ++ impl/lib/raft_sbn.ml | 2 ++ 5 files changed, 11 insertions(+) diff --git a/impl/lib/paxos.ml b/impl/lib/paxos.ml index 64eae09..e330fa5 100644 --- a/impl/lib/paxos.ml +++ b/impl/lib/paxos.ml @@ -297,6 +297,9 @@ struct dtraceln "Failed to match\n%a" PP.t_pp ex.@(t) (* Follower *) | Recv (RequestVote m, cid), Follower _ -> + ex.@(t @> node_state @> Follower.timeout) <- + ex.@(t @> config @> election_timeout) ; + (* Reply *) let t = ex.@(t) in let start = m.leader_commit + 1 in let entries = Log.iter_len t.log ~lo:start () in diff --git a/impl/lib/prevote.ml b/impl/lib/prevote.ml index 0b7c7c9..eb30dbf 100644 --- a/impl/lib/prevote.ml +++ b/impl/lib/prevote.ml @@ -325,6 +325,8 @@ struct | Recv (RequestVote ({prevote= false; _} as m), cid), Follower _ when request_vote_valid (RequestVote m) -> ex.@(t @> node_state @> Follower.voted_for) <- Some cid ; + ex.@(t @> node_state @> Follower.timeout) <- + ex.@(t @> config @> election_timeout) ; send cid @@ RequestVoteResponse {term= ex.@(t @> current_term); success= true; prevote= false} diff --git a/impl/lib/prevote_sbn.ml b/impl/lib/prevote_sbn.ml index 144a9b1..a3e53e3 100644 --- a/impl/lib/prevote_sbn.ml +++ b/impl/lib/prevote_sbn.ml @@ -273,6 +273,8 @@ struct (* Follower *) | Recv (RequestVote ({prevote= false; _} as m), cid), Follower _ when request_vote_valid (RequestVote m) -> + ex.@(t @> node_state @> Follower.timeout) <- + ex.@(t @> config @> election_timeout) ; send cid @@ RequestVoteResponse {term= ex.@(t @> current_term); success= true; prevote= false} diff --git a/impl/lib/raft.ml b/impl/lib/raft.ml index 0f7d33e..45221fd 100644 --- a/impl/lib/raft.ml +++ b/impl/lib/raft.ml @@ -293,6 +293,8 @@ struct (* Follower *) | Recv (RequestVote m, cid), Follower _ when request_vote_valid (RequestVote m) -> + ex.@(t @> node_state @> Follower.timeout) <- + ex.@(t @> config @> election_timeout) ; ex.@(t @> node_state @> Follower.voted_for) <- Some cid ; send cid @@ RequestVoteResponse {term= ex.@(t @> current_term); success= true} diff --git a/impl/lib/raft_sbn.ml b/impl/lib/raft_sbn.ml index 63f8b08..69c2db1 100644 --- a/impl/lib/raft_sbn.ml +++ b/impl/lib/raft_sbn.ml @@ -257,6 +257,8 @@ struct (* Follower *) | Recv (RequestVote m, cid), Follower _ when request_vote_valid (RequestVote m) -> + ex.@(t @> node_state @> Follower.timeout) <- + ex.@(t @> config @> election_timeout) ; send cid @@ RequestVoteResponse {term= ex.@(t @> current_term); success= true} | ( Recv From 2761a8407afa6b001ffd7c0e18ddc5281fc21738 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 31 Oct 2023 11:38:50 +0000 Subject: [PATCH 3/9] Use an inflight buffer to avoid re-adding committed requests over recovery --- lib/internal_infra.ml | 80 ++++++++++++++++++------------------ lib/utils.ml | 94 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 40 deletions(-) diff --git a/lib/internal_infra.ml b/lib/internal_infra.ml index c79dbae..1febe16 100644 --- a/lib/internal_infra.ml +++ b/lib/internal_infra.ml @@ -20,6 +20,16 @@ module Ticker = struct f () ) end +module CommandQueue = RemovableQueue (struct + module Index = Core.Int + + let pp_index = Fmt.int + + type t = Command.t [@@deriving sexp, show] + + let to_index t = t.Command.id +end) + module Make (C : Consensus_intf.S) = struct type debug = { command_length_reporter: int InternalReporter.reporter @@ -38,13 +48,11 @@ module Make (C : Consensus_intf.S) = struct ; mutable cons: C.t ; ticker: Ticker.t ; internal_streams: (node_id, C.message Eio.Stream.t) Hashtbl.t + ; command_queue: CommandQueue.t + ; applied_txns: Core.Hash_set.M(Int).t ; debug: debug } let apply (t : t) (cmd : command) : op_result = - (* TODO truncate - ie from each client record the high water mark of results and remove lower than that - Core.Hash_set.remove t.inflight_txns cmd.id ; - *) update_state_machine t.state_machine cmd let handle_actions t actions = @@ -57,6 +65,8 @@ module Make (C : Consensus_intf.S) = struct dtraceln "Broadcast %a" C.PP.message_pp msg | CommitCommands citer -> citer (fun cmd -> + CommandQueue.remove t.command_queue cmd.id ; + Core.Hash_set.add t.applied_txns cmd.id ; let res = apply t cmd in TRACE.commit cmd ; if C.should_ack_clients t.cons then ( @@ -83,40 +93,29 @@ module Make (C : Consensus_intf.S) = struct Iter.of_gen (fun () -> Eio.Stream.take_nonblocking s) |> Iter.iter (iter_msg src) - let take_at_least_one = - let open struct - exception ExitTake - end in - fun n seq k -> - if n < 1 then - Fmt.invalid_arg "take_at_least_one must take at least one, but took %d" - n ; - let count = ref 1 in - try - seq (fun x -> - k x ; - if !count = n then raise_notrace ExitTake ; - incr count ) - with ExitTake -> () - - (** Recv client msgs *) + (* TODO just let other end add to the list maybe? *) let admit_client_requests t = - match C.available_space_for_commands t.cons with - | num_to_take when num_to_take <= 0 -> - t.debug.no_space_reporter () - | num_to_take -> ( + let cmd_iter = match Eio.Stream.take_nonblocking t.c_rx with | None -> - t.debug.no_commands_reporter () + Iter.empty | Some c1 -> - let rec str_iter k = - Eio.Stream.take_nonblocking t.c_rx - |> Option.iter (fun i -> k i ; (str_iter [@tailcall]) k) - in + Iter.cons c1 + (Iter.from_fun (fun () -> Eio.Stream.take_nonblocking t.c_rx)) + in + cmd_iter (fun c -> + if not (Core.Hash_set.mem t.applied_txns c.Command.id) then + CommandQueue.add t.command_queue c ) ; + match C.available_space_for_commands t.cons with + | space when space <= 0 -> + t.debug.no_space_reporter () + | space -> + let elts = CommandQueue.take t.command_queue space in + if Core.Doubly_linked.is_empty elts then t.debug.no_commands_reporter () + else let iter = - Iter.cons c1 str_iter - |> take_at_least_one (min num_to_take 8192) - (* Limit total intake size *) + Core.Doubly_linked.iter elts + |> Iter.from_labelled_iter |> Iter.map (fun c -> TRACE.ex_in c ; t.debug.request_reporter () ; @@ -124,7 +123,7 @@ module Make (C : Consensus_intf.S) = struct in let tcons, actions = C.advance t.cons (Commands iter) in t.cons <- tcons ; - handle_actions t actions ) + handle_actions t actions let ensure_sent _t = (* We should flush here to ensure queueus aren't building up. @@ -191,6 +190,8 @@ module Make (C : Consensus_intf.S) = struct ; state_machine ; ticker ; internal_streams + ; command_queue= CommandQueue.create () + ; applied_txns= Core.Hash_set.create (module Core.Int) ; debug } in while true do @@ -226,8 +227,8 @@ module Make (C : Consensus_intf.S) = struct in List.map (fun (id, r) -> (id, handshake r)) resolvers - let run ~sw env node_id config period resolvers client_msgs - client_resps port = + let run ~sw env node_id config period resolvers client_msgs client_resps port + = TRACE.run_commit := true ; TRACE.run_ex_in := true ; let internal_streams = Hashtbl.create (List.length resolvers) in @@ -237,14 +238,15 @@ module Make (C : Consensus_intf.S) = struct @@ fun sw -> let addr = `Tcp (Eio.Net.Ipaddr.V4.any, port) in let sock = - Eio.Net.listen ~reuse_addr:true ~backlog:4 ~sw (Eio.Stdenv.net env) addr + Eio.Net.listen ~reuse_addr:true ~backlog:4 ~sw + (Eio.Stdenv.net env) addr in traceln "Listening on %a" Eio.Net.Sockaddr.pp addr ; Eio.Net.run_server ~on_error:(dtraceln "%a" Fmt.exn) sock (accept_handler internal_streams) ) ) ; let resolvers = resolver_handshake node_id resolvers in - run_inter ~sw (Eio.Stdenv.clock env) node_id config period resolvers client_msgs client_resps - internal_streams + run_inter ~sw (Eio.Stdenv.clock env) node_id config period resolvers + client_msgs client_resps internal_streams end module Test = struct diff --git a/lib/utils.ml b/lib/utils.ml index 0322bc5..9719e48 100644 --- a/lib/utils.ml +++ b/lib/utils.ml @@ -146,7 +146,13 @@ module InternalReporter = struct pf ppf "%a" pp s in let print_enabled = ref false in - let pp = [field name (fun _ -> print_enabled := true; state) pp_stats] in + let pp = + [ field name + (fun _ -> + print_enabled := true ; + state ) + pp_stats ] + in let run_enabled = ref false in let update x = if !run_enabled && !print_enabled then ( @@ -251,3 +257,89 @@ let prime nth = in Iter.unfoldr unfold [] |> Iter.drop (nth - 1) |> Iter.head_exn +let pp_hashtbl comp ppk ppv ppf v = + let open Core in + Fmt.pf ppf "%a" + Fmt.(brackets @@ list @@ parens @@ pair ppk ~sep:(any ":@ ") ppv) + Core.(v |> Hashtbl.to_alist |> List.sort ~compare:comp) + +module RemovableQueue (T : sig + module Index : Core.Hashtbl.Key + + val pp_index : Index.t Fmt.t + + type t [@@deriving sexp] + + val pp : t Fmt.t + + val to_index : t -> Index.t +end) : sig + type t [@@deriving sexp, show] + + val create : unit -> t + + val add : t -> T.t -> unit + + val remove : t -> T.Index.t -> unit + + val take : t -> int -> T.t Core.Doubly_linked.t + + val invariant : t -> unit +end = struct + open Core + + let pp_access = + let comp (a, _) (b, _) = [%compare: T.Index.t] a b in + pp_hashtbl comp T.pp_index (Fmt.any "ELT") + + let pp_queue ppf v = + Fmt.pf ppf "%a" + Fmt.(braces @@ list ~sep:comma T.pp) + (Doubly_linked.to_list v) + + type t = + { access: (T.t Doubly_linked.Elt.t[@sexp.opaque]) Hashtbl.M(T.Index).t + [@printer pp_access] + ; queue: T.t Doubly_linked.t [@printer pp_queue] } + [@@deriving sexp, show] + + let invariant t : unit = + Invariant.invariant [%here] t [%sexp_of: t] (fun () -> + Doubly_linked.iteri t.queue ~f:(fun idx v -> + let id = T.to_index v in + if not (Hashtbl.mem t.access id) then + Fmt.failwith "Unable to find (%d:%a) in accessor" idx T.pp_index + id ) ; + Hashtbl.iteri t.access ~f:(fun ~key ~data -> + if not (Doubly_linked.mem_elt t.queue data) then + Fmt.failwith "Unable to find elt for %a in queue" T.pp_index key ) ) + + let create () = + {access= Hashtbl.create (module T.Index); queue= Doubly_linked.create ()} + + let add t c = + let id = T.to_index c in + if not (Hashtbl.mem t.access id) then + let elt = Doubly_linked.insert_last t.queue c in + Hashtbl.add_exn t.access ~key:id ~data:elt + + let remove t id = + match Hashtbl.find t.access id with + | None -> + () + | Some elt -> + Doubly_linked.remove t.queue elt ; + Hashtbl.remove t.access id + + let take t n = + if Doubly_linked.is_empty t.queue then Doubly_linked.create () + else + let taken, rest = + Doubly_linked.partitioni_tf t.queue ~f:(fun i _ -> i < n) + in + Doubly_linked.clear t.queue ; + Doubly_linked.transfer ~src:rest ~dst:t.queue ; + Doubly_linked.iter taken ~f:(fun v -> + Hashtbl.remove t.access (T.to_index v) ) ; + taken +end From fb45494105d34471328095a641117d5fe7103fa8 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 31 Oct 2023 12:57:48 +0000 Subject: [PATCH 4/9] fmt --- .github/workflows/workflow.yml | 46 ++++++++++++------------------- bin/cli.ml | 4 ++- bin/dune | 27 ++++++++++-------- bin/test_alloc.ml | 3 +- impl/bench/dune | 10 +++---- impl/lib/conspire_command_tree.ml | 16 ++++++----- impl/lib/conspire_f.ml | 16 +++++++---- impl/lib/conspire_mp.ml | 6 +--- impl/test/test_command_tree.ml | 4 +-- lib/external_infra.ml | 3 +- lib/infra.ml | 5 ++-- ocons_conn_mgr/persistent_conn.ml | 9 ++++-- ocons_conn_mgr/util.ml | 4 ++- 13 files changed, 76 insertions(+), 77 deletions(-) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index d2d5c38..ced0c80 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -1,33 +1,23 @@ -name: Build and runtest +name: "CI" on: - push: - branches: - - main pull_request: + push: jobs: - build: - name: Check build - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v3 - - uses: DeterminateSystems/nix-installer-action@main - - uses: DeterminateSystems/magic-nix-cache-action@main - - run: nix develop -c dune build - format: - name: Check formatting - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v3 - - uses: DeterminateSystems/nix-installer-action@main - - uses: DeterminateSystems/magic-nix-cache-action@main - - run: nix develop -c dune build @fmt - runtest: - name: Run tests - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v3 - - uses: DeterminateSystems/nix-installer-action@main - - uses: DeterminateSystems/magic-nix-cache-action@main - - run: nix develop -c dune runtest + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install Nix + uses: DeterminateSystems/nix-installer-action@v4 + - name: Run the Magic Nix Cache + uses: DeterminateSystems/magic-nix-cache-action@v1 + - name: Run tests + run: | + nix develop -c dune runtest + - name: Format + run: | + nix develop -c dune build @fmt + diff --git a/bin/cli.ml b/bin/cli.ml index 060fd16..229d55e 100644 --- a/bin/cli.ml +++ b/bin/cli.ml @@ -13,7 +13,9 @@ let run op sockaddrs id retry_timeout = sockaddrs |> List.mapi (fun idx addr -> ( idx - , fun sw -> (Eio.Net.connect ~sw env#net addr :> Eio.Flow.two_way_ty Eio.Flow.two_way) ) ) + , fun sw -> + ( Eio.Net.connect ~sw env#net addr + :> Eio.Flow.two_way_ty Eio.Flow.two_way ) ) ) in Eio.traceln "Creating conns to: %a" Fmt.(braces @@ list ~sep:comma Eio.Net.Sockaddr.pp) diff --git a/bin/dune b/bin/dune index 37358ce..f5e71b1 100644 --- a/bin/dune +++ b/bin/dune @@ -1,16 +1,19 @@ (executable - (name cli) - (modules cli) - (libraries ocons.core cmdliner)) + (name cli) + (modules cli) + (libraries ocons.core cmdliner)) + (executable - (name bench) - (modules bench) - (libraries ocons.core cmdliner)) + (name bench) + (modules bench) + (libraries ocons.core cmdliner)) + (executable - (name echo) - (modules echo) - (libraries ocons.core cmdliner)) + (name echo) + (modules echo) + (libraries ocons.core cmdliner)) + (executable - (name test_alloc) - (modules test_alloc) - (libraries core core_bench core_unix.command_unix)) + (name test_alloc) + (modules test_alloc) + (libraries core core_bench core_unix.command_unix)) diff --git a/bin/test_alloc.ml b/bin/test_alloc.ml index c8a0775..cbd66af 100644 --- a/bin/test_alloc.ml +++ b/bin/test_alloc.ml @@ -9,8 +9,7 @@ let alloc_test = let test `init = let large = make_data 1000 in let extern = ref None in - fun () -> - extern := Some large + fun () -> extern := Some large in Bench.Test.create_with_initialization ~name:"alloc_with_reference" test diff --git a/impl/bench/dune b/impl/bench/dune index 9c1ee80..36bc523 100644 --- a/impl/bench/dune +++ b/impl/bench/dune @@ -1,6 +1,6 @@ (executable - (name ctree_bench) - (modules ctree_bench) - (libraries impl_core core_bench.inline_benchmarks) - (preprocess (pps ppx_jane ppx_deriving.show)) - ) + (name ctree_bench) + (modules ctree_bench) + (libraries impl_core core_bench.inline_benchmarks) + (preprocess + (pps ppx_jane ppx_deriving.show))) diff --git a/impl/lib/conspire_command_tree.ml b/impl/lib/conspire_command_tree.ml index 35342f0..70e1cfd 100644 --- a/impl/lib/conspire_command_tree.ml +++ b/impl/lib/conspire_command_tree.ml @@ -7,19 +7,20 @@ module type Value = sig end module CommandTree (Value : Value) = struct - module Key = struct + module Key = struct include Md5 - let pp ppf v = - Fmt.pf ppf "%s" (Md5.to_hex v) + + let pp ppf v = Fmt.pf ppf "%s" (Md5.to_hex v) end + type key = Key.t [@@deriving show, bin_io, equal, compare] let make_key = - let open struct + let open struct type relevant_key_data = key * Value.t [@@deriving bin_io] end in fun parent_key value -> - Md5.digest_bin_prot bin_writer_relevant_key_data (parent_key, value) + Md5.digest_bin_prot bin_writer_relevant_key_data (parent_key, value) (* Map of vector clocks to values Aim to replicate this to other nodes @@ -44,7 +45,8 @@ module CommandTree (Value : Value) = struct let root_key = Md5.digest_string "" - let get_key_of_node node = match node with None -> root_key | Some {key; _} -> key + let get_key_of_node node = + match node with None -> root_key | Some {key; _} -> key let get_idx_of_node node = match node with None -> 0 | Some {node= idx, _, _; _} -> idx @@ -224,7 +226,7 @@ module CommandTree (Value : Value) = struct Some GT | Some ({node= ia, _, _; _} as na), Some ({node= ib, _, _; _} as nb) -> ( let rec on_path t curr ({key= kt; node= it, _, _; _} as target) = - assert (get_idx_of_node curr >= it); + assert (get_idx_of_node curr >= it) ; match curr with | None -> false diff --git a/impl/lib/conspire_f.ml b/impl/lib/conspire_f.ml index c55167b..05b07c2 100644 --- a/impl/lib/conspire_f.ml +++ b/impl/lib/conspire_f.ml @@ -128,11 +128,15 @@ module Make (Value : Value) = struct type t = { rep: Rep.rep - ; other_nodes_state: state Map.M(Int).t [@printer Utils.pp_map Fmt.int pp_state] + ; other_nodes_state: state Map.M(Int).t + [@printer Utils.pp_map Fmt.int pp_state] ; config: config [@opaque] ; commit_log: Value.t Log.t } [@@deriving show {with_path= false}] + let reporter_conflict, run_c = + Ocons_core.Utils.InternalReporter.rate_reporter "conflict" + let acceptor_reply t src = let local = t.rep.state in let remote = Map.find_exn t.other_nodes_state src in @@ -148,6 +152,7 @@ module Make (Value : Value) = struct let res = CTree.compare_keys t.rep.store local.vval remote.vval in match res with | None -> + reporter_conflict () ; (* conflict *) Utils.dtraceln "CONFLICT from %d" src ; Utils.dtraceln "local %a does not prefix of remote %a" @@ -240,8 +245,7 @@ module Make (Value : Value) = struct check_commit t let acceptor_term_tick t term' = - if t.rep.state.term < term' then - t.rep.state.term <- term' + if t.rep.state.term < term' then t.rep.state.term <- term' let handle_steady_state t src (msg : Rep.success) = let option_bind o ~f = Option.value_map o ~default:(Ok ()) ~f in @@ -261,9 +265,8 @@ module Make (Value : Value) = struct set_state remote new_state ; acceptor_reply t src ; check_commit t ; - check_conflict_recovery t; - acceptor_term_tick t new_state.term; - ) ; + check_conflict_recovery t ; + acceptor_term_tick t new_state.term ) ; Result.return () let handle_message t src (msg : Rep.message) : @@ -280,6 +283,7 @@ module Make (Value : Value) = struct Error `MustAck let create (config : config) = + run_c := true ; let rep = Rep.create config.other_replica_ids in let other_nodes_state = List.map config.other_replica_ids ~f:(fun i -> (i, init_state rep.store)) diff --git a/impl/lib/conspire_mp.ml b/impl/lib/conspire_mp.ml index 527f98c..eb37952 100644 --- a/impl/lib/conspire_mp.ml +++ b/impl/lib/conspire_mp.ml @@ -91,11 +91,7 @@ module Types = struct Conspire_f. {node_id; replica_ids; other_replica_ids; replica_count; quorum_size} in - { conspire - ; other_replica_ids - ; lower_replica_ids - ; fd_timeout - ; max_outstanding } + {conspire; other_replica_ids; lower_replica_ids; fd_timeout; max_outstanding} type message = Conspire.message [@@deriving show, bin_io] diff --git a/impl/test/test_command_tree.ml b/impl/test/test_command_tree.ml index 2fcb9d0..d7ca6ef 100644 --- a/impl/test/test_command_tree.ml +++ b/impl/test/test_command_tree.ml @@ -78,9 +78,7 @@ let%expect_test "make_update" = let ct, _ = CTree.addv ct ~node:0 ~parent:ct.root (Iter.of_list [1; 2]) in let hd1 = Md5.of_hex_exn "5278a244879fc58054713fb2f920f455" in let partial_tree = ct in - Fmt.pr "%a@." - (Impl_core__Utils.pp_set CTree.Key.pp) - (Map.key_set ct.ctree) ; + Fmt.pr "%a@." (Impl_core__Utils.pp_set CTree.Key.pp) (Map.key_set ct.ctree) ; [%expect {| [5278a244879fc58054713fb2f920f455, 9b72f276548d04f7e1b6c1f1419a523b, diff --git a/lib/external_infra.ml b/lib/external_infra.ml index 1f85c36..f90dd0f 100644 --- a/lib/external_infra.ml +++ b/lib/external_infra.ml @@ -88,8 +88,7 @@ let accept_handler t sock addr = traceln "Client handler failed with %a" Fmt.exn_backtrace (e, Printexc.get_raw_backtrace ()) -let run (env) port - cmd_str res_str = +let run env port cmd_str res_str = TRACE.run_cli_ex := true ; TRACE.run_in_ex := true ; Switch.run diff --git a/lib/infra.ml b/lib/infra.ml index c0ae46a..8d8e975 100644 --- a/lib/infra.ml +++ b/lib/infra.ml @@ -15,7 +15,7 @@ module Make (C : Consensus_intf.S) = struct module Internal = Internal_infra.Make (C) module ExInfra = External_infra - let run (env) config = + let run env config = Switch.run @@ fun sw -> let command_stream = Eio.Stream.create config.stream_length in @@ -45,8 +45,7 @@ module Make (C : Consensus_intf.S) = struct (fun () -> try Eio.Domain_manager.run (Eio.Stdenv.domain_mgr env) (fun () -> - ExInfra.run env config.external_port command_stream - result_stream ) + ExInfra.run env config.external_port command_stream result_stream ) with e when Utils.is_not_cancel e -> traceln "External infra failed" ; traceln "%a" Fmt.exn_backtrace (e, Printexc.get_raw_backtrace ()) ; diff --git a/ocons_conn_mgr/persistent_conn.ml b/ocons_conn_mgr/persistent_conn.ml index 2b7fa18..5931782 100644 --- a/ocons_conn_mgr/persistent_conn.ml +++ b/ocons_conn_mgr/persistent_conn.ml @@ -76,7 +76,8 @@ let is_open t = not t.should_close let switch_run ~on_error f = try Switch.run f with e when is_not_cancel e -> on_error e -let create ?connected ~sw (f : Switch.t -> Eio.Flow.two_way_ty Flow.two_way) delayer = +let create ?connected ~sw (f : Switch.t -> Eio.Flow.two_way_ty Flow.two_way) + delayer = let t = { conn_state= Closed ; should_close= false @@ -135,7 +136,11 @@ let%expect_test "PersistantConn" = ; `Return "3\n" ; `Return "4\n" ; `Raise End_of_file ] ; - let c = create ~sw (fun _ -> (!f :> Eio.Flow.two_way_ty Eio.Flow.two_way)) (fun () -> ()) in + let c = + create ~sw + (fun _ -> (!f :> Eio.Flow.two_way_ty Eio.Flow.two_way)) + (fun () -> ()) + in let p_line = Buf_read.line in print_endline (recv c p_line) ; [%expect {| diff --git a/ocons_conn_mgr/util.ml b/ocons_conn_mgr/util.ml index 25bbdaf..9593731 100644 --- a/ocons_conn_mgr/util.ml +++ b/ocons_conn_mgr/util.ml @@ -4,7 +4,9 @@ let set_debug_flag () = debug_flag := true let dtraceln fmt = let ignore_format = Format.ikfprintf ignore Fmt.stderr in - let traceln fmt = Eio.traceln ("%a" ^^ fmt) Time_float_unix.pp (Time_float_unix.now ()) in + let traceln fmt = + Eio.traceln ("%a" ^^ fmt) Time_float_unix.pp (Time_float_unix.now ()) + in if !debug_flag then traceln fmt else ignore_format fmt let is_not_cancel = function Eio.Cancel.Cancelled _ -> false | _ -> true From 584f910dcf4c8b7b3ffbdfa3628702f4ab6f25f5 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 31 Oct 2023 13:00:44 +0000 Subject: [PATCH 5/9] Only run once per pr --- .github/workflows/workflow.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index ced0c80..1d49d34 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -3,6 +3,7 @@ name: "CI" on: pull_request: push: + branches: [main] jobs: tests: From b0ff1edfff7691bdf519765c5dc2e355da3baa8e Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 31 Oct 2023 13:26:22 +0000 Subject: [PATCH 6/9] Remove dependency on time zones --- impl/lib/conspire_dc.ml | 12 +- impl/test/test_conspire_dc.ml | 217 ++++++++++++---------------------- 2 files changed, 82 insertions(+), 147 deletions(-) diff --git a/impl/lib/conspire_dc.ml b/impl/lib/conspire_dc.ml index 123d7fa..adc6c64 100644 --- a/impl/lib/conspire_dc.ml +++ b/impl/lib/conspire_dc.ml @@ -2,10 +2,13 @@ open! Core open Types module Time = Time_float_unix +let pp_time_float_unix : Time.t Fmt.t = + fun ppf v -> Fmt.pf ppf "%0.5f" (Utils.time_to_float v) + module Value = struct let pp_command = Command.pp - type t = command list * Time.t + type t = command list * (Time.t[@printer pp_time_float_unix]) [@@deriving compare, equal, hash, bin_io, sexp, show] let empty = ([], Time.epoch) @@ -26,13 +29,10 @@ end *) module DelayReorderBuffer = struct - let pp_time_float_unix : Time.t Fmt.t = - fun ppf v -> Fmt.pf ppf "%0.5f" (Utils.time_to_float v) - type 'a t = { mutable store: 'a list Map.M(Time).t [@polyprinter fun pa -> Utils.pp_map pp_time_float_unix pa] - ; mutable hwm: Time.t + ; mutable hwm: Time.t [@printer pp_time_float_unix] ; interval: Time.Span.t ; compare: 'a -> 'a -> int } [@@deriving show {with_path= false}] @@ -121,7 +121,7 @@ module Types = struct ; clock= (clock :> float Eio.Time.clock_ty Eio.Time.clock) } type message = - | Commands of (Command.t list * Time.t) + | Commands of (Command.t list * (Time.t[@printer pp_time_float_unix])) | Conspire of Conspire.message [@@deriving show, bin_io] diff --git a/impl/test/test_conspire_dc.ml b/impl/test/test_conspire_dc.ml index 035656b..29fb0a0 100644 --- a/impl/test/test_conspire_dc.ml +++ b/impl/test/test_conspire_dc.ml @@ -52,8 +52,7 @@ let%expect_test "local_commit" = remotes = }; other_nodes_state = []; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 0.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [] |}] ; @@ -74,14 +73,12 @@ let%expect_test "local_commit" = remotes = }; other_nodes_state = []; config = ; commit_log = [] }; command_buffer = - { store = [(1.00000: [Command(Read c0, 1)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + { store = [(1.00000: [Command(Read c0, 1)])]; hwm = 0.00000; + interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)))] |}] ; + ([Command(Read c0, 1)], 1.00000)))] |}] ; ac0 1. ; [%expect {| +mock time is now 1 |}] ; let t, actions = Impl.advance t Tick in @@ -99,17 +96,15 @@ let%expect_test "local_commit" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = []; config = ; - commit_log = - [([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)] }; + commit_log = [([Command(Read c0, 1)], 1.00000)] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [CommitCommands(Command(Read c0, 1))] |}] ; @@ -130,23 +125,20 @@ let%expect_test "local_commit" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = []; config = ; - commit_log = - [([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)] }; + commit_log = [([Command(Read c0, 1)], 1.00000)] }; command_buffer = { store = [(2.00000: [Command(Read c2, 3); Command(Read c1, 2)])]; - hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; compare = - }; + hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:02.000000+01:00)))] |}] ; + ([Command(Read c1, 2); Command(Read c2, 3)], 2.00000)))] |}] ; ac0 1. ; [%expect {| +mock time is now 2 |}] ; let t, actions = Impl.advance t Tick in @@ -164,27 +156,24 @@ let%expect_test "local_commit" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (ccfc0ddc71cfcede53a100a51835b7ff: { node = (2, 34e452ffd92c22ea72ef125a33a0a593, - ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:02.000000+01:00)); + ([Command(Read c1, 2); Command(Read c2, 3)], 2.00000)); parent = ; key = ccfc0ddc71cfcede53a100a51835b7ff }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = []; config = ; commit_log = - [([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00) - ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:02.000000+01:00)] + [([Command(Read c0, 1)], 1.00000) + ([Command(Read c1, 2); Command(Read c2, 3)], 2.00000)] }; command_buffer = - { store = [(2.00000: [Command(Read c1, 2)])]; - hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; compare = - }; + { store = [(2.00000: [Command(Read c1, 2)])]; hwm = 2.00000; + interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 2; limit = 100 }; clock = } actions: [CommitCommands(Command(Read c1, 2), Command(Read c2, 3))] |}] @@ -223,14 +212,12 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = [(1.00000: [Command(Read c1, 1)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + { store = [(1.00000: [Command(Read c1, 1)])]; hwm = 0.00000; + interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)))] |}] ; + ([Command(Read c1, 1)], 1.00000)))] |}] ; let t1, actions = Impl.advance t1 (Recv (Commands ([c1], Utils.float_to_time 1.), 0)) in @@ -259,9 +246,8 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = [(1.00000: [Command(Read c1, 1)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + { store = [(1.00000: [Command(Read c1, 1)])]; hwm = 0.00000; + interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [] |}] ; @@ -285,7 +271,7 @@ let%expect_test "e2e commit" = [(c4677fbe3fa92d4246f849e282f15527: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 1)], 1.00000)); parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -302,8 +288,7 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire @@ -311,9 +296,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -326,9 +309,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -341,9 +322,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -366,7 +345,7 @@ let%expect_test "e2e commit" = [(c4677fbe3fa92d4246f849e282f15527: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 1)], 1.00000)); parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -383,8 +362,7 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(0,(Conspire_dc.Types.Conspire @@ -392,9 +370,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -407,9 +383,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -422,9 +396,7 @@ let%expect_test "e2e commit" = (Some { new_head = c4677fbe3fa92d4246f849e282f15527; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 1)], 1.00000))] }); cons = (Some { vval = c4677fbe3fa92d4246f849e282f15527; @@ -459,7 +431,7 @@ let%expect_test "e2e commit" = [(c4677fbe3fa92d4246f849e282f15527: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 1)], 1.00000)); parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -476,8 +448,7 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [] |}] ; @@ -496,7 +467,7 @@ let%expect_test "e2e commit" = [(c4677fbe3fa92d4246f849e282f15527: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 1)], 1.00000)); parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -511,12 +482,9 @@ let%expect_test "e2e commit" = (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; - config = ; - commit_log = - [([Command(Read c1, 1)], 1970-01-01 01:00:01.000000+01:00)] }; + config = ; commit_log = [([Command(Read c1, 1)], 1.00000)] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [CommitCommands(Command(Read c1, 1)) @@ -577,14 +545,12 @@ let%expect_test "batching" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = [(1.00000: [Command(Read c0, 1)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + { store = [(1.00000: [Command(Read c0, 1)])]; hwm = 0.00000; + interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)))] |}] ; + ([Command(Read c0, 1)], 1.00000)))] |}] ; ac0 1.1 ; [%expect {| +mock time is now 1.1 |}] ; let c1 = make_command (Read "c1") in @@ -616,13 +582,11 @@ let%expect_test "batching" = command_buffer = { store = [(1.00000: [Command(Read c0, 1)]); (2.10000: [Command(Read c1, 2)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + hwm = 0.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c1, 2)], - 1970-01-01 01:00:02.100000+01:00)))] |}] ; + ([Command(Read c1, 2)], 2.10000)))] |}] ; ac0 0.5 ; let c2 = make_command (Read "c2") in let t0, actions = Impl.advance t0 (Commands (Iter.of_list [c2])) in @@ -655,13 +619,11 @@ let%expect_test "batching" = { store = [(1.00000: [Command(Read c0, 1)]); (2.10000: [Command(Read c1, 2)]); (2.60000: [Command(Read c2, 3)])]; - hwm = 1970-01-01 01:00:00.000000+01:00; interval = 1s; compare = - }; + hwm = 0.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 0; limit = 100 }; clock = } actions: [Broadcast((Conspire_dc.Types.Commands - ([Command(Read c2, 3)], - 1970-01-01 01:00:02.600000+01:00)))] |}] ; + ([Command(Read c2, 3)], 2.60000)))] |}] ; let t0, actions = Impl.advance t0 Tick in print t0 actions ; [%expect @@ -677,7 +639,7 @@ let%expect_test "batching" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -696,8 +658,7 @@ let%expect_test "batching" = command_buffer = { store = [(2.10000: [Command(Read c1, 2)]); (2.60000: [Command(Read c2, 3)])]; - hwm = 1970-01-01 01:00:01.000000+01:00; interval = 1s; compare = - }; + hwm = 1.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire @@ -705,9 +666,7 @@ let%expect_test "batching" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -720,9 +679,7 @@ let%expect_test "batching" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -735,9 +692,7 @@ let%expect_test "batching" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -764,13 +719,12 @@ let%expect_test "batching" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (4208465ea72aa1217306cdb4da303438: { node = (2, 34e452ffd92c22ea72ef125a33a0a593, - ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:03.000000+01:00)); + ([Command(Read c1, 2); Command(Read c2, 3)], 3.00000)); parent = ; key = 4208465ea72aa1217306cdb4da303438 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -787,8 +741,7 @@ let%expect_test "batching" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:03.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 3.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 2; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire @@ -797,7 +750,7 @@ let%expect_test "batching" = extension = [(2, 34e452ffd92c22ea72ef125a33a0a593, ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:03.000000+01:00)) + 3.00000)) ] }); cons = @@ -812,7 +765,7 @@ let%expect_test "batching" = extension = [(2, 34e452ffd92c22ea72ef125a33a0a593, ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:03.000000+01:00)) + 3.00000)) ] }); cons = @@ -827,7 +780,7 @@ let%expect_test "batching" = extension = [(2, 34e452ffd92c22ea72ef125a33a0a593, ([Command(Read c1, 2); Command(Read c2, 3)], - 1970-01-01 01:00:03.000000+01:00)) + 3.00000)) ] }); cons = @@ -864,7 +817,7 @@ let%expect_test "Conflict" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -881,8 +834,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire @@ -890,9 +842,7 @@ let%expect_test "Conflict" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -905,9 +855,7 @@ let%expect_test "Conflict" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -920,9 +868,7 @@ let%expect_test "Conflict" = (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c0, 1)], 1.00000))] }); cons = (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; @@ -950,7 +896,7 @@ let%expect_test "Conflict" = [(9ddc31c480e04b8965ff34f30c37d919: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 2)], 1.00000)); parent = ; key = 9ddc31c480e04b8965ff34f30c37d919 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -967,8 +913,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(0,(Conspire_dc.Types.Conspire @@ -976,9 +921,7 @@ let%expect_test "Conflict" = (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 2)], 1.00000))] }); cons = (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; @@ -991,9 +934,7 @@ let%expect_test "Conflict" = (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 2)], 1.00000))] }); cons = (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; @@ -1006,9 +947,7 @@ let%expect_test "Conflict" = (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], - 1970-01-01 01:00:01.000000+01:00)) - ] + ([Command(Read c1, 2)], 1.00000))] }); cons = (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; @@ -1043,7 +982,7 @@ let%expect_test "Conflict" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -1060,8 +999,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [] |}] ; @@ -1091,12 +1029,12 @@ let%expect_test "Conflict" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (620122743bc84de6b418bd632ea0cdc2: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 2)], 1.00000)); parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -1113,8 +1051,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire @@ -1156,12 +1093,12 @@ let%expect_test "Conflict" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (620122743bc84de6b418bd632ea0cdc2: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 2)], 1.00000)); parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -1178,8 +1115,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [] |}] ; @@ -1198,12 +1134,12 @@ let%expect_test "Conflict" = [(34e452ffd92c22ea72ef125a33a0a593: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c0, 1)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c0, 1)], 1.00000)); parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); (620122743bc84de6b418bd632ea0cdc2: { node = (1, d41d8cd98f00b204e9800998ecf8427e, - ([Command(Read c1, 2)], 1970-01-01 01:00:01.000000+01:00)); + ([Command(Read c1, 2)], 1.00000)); parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; @@ -1220,8 +1156,7 @@ let%expect_test "Conflict" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; command_buffer = - { store = []; hwm = 1970-01-01 01:00:02.000000+01:00; interval = 1s; - compare = }; + { store = []; hwm = 2.00000; interval = 1s; compare = }; tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [Send(1,(Conspire_dc.Types.Conspire From 988c866a82d47bb3465b9540b0e06472b6fd425e Mon Sep 17 00:00:00 2001 From: cjen1 Date: Tue, 14 Nov 2023 18:54:12 +0000 Subject: [PATCH 7/9] Add tracing for paxos --- impl/lib/paxos.ml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/impl/lib/paxos.ml b/impl/lib/paxos.ml index e330fa5..f9cb1fd 100644 --- a/impl/lib/paxos.ml +++ b/impl/lib/paxos.ml @@ -238,9 +238,10 @@ struct | AppendEntriesResponse {term; _} | RequestVote {term; _} | RequestVoteResponse {term; _} ) - , _ ) - , _ ) + , src ) + , _) when term > ex.@(t @> current_term) -> + traceln "Follower for node %d for term %d" src term; transit_follower term | _ -> () @@ -276,6 +277,7 @@ struct (* Recv msgs from this term*) (* Candidate *) | Recv (RequestVoteResponse m, src), Candidate _ -> + traceln "Received vote from %d" src; assert (m.term = ex.@(t @> current_term)) ; let entries, _ = m.entries in let q_entries = From fbf6bf876e020aaa57b66773c2fb07ce1df5e3f5 Mon Sep 17 00:00:00 2001 From: cjen1 Date: Mon, 4 Dec 2023 15:48:54 +0000 Subject: [PATCH 8/9] Fix send bug --- impl/lib/paxos.ml | 97 ++++++++++++++++++++++------------------- impl/lib/prevote.ml | 50 ++++++++++----------- impl/lib/prevote_sbn.ml | 43 +++++++++--------- impl/lib/raft.ml | 50 ++++++++++----------- impl/lib/raft_sbn.ml | 52 ++++++++++------------ impl/lib/types.ml | 2 +- impl/main.ml | 2 +- 7 files changed, 146 insertions(+), 150 deletions(-) diff --git a/impl/lib/paxos.ml b/impl/lib/paxos.ml index f9cb1fd..08c0a37 100644 --- a/impl/lib/paxos.ml +++ b/impl/lib/paxos.ml @@ -65,25 +65,38 @@ module Types = struct module PP = struct let message_pp ppf v = + let verb = true in let open Fmt in match v with | RequestVote {term; leader_commit} -> pf ppf "RequestVote {term:%d; leader_commit:%d}" term leader_commit | RequestVoteResponse {term; start_index; entries= entries, len} -> - pf ppf - "RequestVoteResponse {term:%d; start_index:%d; entries_length:%d; \ - entries: %a}" - term start_index len - (brackets @@ list ~sep:(const char ',') log_entry_pp) - (entries |> Iter.to_list) + if verb then + pf ppf + "RequestVoteResponse {term:%d; start_index:%d; \ + entries_length:%d; entries: %a}" + term start_index len + (brackets @@ list ~sep:(const char ',') log_entry_pp) + (entries |> Iter.to_list) + else + pf ppf + "RequestVoteResponse {term:%d; start_index:%d; \ + entries_length:%d; entries: _ }" + term start_index len | AppendEntries {term; leader_commit; prev_log_index; prev_log_term; entries} -> - pf ppf - "AppendEntries {term: %d; leader_commit: %d; prev_log_index: %d; \ - prev_log_term: %d; entries_length: %d; entries: %a}" - term leader_commit prev_log_index prev_log_term (snd entries) - (brackets @@ list ~sep:(const char ',') log_entry_pp) - (fst entries |> Iter.to_list) + if verb then + pf ppf + "AppendEntries {term: %d; leader_commit: %d; prev_log_index: %d; \ + prev_log_term: %d; entries_length: %d; entries: %a}" + term leader_commit prev_log_index prev_log_term (snd entries) + (brackets @@ list ~sep:(const char ',') log_entry_pp) + (fst entries |> Iter.to_list) + else + pf ppf + "AppendEntries {term: %d; leader_commit: %d; prev_log_index: %d; \ + prev_log_term: %d; entries_length: %d; entries: _}" + term leader_commit prev_log_index prev_log_term (snd entries) | AppendEntriesResponse {term; success; _} -> pf ppf "AppendEntriesResponse {term: %d; success: %a}" term (result @@ -142,28 +155,27 @@ struct match ct.node_state with | Leader s -> let highest = Log.highest ct.log in - (* Assume we are going to send up to highest to each *) - let send_f id highest_sent = - let lo = highest_sent + 1 in - let len = - min (highest - lo) ex.@(t @> config @> max_append_entries) - in - let hi = lo + len in - (* so we want to send the segment [lo -> hi] inclusive *) - if lo <= hi || force then - let prev_log_index = lo - 1 in - let entries = Log.iter_len ct.log ~lo ~hi () in - send id - @@ AppendEntries - { term= ct.current_term - ; leader_commit= ex.@(t @> commit_index) - ; prev_log_index - ; prev_log_term= get_log_term ct.log prev_log_index - ; entries } - in - IntMap.iter send_f s.rep_sent ; ex.@(t @> node_state @> Leader.rep_sent) <- - IntMap.map (fun _ -> highest) s.rep_sent + IntMap.mapi + (fun id highest_sent -> + let lo = highest_sent + 1 in + let len = + min (highest - lo) ex.@(t @> config @> max_append_entries) + in + let hi = lo + len in + (* so we want to send the segment [lo -> hi] inclusive *) + ( if lo <= hi || force then + let prev_log_index = lo - 1 in + let entries = Log.iter_len ct.log ~lo ~hi () in + send id + @@ AppendEntries + { term= ct.current_term + ; leader_commit= ex.@(t @> commit_index) + ; prev_log_index + ; prev_log_term= get_log_term ct.log prev_log_index + ; entries } ) ; + hi ) + s.rep_sent | _ -> assert false @@ -239,9 +251,9 @@ struct | RequestVote {term; _} | RequestVoteResponse {term; _} ) , src ) - , _) + , _ ) when term > ex.@(t @> current_term) -> - traceln "Follower for node %d for term %d" src term; + traceln "Follower for node %d for term %d" src term ; transit_follower term | _ -> () @@ -277,7 +289,7 @@ struct (* Recv msgs from this term*) (* Candidate *) | Recv (RequestVoteResponse m, src), Candidate _ -> - traceln "Received vote from %d" src; + traceln "Received vote from %d" src ; assert (m.term = ex.@(t @> current_term)) ; let entries, _ = m.entries in let q_entries = @@ -296,7 +308,6 @@ struct (* This case happens if a message is lost *) assert (m.term = ex.@(t @> current_term)) ; A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) ; - dtraceln "Failed to match\n%a" PP.t_pp ex.@(t) (* Follower *) | Recv (RequestVote m, cid), Follower _ -> ex.@(t @> node_state @> Follower.timeout) <- @@ -330,11 +341,9 @@ struct (* This will be the prev_log_index of the next msg *) dtraceln "Failed to match\n\ - rooted_at_start(%b), matching_index_and_term(%b):\n\ - %a" + rooted_at_start(%b), matching_index_and_term(%b):\n" rooted_at_start - (matching_index_and_term ()) - PP.t_pp ct ; + (matching_index_and_term ()) ; send lid @@ AppendEntriesResponse { term= ct.current_term @@ -386,7 +395,7 @@ struct transit_candidate () | Candidate {timeout; _} when timeout <= 0 -> send_request_vote () ; - ex.@(t @> node_state @> Candidate.timeout) <- 1 + ex.@(t @> node_state @> Candidate.timeout) <- ex.@(t @> config @> election_timeout) | Leader {heartbeat; _} when heartbeat <= 0 -> send_append_entries ~force:true () ; ex.@(t @> node_state @> Leader.heartbeat) <- 1 @@ -414,8 +423,8 @@ struct let advance_raw e = resolve_event e ; resolve_timeouts () ; - check_conditions () ; - check_commit () + check_commit () ; + check_conditions () let advance t e = run_side_effects (fun () -> advance_raw e) t diff --git a/impl/lib/prevote.ml b/impl/lib/prevote.ml index eb30dbf..2e97a15 100644 --- a/impl/lib/prevote.ml +++ b/impl/lib/prevote.ml @@ -145,28 +145,27 @@ struct match ct.node_state with | Leader s -> let highest = Log.highest ct.log in - (* Assume we are going to send up to highest to each *) - let send_f id highest_sent = - let lo = highest_sent + 1 in - let len = - min (highest - lo) ex.@(t @> config @> max_append_entries) - in - let hi = lo + len in - (* so we want to send the segment [lo -> hi] inclusive *) - if lo <= hi || force then - let prev_log_index = lo - 1 in - let entries = Log.iter_len ct.log ~lo ~hi () in - send id - @@ AppendEntries - { term= ct.current_term - ; leader_commit= ex.@(t @> commit_index) - ; prev_log_index - ; prev_log_term= get_log_term ct.log prev_log_index - ; entries } - in - IntMap.iter send_f s.rep_sent ; ex.@(t @> node_state @> Leader.rep_sent) <- - IntMap.map (fun _ -> highest) s.rep_sent + IntMap.mapi + (fun id highest_sent -> + let lo = highest_sent + 1 in + let len = + min (highest - lo) ex.@(t @> config @> max_append_entries) + in + let hi = lo + len in + (* so we want to send the segment [lo -> hi] inclusive *) + ( if lo <= hi || force then + let prev_log_index = lo - 1 in + let entries = Log.iter_len ct.log ~lo ~hi () in + send id + @@ AppendEntries + { term= ct.current_term + ; leader_commit= ex.@(t @> commit_index) + ; prev_log_index + ; prev_log_term= get_log_term ct.log prev_log_index + ; entries } ) ; + hi ) + s.rep_sent | _ -> assert false @@ -315,8 +314,7 @@ struct -> (* This case happens if a message is lost *) assert (m.term = ex.@(t @> current_term)) ; - A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) ; - dtraceln "Failed to match\n%a" PP.t_pp ex.@(t) + A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) (* All nodes must be able to receive prevote *) | Recv ((RequestVote {prevote= true; term; _} as m), cid), _ when term > ex.@(t @> current_term) && request_vote_valid m -> @@ -354,11 +352,9 @@ struct (* This will be the prev_log_index of the next msg *) dtraceln "Failed to match\n\ - rooted_at_start(%b), matching_index_and_term(%b):\n\ - %a" + rooted_at_start(%b), matching_index_and_term(%b):\n" rooted_at_start - (matching_index_and_term ()) - PP.t_pp ct ; + (matching_index_and_term ()) ; send lid @@ AppendEntriesResponse { term= ct.current_term diff --git a/impl/lib/prevote_sbn.ml b/impl/lib/prevote_sbn.ml index a3e53e3..6428feb 100644 --- a/impl/lib/prevote_sbn.ml +++ b/impl/lib/prevote_sbn.ml @@ -106,28 +106,27 @@ struct match ct.node_state with | Leader s -> let highest = Log.highest ct.log in - (* Assume we are going to send up to highest to each *) - let send_f id highest_sent = - let lo = highest_sent + 1 in - let len = - min (highest - lo) ex.@(t @> config @> max_append_entries) - in - let hi = lo + len in - (* so we want to send the segment [lo -> hi] inclusive *) - if lo <= hi || force then - let prev_log_index = lo - 1 in - let entries = Log.iter_len ct.log ~lo ~hi () in - send id - @@ AppendEntries - { term= ct.current_term - ; leader_commit= ex.@(t @> commit_index) - ; prev_log_index - ; prev_log_term= get_log_term ct.log prev_log_index - ; entries } - in - IntMap.iter send_f s.rep_sent ; ex.@(t @> node_state @> Leader.rep_sent) <- - IntMap.map (fun _ -> highest) s.rep_sent + IntMap.mapi + (fun id highest_sent -> + let lo = highest_sent + 1 in + let len = + min (highest - lo) ex.@(t @> config @> max_append_entries) + in + let hi = lo + len in + (* so we want to send the segment [lo -> hi] inclusive *) + ( if lo <= hi || force then + let prev_log_index = lo - 1 in + let entries = Log.iter_len ct.log ~lo ~hi () in + send id + @@ AppendEntries + { term= ct.current_term + ; leader_commit= ex.@(t @> commit_index) + ; prev_log_index + ; prev_log_term= get_log_term ct.log prev_log_index + ; entries } ) ; + hi ) + s.rep_sent | _ -> assert false @@ -362,7 +361,7 @@ struct ex.@(t @> config @> election_timeout) | Candidate {timeout; _} when timeout <= 0 -> send_request_vote () ; - ex.@(t @> node_state @> Candidate.timeout) <- 1 + ex.@(t @> node_state @> Candidate.timeout) <- ex.@(t @> config @> election_timeout) | Leader {heartbeat; _} when heartbeat <= 0 -> send_append_entries ~force:true () ; ex.@(t @> node_state @> Leader.heartbeat) <- 1 diff --git a/impl/lib/raft.ml b/impl/lib/raft.ml index 45221fd..10f5063 100644 --- a/impl/lib/raft.ml +++ b/impl/lib/raft.ml @@ -141,28 +141,27 @@ struct match ct.node_state with | Leader s -> let highest = Log.highest ct.log in - (* Assume we are going to send up to highest to each *) - let send_f id highest_sent = - let lo = highest_sent + 1 in - let len = - min (highest - lo) ex.@(t @> config @> max_append_entries) - in - let hi = lo + len in - (* so we want to send the segment [lo -> hi] inclusive *) - if lo <= hi || force then - let prev_log_index = lo - 1 in - let entries = Log.iter_len ct.log ~lo ~hi () in - send id - @@ AppendEntries - { term= ct.current_term - ; leader_commit= ex.@(t @> commit_index) - ; prev_log_index - ; prev_log_term= get_log_term ct.log prev_log_index - ; entries } - in - IntMap.iter send_f s.rep_sent ; ex.@(t @> node_state @> Leader.rep_sent) <- - IntMap.map (fun _ -> highest) s.rep_sent + IntMap.mapi + (fun id highest_sent -> + let lo = highest_sent + 1 in + let len = + min (highest - lo) ex.@(t @> config @> max_append_entries) + in + let hi = lo + len in + (* so we want to send the segment [lo -> hi] inclusive *) + ( if lo <= hi || force then + let prev_log_index = lo - 1 in + let entries = Log.iter_len ct.log ~lo ~hi () in + send id + @@ AppendEntries + { term= ct.current_term + ; leader_commit= ex.@(t @> commit_index) + ; prev_log_index + ; prev_log_term= get_log_term ct.log prev_log_index + ; entries } ) ; + hi ) + s.rep_sent | _ -> assert false @@ -288,8 +287,7 @@ struct -> (* This case happens if a message is lost *) assert (m.term = ex.@(t @> current_term)) ; - A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) ; - dtraceln "Failed to match\n%a" PP.t_pp ex.@(t) + A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) (* Follower *) | Recv (RequestVote m, cid), Follower _ when request_vote_valid (RequestVote m) -> @@ -321,11 +319,9 @@ struct (* This will be the prev_log_index of the next msg *) dtraceln "Failed to match\n\ - rooted_at_start(%b), matching_index_and_term(%b):\n\ - %a" + rooted_at_start(%b), matching_index_and_term(%b):\n" rooted_at_start - (matching_index_and_term ()) - PP.t_pp ct ; + (matching_index_and_term ()) ; send lid @@ AppendEntriesResponse { term= ct.current_term diff --git a/impl/lib/raft_sbn.ml b/impl/lib/raft_sbn.ml index 69c2db1..f26d4d4 100644 --- a/impl/lib/raft_sbn.ml +++ b/impl/lib/raft_sbn.ml @@ -104,28 +104,27 @@ struct match ct.node_state with | Leader s -> let highest = Log.highest ct.log in - (* Assume we are going to send up to highest to each *) - let send_f id highest_sent = - let lo = highest_sent + 1 in - let len = - min (highest - lo) ex.@(t @> config @> max_append_entries) - in - let hi = lo + len in - (* so we want to send the segment [lo -> hi] inclusive *) - if lo <= hi || force then - let prev_log_index = lo - 1 in - let entries = Log.iter_len ct.log ~lo ~hi () in - send id - @@ AppendEntries - { term= ct.current_term - ; leader_commit= ex.@(t @> commit_index) - ; prev_log_index - ; prev_log_term= get_log_term ct.log prev_log_index - ; entries } - in - IntMap.iter send_f s.rep_sent ; ex.@(t @> node_state @> Leader.rep_sent) <- - IntMap.map (fun _ -> highest) s.rep_sent + IntMap.mapi + (fun id highest_sent -> + let lo = highest_sent + 1 in + let len = + min (highest - lo) ex.@(t @> config @> max_append_entries) + in + let hi = lo + len in + (* so we want to send the segment [lo -> hi] inclusive *) + ( if lo <= hi || force then + let prev_log_index = lo - 1 in + let entries = Log.iter_len ct.log ~lo ~hi () in + send id + @@ AppendEntries + { term= ct.current_term + ; leader_commit= ex.@(t @> commit_index) + ; prev_log_index + ; prev_log_term= get_log_term ct.log prev_log_index + ; entries } ) ; + hi ) + s.rep_sent | _ -> assert false @@ -252,8 +251,7 @@ struct -> (* This case happens if a message is lost *) assert (m.term = ex.@(t @> current_term)) ; - A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) ; - dtraceln "Failed to match\n%a" PP.t_pp ex.@(t) + A.map (t @> node_state @> Leader.rep_sent) () ~f:(IntMap.add src idx) (* Follower *) | Recv (RequestVote m, cid), Follower _ when request_vote_valid (RequestVote m) -> @@ -283,11 +281,9 @@ struct (* This will be the prev_log_index of the next msg *) dtraceln "Failed to match\n\ - rooted_at_start(%b), matching_index_and_term(%b):\n\ - %a" + rooted_at_start(%b), matching_index_and_term(%b):\n" rooted_at_start - (matching_index_and_term ()) - PP.t_pp ct ; + (matching_index_and_term ()) ; send lid @@ AppendEntriesResponse { term= ct.current_term @@ -336,7 +332,7 @@ struct transit_candidate () | Candidate {timeout; _} when timeout <= 0 -> send_request_vote () ; - ex.@(t @> node_state @> Candidate.timeout) <- 1 + ex.@(t @> node_state @> Candidate.timeout) <- ex.@(t @> config @> election_timeout) | Leader {heartbeat; _} when heartbeat <= 0 -> send_append_entries ~force:true () ; ex.@(t @> node_state @> Leader.heartbeat) <- 1 diff --git a/impl/lib/types.ml b/impl/lib/types.ml index 2772548..10303d2 100644 --- a/impl/lib/types.ml +++ b/impl/lib/types.ml @@ -25,7 +25,7 @@ let config_pp : config Fmt.t = v.other_nodes let make_config ~node_id ~node_list ~election_timeout ?(max_outstanding = 8192) - ?(max_append_entries = 8192) () = + ?(max_append_entries = 512) () = let length = List.length node_list in let phase1quorum = (length + 1) / 2 in let phase2quorum = (length + 1) / 2 in diff --git a/impl/main.ml b/impl/main.ml index 1f1d631..6fab8d9 100644 --- a/impl/main.ml +++ b/impl/main.ml @@ -37,7 +37,7 @@ let run kind node_id node_addresses internal_port external_port tick_period ; node_id ; election_timeout ; max_outstanding - ; max_append_entries= 8192 } + ; max_append_entries= 512 } in let config cons_config = Infra. From d0b53932b887121a75b8377e54fe5c6052eac5eb Mon Sep 17 00:00:00 2001 From: cjen1 Date: Sat, 30 Dec 2023 15:47:14 +0000 Subject: [PATCH 9/9] Don't print log if not match --- impl/test/test_paxos.ml | 26 +++++++++++++++++++++----- impl/test/test_raft.ml | 6 +----- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/impl/test/test_paxos.ml b/impl/test/test_paxos.ml index be74ce9..3a04daf 100644 --- a/impl/test/test_paxos.ml +++ b/impl/test/test_paxos.ml @@ -99,6 +99,7 @@ let%expect_test "request vote from higher" = actions ; [%expect {| + +Follower for node 2 for term 10 +Follower for term 10 t': {log: []; commit_index:-1; current_term: 10; node_state:Follower(5)} actions: @@ -114,6 +115,7 @@ let%expect_test "request vote from higher" = [%expect {| +Candidate for term 3 + +Follower for node 2 for term 10 +Follower for term 10 t': {log: []; commit_index:-1; current_term: 10; node_state:Follower(5)} actions: @@ -131,6 +133,7 @@ let%expect_test "request vote from higher" = {| +Candidate for term 3 +Leader for term 3 + +Follower for node 2 for term 10 +Follower for term 10 t': {log: []; commit_index:-1; current_term: 10; node_state:Follower(5)} actions: @@ -156,6 +159,7 @@ let%expect_test "Loop" = let t, _ = Impl.advance t Tick in Fmt.pr "node_state: %a\n" PP.node_state_pp (A.get node_state t) ; [%expect {| + +Follower for node 2 for term 10 +Follower for term 10 node_state: Follower(1) |}] ; let t, actions = Impl.advance t Tick in @@ -175,6 +179,7 @@ let%expect_test "Loop" = Fmt.pr "a1: %a\n" Fmt.(brackets @@ list ~sep:(const string "\n") action_pp) a1 ; [%expect {| + +Follower for node 2 for term 12 +Follower for term 12 a1: [Send(2,RequestVoteResponse {term:12; start_index:0; entries_length:0; entries: []})] |}] ; @@ -183,6 +188,7 @@ let%expect_test "Loop" = Fmt.pr "a2: %a\n" Fmt.(brackets @@ list ~sep:(const string "\n") action_pp) a2 ; [%expect {| + +Follower for node 2 for term 12 +Follower for term 12 a2: [Send(2,RequestVoteResponse {term:12; start_index:0; entries_length:0; entries: []})] |}] ; @@ -196,6 +202,7 @@ let%expect_test "Loop" = actions ; [%expect {| + +Received vote from 1 +Leader for term 12 actions: [Send(1,AppendEntries {term: 12; leader_commit: -1; prev_log_index: -1; prev_log_term: 0; entries_length: 0; entries: []}) @@ -309,6 +316,7 @@ let%expect_test "Loop" = pp_res t2 actions ; [%expect {| + +Follower for node 1 for term 13 +Follower for term 13 t: {log: [{command: Command(Read m1, 1); term : 12}]; commit_index:-1; current_term: 13; node_state:Follower(5)} actions: @@ -324,6 +332,7 @@ let%expect_test "Loop" = pp_res t1 actions ; [%expect {| + +Received vote from 2 +Leader for term 13 t: {log: [{command: Command(Read m1, 1); term : 13}]; commit_index:-1; current_term: 13; node_state:Leader{heartbeat:1; rep_ackd: [{0, -1}, {2, -1}]; rep_sent:[{0, 0}, {2, 0}]} @@ -397,6 +406,7 @@ let%expect_test "Loop" = pp_res t actions ; [%expect {| + +Follower for node 2 for term 14 +Follower for term 14 t: {log: [{command: Command(Read m1, 1); term : 12},{command: Command(Read m2, 2); term : 12}]; commit_index:0; current_term: 14; node_state:Follower(5)} actions: @@ -415,6 +425,7 @@ let%expect_test "Loop" = pp_res t2 actions ; [%expect {| + +Received vote from 0 +Leader for term 14 t: {log: [{command: Command(Read m1, 1); term : 14},{command: Command(Read m4, 3); term : 14}]; commit_index:-1; current_term: 14; node_state:Leader{heartbeat:1; rep_ackd: [{0, -1}, {1, -1}]; rep_sent:[{0, 1}, {1, 1}]} @@ -446,8 +457,11 @@ let%expect_test "Loop" = Fmt.pr "t2:\n%a\n" PP.t_pp t2 ; [%expect {| + +Follower for node 1 for term 100 +Follower for term 100 + +Follower for node 2 for term 100 +Follower for term 100 + +Follower for node 2 for term 100 +Follower for term 100 t0: {log: [{command: Command(Read m1, 1); term : 12},{command: Command(Read m2, 2); term : 12}]; commit_index:0; current_term: 100; node_state:Follower(5)} @@ -510,6 +524,7 @@ let%expect_test "Loop" = pp_res t actions ; [%expect {| + +Received vote from 1 +Leader for term 102 t: {log: [{command: Command(Read m1, 1); term : 12},{command: Command(Read m4, 3); term : 102},{command: Command(Read m5, 4); term : 102},{command: Command(Read m6, 5); term : 102}]; commit_index:0; current_term: 102; node_state:Leader{heartbeat:1; rep_ackd: [{1, -1}, {2, -1}]; rep_sent:[{1, 3}, {2, 3}]} @@ -535,6 +550,7 @@ let%expect_test "Loop" = pp_res t1 actions ; [%expect {| + +Received vote from 2 +Leader for term 103 t: {log: [{command: Command(Read m1, 1); term : 13},{command: Command(Read m4, 3); term : 13},{command: Command(Read m7, 6); term : 103},{command: Command(Read m6, 5); term : 103}]; commit_index:1; current_term: 103; node_state:Leader{heartbeat:1; rep_ackd: [{0, -1}, {2, -1}]; rep_sent:[{0, 3}, {2, 3}]} @@ -560,6 +576,7 @@ let%expect_test "Loop" = pp_res t2 actions ; [%expect {| + +Received vote from 0 +Leader for term 101 t: {log: [{command: Command(Read m1, 1); term : 101},{command: Command(Read m4, 3); term : 101},{command: Command(Read m7, 6); term : 101}]; commit_index:-1; current_term: 101; node_state:Leader{heartbeat:1; rep_ackd: [{0, -1}, {1, -1}]; rep_sent:[{0, 2}, {1, 2}]} @@ -593,9 +610,12 @@ let%expect_test "Missing elements" = pp_res t actions ; [%expect {| + +Follower for node 2 for term 10 +Follower for term 10 +Candidate for term 12 + +Follower for node 2 for term 12 +Follower for term 12 + +Received vote from 1 +Leader for term 12 t: {log: []; commit_index:-1; current_term: 12; node_state:Leader{heartbeat:1; rep_ackd: [{1, -1}, {2, -1}]; rep_sent:[{1, -1}, {2, -1}]} @@ -639,8 +659,7 @@ let%expect_test "Missing elements" = {| +Failed to match +rooted_at_start(false), matching_index_and_term(false): - +{log: - +[]; commit_index:-1; current_term: 12; node_state:Follower(5)} + + t: {log: []; commit_index:-1; current_term: 12; node_state:Follower(5)} actions: [Send(0,AppendEntriesResponse {term: 12; success: Error: -1})] |}] ; @@ -649,9 +668,6 @@ let%expect_test "Missing elements" = pp_res t actions ; [%expect {| - +Failed to match - +{log: [{command: Command(Read m1, 1); term : 12},{command: Command(Read m2, 2); term : 12}]; commit_index:-1; current_term: 12; node_state:Leader{heartbeat:1; rep_ackd: - +[{1, -1}, {2, -1}]; rep_sent:[{1, -1}, {2, 1}]} t: {log: [{command: Command(Read m1, 1); term : 12},{command: Command(Read m2, 2); term : 12}]; commit_index:-1; current_term: 12; node_state:Leader{heartbeat:1; rep_ackd: [{1, -1}, {2, -1}]; rep_sent:[{1, 1}, {2, 1}]} actions: [Send(1,AppendEntries {term: 12; leader_commit: -1; prev_log_index: -1; prev_log_term: 0; entries_length: 2; entries: diff --git a/impl/test/test_raft.ml b/impl/test/test_raft.ml index 36f2169..7453ed4 100644 --- a/impl/test/test_raft.ml +++ b/impl/test/test_raft.ml @@ -630,8 +630,7 @@ let%expect_test "Missing elements" = {| +Failed to match +rooted_at_start(false), matching_index_and_term(false): - +{log: - +[]; commit_index:-1; current_term: 11; node_state:Follower{timeout:5; voted_for:0}} + + t: {log: []; commit_index:-1; current_term: 11; node_state:Follower{timeout:5; voted_for:0}} actions: [Send(0,AppendEntriesResponse {term: 11; success: Error: -1})] |}] ; @@ -640,9 +639,6 @@ let%expect_test "Missing elements" = pp_res t actions ; [%expect {| - +Failed to match - +{log: [{command: Command(NoOp, -1); term : 11},{command: Command(Read m1, 1); term : 11},{command: Command(Read m2, 2); term : 11}]; commit_index:-1; current_term: 11; node_state:Leader{heartbeat:1; rep_ackd: - +[{1, -1}, {2, -1}]; rep_sent:[{1, -1}, {2, 2}]} t: {log: [{command: Command(NoOp, -1); term : 11},{command: Command(Read m1, 1); term : 11},{command: Command(Read m2, 2); term : 11}]; commit_index:-1; current_term: 11; node_state:Leader{heartbeat:1; rep_ackd: [{1, -1}, {2, -1}]; rep_sent:[{1, 2}, {2, 2}]} actions: [Send(1,AppendEntries {term: 11; leader_commit: -1; prev_log_index: -1; prev_log_term: 0; entries_length: 3; entries: