Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve conspire msg #28

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bin/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ let pitcher ~sw nid mclock n rate cmgr (dispatch : (int, Mtime.t) Hashtbl.t) :
let id = i * prime in
let cmd =
Command.
{op= Write ("asdf", "asdf"); id; trace_start= Unix.gettimeofday ()}
{ op= Write ("asdf", "asdf")
; id
; submitted= Unix.gettimeofday ()
; trace_start= Unix.gettimeofday () }
in
let target = Mtime.add_span prev period |> Option.get in
if Mtime.is_later target ~than:(MT.now mclock) then
Expand Down
28 changes: 27 additions & 1 deletion bin/test_alloc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,30 @@ let alloc_test =
in
Bench.Test.create_with_initialization ~name:"alloc_with_reference" test

let () = Command_unix.run (Bench.make_command [alloc_test])
let alloc_return_tuple =
let extern = ref 0 in
let f x = (x + 1, x + 2, x + 3) in
let test () =
let a, b, c = f 1 in
extern := a + b + c
in
Bench.Test.create ~name:"alloc_return_tuple" test

type foo = {mutable a: int; mutable b: int; mutable c: int}

let alloc_return_struct =
let extern = ref 0 in
let f x r =
r.a <- x + 1 ;
r.b <- x + 2 ;
r.c <- x + 3
in
let test `init =
let foo = {a= 0; b= 0; c= 0} in
fun () ->
f 1 foo ;
extern := foo.a + foo.b + foo.c
in
Bench.Test.create_with_initialization ~name:"alloc_return_struct" test

let () = Command_unix.run (Bench.make_command [alloc_test; alloc_return_tuple; alloc_return_struct])
3 changes: 2 additions & 1 deletion fuzz/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
(test
(name lib_line_prot)
(libraries ocons.core crowbar mtime))
(libraries ocons.core crowbar mtime)
(preprocess (pps ppx_jane)))
12 changes: 9 additions & 3 deletions fuzz/lib_line_prot.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ module Gen = struct
; const NoOp ]

let command =
map [op; int; float] (fun op id trace_start ->
Ocons_core.Types.Command.{op; id; trace_start} )
map [op; int; float; float] (fun op id submitted trace_start ->
Ocons_core.Types.Command.{op; id; submitted; trace_start} )

let log_entry = map [command; int] (fun command term -> {command; term})

Expand All @@ -39,7 +39,13 @@ let test_client_request r =
@@ fun bw ->
Line_prot.External_infra.serialise_request r bw ;
let r' = Line_prot.External_infra.parse_request br in
check_eq ~cmp:Command.compare ~pp:Command.pp_mach r r'
(* compare *)
check_eq ~cmp:Command.compare ~pp:Command.pp_mach r r';
(* hash *)
check_eq ~pp:Command.pp_mach r r' ~cmp:(fun a b ->
let ha, hb = Command.hash a, Command.hash b in
Int.compare ha hb
)

let test_client_response r =
let open Crowbar in
Expand Down
51 changes: 51 additions & 0 deletions impl/fuzz/command_tree.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
open! Core

module CTree = Impl_core__Conspire_command_tree.CommandTree (struct
type t = int [@@deriving compare, show, bin_io, hash]
end)

let ctree =
let open Crowbar in
fix (fun gen ->
choose
[ const CTree.empty
; map [gen; int; list1 int] (fun ctree kid vs ->
let nodes = Map.keys ctree.CTree.ctree in
let kid = kid % List.length nodes in
let root = List.nth_exn nodes kid in
let ctree', _ =
CTree.addv ~node:0 ctree ~parent:root (List.iter vs |> Iter.from_labelled_iter) in
ctree')])

(* take ctree and random node and node in its history *)
(* make an update of that and add it to the tree of just the path to that *)
let test_ctree_update ctree u_start u_end =
let nodes =
ctree.CTree.ctree |> Map.keys
|> List.sort ~compare:(fun ka kb ->
Int.compare (CTree.get_idx ctree ka) (CTree.get_idx ctree kb) )
in
let u_end = u_end % List.length nodes in
let n_end = List.nth_exn nodes u_end in
(* path goes from root to n_end *)
let path = CTree.path_between ctree CTree.root_key n_end in
let remote_path, _ = List.split_n path (u_start % max 1 (List.length path)) in
let remote, _ =
CTree.addv CTree.empty ~node:0 ~parent:CTree.root_key
( List.iter remote_path |> Iter.from_labelled_iter
|> Iter.map (fun (_, _, v) -> v) )
in
let update = CTree.make_update ctree n_end remote in
let remote = CTree.apply_update_exn remote update in
let local_path = CTree.path_between ctree CTree.root_key n_end in
let remote_path = CTree.path_between remote CTree.root_key n_end in
let pp ppf v =
Fmt.pf ppf "%a"
Fmt.(brackets @@ list ~sep:comma int)
(List.map v ~f:(fun (_idx, _key, v) -> v))
in
Crowbar.check_eq ~pp ~cmp:[%compare: CTree.node list] local_path remote_path

let () =
let open Crowbar in
add_test ~name:"ctree update" [ctree; int; int] test_ctree_update
4 changes: 2 additions & 2 deletions impl/fuzz/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(tests
(names test_msgs)
(names test_msgs command_tree)
(flags
(:standard -w -27))
(preprocess
(pps ppx_jane))
(pps ppx_jane ppx_log ppx_accessor ppx_deriving.show))
(libraries impl_core crowbar))
23 changes: 17 additions & 6 deletions impl/fuzz/test_msgs.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
module C = Crowbar

let entries_equal (ea, la) (eb, lb) =
la = lb && List.equal ( = ) (Iter.to_list ea) (Iter.to_list eb)
la = lb
&& List.equal [%compare.equal: Ocons_core.Types.log_entry] (Iter.to_list ea)
(Iter.to_list eb)

module Gen = struct
open Ocons_core.Types
Expand All @@ -17,13 +19,22 @@ module Gen = struct
; const NoOp ]

let command =
map [op; int] (fun op id ->
Ocons_core.Types.Command.{op; id; trace_start= -1.} )
with_printer Command.pp
@@ map [op; int; float; float] (fun op id submitted trace_start ->
Ocons_core.Types.Command.{op; id; trace_start; submitted} )

let log_entry = map [command; int] (fun command term -> {command; term})
let log_entry =
with_printer log_entry_pp
@@ map [command; int] (fun command term -> {command; term})

let pp_entries ppf (v, _) =
Fmt.pf ppf "%a"
Fmt.(brackets @@ list ~sep:comma log_entry_pp_mach)
(Iter.to_list v)

let entries =
map [list log_entry] (fun les -> (Iter.of_list les, List.length les))
with_printer pp_entries
@@ map [list log_entry] (fun les -> (Iter.of_list les, List.length les))

let conspire_value : Impl_core.ConspireSS.value gen = list command
end
Expand All @@ -43,7 +54,7 @@ let test_entry_equality les =
@@ fun bw ->
LP.SerPrim.entries w_entries bw ;
let r_entries = LP.DeserPrim.entries br in
check_eq ~eq:entries_equal w_entries r_entries
check_eq ~pp:Gen.pp_entries ~eq:entries_equal w_entries r_entries

module Paxos = struct
open Impl_core.Paxos
Expand Down
11 changes: 7 additions & 4 deletions impl/lib/conspire_command_tree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module CommandTree (Value : Value) = struct
Extensions to heads
*)

type node = int * key * Value.t [@@deriving show, bin_io]
type node = int * key * Value.t [@@deriving show, bin_io, compare]

type parent_ref_node =
{node: node; parent: parent_ref_node option [@opaque]; key: key}
Expand Down Expand Up @@ -93,7 +93,7 @@ module CommandTree (Value : Value) = struct
| [] ->
Fmt.failwith "All updates should be non-empty"
| (_, par, _) :: _ when not (Map.mem t.ctree par) ->
Fmt.error_msg "Missing %a" Key.pp par
Error (`Root_of_update_not_found par)
| (_, par, _) :: _ as extension ->
let rec aux (ctree : parent_ref_node option Map.M(Key).t)
(extension : node list) parent =
Expand All @@ -115,8 +115,8 @@ module CommandTree (Value : Value) = struct
match apply_update t update with
| Ok t ->
t
| Error (`Msg s) ->
Fmt.failwith "%s" s
| Error (`Root_of_update_not_found par) ->
Fmt.failwith "Root of update not found: %a" Key.pp par

let addv t ~node:_ ~parent vi =
let parent_key = parent in
Expand Down Expand Up @@ -256,10 +256,13 @@ module CommandTree (Value : Value) = struct
match (curr, rt) with
| None, Some _ ->
Fmt.invalid_arg "%a not on path to %a" Key.pp rt_vc Key.pp hd_vc
(* Both root*)
| None, None ->
acc
(* Both non-root and equal *)
| Some curr, Some rt when [%equal: Key.t] curr.key rt.key ->
acc
(* both Non-root and non-equal *)
| Some curr, _ ->
aux curr.parent (curr.node :: acc)
in
Expand Down
102 changes: 31 additions & 71 deletions impl/lib/conspire_dc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,6 @@ end
(all the while doing the recovery and commit messages as before)
*)

module DelayReorderBuffer = struct
type 'a t =
{ mutable store: 'a list Map.M(Time).t
[@polyprinter fun pa -> Utils.pp_map pp_time_float_unix pa]
; mutable hwm: Time.t [@printer pp_time_float_unix]
; interval: Time.Span.t
; compare: 'a -> 'a -> int }
[@@deriving show {with_path= false}]

let ms_clamp ?(direction = `Ceil) interval f =
match direction with
| `Ceil ->
Time.next_multiple ~base:Time.epoch ~interval ~after:f
~can_equal_after:true ()
| `Floor ->
Time.prev_multiple ~base:Time.epoch ~interval ~before:f
~can_equal_before:true ()

let get_values t current =
(* lim is the millisecond before now *)
let lim = current |> ms_clamp ~direction:`Floor t.interval in
let seq =
Map.to_sequence ~order:`Increasing_key ~keys_less_or_equal_to:lim t.store
in
t.hwm <- lim ;
seq
|> Sequence.group ~break:(fun (a, _) (b, _) ->
Time.(ms_clamp t.interval a <> ms_clamp t.interval b) )
|> Sequence.map ~f:(fun ls ->
let key = ms_clamp t.interval (List.hd_exn ls |> fst) in
(* remove relevant keys *)
let values =
ls
|> List.iter ~f:(fun (k, _) ->
t.store <- Map.remove_multi t.store k ) ;
(* export batch *)
ls |> List.map ~f:snd |> List.join |> List.sort ~compare:t.compare
in
(values, key) )

let add_value t v target =
if Time_float.(target > t.hwm) then
t.store <- Map.add_multi t.store ~key:target ~data:v

let create ~compare interval base_hwm =
{hwm= base_hwm; interval; store= Map.empty (module Time); compare}
end

module Counter = struct
type t = {mutable count: int; limit: int} [@@deriving show]
Expand Down Expand Up @@ -129,7 +82,7 @@ module Types = struct
{ config: config [@opaque]
; conspire: Conspire.t
; command_buffer:
Command.t DelayReorderBuffer.t (* TODO only reply to relevnat nodes *)
Command.t Delay_buffer.t (* TODO only reply to relevnat nodes *)
; tick_count: Counter.t
; clock: float Eio.Time.clock_ty Eio.Std.r [@opaque] }
[@@deriving show {with_path= false}]
Expand Down Expand Up @@ -186,7 +139,7 @@ struct
|> Option.value_map ~default:Time.epoch ~f:snd
in
let batches =
DelayReorderBuffer.get_values t.command_buffer time
Delay_buffer.get_values t.command_buffer time
|> Sequence.filter ~f:(fun (_, t) -> Time.(t > batch_floor))
in
Conspire.add_commands t.conspire
Expand All @@ -206,34 +159,41 @@ struct
let now = Eio.Time.now t.clock |> Utils.float_to_time in
let hedged_target = Time.add now t.config.delay_interval in
List.iter commands ~f:(fun c ->
DelayReorderBuffer.add_value t.command_buffer c hedged_target ) ;
Delay_buffer.add_value t.command_buffer c hedged_target ) ;
Act.broadcast (Commands (commands, hedged_target))
| Recv (Commands (cs, tar), _) ->
List.iter cs ~f:(fun c ->
DelayReorderBuffer.add_value t.command_buffer c tar )
Delay_buffer.add_value t.command_buffer c tar )
| Recv (Conspire m, src) -> (
let recovery_started, prev_ci =
( t.conspire.rep.state.term > t.conspire.rep.state.vterm
, get_commit_index t )
in
let msg_result = Conspire.handle_message t.conspire src m in
let now_steady_state, committed =
( t.conspire.rep.state.term = t.conspire.rep.state.vterm
, get_commit_index t > prev_ci )
in
let recovery_completed = recovery_started && now_steady_state in
let recovery_initiated =
(not recovery_started) && not now_steady_state
in
match msg_result with
let update_result = Conspire.handle_update_message t.conspire src m in
match update_result with
| Error `MustAck ->
send t src
| Error `MustNack ->
| Error (`MustNack reason) ->
( match reason with
| `Root_of_update_not_found _ ->
Utils.dtraceln "Update is not rooted"
| `Commit_index_not_in_tree ->
Utils.dtraceln "Commit index not in tree"
| `VVal_not_in_tree ->
Utils.dtraceln "VVal not int tree" ) ;
nack t src
| Ok _ when recovery_completed || committed || recovery_initiated ->
broadcast t
| _ ->
send t src )
| Ok () ->
process_acceptor_state t.conspire src ;
let conflict_recovery_attempt =
Conspire.conflict_recovery t.conspire
in
let recovery_started =
t.conspire.rep.state.term > t.conspire.rep.state.vterm
in
let committed =
Conspire.check_commit t.conspire |> Option.is_some
in
let should_broadcast =
Result.is_ok conflict_recovery_attempt
|| committed || recovery_started
in
if should_broadcast then broadcast t else send t src )

let advance t e =
Act.run_side_effects
Expand All @@ -243,7 +203,7 @@ struct
let create (config : config) =
let conspire = Conspire.create config.conspire in
let command_buffer =
DelayReorderBuffer.create ~compare:Command.compare
Delay_buffer.create ~compare:Command.compare
config.batching_interval
(Eio.Time.now config.clock |> Utils.float_to_time)
in
Expand Down
Loading
Loading