Skip to content

Commit

Permalink
Add fuzzing and better documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Cjen1 committed Jan 8, 2024
1 parent ef3abaa commit 1f92ed4
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 29 deletions.
51 changes: 51 additions & 0 deletions impl/fuzz/command_tree.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
open! Core

module CTree = Impl_core__Conspire_command_tree.CommandTree (struct
type t = int [@@deriving compare, show, bin_io, hash]
end)

let ctree =
let open Crowbar in
fix (fun gen ->
choose
[ const CTree.empty
; map [gen; int; list1 int] (fun ctree kid vs ->
let nodes = Map.keys ctree.CTree.ctree in
let kid = kid % List.length nodes in
let root = List.nth_exn nodes kid in
let ctree', _ =
CTree.addv ~node:0 ctree ~parent:root (List.iter vs |> Iter.from_labelled_iter) in
ctree')])

(* take ctree and random node and node in its history *)
(* make an update of that and add it to the tree of just the path to that *)
let test_ctree_update ctree u_start u_end =
let nodes =
ctree.CTree.ctree |> Map.keys
|> List.sort ~compare:(fun ka kb ->
Int.compare (CTree.get_idx ctree ka) (CTree.get_idx ctree kb) )
in
let u_end = u_end % List.length nodes in
let n_end = List.nth_exn nodes u_end in
(* path goes from root to n_end *)
let path = CTree.path_between ctree CTree.root_key n_end in
let remote_path, _ = List.split_n path (u_start % max 1 (List.length path)) in
let remote, _ =
CTree.addv CTree.empty ~node:0 ~parent:CTree.root_key
( List.iter remote_path |> Iter.from_labelled_iter
|> Iter.map (fun (_, _, v) -> v) )
in
let update = CTree.make_update ctree n_end remote in
let remote = CTree.apply_update_exn remote update in
let local_path = CTree.path_between ctree CTree.root_key n_end in
let remote_path = CTree.path_between remote CTree.root_key n_end in
let pp ppf v =
Fmt.pf ppf "%a"
Fmt.(brackets @@ list ~sep:comma int)
(List.map v ~f:(fun (_idx, _key, v) -> v))
in
Crowbar.check_eq ~pp ~cmp:[%compare: CTree.node list] local_path remote_path

let () =
let open Crowbar in
add_test ~name:"ctree update" [ctree; int; int] test_ctree_update
4 changes: 2 additions & 2 deletions impl/fuzz/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(tests
(names test_msgs)
(names test_msgs command_tree)
(flags
(:standard -w -27))
(preprocess
(pps ppx_jane))
(pps ppx_jane ppx_log ppx_accessor ppx_deriving.show))
(libraries impl_core crowbar))
5 changes: 4 additions & 1 deletion impl/lib/conspire_command_tree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module CommandTree (Value : Value) = struct
Extensions to heads
*)

type node = int * key * Value.t [@@deriving show, bin_io]
type node = int * key * Value.t [@@deriving show, bin_io, compare]

type parent_ref_node =
{node: node; parent: parent_ref_node option [@opaque]; key: key}
Expand Down Expand Up @@ -256,10 +256,13 @@ module CommandTree (Value : Value) = struct
match (curr, rt) with
| None, Some _ ->
Fmt.invalid_arg "%a not on path to %a" Key.pp rt_vc Key.pp hd_vc
(* Both root*)
| None, None ->
acc
(* Both non-root and equal *)
| Some curr, Some rt when [%equal: Key.t] curr.key rt.key ->
acc
(* both Non-root and non-equal *)
| Some curr, _ ->
aux curr.parent (curr.node :: acc)
in
Expand Down
38 changes: 27 additions & 11 deletions impl/lib/conspire_leader.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,17 @@ struct
let available_space_for_commands t =
if can_apply_requests t then t.config.max_outstanding else 0

let send ?(force = false) ?(prune = false) t dst =
let send ?(force = false) t dst =
let open Rep in
let update = get_update_to_send ~prune t.conspire.rep dst in
let update = get_update_to_send t.conspire.rep dst in
if force || Option.is_some update.ctree || Option.is_some update.cons then
Act.send dst (Ok update)

let nack t dst =
Act.send dst (Error {commit= t.conspire.rep.state.commit_index})

let broadcast ?(force = false) t =
List.iter t.config.other_replica_ids ~f:(fun nid ->
send ~force t nid ~prune:(not @@ is_leader t) )
List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid)

let nack_counter, run_nc =
Ocons_core.Utils.InternalReporter.rate_reporter "nacks"
Expand Down Expand Up @@ -166,8 +165,9 @@ struct
let update_result = Conspire.handle_update_message t.conspire src m in
match update_result with
| Error `MustAck ->
Act.traceln "Acking %d" src;
ack_counter () ; send t src
Act.traceln "Acking %d" src ;
ack_counter () ;
send t src
| Error (`MustNack reason) ->
Act.traceln "Nack for %d: %s" src
( match reason with
Expand Down Expand Up @@ -198,13 +198,29 @@ struct
Result.is_ok conflict_recovery_attempt
|| committed || recovery_started
in
if should_broadcast then broadcast t
else send ~prune:(not @@ is_leader t) t src )
if should_broadcast then broadcast t else send t src )

let advance t e =
Act.run_side_effects
(fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e))
t
let is_leader_pre = is_leader t in
let prev_term = t.conspire.rep.state.term in
let prev_vterm = t.conspire.rep.state.vterm in
let res =
Act.run_side_effects
(fun () -> Exn.handle_uncaught_and_exit (fun () -> handle_event t e))
t
in
( match (is_leader_pre, is_leader t) with
| true, false ->
Act.traceln "No longer leader for %d" t.conspire.rep.state.term
| false, true ->
Act.traceln "Now leader for %d" t.conspire.rep.state.term
| _ ->
() ) ;
if prev_term < t.conspire.rep.state.term then
Act.traceln "Conflict, term=%d" t.conspire.rep.state.term ;
if prev_vterm < t.conspire.rep.state.vterm then
Act.traceln "Recovery to %d" t.conspire.rep.state.vterm ;
res

let create (config : config) =
let conspire = Conspire.create config.conspire in
Expand Down
28 changes: 13 additions & 15 deletions impl/lib/conspire_leader_dc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,19 @@ struct
List.for_all t.config.lower_replica_ids ~f:(fun nid ->
not @@ FailureDetector.is_live t.failure_detector nid )

let can_apply_requests _t = true

let available_space_for_commands t = t.config.max_outstanding

let send ?(force = false) ?(prune = false) t dst =
let send ?(force = false) t dst =
let open Rep in
let update = get_update_to_send ~prune t.conspire.rep dst in
let update = get_update_to_send t.conspire.rep dst in
if force || Option.is_some update.ctree || Option.is_some update.cons then
Act.send dst (Conspire (Ok update))

let nack t dst =
Act.send dst (Conspire (Error {commit= t.conspire.rep.state.commit_index}))

let broadcast ?(force = false) t =
List.iter t.config.other_replica_ids ~f:(fun nid ->
send ~force t nid ~prune:(not @@ is_leader t) )
List.iter t.config.other_replica_ids ~f:(fun nid -> send ~force t nid)

let gather_batch_from_buffer t =
let time = Eio.Time.now t.clock |> Utils.float_to_time in
Expand Down Expand Up @@ -173,15 +170,17 @@ struct
let update_result = Conspire.handle_update_message t.conspire src m in
match update_result with
| Error `MustAck ->
Act.traceln "Acking %d" src ;
send t src
| Error (`MustNack reason) ->
( match reason with
| `Root_of_update_not_found _ ->
Utils.traceln "Nack: Update root not in tree"
| `Commit_index_not_in_tree ->
Utils.traceln "Nack: Commit index not in tree"
| `VVal_not_in_tree ->
Utils.traceln "Nack: VVal not in tree" ) ;
Act.traceln "Nack for %d: %s" src
( match reason with
| `Root_of_update_not_found _ ->
"Update is not rooted"
| `Commit_index_not_in_tree ->
"Commit index not in tree"
| `VVal_not_in_tree ->
"VVal not int tree" ) ;
nack t src
| Ok () ->
process_acceptor_state t.conspire src ;
Expand All @@ -203,8 +202,7 @@ struct
Result.is_ok conflict_recovery_attempt
|| committed || recovery_started
in
if should_broadcast then broadcast t
else send ~prune:(not @@ is_leader t) t src )
if should_broadcast then broadcast t else send t src )

let advance t e =
let init_leader = is_leader t in
Expand Down

0 comments on commit 1f92ed4

Please sign in to comment.