Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to moonpool-over-picos #8

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions batrpc.opam
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ depends: [
"pbrt_yojson" {>= "3.0"}
"pbrt_services" {>= "3.0"}
"moonpool" {>= "0.5"}
"moonpool-io" {>= "0.7"}
"containers" {>= "3.0" & with-test}
"trace-tef" {with-test}
"mtime" {>= "2.0"}
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
(pbrt_yojson (>= 3.0))
(pbrt_services (>= 3.0))
(moonpool (>= 0.5))
(moonpool-io (>= 0.7))
(containers (and (>= 3.0) :with-test))
(trace-tef :with-test)
(mtime (>= 2.0))
Expand Down
23 changes: 16 additions & 7 deletions src/client/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ let handle_error (self : t) ~(meta : Meta.meta) ~ic () : unit =
let err = Error.mk_error ~kind:Errors.remote err.msg in
Log.err (fun k ->
k "client: received error for id %ld:@ %a" meta.id Error.pp err);
Fut.fulfill_idempotent promise (Error (Error.E err, bt))
Fut.fulfill_idempotent promise (Error (Exn_bt.make (Error.E err) bt))
| Some (IF_stream { bt; promise; _ }) ->
remove_from_tbl_ self meta.id;
let err = Framing.read_error ~config ~encoding ic ~meta in
let err = Error.mk_error ~kind:Errors.remote err.msg in
Log.err (fun k ->
k "client: received error for id %ld:@ %a" meta.id Error.pp err);
Fut.fulfill_idempotent promise (Error (Error.E err, bt))
Fut.fulfill_idempotent promise (Error (Exn_bt.make (Error.E err) bt))

let handle_timeout (self : t) id : unit =
let@ self = Lock.with_lock self.st in
Expand All @@ -162,14 +162,22 @@ let handle_timeout (self : t) id : unit =
(function
| IF_unary { promise; bt; _ } ->
remove_from_tbl_ self id;
let err = Error.mk_error ~kind:Error_kind.timeout "Timeout" in
let err =
Error.mk_error
~bt:(Printexc.raw_backtrace_to_string bt)
~kind:Error_kind.timeout "Timeout"
in
Log.err (fun k -> k "client: timeout for id %ld:" id);
Fut.fulfill_idempotent promise (Error (Error.E err, bt))
Fut.fulfill_idempotent promise (Error (Exn_bt.make (Error.E err) bt))
| IF_stream { promise; bt; _ } ->
remove_from_tbl_ self id;
let err = Error.mk_error ~kind:Error_kind.timeout "Timeout" in
let err =
Error.mk_error
~bt:(Printexc.raw_backtrace_to_string bt)
~kind:Error_kind.timeout "Timeout"
in
Log.err (fun k -> k "client: timeout for id %ld" id);
Fut.fulfill_idempotent promise (Error (Error.E err, bt)))
Fut.fulfill_idempotent promise (Error (Exn_bt.make (Error.E err) bt)))
entry

let[@inline] apply_middleware rpc (h : _ Handler.t) (m : Middleware.t) :
Expand Down Expand Up @@ -222,6 +230,7 @@ let send_request_ (self : t) ~oc ~meta ~rpc req : unit =
let@ enc = with_pbrt_enc_ self in
Pbrt.Encoder.clear enc;

Printf.eprintf "SEND REQ\n%!";
let@ oc = Lock.with_lock oc in
Framing.write_req ~enc ~config:self.config ~encoding:self.encoding oc rpc meta
req;
Expand Down Expand Up @@ -281,7 +290,7 @@ let call_client_stream (self : t) ~timer ~(oc : #Io.Out.t Lock.t)
Service.Value_mode.unary )
Pbrt_services.Client.rpc) : 'req Push_stream.t * _ Fut.t =
(* TODO: can we just avoid that? *)
let bt = Printexc.get_callstack 5 in
let bt = Printexc.get_callstack 10 in

Option.iter check_timeout_ timeout_s;
let fut, promise = Fut.make () in
Expand Down
1 change: 1 addition & 0 deletions src/core/common_.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Service = Pbrt_services
module Runner = Moonpool.Runner
module Log = (val Logs.src_log (Logs.Src.create "batrpc"))
module Slice = Iostream.Slice

type header = Meta.header
type headers = header list
19 changes: 11 additions & 8 deletions src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
:standard
-open
Imandrakit
-open
Imandrakit_thread
-open
Imandrakit_zip)
;-open
;Imandrakit_thread
;-open
;Imandrakit_zip
)
(preprocess
(pps ppx_deriving.std))
(libraries
atomic
moonpool
moonpool-io
trace.core
imandrakit
imandrakit.metrics
iostream
;imandrakit
;imandrakit.metrics
imandrakit.zip
imandrakit-thread
;imandrakit-thread
camlzip
iostream
;iostream
pbrt
pbrt_yojson
hmap
Expand Down
29 changes: 16 additions & 13 deletions src/core/io.ml
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
(** IO primitives *)

module Slice = Iostream.Slice
module MIO = Moonpool_io
module Fd = Moonpool_io.Fd

(** Input stream *)
module In = struct
include Iostream.In_buf

class of_str (str : string) : t = of_string str

class of_fd ?(shutdown = false) ?(close_noerr = false) ?bytes
(fd : Unix.file_descr) :
t =
class of_fd ?(shutdown = false) ?(close_noerr = false) ?bytes (fd : Fd.t) : t
=
let eof = ref false in
object
inherit t_from_refill ?bytes ()

method private refill (slice : Slice.t) =
if not !eof then (
slice.len <- Unix.read fd slice.bytes 0 (Bytes.length slice.bytes);
slice.len <- MIO.Unix.read fd slice.bytes 0 (Bytes.length slice.bytes);
slice.off <- 0;
Printf.eprintf "READ %S\n%!"
(Bytes.sub_string slice.bytes 0 slice.len);
if slice.len = 0 then eof := true
)

method close () =
eof := true;
if shutdown then (
try Unix.shutdown fd Unix.SHUTDOWN_RECEIVE with _ -> ()
try MIO.Unix.shutdown fd Unix.SHUTDOWN_RECEIVE with _ -> ()
);
if close_noerr then (
try Unix.close fd with _ -> ()
try MIO.Unix.close fd with _ -> ()
) else
Unix.close fd
MIO.Unix.close fd
end

(** [instrument ic ~on_read] makes a new buffered input stream.
Expand Down Expand Up @@ -82,29 +85,29 @@ end
module Out = struct
include Iostream.Out_buf

class of_fd ?(shutdown = false) ?(close_noerr = false) (fd : Unix.file_descr) :
t =
class of_fd ?(shutdown = false) ?(close_noerr = false) (fd : Fd.t) : t =
object
inherit t_from_output ()

method private output_underlying bs i len0 =
Printf.eprintf "IO OUTPUT %S\n%!" (Bytes.sub_string bs i len0);
let i = ref i in
let len = ref len0 in
while !len > 0 do
let n = Unix.write fd bs !i !len in
let n = MIO.Unix.write fd bs !i !len in
i := !i + n;
len := !len - n
done

method private close_underlying () =
if shutdown then (
try Unix.shutdown fd Unix.SHUTDOWN_SEND with _ -> ()
try MIO.Unix.shutdown fd Unix.SHUTDOWN_SEND with _ -> ()
);

if close_noerr then (
try Unix.close fd with _ -> ()
try MIO.Unix.close fd with _ -> ()
) else
Unix.close fd
MIO.Unix.close fd
end

(** [instrument oc ~on_write] returns a new output stream
Expand Down
14 changes: 0 additions & 14 deletions src/core/net_stats.ml

This file was deleted.

3 changes: 2 additions & 1 deletion src/otel/batrpc_otel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ module Util_ = struct
let status =
match res with
| Ok _ -> Otel.Proto.Trace.default_status ~code:Status_code_ok ()
| Error (exn, _bt) ->
| Error ebt ->
let exn = Exn_bt.exn ebt in
Otel.Proto.Trace.default_status ~code:Status_code_error
~message:(Printexc.to_string exn) ()
in
Expand Down
1 change: 1 addition & 0 deletions src/server/for_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ let run (self : t) : unit =
| Close -> handle_close self
| Heartbeat -> handle_heartbeat self ~meta
| Request ->
Printf.eprintf "GOT REQ\n%!";
State.handle_request self.st ~encoding:self.encoding ~runner:self.runner
~meta ~ic:self.ic ~oc:self.oc ()
| Client_stream_item ->
Expand Down
8 changes: 6 additions & 2 deletions src/server/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ let send_response_or_error (self : t) ~encoding ~oc ~(meta : Meta.meta) ~rpc
in

(* send response, atomically *)
Printf.eprintf "SEND RES\n%!";
let@ oc = Lock.with_lock oc in
Framing.write_res ~config:self.config ~encoding oc rpc meta res;
oc#flush ()
| Error err ->
Log.err (fun k -> k "reply with error for id=%ld:@ %a" meta.id Error.pp err);
Printf.eprintf "SEND RES ERR\n%!";
send_error self ~encoding ~oc ~meta err

let send_stream_item (self : t) ~encoding ~oc ~(meta : Meta.meta) ~rpc res :
Expand Down Expand Up @@ -156,16 +158,18 @@ let[@inline] apply_middleware ~service_name rpc (h : _ Handler.t)

let handle_request (self : t) ~encoding ~runner ~(meta : Meta.meta) ~ic ~oc () :
unit =
Printf.eprintf "HANDLE REQ\n%!";
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "rpc.server.handle-req" in
assert (meta.kind = Meta.Request);

let compute_res_and_reply rpc (f : _ Handler.t) (ctx, req) : unit =
let fut = f (ctx, req) in
Printf.eprintf "COMPUTE RES\n%!";
(* when [fut] is done, send result *)
Fut.on_result fut (function
| Ok res -> send_response_or_error self ~encoding ~oc ~meta ~rpc (Ok res)
| Error (exn, bt) ->
let res = Error (Error.of_exn ~kind:Errors.handler ~bt exn) in
| Error ebt ->
let res = Error (Error.of_exn_bt ~kind:Errors.handler ebt) in
send_response_or_error self ~encoding ~oc ~meta ~rpc res)
in

Expand Down
3 changes: 3 additions & 0 deletions src/unix/common_.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module MIO = Moonpool_io
module Fd = Moonpool_io.Fd
module Event = Moonpool_sync.Event
4 changes: 4 additions & 0 deletions src/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,9 @@
batrpc.client
batrpc.server
containers
imandrakit
imandrakit.metrics
moonpool.sync
moonpool-io
threads
unix))
8 changes: 4 additions & 4 deletions src/unix/tcp_client.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
open Common_
module C = Batrpc_client

type t = C.t
Expand All @@ -11,12 +12,11 @@ let connect ?active ?buf_pool ?(middlewares = []) ?(encoding = Encoding.Binary)
let@ () = Error.try_catch ~kind:Errors.network () in

let kind = Util_sockaddr.kind addr in
let sock = Unix.socket kind Unix.SOCK_STREAM 0 in
let sock = MIO.Unix.socket kind Unix.SOCK_STREAM 0 in

Unix.setsockopt sock Unix.TCP_NODELAY true;
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
MIO.Unix.setsockopt sock Unix.TCP_NODELAY true;

Unix.connect sock addr;
MIO.Unix.connect sock addr;

let ic = new Io.In.of_fd ~shutdown:true ~close_noerr:true sock in
let oc = new Io.Out.of_fd ~close_noerr:true sock in
Expand Down
32 changes: 17 additions & 15 deletions src/unix/tcp_server.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
open Common_
module Server = Batrpc_server

type conn = Server.For_client.t
Expand All @@ -7,7 +8,7 @@ type t = {
alive: bool Atomic.t;
st: Server.State.t;
on_new_client: conn -> Unix.sockaddr -> unit;
sock: Unix.file_descr;
sock: Fd.t;
runner: Runner.t;
timer: Timer.t;
alive_conns: int Atomic.t;
Expand All @@ -19,7 +20,7 @@ let[@inline] active self = self.active
let terminate self : unit =
if Atomic.exchange self.alive false then (
Log_rpc.debug (fun k -> k "tcp server: terminating");
try Unix.close self.sock with _ -> ()
try MIO.Unix.close self.sock with _ -> ()
)

let add_middleware self m = Server.State.add_middleware self.st m
Expand All @@ -30,19 +31,19 @@ let create ?server_state ?(on_new_client = fun _ _ -> ())
(addr : Unix.sockaddr) : t Error.result =
let@ () = Error.try_catch ~kind:Errors.network () in
let kind = Util_sockaddr.kind addr in
let sock = Unix.socket kind Unix.SOCK_STREAM 0 in
let sock = MIO.Unix.socket kind Unix.SOCK_STREAM 0 in

Unix.setsockopt sock Unix.SO_REUSEADDR true;
Unix.setsockopt sock Unix.SO_REUSEPORT true;
if reuseaddr then Unix.setsockopt sock Unix.SO_REUSEPORT true;
MIO.Unix.setsockopt sock Unix.SO_REUSEADDR true;
MIO.Unix.setsockopt sock Unix.SO_REUSEPORT true;
if reuseaddr then MIO.Unix.setsockopt sock Unix.SO_REUSEPORT true;

config_socket sock;

(* we're going to use [select] to not entirely block on [accept] *)
Unix.set_nonblock sock;
MIO.Unix.set_nonblock sock;

Unix.bind sock addr;
Unix.listen sock 16;
MIO.Unix.bind sock addr;
MIO.Unix.listen sock 16;

let buf_pool = Buf_pool.create () in

Expand Down Expand Up @@ -78,8 +79,7 @@ let create ?server_state ?(on_new_client = fun _ _ -> ())
(** Handle this new client. This only sets up the new connection,
actual work will happen in the new RPC conn. *)
let handle_client_async_ (self : t) client_sock client_addr : unit =
Unix.setsockopt client_sock Unix.TCP_NODELAY true;
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
MIO.Unix.setsockopt client_sock Unix.TCP_NODELAY true;

let ic = new Io.In.of_fd ~shutdown:true ~close_noerr:true client_sock in
let oc = new Io.Out.of_fd ~shutdown:true ~close_noerr:true client_sock in
Expand All @@ -105,8 +105,10 @@ let handle_client_async_ (self : t) client_sock client_addr : unit =
Server.For_client.create ~active:self.active ~buf_pool:self.buf_pool
~state:self.st ~runner:self.runner ~encoding ~timer:self.timer ~ic ~oc ()
in
(* TODO: use a fiber instead? *)
ignore (Thread.create Server.For_client.run rpc_conn : Thread.t);
ignore
(Moonpool_fib.spawn_top ~on:self.runner (fun () ->
Server.For_client.run rpc_conn)
: _ Moonpool_fib.t);

Fut.on_result (Server.For_client.on_close rpc_conn) (fun _ ->
Atomic.decr self.alive_conns);
Expand All @@ -120,15 +122,15 @@ let run (self : t) : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "bin-rpc.tcp-server.run" in

let wait_for_client_or_timeout () : bool =
match Unix.select [ self.sock ] [] [] 1.0 with
match Unix.select [ Fd.unsafe_get self.sock ] [] [] 1.0 with
| [], _, _ -> false
| _ -> true
in

while Atomic.get self.alive && Switch.is_on self.active do
let maybe_client = wait_for_client_or_timeout () in
if maybe_client then (
match Unix.accept self.sock with
match MIO.Unix.accept self.sock with
| client_sock, client_addr ->
Atomic.incr self.alive_conns;

Expand Down
Loading
Loading