Skip to content

Commit

Permalink
Merge pull request #30 from Cjen1/transactions
Browse files Browse the repository at this point in the history
Transactions
  • Loading branch information
Cjen1 authored Jul 15, 2024
2 parents 7397634 + 46e4a7b commit 883c067
Show file tree
Hide file tree
Showing 13 changed files with 891 additions and 801 deletions.
2 changes: 1 addition & 1 deletion bin/bench.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let pitcher ~sw nid mclock n rate cmgr (dispatch : (int, Mtime.t) Hashtbl.t) :
let id = i * prime in
let cmd =
Command.
{ op= Write ("asdf", "asdf")
{ op= [|Write ("asdf", "asdf"); Read "fdsa"|]
; id
; submitted= Unix.gettimeofday ()
; trace_start= Unix.gettimeofday () }
Expand Down
20 changes: 2 additions & 18 deletions bin/cli.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ let run op sockaddrs id retry_timeout =
let cli = Cli.create_rpc ~sw env con_ress id retry_timeout in
Eio.traceln "Submitting request %a" sm_op_pp op ;
Random.self_init () ;
let res = Cli.send_request ~random_id:true cli op in
let res = Cli.send_request ~random_id:true cli [|op|] in
Eio.traceln "Received: %a" op_result_pp res ;
Cli.close cli

Expand Down Expand Up @@ -107,20 +107,4 @@ let read_cmd =
let op_t = const (fun k -> Types.Read k) $ key_t in
const_cmd info 1 op_t

let cas_cmd =
let open Term in
let info = Cmd.info "cas" in
let key_t = Arg.(required & pos 0 (some string) None (info ~docv:"KEY" [])) in
let value_t =
Arg.(required & pos 1 (some string) None (info ~docv:"VALUE" []))
in
let new_value_t =
Arg.(required & pos 2 (some string) None (info ~docv:"NEW_VALUE" []))
in
let op_t =
const (fun key value value' -> Types.CAS {key; value; value'})
$ key_t $ value_t $ new_value_t
in
const_cmd info 3 op_t

let () = exit Cmd.(eval @@ group (info "cli") [write_cmd; read_cmd; cas_cmd])
let () = exit Cmd.(eval @@ group (info "cli") [write_cmd; read_cmd])
17 changes: 8 additions & 9 deletions fuzz/lib_line_prot.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ module Gen = struct
let op =
choose
[ map [bytes] (fun k -> Read k)
; map [bytes; bytes] (fun k v -> Write (k, v))
; map [bytes; bytes; bytes] (fun key value value' ->
CAS {key; value; value'} )
; const NoOp ]
; map [bytes; bytes] (fun k v -> Write (k, v)) ]

let command =
map [op; int; float; float] (fun op id submitted trace_start ->
Ocons_core.Types.Command.{op; id; submitted; trace_start} )
map
[list op; int; float; float]
(fun op id submitted trace_start ->
Ocons_core.Types.Command.
{op= Array.of_list op; id; submitted; trace_start} )

let log_entry = map [command; int] (fun command term -> {command; term})

Expand All @@ -24,9 +24,8 @@ module Gen = struct

let op_response =
choose
[ const Success
; map [bytes] (fun k -> Failure k)
; map [bytes] (fun k -> ReadSuccess k) ]
[ map [list @@ pair bytes (option bytes)] (fun resps -> Success resps)
; map [bytes] (fun k -> Failure k) ]
end

let test_client_request r =
Expand Down
12 changes: 6 additions & 6 deletions impl/fuzz/test_msgs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ module Gen = struct
let open Ocons_core.Types in
choose
[ map [bytes] (fun k -> Read k)
; map [bytes; bytes] (fun k v -> Write (k, v))
; map [bytes; bytes; bytes] (fun key value value' ->
CAS {key; value; value'} )
; const NoOp ]
; map [bytes; bytes] (fun k v -> Write (k, v)) ]

let command =
with_printer Command.pp
@@ map [op; int; float; float] (fun op id submitted trace_start ->
Ocons_core.Types.Command.{op; id; trace_start; submitted} )
@@ map
[list op; int; float; float]
(fun op id submitted trace_start ->
Ocons_core.Types.Command.
{op= Array.of_list op; id; trace_start; submitted} )

let log_entry =
with_printer log_entry_pp
Expand Down
29 changes: 2 additions & 27 deletions impl/lib/line_prot.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,9 @@ module DeserPrim = Ocons_core.Line_prot.DeserPrim
module W = Eio.Buf_write
module R = Eio.Buf_read

let bin_io_read reader t =
let open Bin_prot.Utils in
let open R in
ensure t size_header_length ;
let cst = R.peek t in
let pos_ref = ref cst.Cstruct.off in
let len = bin_read_size_header cst.Cstruct.buffer ~pos_ref in
ensure t (len + size_header_length) ;
let cst = R.peek t in
let init_pos = cst.Cstruct.off + size_header_length in
pos_ref := init_pos ;
let x = reader cst.Cstruct.buffer ~pos_ref in
let len' = !pos_ref - init_pos in
assert (len' = len) ;
consume t (len + size_header_length) ;
x
let bin_io_write = Ocons_core.Line_prot.bin_io_write

let bin_io_write write_buf writer sizer t =
let open Bin_prot.Utils in
let open W in
let hlen = size_header_length in
let vlen = sizer t in
let tlen = vlen + hlen in
let blit x ~src_off:_ (cstbuf : Cstruct.buffer) ~dst_off:pos ~len =
let pos = bin_write_size_header cstbuf ~pos (len - hlen) in
writer cstbuf ~pos x |> ignore
in
write_gen write_buf ~blit ~off:0 ~len:tlen t
let bin_io_read = Ocons_core.Line_prot.bin_io_read

module VarLineProt
(ST : StrategyTypes)
Expand Down
Loading

0 comments on commit 883c067

Please sign in to comment.