diff --git a/bin/bench.ml b/bin/bench.ml index 8cfa971..0fc354e 100644 --- a/bin/bench.ml +++ b/bin/bench.ml @@ -24,7 +24,10 @@ let pitcher ~sw nid mclock n rate cmgr (dispatch : (int, Mtime.t) Hashtbl.t) : let id = i * prime in let cmd = Command. - {op= Write ("asdf", "asdf"); id; trace_start= Unix.gettimeofday ()} + { op= Write ("asdf", "asdf") + ; id + ; submitted= Unix.gettimeofday () + ; trace_start= Unix.gettimeofday () } in let target = Mtime.add_span prev period |> Option.get in if Mtime.is_later target ~than:(MT.now mclock) then diff --git a/bin/test_alloc.ml b/bin/test_alloc.ml index cbd66af..473eebc 100644 --- a/bin/test_alloc.ml +++ b/bin/test_alloc.ml @@ -13,4 +13,30 @@ let alloc_test = in Bench.Test.create_with_initialization ~name:"alloc_with_reference" test -let () = Command_unix.run (Bench.make_command [alloc_test]) +let alloc_return_tuple = + let extern = ref 0 in + let f x = (x + 1, x + 2, x + 3) in + let test () = + let a, b, c = f 1 in + extern := a + b + c + in + Bench.Test.create ~name:"alloc_return_tuple" test + +type foo = {mutable a: int; mutable b: int; mutable c: int} + +let alloc_return_struct = + let extern = ref 0 in + let f x r = + r.a <- x + 1 ; + r.b <- x + 2 ; + r.c <- x + 3 + in + let test `init = + let foo = {a= 0; b= 0; c= 0} in + fun () -> + f 1 foo ; + extern := foo.a + foo.b + foo.c + in + Bench.Test.create_with_initialization ~name:"alloc_return_struct" test + +let () = Command_unix.run (Bench.make_command [alloc_test; alloc_return_tuple; alloc_return_struct]) diff --git a/fuzz/dune b/fuzz/dune index 91770e8..876315e 100644 --- a/fuzz/dune +++ b/fuzz/dune @@ -1,3 +1,4 @@ (test (name lib_line_prot) - (libraries ocons.core crowbar mtime)) + (libraries ocons.core crowbar mtime) + (preprocess (pps ppx_jane))) diff --git a/fuzz/lib_line_prot.ml b/fuzz/lib_line_prot.ml index 494a7d1..ccbf9bc 100644 --- a/fuzz/lib_line_prot.ml +++ b/fuzz/lib_line_prot.ml @@ -14,8 +14,8 @@ module Gen = struct ; const NoOp ] let command = - map [op; int; float] (fun op id trace_start -> - Ocons_core.Types.Command.{op; id; trace_start} ) + map [op; int; float; float] (fun op id submitted trace_start -> + Ocons_core.Types.Command.{op; id; submitted; trace_start} ) let log_entry = map [command; int] (fun command term -> {command; term}) @@ -39,7 +39,13 @@ let test_client_request r = @@ fun bw -> Line_prot.External_infra.serialise_request r bw ; let r' = Line_prot.External_infra.parse_request br in - check_eq ~cmp:Command.compare ~pp:Command.pp_mach r r' + (* compare *) + check_eq ~cmp:Command.compare ~pp:Command.pp_mach r r'; + (* hash *) + check_eq ~pp:Command.pp_mach r r' ~cmp:(fun a b -> + let ha, hb = Command.hash a, Command.hash b in + Int.compare ha hb + ) let test_client_response r = let open Crowbar in diff --git a/impl/fuzz/command_tree.ml b/impl/fuzz/command_tree.ml new file mode 100644 index 0000000..7307bb7 --- /dev/null +++ b/impl/fuzz/command_tree.ml @@ -0,0 +1,51 @@ +open! Core + +module CTree = Impl_core__Conspire_command_tree.CommandTree (struct + type t = int [@@deriving compare, show, bin_io, hash] +end) + +let ctree = + let open Crowbar in + fix (fun gen -> + choose + [ const CTree.empty + ; map [gen; int; list1 int] (fun ctree kid vs -> + let nodes = Map.keys ctree.CTree.ctree in + let kid = kid % List.length nodes in + let root = List.nth_exn nodes kid in + let ctree', _ = + CTree.addv ~node:0 ctree ~parent:root (List.iter vs |> Iter.from_labelled_iter) in + ctree')]) + +(* take ctree and random node and node in its history *) +(* make an update of that and add it to the tree of just the path to that *) +let test_ctree_update ctree u_start u_end = + let nodes = + ctree.CTree.ctree |> Map.keys + |> List.sort ~compare:(fun ka kb -> + Int.compare (CTree.get_idx ctree ka) (CTree.get_idx ctree kb) ) + in + let u_end = u_end % List.length nodes in + let n_end = List.nth_exn nodes u_end in + (* path goes from root to n_end *) + let path = CTree.path_between ctree CTree.root_key n_end in + let remote_path, _ = List.split_n path (u_start % max 1 (List.length path)) in + let remote, _ = + CTree.addv CTree.empty ~node:0 ~parent:CTree.root_key + ( List.iter remote_path |> Iter.from_labelled_iter + |> Iter.map (fun (_, _, v) -> v) ) + in + let update = CTree.make_update ctree n_end remote in + let remote = CTree.apply_update_exn remote update in + let local_path = CTree.path_between ctree CTree.root_key n_end in + let remote_path = CTree.path_between remote CTree.root_key n_end in + let pp ppf v = + Fmt.pf ppf "%a" + Fmt.(brackets @@ list ~sep:comma int) + (List.map v ~f:(fun (_idx, _key, v) -> v)) + in + Crowbar.check_eq ~pp ~cmp:[%compare: CTree.node list] local_path remote_path + +let () = + let open Crowbar in + add_test ~name:"ctree update" [ctree; int; int] test_ctree_update diff --git a/impl/fuzz/dune b/impl/fuzz/dune index 9a150a3..7e5f107 100644 --- a/impl/fuzz/dune +++ b/impl/fuzz/dune @@ -1,7 +1,7 @@ (tests - (names test_msgs) + (names test_msgs command_tree) (flags (:standard -w -27)) (preprocess - (pps ppx_jane)) + (pps ppx_jane ppx_log ppx_accessor ppx_deriving.show)) (libraries impl_core crowbar)) diff --git a/impl/fuzz/test_msgs.ml b/impl/fuzz/test_msgs.ml index e69a4fc..e289eb3 100644 --- a/impl/fuzz/test_msgs.ml +++ b/impl/fuzz/test_msgs.ml @@ -1,7 +1,9 @@ module C = Crowbar let entries_equal (ea, la) (eb, lb) = - la = lb && List.equal ( = ) (Iter.to_list ea) (Iter.to_list eb) + la = lb + && List.equal [%compare.equal: Ocons_core.Types.log_entry] (Iter.to_list ea) + (Iter.to_list eb) module Gen = struct open Ocons_core.Types @@ -17,13 +19,22 @@ module Gen = struct ; const NoOp ] let command = - map [op; int] (fun op id -> - Ocons_core.Types.Command.{op; id; trace_start= -1.} ) + with_printer Command.pp + @@ map [op; int; float; float] (fun op id submitted trace_start -> + Ocons_core.Types.Command.{op; id; trace_start; submitted} ) - let log_entry = map [command; int] (fun command term -> {command; term}) + let log_entry = + with_printer log_entry_pp + @@ map [command; int] (fun command term -> {command; term}) + + let pp_entries ppf (v, _) = + Fmt.pf ppf "%a" + Fmt.(brackets @@ list ~sep:comma log_entry_pp_mach) + (Iter.to_list v) let entries = - map [list log_entry] (fun les -> (Iter.of_list les, List.length les)) + with_printer pp_entries + @@ map [list log_entry] (fun les -> (Iter.of_list les, List.length les)) let conspire_value : Impl_core.ConspireSS.value gen = list command end @@ -43,7 +54,7 @@ let test_entry_equality les = @@ fun bw -> LP.SerPrim.entries w_entries bw ; let r_entries = LP.DeserPrim.entries br in - check_eq ~eq:entries_equal w_entries r_entries + check_eq ~pp:Gen.pp_entries ~eq:entries_equal w_entries r_entries module Paxos = struct open Impl_core.Paxos diff --git a/impl/lib/conspire_command_tree.ml b/impl/lib/conspire_command_tree.ml index 70e1cfd..b15a7e1 100644 --- a/impl/lib/conspire_command_tree.ml +++ b/impl/lib/conspire_command_tree.ml @@ -30,7 +30,7 @@ module CommandTree (Value : Value) = struct Extensions to heads *) - type node = int * key * Value.t [@@deriving show, bin_io] + type node = int * key * Value.t [@@deriving show, bin_io, compare] type parent_ref_node = {node: node; parent: parent_ref_node option [@opaque]; key: key} @@ -93,7 +93,7 @@ module CommandTree (Value : Value) = struct | [] -> Fmt.failwith "All updates should be non-empty" | (_, par, _) :: _ when not (Map.mem t.ctree par) -> - Fmt.error_msg "Missing %a" Key.pp par + Error (`Root_of_update_not_found par) | (_, par, _) :: _ as extension -> let rec aux (ctree : parent_ref_node option Map.M(Key).t) (extension : node list) parent = @@ -115,8 +115,8 @@ module CommandTree (Value : Value) = struct match apply_update t update with | Ok t -> t - | Error (`Msg s) -> - Fmt.failwith "%s" s + | Error (`Root_of_update_not_found par) -> + Fmt.failwith "Root of update not found: %a" Key.pp par let addv t ~node:_ ~parent vi = let parent_key = parent in @@ -256,10 +256,13 @@ module CommandTree (Value : Value) = struct match (curr, rt) with | None, Some _ -> Fmt.invalid_arg "%a not on path to %a" Key.pp rt_vc Key.pp hd_vc + (* Both root*) | None, None -> acc + (* Both non-root and equal *) | Some curr, Some rt when [%equal: Key.t] curr.key rt.key -> acc + (* both Non-root and non-equal *) | Some curr, _ -> aux curr.parent (curr.node :: acc) in diff --git a/impl/lib/conspire_dc.ml b/impl/lib/conspire_dc.ml index adc6c64..876ead1 100644 --- a/impl/lib/conspire_dc.ml +++ b/impl/lib/conspire_dc.ml @@ -28,53 +28,6 @@ end (all the while doing the recovery and commit messages as before) *) -module DelayReorderBuffer = struct - type 'a t = - { mutable store: 'a list Map.M(Time).t - [@polyprinter fun pa -> Utils.pp_map pp_time_float_unix pa] - ; mutable hwm: Time.t [@printer pp_time_float_unix] - ; interval: Time.Span.t - ; compare: 'a -> 'a -> int } - [@@deriving show {with_path= false}] - - let ms_clamp ?(direction = `Ceil) interval f = - match direction with - | `Ceil -> - Time.next_multiple ~base:Time.epoch ~interval ~after:f - ~can_equal_after:true () - | `Floor -> - Time.prev_multiple ~base:Time.epoch ~interval ~before:f - ~can_equal_before:true () - - let get_values t current = - (* lim is the millisecond before now *) - let lim = current |> ms_clamp ~direction:`Floor t.interval in - let seq = - Map.to_sequence ~order:`Increasing_key ~keys_less_or_equal_to:lim t.store - in - t.hwm <- lim ; - seq - |> Sequence.group ~break:(fun (a, _) (b, _) -> - Time.(ms_clamp t.interval a <> ms_clamp t.interval b) ) - |> Sequence.map ~f:(fun ls -> - let key = ms_clamp t.interval (List.hd_exn ls |> fst) in - (* remove relevant keys *) - let values = - ls - |> List.iter ~f:(fun (k, _) -> - t.store <- Map.remove_multi t.store k ) ; - (* export batch *) - ls |> List.map ~f:snd |> List.join |> List.sort ~compare:t.compare - in - (values, key) ) - - let add_value t v target = - if Time_float.(target > t.hwm) then - t.store <- Map.add_multi t.store ~key:target ~data:v - - let create ~compare interval base_hwm = - {hwm= base_hwm; interval; store= Map.empty (module Time); compare} -end module Counter = struct type t = {mutable count: int; limit: int} [@@deriving show] @@ -129,7 +82,7 @@ module Types = struct { config: config [@opaque] ; conspire: Conspire.t ; command_buffer: - Command.t DelayReorderBuffer.t (* TODO only reply to relevnat nodes *) + Command.t Delay_buffer.t (* TODO only reply to relevnat nodes *) ; tick_count: Counter.t ; clock: float Eio.Time.clock_ty Eio.Std.r [@opaque] } [@@deriving show {with_path= false}] @@ -186,7 +139,7 @@ struct |> Option.value_map ~default:Time.epoch ~f:snd in let batches = - DelayReorderBuffer.get_values t.command_buffer time + Delay_buffer.get_values t.command_buffer time |> Sequence.filter ~f:(fun (_, t) -> Time.(t > batch_floor)) in Conspire.add_commands t.conspire @@ -206,34 +159,41 @@ struct let now = Eio.Time.now t.clock |> Utils.float_to_time in let hedged_target = Time.add now t.config.delay_interval in List.iter commands ~f:(fun c -> - DelayReorderBuffer.add_value t.command_buffer c hedged_target ) ; + Delay_buffer.add_value t.command_buffer c hedged_target ) ; Act.broadcast (Commands (commands, hedged_target)) | Recv (Commands (cs, tar), _) -> List.iter cs ~f:(fun c -> - DelayReorderBuffer.add_value t.command_buffer c tar ) + Delay_buffer.add_value t.command_buffer c tar ) | Recv (Conspire m, src) -> ( - let recovery_started, prev_ci = - ( t.conspire.rep.state.term > t.conspire.rep.state.vterm - , get_commit_index t ) - in - let msg_result = Conspire.handle_message t.conspire src m in - let now_steady_state, committed = - ( t.conspire.rep.state.term = t.conspire.rep.state.vterm - , get_commit_index t > prev_ci ) - in - let recovery_completed = recovery_started && now_steady_state in - let recovery_initiated = - (not recovery_started) && not now_steady_state - in - match msg_result with + let update_result = Conspire.handle_update_message t.conspire src m in + match update_result with | Error `MustAck -> send t src - | Error `MustNack -> + | Error (`MustNack reason) -> + ( match reason with + | `Root_of_update_not_found _ -> + Utils.dtraceln "Update is not rooted" + | `Commit_index_not_in_tree -> + Utils.dtraceln "Commit index not in tree" + | `VVal_not_in_tree -> + Utils.dtraceln "VVal not int tree" ) ; nack t src - | Ok _ when recovery_completed || committed || recovery_initiated -> - broadcast t - | _ -> - send t src ) + | Ok () -> + process_acceptor_state t.conspire src ; + let conflict_recovery_attempt = + Conspire.conflict_recovery t.conspire + in + let recovery_started = + t.conspire.rep.state.term > t.conspire.rep.state.vterm + in + let committed = + Conspire.check_commit t.conspire |> Option.is_some + in + let should_broadcast = + Result.is_ok conflict_recovery_attempt + || committed || recovery_started + in + if should_broadcast then broadcast t else send t src ) let advance t e = Act.run_side_effects @@ -243,7 +203,7 @@ struct let create (config : config) = let conspire = Conspire.create config.conspire in let command_buffer = - DelayReorderBuffer.create ~compare:Command.compare + Delay_buffer.create ~compare:Command.compare config.batching_interval (Eio.Time.now config.clock |> Utils.float_to_time) in diff --git a/impl/lib/conspire_f.ml b/impl/lib/conspire_f.ml index 05b07c2..5f260f8 100644 --- a/impl/lib/conspire_f.ml +++ b/impl/lib/conspire_f.ml @@ -94,7 +94,7 @@ module Make (Value : Value) = struct remote.expected_tree <- CTree.copy_update rep.store remote.expected_tree update.new_head - let get_update_to_send rep dst = + let get_update_to_send ?(prune = false) rep dst = let update = get_update rep dst in let remote = Map.find_exn rep.remotes dst in Option.iter update.ctree ~f:(fun update -> @@ -103,6 +103,9 @@ module Make (Value : Value) = struct |> Result.ok |> Option.value_exn ) ; Option.iter update.cons ~f:(fun s -> set_state (Map.find_exn rep.remotes dst).expected_state s ) ; + let update = + {update with ctree= (if prune then None else update.ctree)} + in update let add_commands rep ~node vals = @@ -137,41 +140,44 @@ module Make (Value : Value) = struct let reporter_conflict, run_c = Ocons_core.Utils.InternalReporter.rate_reporter "conflict" - let acceptor_reply t src = + let process_acceptor_state t src = let local = t.rep.state in let remote = Map.find_exn t.other_nodes_state src in let msg_term : term = remote.vterm in - match msg_term >= local.term with - (* overwrite *) - | true when msg_term > local.vterm -> - local.term <- msg_term ; - local.vterm <- msg_term ; - local.vval <- remote.vval - (* extends local *) - | true when msg_term = local.vterm -> ( - let res = CTree.compare_keys t.rep.store local.vval remote.vval in - match res with - | None -> - reporter_conflict () ; - (* conflict *) - Utils.dtraceln "CONFLICT from %d" src ; - Utils.dtraceln "local %a does not prefix of remote %a" - (Fmt.option CTree.pp_parent_ref_node) - (CTree.get t.rep.store local.vval) - (Fmt.option CTree.pp_parent_ref_node) - (CTree.get t.rep.store remote.vval) ; - local.term <- msg_term + 1 - | Some EQ -> - (* equal *) - () - | Some GT -> - (* local > remote *) - () - | Some LT -> - (* local < remote *) - local.vval <- remote.vval ) - | _ -> - assert (msg_term < local.term || msg_term < local.vterm) + let () = + match msg_term >= local.term with + (* overwrite *) + | true when msg_term > local.vterm -> + local.term <- msg_term ; + local.vterm <- msg_term ; + local.vval <- remote.vval + (* extends local *) + | true when msg_term = local.vterm -> ( + let res = CTree.compare_keys t.rep.store local.vval remote.vval in + match res with + | None -> + reporter_conflict () ; + (* conflict *) + Utils.dtraceln "CONFLICT from %d" src ; + Utils.dtraceln "local %a does not prefix of remote %a" + (Fmt.option CTree.pp_parent_ref_node) + (CTree.get t.rep.store local.vval) + (Fmt.option CTree.pp_parent_ref_node) + (CTree.get t.rep.store remote.vval) ; + local.term <- msg_term + 1 + | Some EQ -> + (* equal *) + () + | Some GT -> + (* local > remote *) + () + | Some LT -> + (* local < remote *) + local.vval <- remote.vval ) + | _ -> + assert (msg_term < local.term || msg_term < local.vterm) + in + if t.rep.state.term < remote.term then t.rep.state.term <- remote.term let check_commit t = let old_ci = t.rep.state.commit_index in @@ -187,15 +193,16 @@ module Make (Value : Value) = struct t.config.quorum_size in match new_commit with - | None -> - () - | Some ci -> + | Some ci when not ([%equal: CTree.key] old_ci ci) -> (* can update since once committed, never overwritten *) t.rep.state.commit_index <- ci ; let cis = CTree.path_between t.rep.store old_ci ci in - List.iter cis ~f:(fun (_, _, v) -> Log.add t.commit_log v) + List.iter cis ~f:(fun (_, _, v) -> Log.add t.commit_log v) ; + Some ci + | _ -> + None - let check_conflict_recovery t = + let conflict_recovery t = let votes = t.config.replica_ids |> Iter.of_list |> Iter.map (function @@ -207,80 +214,79 @@ module Make (Value : Value) = struct let max_term = votes |> Iter.fold (fun acc (s : state) -> max acc s.term) (-1) in - if max_term > t.rep.state.vterm then - let num_max_term_votes = - votes |> Iter.filter_count (fun (s : state) -> s.term = max_term) - in - if num_max_term_votes >= t.config.quorum_size then ( - let votes = - votes |> Iter.filter (fun (s : state) -> s.term = max_term) - in - let missing_votes = t.config.replica_count - Iter.length votes in - let max_vterm = - votes |> Iter.fold (fun acc (s : state) -> max acc s.vterm) (-1) - in - let vterm_votes = - votes |> Iter.filter (fun (s : state) -> s.vterm = max_vterm) - in - let o4_prefix = - CTree.greatest_sufficiently_common_prefix t.rep.store - (vterm_votes |> Iter.map (fun s -> s.vval) |> Iter.to_list) - (t.config.quorum_size - missing_votes) - in - let max_length_o4_vote = - vterm_votes - |> Iter.filter (fun s -> - Option.value_map o4_prefix ~default:true ~f:(fun o4_prefix -> - CTree.prefix t.rep.store o4_prefix s.vval ) ) - |> Iter.max_exn ~lt:(fun a b -> - CTree.get_idx t.rep.store a.vval - < CTree.get_idx t.rep.store b.vval ) - in - t.rep.state.term <- max_term ; - t.rep.state.vterm <- max_term ; - t.rep.state.vval <- max_length_o4_vote.vval ) - - let add_commands t (ci : Value.t Iter.t) = - Rep.add_commands t.rep ~node:t.config.node_id ci ; - check_commit t - - let acceptor_term_tick t term' = - if t.rep.state.term < term' then t.rep.state.term <- term' - - let handle_steady_state t src (msg : Rep.success) = - let option_bind o ~f = Option.value_map o ~default:(Ok ()) ~f in - let%bind.Result () = option_bind msg.ctree ~f:(Rep.recv_update t.rep src) in let%bind.Result () = - option_bind msg.cons ~f:(fun s -> - CTree.mem t.rep.store s.vval - |> Result.ok_if_true ~error:(`Msg "Invalid vval") ) + Result.ok_if_true (max_term > t.rep.state.vterm) ~error:`InSteadyState + in + let num_max_term_votes = + votes |> Iter.filter_count (fun (s : state) -> s.term = max_term) in let%bind.Result () = - option_bind msg.cons ~f:(fun s -> - CTree.mem t.rep.store s.commit_index - |> Result.ok_if_true ~error:(`Msg "Invalid commit index") ) + Result.ok_if_true + (num_max_term_votes >= t.config.quorum_size) + ~error:`No_quorum in - Option.iter msg.cons ~f:(fun new_state -> - let remote = Map.find_exn t.other_nodes_state src in - set_state remote new_state ; - acceptor_reply t src ; - check_commit t ; - check_conflict_recovery t ; - acceptor_term_tick t new_state.term ) ; + let votes = votes |> Iter.filter (fun (s : state) -> s.term = max_term) in + let missing_votes = t.config.replica_count - Iter.length votes in + let max_vterm = + votes |> Iter.fold (fun acc (s : state) -> max acc s.vterm) (-1) + in + let vterm_votes = + votes |> Iter.filter (fun (s : state) -> s.vterm = max_vterm) + in + let o4_prefix = + CTree.greatest_sufficiently_common_prefix t.rep.store + (vterm_votes |> Iter.map (fun s -> s.vval) |> Iter.to_list) + (t.config.quorum_size - missing_votes) + in + let max_length_o4_vote = + vterm_votes + |> Iter.filter (fun s -> + Option.value_map o4_prefix ~default:true ~f:(fun o4_prefix -> + CTree.prefix t.rep.store o4_prefix s.vval ) ) + |> Iter.max_exn ~lt:(fun a b -> + CTree.get_idx t.rep.store a.vval < CTree.get_idx t.rep.store b.vval ) + in + t.rep.state.term <- max_term ; + t.rep.state.vterm <- max_term ; + t.rep.state.vval <- max_length_o4_vote.vval ; Result.return () - let handle_message t src (msg : Rep.message) : - (unit, [`MustAck | `MustNack]) result = + let add_commands t (ci : Value.t Iter.t) = + Rep.add_commands t.rep ~node:t.config.node_id ci ; + let (_ : CTree.key option) = check_commit t in + () + + let handle_update_message t src (msg : Rep.message) = match msg with - | Ok msg -> - handle_steady_state t src msg - |> Result.map_error ~f:(function `Msg _ -> `MustNack) | Error _ -> - (* reset expected state to commit index and initial state *) + (* Reset optimistic tree to known tree *) let remote = Map.find_exn t.rep.remotes src in - (* commit guaranteed to exist locally otherwise rejected update :) *) remote.expected_tree <- remote.known_tree ; Error `MustAck + | Ok msg -> + let ctree_rep_result = + let option_bind o ~f = Option.value_map o ~default:(Ok ()) ~f in + let%bind.Result () = + option_bind msg.ctree ~f:(Rep.recv_update t.rep src) + in + let%bind.Result () = + option_bind msg.cons ~f:(fun s -> + CTree.mem t.rep.store s.vval + |> Result.ok_if_true ~error:`VVal_not_in_tree ) + in + let%bind.Result () = + option_bind msg.cons ~f:(fun s -> + CTree.mem t.rep.store s.commit_index + |> Result.ok_if_true ~error:`Commit_index_not_in_tree ) + in + Result.return () + in + let%bind.Result () = + ctree_rep_result |> Result.map_error ~f:(fun e -> `MustNack e) + in + Option.iter msg.cons + ~f:(set_state (Map.find_exn t.other_nodes_state src)) ; + Result.return () let create (config : config) = run_c := true ; diff --git a/impl/lib/conspire_mp.ml b/impl/lib/conspire_leader.ml similarity index 65% rename from impl/lib/conspire_mp.ml rename to impl/lib/conspire_leader.ml index eb37952..62d8cd0 100644 --- a/impl/lib/conspire_mp.ml +++ b/impl/lib/conspire_leader.ml @@ -16,32 +16,6 @@ module Conspire = Conspire_f.Make (Value) - Command otherwise => ignore *) -module StallChecker = struct - open Utils - - type t = - { mutable prev_commit_idx: log_index - ; mutable ticks_since_commit: int - ; tick_limit: int } - - let reset t commit_index = - if commit_index > t.prev_commit_idx then t.prev_commit_idx <- commit_index ; - t.ticks_since_commit <- t.tick_limit - - let is_stalled t = t.ticks_since_commit <= 0 - - let check ?(pp : unit Fmt.t = fun _ () -> ()) ?(should_make_progress = false) - t = - t.ticks_since_commit <- t.ticks_since_commit - 1 ; - if is_stalled t && should_make_progress then ( - traceln "========== Stalled ==========" ; - traceln "%a" pp () ; - traceln "========== Stalled ==========" ) ; - if is_stalled t then reset t t.prev_commit_idx - - let create = {ticks_since_commit= 2; prev_commit_idx= -1; tick_limit= 2} -end - module FailureDetector = struct type t = { state: (node_id, int) Hashtbl.t @@ -98,8 +72,7 @@ module Types = struct type t = { config: config [@opaque] ; conspire: Conspire.t - ; failure_detector: FailureDetector.t - ; stall_checker: StallChecker.t [@opaque] } + ; failure_detector: FailureDetector.t } [@@deriving show {with_path= false}] let get_command idx t = @@ -157,13 +130,6 @@ struct let broadcast ?(force = false) t = List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid) - let stall_check (t : t) = - StallChecker.check t.stall_checker ~pp:(fun ppf () -> - let problem_idx = t.conspire.rep.state.commit_index in - Fmt.pf ppf "Stalled on %d at %a@." t.config.conspire.node_id - (Fmt.option Conspire.CTree.pp_parent_ref_node) - (Conspire.CTree.get t.conspire.rep.store problem_idx) ) - let nack_counter, run_nc = Ocons_core.Utils.InternalReporter.rate_reporter "nacks" @@ -183,49 +149,85 @@ struct run_vl := true ; match event with | Tick -> - stall_check t ; term_tracker t.conspire.rep.state.term ; FailureDetector.tick t.failure_detector ; + if is_leader t then Conspire.conflict_recovery t.conspire |> ignore ; broadcast t ~force:true | Commands ci when can_apply_requests t -> let commands = Iter.to_list ci in value_length (List.length commands) ; - Conspire.add_commands t.conspire (commands |> Iter.singleton) ; + let _ = Conspire.add_commands t.conspire (commands |> Iter.singleton) in broadcast t | Commands _ -> Fmt.invalid_arg "Commands cannot yet be applied" | Recv (m, src) -> ( FailureDetector.reset t.failure_detector src ; - let recovery_started, prev_ci = - ( t.conspire.rep.state.term > t.conspire.rep.state.vterm - , get_commit_index t ) - in - let msg_result = Conspire.handle_message t.conspire src m in - let now_steady_state, committed = - ( t.conspire.rep.state.term = t.conspire.rep.state.vterm - , get_commit_index t > prev_ci ) - in - match (msg_result, recovery_started && now_steady_state, committed) with - | Error `MustAck, _, _ -> - ack_counter () ; send t src - | Error `MustNack, _, _ -> - nack_counter () ; nack t src - | Ok _, true, _ | Ok _, _, true -> - broadcast t - | _ -> - send t src ) + let update_result = Conspire.handle_update_message t.conspire src m in + match update_result with + | Error `MustAck -> + Act.traceln "Acking %d" src ; + ack_counter () ; + send t src + | Error (`MustNack reason) -> + Act.traceln "Nack for %d: %s" src + ( match reason with + | `Root_of_update_not_found _ -> + "Update is not rooted" + | `Commit_index_not_in_tree -> + "Commit index not in tree" + | `VVal_not_in_tree -> + "VVal not int tree" ) ; + nack_counter () ; + nack t src + | Ok () -> + process_acceptor_state t.conspire src ; + let conflict_recovery_attempt = + if is_leader t then Conspire.conflict_recovery t.conspire + else Error `NotLeader + in + Result.iter conflict_recovery_attempt ~f:(fun () -> + Act.traceln "Recovery complete term: {t:%d,vt:%d}" + t.conspire.rep.state.term t.conspire.rep.state.vterm ) ; + let recovery_started = + t.conspire.rep.state.term > t.conspire.rep.state.vterm + in + let committed = + Conspire.check_commit t.conspire |> Option.is_some + in + let should_broadcast = + Result.is_ok conflict_recovery_attempt + || committed || recovery_started + in + if should_broadcast then broadcast t else send t src ) let advance t e = - Act.run_side_effects - (fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e)) - t + let is_leader_pre = is_leader t in + let prev_term = t.conspire.rep.state.term in + let prev_vterm = t.conspire.rep.state.vterm in + let res = + Act.run_side_effects + (fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e)) + t + in + ( match (is_leader_pre, is_leader t) with + | true, false -> + Act.traceln "No longer leader for %d" t.conspire.rep.state.term + | false, true -> + Act.traceln "Now leader for %d" t.conspire.rep.state.term + | _ -> + () ) ; + if prev_term < t.conspire.rep.state.term then + Act.traceln "Conflict, term=%d" t.conspire.rep.state.term ; + if prev_vterm < t.conspire.rep.state.vterm then + Act.traceln "Recovery to %d" t.conspire.rep.state.vterm ; + res let create (config : config) = let conspire = Conspire.create config.conspire in let failure_detector = FailureDetector.create config.fd_timeout config.conspire.other_replica_ids in - {config; conspire; failure_detector; stall_checker= StallChecker.create} + {config; conspire; failure_detector} end module Impl = Make (Actions_f.ImperativeActions (Types)) diff --git a/impl/lib/conspire_leader_dc.ml b/impl/lib/conspire_leader_dc.ml new file mode 100644 index 0000000..4d8d385 --- /dev/null +++ b/impl/lib/conspire_leader_dc.ml @@ -0,0 +1,238 @@ +open! Core +open Types +module Time = Time_float_unix + +let pp_time_float_unix : Time.t Fmt.t = + fun ppf v -> Fmt.pf ppf "%0.5f" (Utils.time_to_float v) + +module Value = struct + let pp_command = Command.pp + + type t = command list * (Time.t[@printer pp_time_float_unix]) + [@@deriving compare, equal, hash, bin_io, sexp, show] + + let empty = ([], Time.epoch) +end + +(* + Use delay-reorder-buffer to avoid conflict with leader when backup pre-emptively proposes + *) + +module Counter = struct + type t = {mutable count: int; limit: int} [@@deriving show] + + let incr_counter t = t.count <- t.count + 1 + + let reset t = t.count <- 0 + + let over_lim t = t.count >= t.limit +end + +module Conspire = Conspire_f.Make (Value) +module FailureDetector = Conspire_leader.FailureDetector + +module Types = struct + type config = + { conspire: Conspire_f.config + ; lower_replica_ids: node_id list + ; other_replica_ids: node_id list + ; batching_interval: Time.Span.t + ; delay_interval: Time.Span.t + ; fd_timeout: int + ; max_outstanding: int + ; broadcast_tick_interval: int + ; clock: float Eio.Time.clock_ty Eio.Time.clock [@opaque] } + [@@deriving show {with_path= false}] + + let make_config ~node_id ~replica_ids ~delay_interval ~fd_timeout + ~batching_interval ?(max_outstanding = 8192) ~broadcast_tick_interval + clock : config = + let floor f = f |> Int.of_float in + let replica_count = List.length replica_ids in + let quorum_size = floor (2. *. Float.of_int replica_count /. 3.) + 1 in + assert (3 * quorum_size > 2 * replica_count) ; + let other_replica_ids = + List.filter replica_ids ~f:(fun i -> not (i = node_id)) + in + let lower_replica_ids = + List.filter replica_ids ~f:(fun id -> id < node_id) + in + let conspire = + Conspire_f. + {node_id; replica_ids; other_replica_ids; replica_count; quorum_size} + in + { conspire + ; other_replica_ids + ; lower_replica_ids + ; delay_interval + ; fd_timeout + ; batching_interval + ; max_outstanding + ; broadcast_tick_interval + ; clock= (clock :> float Eio.Time.clock_ty Eio.Time.clock) } + + type message = + | Commands of (Command.t list * (Time.t[@printer pp_time_float_unix])) + | Conspire of Conspire.message + [@@deriving show, bin_io] + + type t = + { config: config [@opaque] + ; conspire: Conspire.t + ; command_buffer: Command.t Delay_buffer.t + ; tick_count: Counter.t + ; failure_detector: FailureDetector.t + ; clock: float Eio.Time.clock_ty Eio.Std.r [@opaque] } + [@@deriving show {with_path= false}] + + let get_command idx t = + if idx < 0 then Iter.empty + else Log.get t.conspire.commit_log idx |> fst |> Iter.of_list + + let get_commit_index t = Log.highest t.conspire.commit_log + + module PP = struct + let message_pp = pp_message + + let t_pp = pp + + let config_pp = pp_config + end +end + +module Make + (Act : Actions_f.ActionSig + with type t = Types.t + and type message = Types.message) = +struct + include Conspire + include Types + + let is_leader t = + List.for_all t.config.lower_replica_ids ~f:(fun nid -> + not @@ FailureDetector.is_live t.failure_detector nid ) + + let available_space_for_commands t = t.config.max_outstanding + + let send ?(force = false) t dst = + let open Rep in + let update = get_update_to_send t.conspire.rep dst in + if force || Option.is_some update.ctree || Option.is_some update.cons then + Act.send dst (Conspire (Ok update)) + + let nack t dst = + Act.send dst (Conspire (Error {commit= t.conspire.rep.state.commit_index})) + + let broadcast ?(force = false) t = + List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid) + + let gather_batch_from_buffer t = + let time = Eio.Time.now t.clock |> Utils.float_to_time in + let batch_floor = + CTree.get_value t.conspire.rep.store t.conspire.rep.state.vval + |> Option.value_map ~default:Time.epoch ~f:snd + in + let batches = + Delay_buffer.get_values t.command_buffer time + |> Sequence.filter ~f:(fun (_, t) -> Time.(t > batch_floor)) + in + Conspire.add_commands t.conspire + (batches |> Sequence.iter |> Iter.from_labelled_iter) + + let handle_event t (event : message Ocons_core.Consensus_intf.event) = + match event with + | Tick -> + (* Increment counters *) + Counter.incr_counter t.tick_count ; + FailureDetector.tick t.failure_detector ; + if is_leader t then ( + (* Do recovery *) + Conspire.conflict_recovery t.conspire |> ignore ; + (* If leader add batch *) + if t.conspire.rep.state.vterm = t.conspire.rep.state.term then + gather_batch_from_buffer t ; + broadcast t ) ; + (* Broadcast occasionally *) + if Counter.over_lim t.tick_count then ( + Counter.reset t.tick_count ; broadcast t ~force:true ) + | Commands ci -> + let commands = Iter.to_list ci in + let now = Eio.Time.now t.clock |> Utils.float_to_time in + let hedged_target = Time.add now t.config.delay_interval in + List.iter commands ~f:(fun c -> + Delay_buffer.add_value t.command_buffer c hedged_target ) ; + Act.broadcast (Commands (commands, hedged_target)) + | Recv (Commands (cs, tar), _) -> + List.iter cs ~f:(fun c -> + Delay_buffer.add_value t.command_buffer c tar ) + | Recv (Conspire m, src) -> ( + FailureDetector.reset t.failure_detector src ; + let update_result = Conspire.handle_update_message t.conspire src m in + match update_result with + | Error `MustAck -> + Act.traceln "Acking %d" src ; + send t src + | Error (`MustNack reason) -> + Act.traceln "Nack for %d: %s" src + ( match reason with + | `Root_of_update_not_found _ -> + "Update is not rooted" + | `Commit_index_not_in_tree -> + "Commit index not in tree" + | `VVal_not_in_tree -> + "VVal not int tree" ) ; + nack t src + | Ok () -> + process_acceptor_state t.conspire src ; + let conflict_recovery_attempt = + if is_leader t then Conspire.conflict_recovery t.conspire + else Error `NotLeader + in + Result.iter conflict_recovery_attempt ~f:(fun () -> + Utils.traceln "Recovery complete term: {t:%d,vt:%d}" + t.conspire.rep.state.term t.conspire.rep.state.vterm ) ; + broadcast t ; + let recovery_started = + t.conspire.rep.state.term > t.conspire.rep.state.vterm + in + let committed = + Conspire.check_commit t.conspire |> Option.is_some + in + let should_broadcast = + Result.is_ok conflict_recovery_attempt + || committed || recovery_started + in + if should_broadcast then broadcast t else send t src ) + + let advance t e = + let init_leader = is_leader t in + let res = + Act.run_side_effects + (fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e)) + t + in + if is_leader t && not init_leader then + Utils.traceln "Is now leader for %d" t.conspire.rep.state.term ; + res + + let create (config : config) = + let conspire = Conspire.create config.conspire in + let command_buffer = + Delay_buffer.create ~compare:Command.compare config.batching_interval + (Eio.Time.now config.clock |> Utils.float_to_time) + in + let failure_detector = + FailureDetector.create config.fd_timeout config.conspire.other_replica_ids + in + let tick_count = + Counter.{count= 0; limit= config.broadcast_tick_interval} + in + { config + ; conspire + ; command_buffer + ; tick_count + ; clock= config.clock + ; failure_detector } +end + +module Impl = Make (Actions_f.ImperativeActions (Types)) diff --git a/impl/lib/delay_buffer.ml b/impl/lib/delay_buffer.ml new file mode 100644 index 0000000..5f34869 --- /dev/null +++ b/impl/lib/delay_buffer.ml @@ -0,0 +1,51 @@ +open! Core +module Time = Time_float_unix + +let pp_time_float_unix : Time.t Fmt.t = + fun ppf v -> Fmt.pf ppf "%0.5f" (Utils.time_to_float v) + +type 'a t = + { mutable store: 'a list Map.M(Time).t + [@polyprinter fun pa -> Utils.pp_map pp_time_float_unix pa] + ; mutable hwm: Time.t [@printer pp_time_float_unix] + ; interval: Time.Span.t + ; compare: 'a -> 'a -> int } +[@@deriving show {with_path= false}] + +let ms_clamp ?(direction = `Ceil) interval f = + match direction with + | `Ceil -> + Time.next_multiple ~base:Time.epoch ~interval ~after:f + ~can_equal_after:true () + | `Floor -> + Time.prev_multiple ~base:Time.epoch ~interval ~before:f + ~can_equal_before:true () + +let get_values t current = + (* lim is the millisecond before now *) + let lim = current |> ms_clamp ~direction:`Floor t.interval in + let seq = + Map.to_sequence ~order:`Increasing_key ~keys_less_or_equal_to:lim t.store + in + t.hwm <- lim ; + seq + |> Sequence.group ~break:(fun (a, _) (b, _) -> + Time.(ms_clamp t.interval a <> ms_clamp t.interval b) ) + |> Sequence.map ~f:(fun ls -> + let key = ms_clamp t.interval (List.hd_exn ls |> fst) in + (* remove relevant keys *) + let values = + ls + |> List.iter ~f:(fun (k, _) -> + t.store <- Map.remove_multi t.store k ) ; + (* export batch *) + ls |> List.map ~f:snd |> List.join |> List.sort ~compare:t.compare + in + (values, key) ) + +let add_value t v target = + if Time_float.(target > t.hwm) then + t.store <- Map.add_multi t.store ~key:target ~data:v + +let create ~compare interval base_hwm = + {hwm= base_hwm; interval; store= Map.empty (module Time); compare} diff --git a/impl/lib/impl_core.ml b/impl/lib/impl_core.ml index a98e572..5097e5d 100644 --- a/impl/lib/impl_core.ml +++ b/impl/lib/impl_core.ml @@ -166,9 +166,9 @@ module ConspireSS = struct let serialise = Line_prot.ConspireSS.serialise end -module ConspireMP = struct - include Conspire_mp.Types - include Conspire_mp.Impl +module ConspireLeader = struct + include Conspire_leader.Types + include Conspire_leader.Impl let create_node _ = create @@ -193,3 +193,17 @@ module ConspireDC = struct let parse r = Line_prot.bin_io_read bin_read_message r end + +module ConspireLeaderDC = struct + include Conspire_leader_dc.Types + include Conspire_leader_dc.Impl + + let create_node _ = create + + let should_ack_clients _ = true + + let serialise m w = + Line_prot.bin_io_write w bin_write_message bin_size_message m + + let parse r = Line_prot.bin_io_read bin_read_message r +end diff --git a/impl/main.ml b/impl/main.ml index 6fab8d9..896fa4e 100644 --- a/impl/main.ml +++ b/impl/main.ml @@ -8,8 +8,9 @@ module RMain_sbn = Infra.Make (Impl_core.RaftSBN) module PRMain = Infra.Make (Impl_core.PrevoteRaft) module PRMain_sbn = Infra.Make (Impl_core.PrevoteRaftSBN) module ConspireSSMain = Infra.Make (Impl_core.ConspireSS) -module ConspireMPMain = Infra.Make (Impl_core.ConspireMP) +module ConspireLeaderMain = Infra.Make (Impl_core.ConspireLeader) module ConspireDCMain = Infra.Make (Impl_core.ConspireDC) +module ConspireLeaderDCMain = Infra.Make (Impl_core.ConspireLeaderDC) type kind = | Paxos @@ -18,8 +19,9 @@ type kind = | Raft_sbn | PRaft_sbn | ConspireSS - | ConspireMP + | ConspireLeader | ConspireDC + | ConspireLeaderDC let run kind node_id node_addresses internal_port external_port tick_period election_timeout max_outstanding stream_length stat_report @@ -104,19 +106,19 @@ let run kind node_id node_addresses internal_port external_port tick_period Eio.traceln "Starting Conspire with single-shot instances per log entry" ; Eio.traceln "config = %a" Impl_core.ConspireSS.PP.config_pp conspire_cfg ; ConspireSSMain.run env cfg - | ConspireMP -> + | ConspireLeader -> let replica_ids = List.map (fun (i, _) -> i) node_addresses |> Core.List.sort ~compare:Int.compare in let conspire_cfg = - Impl_core.ConspireMP.make_config ~node_id ~replica_ids + Impl_core.ConspireLeader.make_config ~node_id ~replica_ids ~fd_timeout:election_timeout ~max_outstanding () in let cfg = config conspire_cfg in - Eio.traceln "Starting Conspire" ; - Eio.traceln "config = %a" Impl_core.ConspireMP.PP.config_pp conspire_cfg ; - ConspireMPMain.run env cfg + Eio.traceln "Starting Conspire-Leader" ; + Eio.traceln "config = %a" Impl_core.ConspireLeader.PP.config_pp conspire_cfg ; + ConspireLeaderMain.run env cfg | ConspireDC -> let replica_ids = List.map (fun (i, _) -> i) node_addresses @@ -130,9 +132,27 @@ let run kind node_id node_addresses internal_port external_port tick_period ~tick_limit in let cfg = config conspire_cfg in - Eio.traceln "Starting Conspire" ; + Eio.traceln "Starting Conspire-DC" ; Eio.traceln "config = %a" Impl_core.ConspireDC.PP.config_pp conspire_cfg ; ConspireDCMain.run env cfg + | ConspireLeaderDC -> + let replica_ids = + List.map (fun (i, _) -> i) node_addresses + |> Core.List.sort ~compare:Int.compare + in + let broadcast_tick_interval = (float_of_int election_timeout) /. 10. |> Float.ceil |> Float.to_int in + let conspire_cfg = + Impl_core.ConspireLeaderDC.make_config ~node_id ~replica_ids ~max_outstanding + (Eio.Stdenv.clock env) + ~delay_interval:(Time_float_unix.Span.of_sec delay_interval) + ~batching_interval:(Time_float_unix.Span.of_sec batching_interval) + ~fd_timeout:election_timeout + ~broadcast_tick_interval + in + let cfg = config conspire_cfg in + Eio.traceln "Starting Conspire-leader-dc" ; + Eio.traceln "config = %a" Impl_core.ConspireLeaderDC.PP.config_pp conspire_cfg ; + ConspireLeaderDCMain.run env cfg open Cmdliner @@ -295,7 +315,8 @@ let cmd = ; ("prevote-raft", PRaft) ; ("prevote-raft+sbn", PRaft_sbn) ; ("conspire-ss", ConspireSS) - ; ("conspire-mp", ConspireMP) + ; ("conspire-leader", ConspireLeader) + ; ("conspire-leader-dc", ConspireLeaderDC) ; ("conspire-dc", ConspireDC) ] in Arg.( diff --git a/impl/test/test_conspire_dc.ml b/impl/test/test_conspire_dc.ml index 29fb0a0..45bd24f 100644 --- a/impl/test/test_conspire_dc.ml +++ b/impl/test/test_conspire_dc.ml @@ -89,15 +89,15 @@ let%expect_test "local_commit" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; - commit_index = 34e452ffd92c22ea72ef125a33a0a593 }; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; + commit_index = 2a593842bdac635313d582f80fb0a066 }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -118,15 +118,15 @@ let%expect_test "local_commit" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; - commit_index = 34e452ffd92c22ea72ef125a33a0a593 }; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; + commit_index = 2a593842bdac635313d582f80fb0a066 }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -149,20 +149,20 @@ let%expect_test "local_commit" = conspire = { rep = { state = - { vval = ccfc0ddc71cfcede53a100a51835b7ff; vterm = 0; term = 0; - commit_index = ccfc0ddc71cfcede53a100a51835b7ff }; + { vval = 5cd295d17b5aa74c6eccfe593600bc73; vterm = 0; term = 0; + commit_index = 5cd295d17b5aa74c6eccfe593600bc73 }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (ccfc0ddc71cfcede53a100a51835b7ff: + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); + (5cd295d17b5aa74c6eccfe593600bc73: { node = - (2, 34e452ffd92c22ea72ef125a33a0a593, + (2, 2a593842bdac635313d582f80fb0a066, ([Command(Read c1, 2); Command(Read c2, 3)], 2.00000)); - parent = ; key = ccfc0ddc71cfcede53a100a51835b7ff }); + parent = ; key = 5cd295d17b5aa74c6eccfe593600bc73 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -264,15 +264,15 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(c4677fbe3fa92d4246f849e282f15527: + [(4f1688bf7767c0d49f5ce96ed21dfa21: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000)); - parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); + parent = ; key = 4f1688bf7767c0d49f5ce96ed21dfa21 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -293,39 +293,39 @@ let%expect_test "e2e commit" = clock = } actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -338,15 +338,15 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(c4677fbe3fa92d4246f849e282f15527: + [(4f1688bf7767c0d49f5ce96ed21dfa21: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000)); - parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); + parent = ; key = 4f1688bf7767c0d49f5ce96ed21dfa21 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -367,45 +367,45 @@ let%expect_test "e2e commit" = clock = } actions: [Send(0,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = c4677fbe3fa92d4246f849e282f15527; + (Some { new_head = 4f1688bf7767c0d49f5ce96ed21dfa21; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000))] }); cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }))] |}] ; let root_clk = t0.conspire.rep.store.root in - let c1_clk = Md5.of_hex_exn "c4677fbe3fa92d4246f849e282f15527" in + let c1_clk = DC.Conspire.CTree.make_key root_clk ([c1], 1. |> Utils.float_to_time) in let replication_message = Conspire (Ok @@ -424,21 +424,21 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(c4677fbe3fa92d4246f849e282f15527: + [(4f1688bf7767c0d49f5ce96ed21dfa21: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000)); - parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); + parent = ; key = 4f1688bf7767c0d49f5ce96ed21dfa21 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -460,24 +460,24 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; - commit_index = c4677fbe3fa92d4246f849e282f15527 }; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; + commit_index = 4f1688bf7767c0d49f5ce96ed21dfa21 }; store = { ctree = - [(c4677fbe3fa92d4246f849e282f15527: + [(4f1688bf7767c0d49f5ce96ed21dfa21: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 1)], 1.00000)); - parent = ; key = c4677fbe3fa92d4246f849e282f15527 }); + parent = ; key = 4f1688bf7767c0d49f5ce96ed21dfa21 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = c4677fbe3fa92d4246f849e282f15527; vterm = 0; term = 0; + { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -491,25 +491,25 @@ let%expect_test "e2e commit" = Send(1,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; - commit_index = c4677fbe3fa92d4246f849e282f15527 + commit_index = 4f1688bf7767c0d49f5ce96ed21dfa21 }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; - commit_index = c4677fbe3fa92d4246f849e282f15527 + commit_index = 4f1688bf7767c0d49f5ce96ed21dfa21 }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = c4677fbe3fa92d4246f849e282f15527; + (Some { vval = 4f1688bf7767c0d49f5ce96ed21dfa21; vterm = 0; term = 0; - commit_index = c4677fbe3fa92d4246f849e282f15527 + commit_index = 4f1688bf7767c0d49f5ce96ed21dfa21 }) }))] |}] @@ -632,15 +632,15 @@ let%expect_test "batching" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -663,39 +663,39 @@ let%expect_test "batching" = clock = } actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -712,20 +712,20 @@ let%expect_test "batching" = conspire = { rep = { state = - { vval = 4208465ea72aa1217306cdb4da303438; vterm = 0; term = 0; + { vval = b906b94dbc8b5fa592ad20c239ec09d6; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (4208465ea72aa1217306cdb4da303438: + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); + (b906b94dbc8b5fa592ad20c239ec09d6: { node = - (2, 34e452ffd92c22ea72ef125a33a0a593, + (2, 2a593842bdac635313d582f80fb0a066, ([Command(Read c1, 2); Command(Read c2, 3)], 3.00000)); - parent = ; key = 4208465ea72aa1217306cdb4da303438 }); + parent = ; key = b906b94dbc8b5fa592ad20c239ec09d6 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -746,45 +746,45 @@ let%expect_test "batching" = clock = } actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 4208465ea72aa1217306cdb4da303438; + (Some { new_head = b906b94dbc8b5fa592ad20c239ec09d6; extension = - [(2, 34e452ffd92c22ea72ef125a33a0a593, + [(2, 2a593842bdac635313d582f80fb0a066, ([Command(Read c1, 2); Command(Read c2, 3)], 3.00000)) ] }); cons = - (Some { vval = 4208465ea72aa1217306cdb4da303438; + (Some { vval = b906b94dbc8b5fa592ad20c239ec09d6; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 4208465ea72aa1217306cdb4da303438; + (Some { new_head = b906b94dbc8b5fa592ad20c239ec09d6; extension = - [(2, 34e452ffd92c22ea72ef125a33a0a593, + [(2, 2a593842bdac635313d582f80fb0a066, ([Command(Read c1, 2); Command(Read c2, 3)], 3.00000)) ] }); cons = - (Some { vval = 4208465ea72aa1217306cdb4da303438; + (Some { vval = b906b94dbc8b5fa592ad20c239ec09d6; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 4208465ea72aa1217306cdb4da303438; + (Some { new_head = b906b94dbc8b5fa592ad20c239ec09d6; extension = - [(2, 34e452ffd92c22ea72ef125a33a0a593, + [(2, 2a593842bdac635313d582f80fb0a066, ([Command(Read c1, 2); Command(Read c2, 3)], 3.00000)) ] }); cons = - (Some { vval = 4208465ea72aa1217306cdb4da303438; + (Some { vval = b906b94dbc8b5fa592ad20c239ec09d6; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -810,15 +810,15 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -839,39 +839,39 @@ let%expect_test "Conflict" = clock = } actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 34e452ffd92c22ea72ef125a33a0a593; + (Some { new_head = 2a593842bdac635313d582f80fb0a066; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000))] }); cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -889,15 +889,15 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 9ddc31c480e04b8965ff34f30c37d919; vterm = 0; term = 0; + { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(9ddc31c480e04b8965ff34f30c37d919: + [(a27fc154024e8c9bb0ba98b71ddadd3a: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000)); - parent = ; key = 9ddc31c480e04b8965ff34f30c37d919 }); + parent = ; key = a27fc154024e8c9bb0ba98b71ddadd3a }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -918,45 +918,45 @@ let%expect_test "Conflict" = clock = } actions: [Send(0,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; + (Some { new_head = a27fc154024e8c9bb0ba98b71ddadd3a; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000))] }); cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + (Some { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(2,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; + (Some { new_head = a27fc154024e8c9bb0ba98b71ddadd3a; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000))] }); cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + (Some { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })) Send(3,(Conspire_dc.Types.Conspire { ctree = - (Some { new_head = 9ddc31c480e04b8965ff34f30c37d919; + (Some { new_head = a27fc154024e8c9bb0ba98b71ddadd3a; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000))] }); cons = - (Some { vval = 9ddc31c480e04b8965ff34f30c37d919; + (Some { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }))] |}] ; let root_clk = t0.conspire.rep.store.root in - let c0_clk = Md5.of_hex_exn "34e452ffd92c22ea72ef125a33a0a593" in + let c0_clk = DC.Conspire.CTree.make_key root_clk ([c0], 1. |> Utils.float_to_time) in let update_t0 term = Conspire (Ok @@ -975,15 +975,15 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -992,7 +992,7 @@ let%expect_test "Conflict" = { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -1003,7 +1003,7 @@ let%expect_test "Conflict" = tick_count = { Conspire_dc.Counter.count = 1; limit = 100 }; clock = } actions: [] |}] ; - let c1_clk = Md5.of_hex_exn "620122743bc84de6b418bd632ea0cdc2" in + let c1_clk = DC.Conspire.CTree.make_key root_clk ([c1], 1. |> Utils.float_to_time) in let update_t1 term = Conspire (Ok @@ -1022,29 +1022,29 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 1; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); + (a27fc154024e8c9bb0ba98b71ddadd3a: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000)); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); + parent = ; key = a27fc154024e8c9bb0ba98b71ddadd3a }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 0; + { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -1057,7 +1057,7 @@ let%expect_test "Conflict" = actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -1065,7 +1065,7 @@ let%expect_test "Conflict" = Send(2,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -1073,7 +1073,7 @@ let%expect_test "Conflict" = Send(3,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -1086,29 +1086,29 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 1; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); + (a27fc154024e8c9bb0ba98b71ddadd3a: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000)); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); + parent = ; key = a27fc154024e8c9bb0ba98b71ddadd3a }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 1; + { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 0; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -1127,29 +1127,29 @@ let%expect_test "Conflict" = conspire = { rep = { state = - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 1; term = 1; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(34e452ffd92c22ea72ef125a33a0a593: + [(2a593842bdac635313d582f80fb0a066: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c0, 1)], 1.00000)); - parent = ; key = 34e452ffd92c22ea72ef125a33a0a593 }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 2a593842bdac635313d582f80fb0a066 }); + (a27fc154024e8c9bb0ba98b71ddadd3a: { node = (1, d41d8cd98f00b204e9800998ecf8427e, ([Command(Read c1, 2)], 1.00000)); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); + parent = ; key = a27fc154024e8c9bb0ba98b71ddadd3a }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 1; + { vval = a27fc154024e8c9bb0ba98b71ddadd3a; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 34e452ffd92c22ea72ef125a33a0a593; vterm = 0; term = 1; + { vval = 2a593842bdac635313d582f80fb0a066; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -1162,7 +1162,7 @@ let%expect_test "Conflict" = actions: [Send(1,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -1170,7 +1170,7 @@ let%expect_test "Conflict" = Send(2,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) @@ -1178,7 +1178,7 @@ let%expect_test "Conflict" = Send(3,(Conspire_dc.Types.Conspire { ctree = None; cons = - (Some { vval = 34e452ffd92c22ea72ef125a33a0a593; + (Some { vval = 2a593842bdac635313d582f80fb0a066; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) diff --git a/impl/test/test_conspire_mp.ml b/impl/test/test_conspire_leader.ml similarity index 62% rename from impl/test/test_conspire_mp.ml rename to impl/test/test_conspire_leader.ml index eac26cf..cf63df5 100644 --- a/impl/test/test_conspire_mp.ml +++ b/impl/test/test_conspire_leader.ml @@ -1,5 +1,5 @@ open! Core -module MP = Impl_core__Conspire_mp +module MP = Impl_core__Conspire_leader module Imp = Impl_core__Actions_f.ImperativeActions (MP.Types) module Impl = MP.Make (Imp) module Rep = MP.Conspire.Rep @@ -36,8 +36,7 @@ let%expect_test "local_commit" = remotes = }; other_nodes_state = []; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } + { Conspire_leader.FailureDetector.state = []; timeout = 2 } } actions: [] |}] ; let c1 = make_command (Read "c1") in let t, actions = Impl.advance t (Commands (c1 |> Iter.singleton)) in @@ -48,22 +47,21 @@ let%expect_test "local_commit" = conspire = { rep = { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; + commit_index = 977400b12d4c3868588f1db7a6f272da }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = []; config = ; commit_log = [[Command(Read c1, 1)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } + { Conspire_leader.FailureDetector.state = []; timeout = 2 } } actions: [CommitCommands(Command(Read c1, 1))] |}] ; let c2, c3 = (make_command (Read "c2"), make_command (Read "c3")) in let t, actions = Impl.advance t (Commands (Iter.of_list [c2; c3])) in @@ -74,19 +72,19 @@ let%expect_test "local_commit" = conspire = { rep = { state = - { vval = 7f2c0aae94bf199f9b303480537af547; vterm = 0; term = 0; - commit_index = 7f2c0aae94bf199f9b303480537af547 }; + { vval = d1c958ddddd59d3d3a315b1bf5ab1e39; vterm = 0; term = 0; + commit_index = d1c958ddddd59d3d3a315b1bf5ab1e39 }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); - (7f2c0aae94bf199f9b303480537af547: + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); + (d1c958ddddd59d3d3a315b1bf5ab1e39: { node = - (2, 1183a904cd1a3b8f3cf219be9367701f, + (2, 977400b12d4c3868588f1db7a6f272da, [Command(Read c2, 3); Command(Read c3, 2)]); - parent = ; key = 7f2c0aae94bf199f9b303480537af547 }); + parent = ; key = d1c958ddddd59d3d3a315b1bf5ab1e39 }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -94,8 +92,7 @@ let%expect_test "local_commit" = commit_log = [[Command(Read c1, 1)][Command(Read c2, 3); Command(Read c3, 2)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = []; timeout = 2 }; - stall_checker = } + { Conspire_leader.FailureDetector.state = []; timeout = 2 } } actions: [CommitCommands(Command(Read c2, 3), Command(Read c3, 2))] |}] let%expect_test "e2e commit" = @@ -113,14 +110,14 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -136,43 +133,43 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = - (Some { new_head = 1183a904cd1a3b8f3cf219be9367701f; + (Some { new_head = 977400b12d4c3868588f1db7a6f272da; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)])] }); cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(2,{ ctree = - (Some { new_head = 1183a904cd1a3b8f3cf219be9367701f; + (Some { new_head = 977400b12d4c3868588f1db7a6f272da; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)])] }); cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(3,{ ctree = - (Some { new_head = 1183a904cd1a3b8f3cf219be9367701f; + (Some { new_head = 977400b12d4c3868588f1db7a6f272da; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)])] }); cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; - let hd1 = Md5.of_hex_exn "1183a904cd1a3b8f3cf219be9367701f" in + let hd1 = MP.Conspire.CTree.make_key root_hd [c1] in let update = MP.Conspire.Rep. {ctree= Some {new_head= hd1; extension= [(1, root_hd, [c1])]}; cons= None} @@ -189,10 +186,10 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -208,9 +205,9 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [] |}] ; let t0_vote = Rep. @@ -225,20 +222,20 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(0: - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -248,12 +245,12 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(0,{ ctree = None; cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; @@ -265,20 +262,20 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; @@ -288,9 +285,9 @@ let%expect_test "e2e commit" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [] |}] ; let t0, actions = Impl.advance t0 (Recv (Ok t0_vote, 2)) in print t0 actions ; @@ -300,50 +297,50 @@ let%expect_test "e2e commit" = conspire = { rep = { state = - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; + commit_index = 977400b12d4c3868588f1db7a6f272da }; store = { ctree = - [(1183a904cd1a3b8f3cf219be9367701f: + [(977400b12d4c3868588f1db7a6f272da: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 1)]); - parent = ; key = 1183a904cd1a3b8f3cf219be9367701f }); + parent = ; key = 977400b12d4c3868588f1db7a6f272da }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1183a904cd1a3b8f3cf219be9367701f; vterm = 0; term = 0; + { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [[Command(Read c1, 1)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [CommitCommands(Command(Read c1, 1)) Send(1,{ ctree = None; cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }) + commit_index = 977400b12d4c3868588f1db7a6f272da }) }) Send(2,{ ctree = None; cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }) + commit_index = 977400b12d4c3868588f1db7a6f272da }) }) Send(3,{ ctree = None; cons = - (Some { vval = 1183a904cd1a3b8f3cf219be9367701f; + (Some { vval = 977400b12d4c3868588f1db7a6f272da; vterm = 0; term = 0; - commit_index = 1183a904cd1a3b8f3cf219be9367701f }) + commit_index = 977400b12d4c3868588f1db7a6f272da }) })] |}] ; ignore (t0, t1) @@ -351,9 +348,79 @@ let%expect_test "e2e conflict resolution" = Imp.set_is_test true ; reset_make_command_state () ; let t0 = create (c4 0) in - let _t1 = create (c4 1) in + let t1 = create (c4 1) in let c0 = make_command (Read "c0") in let c1 = make_command (Read "c1") in + let t1, _ = Impl.advance t1 Tick in + let t1, _ = Impl.advance t1 Tick in + let t1, actions = Impl.advance t1 (Commands (Iter.of_list [c1])) in + print t1 actions ; + [%expect + {| + +Now leader for 0 + t: { config = ; + conspire = + { rep = + { state = + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = + [(d41d8cd98f00b204e9800998ecf8427e: Root); + (e0622baa152ef15b905ad1cf0666001d: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 2)]); + parent = ; key = e0622baa152ef15b905ad1cf0666001d })]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = + [(0: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (2: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (3: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e })]; + config = ; commit_log = [] }; + failure_detector = + { Conspire_leader.FailureDetector.state = [(0: 0)(2: 0)(3: 0)]; + timeout = 2 } + } + actions: [Send(0,{ ctree = + (Some { new_head = e0622baa152ef15b905ad1cf0666001d; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + [Command(Read c1, 2)])] + }); + cons = + (Some { vval = e0622baa152ef15b905ad1cf0666001d; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) + }) + Send(2,{ ctree = + (Some { new_head = e0622baa152ef15b905ad1cf0666001d; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + [Command(Read c1, 2)])] + }); + cons = + (Some { vval = e0622baa152ef15b905ad1cf0666001d; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) + }) + Send(3,{ ctree = + (Some { new_head = e0622baa152ef15b905ad1cf0666001d; + extension = + [(1, d41d8cd98f00b204e9800998ecf8427e, + [Command(Read c1, 2)])] + }); + cons = + (Some { vval = e0622baa152ef15b905ad1cf0666001d; + vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) + })] |}] ; let t0, actions = Impl.advance t0 (Commands (Iter.of_list [c0])) in print t0 actions ; [%expect @@ -362,14 +429,14 @@ let%expect_test "e2e conflict resolution" = conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -385,45 +452,45 @@ let%expect_test "e2e conflict resolution" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(2,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(3,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; let root_clock = t0.conspire.rep.store.root in let c0_node = (1, root_clock, [c0]) in - let c0_clock = Md5.of_hex_exn "1fddcd0db3e43a000153d0c4de56a7cc" in + let c0_clock = MP.Conspire.CTree.make_key root_clock [c0] in let c1_node = (1, root_clock, [c1]) in let c1_clock = MP.Conspire.CTree.make_key root_clock [c1] in (* ---- Replicate tree ---- *) @@ -443,19 +510,19 @@ let%expect_test "e2e conflict resolution" = conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); + (d41d8cd98f00b204e9800998ecf8427e: Root); + (e0622baa152ef15b905ad1cf0666001d: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 2)]); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; + parent = ; key = e0622baa152ef15b905ad1cf0666001d })]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = @@ -470,9 +537,9 @@ let%expect_test "e2e conflict resolution" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [] |}] ; (* ---- Votes for values ---- *) let t0, _ = @@ -507,43 +574,56 @@ let%expect_test "e2e conflict resolution" = print t0 actions ; [%expect {| + +Conflict, term=1 t: { config = ; conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 1; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); + (d41d8cd98f00b204e9800998ecf8427e: Root); + (e0622baa152ef15b905ad1cf0666001d: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 2)]); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; + parent = ; key = e0622baa152ef15b905ad1cf0666001d })]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 0; + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; + vterm = 0; term = 1; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) + }) + Send(2,{ ctree = None; + cons = + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; + vterm = 0; term = 1; + commit_index = d41d8cd98f00b204e9800998ecf8427e }) + }) + Send(3,{ ctree = None; + cons = + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; @@ -580,59 +660,131 @@ let%expect_test "e2e conflict resolution" = print t0 actions ; [%expect {| + +Recovery complete term: {t:1,vt:1} + +Recovery to 1 t: { config = ; conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 1; term = 1; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); - (620122743bc84de6b418bd632ea0cdc2: + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); + (d41d8cd98f00b204e9800998ecf8427e: Root); + (e0622baa152ef15b905ad1cf0666001d: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 2)]); - parent = ; key = 620122743bc84de6b418bd632ea0cdc2 }); - (d41d8cd98f00b204e9800998ecf8427e: Root)]; + parent = ; key = e0622baa152ef15b905ad1cf0666001d })]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; other_nodes_state = [(1: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 0; + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 1; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: - { vval = 620122743bc84de6b418bd632ea0cdc2; vterm = 0; term = 1; + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(2,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(3,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 1; term = 1; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; - ignore (t0, _t1, c0_node, c1_node) + (* ---- T1 will not recover since it believes T0 is the leader *) + (* Conflict from 0 *) + let t1, _ = + Impl.advance t1 + (Recv + ( Ok + Rep. + { ctree= Some {new_head= c0_clock; extension= [c0_node]} + ; cons= + Some + { vval= c0_clock + ; vterm= 0 + ; term= 1 + ; commit_index= root_clock } } + , 0 ) ) + in + let t1, actions = + Impl.advance t1 + (Recv + ( Ok + Rep. + { ctree= None + ; cons= + Some + { vval= c1_clock + ; vterm= 0 + ; term= 1 + ; commit_index= root_clock } } + , 3 ) ) + in + print t1 actions ; + [%expect + {| + +No longer leader for 1 + +Conflict, term=1 + t: { config = ; + conspire = + { rep = + { state = + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 1; + commit_index = d41d8cd98f00b204e9800998ecf8427e }; + store = + { ctree = + [(875f3872edbe68ca2e42fd687213aa9c: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); + (d41d8cd98f00b204e9800998ecf8427e: Root); + (e0622baa152ef15b905ad1cf0666001d: + { node = + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c1, 2)]); + parent = ; key = e0622baa152ef15b905ad1cf0666001d })]; + root = d41d8cd98f00b204e9800998ecf8427e }; + remotes = }; + other_nodes_state = + [(0: + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 1; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (2: + { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; + commit_index = d41d8cd98f00b204e9800998ecf8427e }); + (3: + { vval = e0622baa152ef15b905ad1cf0666001d; vterm = 0; term = 1; + commit_index = d41d8cd98f00b204e9800998ecf8427e })]; + config = ; commit_log = [] }; + failure_detector = + { Conspire_leader.FailureDetector.state = [(0: 2)(2: 0)(3: 2)]; + timeout = 2 } + } + actions: [] |}] ; + ignore (t0, t1, c0_node, c1_node) let%expect_test "message loss" = Imp.set_is_test true ; @@ -650,14 +802,14 @@ let%expect_test "message loss" = conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -673,44 +825,44 @@ let%expect_test "message loss" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(2,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) }) Send(3,{ ctree = - (Some { new_head = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { new_head = 875f3872edbe68ca2e42fd687213aa9c; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)])] }); cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }) })] |}] ; let root_clock = t0.conspire.rep.store.root in - let c0_clock = Md5.of_hex_exn "1fddcd0db3e43a000153d0c4de56a7cc" in + let c0_clock = MP.Conspire.CTree.make_key root_clock [c0] in let t0, _ = Impl.advance t0 (Recv @@ -746,14 +898,14 @@ let%expect_test "message loss" = conspire = { rep = { state = - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; + commit_index = 875f3872edbe68ca2e42fd687213aa9c }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(875f3872edbe68ca2e42fd687213aa9c: { node = (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -762,34 +914,34 @@ let%expect_test "message loss" = { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [[Command(Read c0, 1)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [CommitCommands(Command(Read c0, 1)) Send(1,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) }) Send(2,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) }) Send(3,{ ctree = None; cons = - (Some { vval = 1fddcd0db3e43a000153d0c4de56a7cc; + (Some { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) })] |}] ; let t0, actions = Impl.advance t0 (Commands (c1 |> Iter.singleton)) in print t0 actions ; @@ -799,18 +951,18 @@ let%expect_test "message loss" = conspire = { rep = { state = - { vval = b1d8bad167372c336ae91f91677feca1; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }; + { vval = 11e8dcb4a482371b91ff2953cabcd7a6; vterm = 0; term = 0; + commit_index = 875f3872edbe68ca2e42fd687213aa9c }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(11e8dcb4a482371b91ff2953cabcd7a6: { node = - (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); - (b1d8bad167372c336ae91f91677feca1: + (2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)]); + parent = ; key = 11e8dcb4a482371b91ff2953cabcd7a6 }); + (875f3872edbe68ca2e42fd687213aa9c: { node = - (2, 1fddcd0db3e43a000153d0c4de56a7cc, [Command(Read c1, 2)]); - parent = ; key = b1d8bad167372c336ae91f91677feca1 }); + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -819,50 +971,50 @@ let%expect_test "message loss" = { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [[Command(Read c0, 1)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = - (Some { new_head = b1d8bad167372c336ae91f91677feca1; + (Some { new_head = 11e8dcb4a482371b91ff2953cabcd7a6; extension = - [(2, 1fddcd0db3e43a000153d0c4de56a7cc, + [(2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)])] }); cons = - (Some { vval = b1d8bad167372c336ae91f91677feca1; + (Some { vval = 11e8dcb4a482371b91ff2953cabcd7a6; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) }) Send(2,{ ctree = - (Some { new_head = b1d8bad167372c336ae91f91677feca1; + (Some { new_head = 11e8dcb4a482371b91ff2953cabcd7a6; extension = - [(2, 1fddcd0db3e43a000153d0c4de56a7cc, + [(2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)])] }); cons = - (Some { vval = b1d8bad167372c336ae91f91677feca1; + (Some { vval = 11e8dcb4a482371b91ff2953cabcd7a6; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) }) Send(3,{ ctree = - (Some { new_head = b1d8bad167372c336ae91f91677feca1; + (Some { new_head = 11e8dcb4a482371b91ff2953cabcd7a6; extension = - [(2, 1fddcd0db3e43a000153d0c4de56a7cc, + [(2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)])] }); cons = - (Some { vval = b1d8bad167372c336ae91f91677feca1; + (Some { vval = 11e8dcb4a482371b91ff2953cabcd7a6; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }) + commit_index = 875f3872edbe68ca2e42fd687213aa9c }) })] |}] ; - let c1_clock = Md5.of_hex_exn "b1d8bad167372c336ae91f91677feca1" in + let c1_clock = MP.Conspire.CTree.make_key root_clock [c1] in let update = Ok MP.Conspire.Rep. @@ -874,6 +1026,7 @@ let%expect_test "message loss" = print t1 actions ; [%expect {| + +Nack for 0: Update is not rooted t: { config = ; conspire = { rep = @@ -896,9 +1049,9 @@ let%expect_test "message loss" = commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(0: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(0,{ commit = d41d8cd98f00b204e9800998ecf8427e })] |}] ; let t0, actions = Impl.advance t0 (Recv (Error MP.Conspire.Rep.{commit= root_clock}, 1)) @@ -907,22 +1060,23 @@ let%expect_test "message loss" = (* note extension goes from the erroneous commit index *) [%expect {| + +Acking 1 t: { config = ; conspire = { rep = { state = - { vval = b1d8bad167372c336ae91f91677feca1; vterm = 0; term = 0; - commit_index = 1fddcd0db3e43a000153d0c4de56a7cc }; + { vval = 11e8dcb4a482371b91ff2953cabcd7a6; vterm = 0; term = 0; + commit_index = 875f3872edbe68ca2e42fd687213aa9c }; store = { ctree = - [(1fddcd0db3e43a000153d0c4de56a7cc: + [(11e8dcb4a482371b91ff2953cabcd7a6: { node = - (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - parent = ; key = 1fddcd0db3e43a000153d0c4de56a7cc }); - (b1d8bad167372c336ae91f91677feca1: + (2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)]); + parent = ; key = 11e8dcb4a482371b91ff2953cabcd7a6 }); + (875f3872edbe68ca2e42fd687213aa9c: { node = - (2, 1fddcd0db3e43a000153d0c4de56a7cc, [Command(Read c1, 2)]); - parent = ; key = b1d8bad167372c336ae91f91677feca1 }); + (1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); + parent = ; key = 875f3872edbe68ca2e42fd687213aa9c }); (d41d8cd98f00b204e9800998ecf8427e: Root)]; root = d41d8cd98f00b204e9800998ecf8427e }; remotes = }; @@ -931,22 +1085,22 @@ let%expect_test "message loss" = { vval = d41d8cd98f00b204e9800998ecf8427e; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (2: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e }); (3: - { vval = 1fddcd0db3e43a000153d0c4de56a7cc; vterm = 0; term = 0; + { vval = 875f3872edbe68ca2e42fd687213aa9c; vterm = 0; term = 0; commit_index = d41d8cd98f00b204e9800998ecf8427e })]; config = ; commit_log = [[Command(Read c0, 1)]] }; failure_detector = - { Conspire_mp.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; timeout = 2 - }; - stall_checker = } + { Conspire_leader.FailureDetector.state = [(1: 2)(2: 2)(3: 2)]; + timeout = 2 } + } actions: [Send(1,{ ctree = - (Some { new_head = b1d8bad167372c336ae91f91677feca1; + (Some { new_head = 11e8dcb4a482371b91ff2953cabcd7a6; extension = [(1, d41d8cd98f00b204e9800998ecf8427e, [Command(Read c0, 1)]); - (2, 1fddcd0db3e43a000153d0c4de56a7cc, + (2, 875f3872edbe68ca2e42fd687213aa9c, [Command(Read c1, 2)]) ] }); diff --git a/lib/client.ml b/lib/client.ml index 6fb0ce6..0199af5 100644 --- a/lib/client.ml +++ b/lib/client.ml @@ -102,7 +102,7 @@ let send_request ?(random_id = false) t op = if random_id then Random.int32 Int32.max_int |> Int32.to_int else t.next_id () in - let command = Command.{op; id; trace_start= Unix.gettimeofday ()} in + let command = Command.{op; id; submitted= Unix.gettimeofday (); trace_start= Unix.gettimeofday ()} in let send () = submit_request t.cmgr command in let res_t, res_u = Promise.create () in let request_state = diff --git a/lib/internal_infra.ml b/lib/internal_infra.ml index 1febe16..807ebab 100644 --- a/lib/internal_infra.ml +++ b/lib/internal_infra.ml @@ -254,11 +254,12 @@ module Test = struct Command. { op= Write (k, n) ; id= Core.Random.int Core.Int.max_value - ; trace_start= -1. } + ; trace_start= -1. + ; submitted= -1.} let r n = Command. - {op= Read n; id= Core.Random.int Core.Int.max_value; trace_start= -1.} + {op= Read n; id= Core.Random.int Core.Int.max_value; trace_start= -1.; submitted = -1.} module CT : Consensus_intf.S = struct type message = Core.String.t [@@deriving sexp] diff --git a/lib/line_prot.ml b/lib/line_prot.ml index bfad270..db2a3d9 100644 --- a/lib/line_prot.ml +++ b/lib/line_prot.ml @@ -27,8 +27,9 @@ module SerPrim = struct | NoOp -> () - let command Command.{op; id; trace_start} w = + let command Command.{op; id; submitted; trace_start} w = W.BE.uint64 w (Int64.of_int id) ; + W.BE.double w submitted ; W.BE.double w trace_start ; sm_op op w @@ -85,9 +86,10 @@ module DeserPrim = struct let command = let* id = R.map Int64.to_int R.BE.uint64 + and* submitted = R.BE.double and* trace_start = R.BE.double and* op = sm_op in - R.return Command.{op; id; trace_start} + R.return Command.{op; id; submitted; trace_start} let log_entry = let* term = R.map Int64.to_int R.BE.uint64 and* command = command in diff --git a/lib/types.ml b/lib/types.ml index 04ba7d9..c7767e1 100644 --- a/lib/types.ml +++ b/lib/types.ml @@ -40,7 +40,8 @@ let sm_op_pp ppf v = module Command = struct module T = struct - type t = {op: sm_op; id: command_id; mutable trace_start: float} + type t = + {op: sm_op; id: command_id; submitted: float; mutable trace_start: float} [@@deriving sexp, bin_io] let hash t = hash_command_id t.id @@ -48,7 +49,7 @@ module Command = struct let hash_fold_t s t = hash_fold_command_id s t.id let pp_mach ppf v = - Fmt.pf ppf "Command(%a, %d, %.4f)" sm_op_pp v.op v.id v.trace_start + Fmt.pf ppf "Command(%a, %d, %.4f, %.4f)" sm_op_pp v.op v.id v.submitted v.trace_start let pp ppf v = Fmt.pf ppf "Command(%a, %d)" sm_op_pp v.op v.id @@ -67,15 +68,16 @@ let update_command_time c = c.Command.trace_start <- Core_unix.gettimeofday () let get_command_trace_time c = c.Command.trace_start -let empty_command = Command.{op= NoOp; id= -1; trace_start= -1.} +let empty_command = Command.{op= NoOp; id= -1; submitted= -1.; trace_start= -1.} let make_command_state = ref 0 let reset_make_command_state () = make_command_state := 0 +(* Used for tests *) let make_command c = make_command_state := !make_command_state + 1 ; - Command.{op= c; id= !make_command_state; trace_start= -1.} + Command.{op= c; id= !make_command_state; trace_start= -1.; submitted= -1.} type op_result = Success | Failure of string | ReadSuccess of key [@@deriving bin_io]