diff --git a/.ocamlformat b/.ocamlformat index f0503e5..e6598e7 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,4 +1,4 @@ -profile = sparse +profile = default break-cases = nested break-fun-decl = smart cases-exp-indent = 2 diff --git a/mirage/dune b/mirage/dune index aec294e..616925b 100644 --- a/mirage/dune +++ b/mirage/dune @@ -1,6 +1,6 @@ (library (name gluten_mirage) (public_name gluten-mirage) - (libraries faraday-lwt gluten-lwt lwt mirage-flow conduit-mirage cstruct) + (libraries faraday-lwt gluten-lwt lwt mirage-flow cstruct) (flags (:standard -safe-string))) diff --git a/mirage/gluten_mirage.ml b/mirage/gluten_mirage.ml index 5ee39fa..b33e707 100644 --- a/mirage/gluten_mirage.ml +++ b/mirage/gluten_mirage.ml @@ -32,46 +32,80 @@ open Lwt.Infix -module Make_IO (Flow : Mirage_flow.S) : - Gluten_lwt.IO with type socket = Flow.flow and type addr = unit = struct - type socket = Flow.flow - - type addr = unit +module Buffered_flow = struct + type 'a t = { + flow : 'a; + mutable buf : Cstruct.t; + } - let shutdown flow = Flow.close flow + let create flow = { flow; buf = Cstruct.empty } +end - let shutdown_receive flow = Lwt.async (fun () -> shutdown flow) +module Make_IO (Flow : Mirage_flow.S) : + Gluten_lwt.IO + with type socket = Flow.flow Buffered_flow.t + and type addr = unit = struct + type socket = Flow.flow Buffered_flow.t + type addr = unit + let shutdown (sock : _ Buffered_flow.t) = Flow.close sock.flow + let shutdown_receive sock = Lwt.async (fun () -> shutdown sock) let close = shutdown - let read flow bigstring ~off ~len:_ = + let buffered_read (sock : _ Buffered_flow.t) len = + let trunc buf = + match Cstruct.length buf > len with + | false -> + buf + | true -> + let head, rest = Cstruct.split buf len in + sock.buf <- rest; + head + in + let buffered_data = + match Cstruct.is_empty sock.buf with + | true -> + None + | false -> + let buf = sock.buf in + sock.buf <- Cstruct.empty; + Some (Ok (`Data (trunc buf))) + in + match buffered_data with + | Some data -> + Lwt.return data + | None -> ( + Flow.read sock.flow >|= fun data -> + assert (Cstruct.is_empty sock.buf); + match data with Ok (`Data buf) -> Ok (`Data (trunc buf)) | x -> x) + + let read (sock : _ Buffered_flow.t) bigstring ~off ~len = Lwt.catch (fun () -> - Flow.read flow >|= function + buffered_read sock len >|= function | Ok (`Data buf) -> - Bigstringaf.blit - buf.buffer - ~src_off:buf.off - bigstring - ~dst_off:off + Bigstringaf.blit buf.buffer ~src_off:buf.off bigstring ~dst_off:off ~len:buf.len; `Ok buf.len | Ok `Eof -> `Eof | Error error -> failwith (Format.asprintf "%a" Flow.pp_error error)) - (fun exn -> shutdown flow >>= fun () -> Lwt.fail exn) + (fun exn -> shutdown sock >>= fun () -> Lwt.fail exn) - let writev flow iovecs = + let writev (sock : _ Buffered_flow.t) iovecs = let cstruct_iovecs = List.map (fun { Faraday.buffer; off; len } -> Cstruct.of_bigarray ~off ~len buffer) iovecs in + let len = Cstruct.lenv cstruct_iovecs in + let data = Cstruct.create_unsafe len in + let _, _ = Cstruct.fillv ~src:cstruct_iovecs ~dst:data in Lwt.catch (fun () -> - Flow.writev flow cstruct_iovecs >|= fun x -> + Flow.write sock.flow data >|= fun x -> match x with | Ok () -> `Ok (Cstruct.lenv cstruct_iovecs) @@ -79,7 +113,7 @@ module Make_IO (Flow : Mirage_flow.S) : `Closed | Error other_error -> raise (Failure (Format.asprintf "%a" Flow.pp_write_error other_error))) - (fun exn -> shutdown flow >>= fun () -> Lwt.fail exn) + (fun exn -> shutdown sock >>= fun () -> Lwt.fail exn) end module Server (Flow : Mirage_flow.S) = Gluten_lwt.Server (Make_IO (Flow)) diff --git a/mirage/gluten_mirage.mli b/mirage/gluten_mirage.mli index 59c7d33..cacc64c 100644 --- a/mirage/gluten_mirage.mli +++ b/mirage/gluten_mirage.mli @@ -30,8 +30,19 @@ * POSSIBILITY OF SUCH DAMAGE. *---------------------------------------------------------------------------*) +module Buffered_flow : sig + type 'a t = { + flow : 'a; + mutable buf : Cstruct.t; + } + + val create : 'a -> 'a t +end + module Server (Flow : Mirage_flow.S) : - Gluten_lwt.Server with type socket = Flow.flow and type addr = unit + Gluten_lwt.Server + with type socket = Flow.flow Buffered_flow.t + and type addr = unit module Client (Flow : Mirage_flow.S) : - Gluten_lwt.Client with type socket = Flow.flow + Gluten_lwt.Client with type socket = Flow.flow Buffered_flow.t