diff --git a/impl/fuzz/command_tree.ml b/impl/fuzz/command_tree.ml new file mode 100644 index 0000000..7307bb7 --- /dev/null +++ b/impl/fuzz/command_tree.ml @@ -0,0 +1,51 @@ +open! Core + +module CTree = Impl_core__Conspire_command_tree.CommandTree (struct + type t = int [@@deriving compare, show, bin_io, hash] +end) + +let ctree = + let open Crowbar in + fix (fun gen -> + choose + [ const CTree.empty + ; map [gen; int; list1 int] (fun ctree kid vs -> + let nodes = Map.keys ctree.CTree.ctree in + let kid = kid % List.length nodes in + let root = List.nth_exn nodes kid in + let ctree', _ = + CTree.addv ~node:0 ctree ~parent:root (List.iter vs |> Iter.from_labelled_iter) in + ctree')]) + +(* take ctree and random node and node in its history *) +(* make an update of that and add it to the tree of just the path to that *) +let test_ctree_update ctree u_start u_end = + let nodes = + ctree.CTree.ctree |> Map.keys + |> List.sort ~compare:(fun ka kb -> + Int.compare (CTree.get_idx ctree ka) (CTree.get_idx ctree kb) ) + in + let u_end = u_end % List.length nodes in + let n_end = List.nth_exn nodes u_end in + (* path goes from root to n_end *) + let path = CTree.path_between ctree CTree.root_key n_end in + let remote_path, _ = List.split_n path (u_start % max 1 (List.length path)) in + let remote, _ = + CTree.addv CTree.empty ~node:0 ~parent:CTree.root_key + ( List.iter remote_path |> Iter.from_labelled_iter + |> Iter.map (fun (_, _, v) -> v) ) + in + let update = CTree.make_update ctree n_end remote in + let remote = CTree.apply_update_exn remote update in + let local_path = CTree.path_between ctree CTree.root_key n_end in + let remote_path = CTree.path_between remote CTree.root_key n_end in + let pp ppf v = + Fmt.pf ppf "%a" + Fmt.(brackets @@ list ~sep:comma int) + (List.map v ~f:(fun (_idx, _key, v) -> v)) + in + Crowbar.check_eq ~pp ~cmp:[%compare: CTree.node list] local_path remote_path + +let () = + let open Crowbar in + add_test ~name:"ctree update" [ctree; int; int] test_ctree_update diff --git a/impl/fuzz/dune b/impl/fuzz/dune index 9a150a3..7e5f107 100644 --- a/impl/fuzz/dune +++ b/impl/fuzz/dune @@ -1,7 +1,7 @@ (tests - (names test_msgs) + (names test_msgs command_tree) (flags (:standard -w -27)) (preprocess - (pps ppx_jane)) + (pps ppx_jane ppx_log ppx_accessor ppx_deriving.show)) (libraries impl_core crowbar)) diff --git a/impl/lib/conspire_command_tree.ml b/impl/lib/conspire_command_tree.ml index 0ef61af..b15a7e1 100644 --- a/impl/lib/conspire_command_tree.ml +++ b/impl/lib/conspire_command_tree.ml @@ -30,7 +30,7 @@ module CommandTree (Value : Value) = struct Extensions to heads *) - type node = int * key * Value.t [@@deriving show, bin_io] + type node = int * key * Value.t [@@deriving show, bin_io, compare] type parent_ref_node = {node: node; parent: parent_ref_node option [@opaque]; key: key} @@ -256,10 +256,13 @@ module CommandTree (Value : Value) = struct match (curr, rt) with | None, Some _ -> Fmt.invalid_arg "%a not on path to %a" Key.pp rt_vc Key.pp hd_vc + (* Both root*) | None, None -> acc + (* Both non-root and equal *) | Some curr, Some rt when [%equal: Key.t] curr.key rt.key -> acc + (* both Non-root and non-equal *) | Some curr, _ -> aux curr.parent (curr.node :: acc) in diff --git a/impl/lib/conspire_leader.ml b/impl/lib/conspire_leader.ml index 77c032a..62d8cd0 100644 --- a/impl/lib/conspire_leader.ml +++ b/impl/lib/conspire_leader.ml @@ -118,9 +118,9 @@ struct let available_space_for_commands t = if can_apply_requests t then t.config.max_outstanding else 0 - let send ?(force = false) ?(prune = false) t dst = + let send ?(force = false) t dst = let open Rep in - let update = get_update_to_send ~prune t.conspire.rep dst in + let update = get_update_to_send t.conspire.rep dst in if force || Option.is_some update.ctree || Option.is_some update.cons then Act.send dst (Ok update) @@ -128,8 +128,7 @@ struct Act.send dst (Error {commit= t.conspire.rep.state.commit_index}) let broadcast ?(force = false) t = - List.iter t.config.other_replica_ids ~f:(fun nid -> - send ~force t nid ~prune:(not @@ is_leader t) ) + List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid) let nack_counter, run_nc = Ocons_core.Utils.InternalReporter.rate_reporter "nacks" @@ -166,8 +165,9 @@ struct let update_result = Conspire.handle_update_message t.conspire src m in match update_result with | Error `MustAck -> - Act.traceln "Acking %d" src; - ack_counter () ; send t src + Act.traceln "Acking %d" src ; + ack_counter () ; + send t src | Error (`MustNack reason) -> Act.traceln "Nack for %d: %s" src ( match reason with @@ -198,13 +198,29 @@ struct Result.is_ok conflict_recovery_attempt || committed || recovery_started in - if should_broadcast then broadcast t - else send ~prune:(not @@ is_leader t) t src ) + if should_broadcast then broadcast t else send t src ) let advance t e = - Act.run_side_effects - (fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e)) - t + let is_leader_pre = is_leader t in + let prev_term = t.conspire.rep.state.term in + let prev_vterm = t.conspire.rep.state.vterm in + let res = + Act.run_side_effects + (fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e)) + t + in + ( match (is_leader_pre, is_leader t) with + | true, false -> + Act.traceln "No longer leader for %d" t.conspire.rep.state.term + | false, true -> + Act.traceln "Now leader for %d" t.conspire.rep.state.term + | _ -> + () ) ; + if prev_term < t.conspire.rep.state.term then + Act.traceln "Conflict, term=%d" t.conspire.rep.state.term ; + if prev_vterm < t.conspire.rep.state.vterm then + Act.traceln "Recovery to %d" t.conspire.rep.state.vterm ; + res let create (config : config) = let conspire = Conspire.create config.conspire in diff --git a/impl/lib/conspire_leader_dc.ml b/impl/lib/conspire_leader_dc.ml index 419fa41..4d8d385 100644 --- a/impl/lib/conspire_leader_dc.ml +++ b/impl/lib/conspire_leader_dc.ml @@ -112,13 +112,11 @@ struct List.for_all t.config.lower_replica_ids ~f:(fun nid -> not @@ FailureDetector.is_live t.failure_detector nid ) - let can_apply_requests _t = true - let available_space_for_commands t = t.config.max_outstanding - let send ?(force = false) ?(prune = false) t dst = + let send ?(force = false) t dst = let open Rep in - let update = get_update_to_send ~prune t.conspire.rep dst in + let update = get_update_to_send t.conspire.rep dst in if force || Option.is_some update.ctree || Option.is_some update.cons then Act.send dst (Conspire (Ok update)) @@ -126,8 +124,7 @@ struct Act.send dst (Conspire (Error {commit= t.conspire.rep.state.commit_index})) let broadcast ?(force = false) t = - List.iter t.config.other_replica_ids ~f:(fun nid -> - send ~force t nid ~prune:(not @@ is_leader t) ) + List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid) let gather_batch_from_buffer t = let time = Eio.Time.now t.clock |> Utils.float_to_time in @@ -173,15 +170,17 @@ struct let update_result = Conspire.handle_update_message t.conspire src m in match update_result with | Error `MustAck -> + Act.traceln "Acking %d" src ; send t src | Error (`MustNack reason) -> - ( match reason with - | `Root_of_update_not_found _ -> - Utils.traceln "Nack: Update root not in tree" - | `Commit_index_not_in_tree -> - Utils.traceln "Nack: Commit index not in tree" - | `VVal_not_in_tree -> - Utils.traceln "Nack: VVal not in tree" ) ; + Act.traceln "Nack for %d: %s" src + ( match reason with + | `Root_of_update_not_found _ -> + "Update is not rooted" + | `Commit_index_not_in_tree -> + "Commit index not in tree" + | `VVal_not_in_tree -> + "VVal not int tree" ) ; nack t src | Ok () -> process_acceptor_state t.conspire src ; @@ -203,8 +202,7 @@ struct Result.is_ok conflict_recovery_attempt || committed || recovery_started in - if should_broadcast then broadcast t - else send ~prune:(not @@ is_leader t) t src ) + if should_broadcast then broadcast t else send t src ) let advance t e = let init_leader = is_leader t in