Skip to content

Commit

Permalink
add keep_alive
Browse files Browse the repository at this point in the history
  • Loading branch information
craff committed Mar 6, 2024
1 parent e0ce59b commit 3945c8b
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 131 deletions.
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(name http_lwt_client)
(public_name http-lwt-client)
(libraries faraday-lwt-unix h2 httpaf lwt.unix tls tls-lwt ca-certs base64
logs happy-eyeballs-lwt))
logs happy-eyeballs-lwt stringext))
131 changes: 92 additions & 39 deletions src/http_lwt_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,24 @@ let pp_response ppf { version ; status ; reason ; headers } =
Format.fprintf ppf "((version \"%a\") (status %a) (reason %S) (headers %a))"
Version.pp_hum version Status.pp_hum status reason Headers.pp_hum headers

let single_http_1_1_request ?config fd user_pass host meth path headers body f f_init =
let update_keep_alive keep_alive response =
let open Http_lwt_unix in
match keep_alive with
| None -> ()
| Some r ->
match Headers.get response.headers "Keep-Alive" with
| None -> ()
| Some ka ->
match Stringext.find_from ka ~pattern:"max=" with
| None -> ()
| Some n ->
try
Scanf.sscanf (String.sub ka (n+4) (String.length ka - n - 4))
"%d" (fun n -> r.max <- n)
with _ ->
Log.err (fun m -> m "Bad Keep-Alive header")

let single_http_1_1_request ?config ?keep_alive fd user_pass host meth path headers body f f_init =
let blen = Option.map String.length body in
let headers = prep_http_1_1_headers headers host user_pass blen in
let req = Httpaf.Request.create ~headers meth path in
Expand All @@ -160,6 +177,7 @@ let single_http_1_1_request ?config fd user_pass host meth path headers body f f
(Httpaf.Headers.to_list response.Httpaf.Response.headers)
}
in
update_keep_alive keep_alive response;
let open Lwt.Infix in
let rec on_read on_eof acc bs ~off ~len =
(* XXX(dinosaure): we must do the copy before the [>>=].
Expand Down Expand Up @@ -193,14 +211,14 @@ let single_http_1_1_request ?config fd user_pass host meth path headers body f f
| Some config -> Some config.Httpaf.Config.read_buffer_size
| None -> None
in
Http_lwt_unix.Client_HTTP_1_1.request ?read_buffer_size fd connection ;
Http_lwt_unix.Client_HTTP_1_1.request ?read_buffer_size ?keep_alive fd connection ;
(match body with
| Some body -> Httpaf.Body.write_string request_body body
| None -> ());
Httpaf.Body.close_writer request_body;
finished

let single_h2_request ?config fd scheme user_pass host meth path headers body f f_init =
let single_h2_request ?config ?keep_alive fd scheme user_pass host meth path headers body (f : response -> 'a -> string -> 'a Lwt.t) f_init =
let blen = Option.map String.length body in
let headers = prep_h2_headers headers host user_pass blen in
let req = H2.Request.create ~scheme ~headers meth path in
Expand All @@ -222,6 +240,7 @@ let single_h2_request ?config fd scheme user_pass host meth path headers body f
reason = "" ;
headers = response.H2.Response.headers ;
} in
update_keep_alive keep_alive response;
let open Lwt.Infix in
let rec on_read on_eof acc bs ~off ~len =
(* XXX(dinosaure): we must do the copy before the [>>=].
Expand Down Expand Up @@ -262,7 +281,7 @@ let single_h2_request ?config fd scheme user_pass host meth path headers body f
| Some config -> Some config.H2.Config.read_buffer_size
| None -> None
in
Http_lwt_unix.Client_H2.request ?read_buffer_size fd connection ;
Http_lwt_unix.Client_H2.request ?read_buffer_size ?keep_alive fd connection;
(match body with
| Some body -> H2.Body.Writer.write_string request_body body
| None -> ());
Expand All @@ -285,37 +304,61 @@ let alpn_protocol = function
None
| Error () -> None

let single_request resolver ?config tls_config ~meth ~headers ?body uri f f_init =
type keep_alive = Http_lwt_unix.keep_alive

let new_keep_alive = Http_lwt_unix.new_keep_alive

let add_keep_alive headers = ("Connection", "Keep-Alive")::headers

let single_request resolver ?keep_alive ?config tls_config ~meth ~headers ?body uri f f_init =
Lwt_result.lift (decode_uri uri) >>= fun (tls, scheme, user_pass, host, port, path) ->
(if tls then
Lwt_result.lift (Lazy.force tls_config) >|= function
| `Custom c -> Some c
| `Default config ->
match Result.bind (Domain_name.of_string host) Domain_name.host with
| Ok peer -> Some (Tls.Config.peer config peer)
| Error _ -> Some config
else
Lwt_result.return None) >>= fun tls_config ->
connect resolver ?port ?tls_config host >>= fun fd ->
begin match alpn_protocol fd, config with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
Log.debug (fun m -> m "Start an http/1.1 connection as expected.");
single_http_1_1_request ~config fd user_pass host meth path headers body f f_init
| (Some `HTTP_1_1 | None), None ->
Log.debug (fun m -> m "Start an http/1.1 connection by default.");
single_http_1_1_request fd user_pass host meth path headers body f f_init
| (Some `H2 | None), Some (`H2 config) ->
Log.debug (fun m -> m "Start an h2 connection as expected.");
single_h2_request ~config fd scheme user_pass host meth path headers body f f_init
| Some `H2, None ->
Log.debug (fun m -> m "Start an h2 connection as requested by the server.");
single_h2_request fd scheme user_pass host meth path headers body f f_init
| Some `H2, Some (`HTTP_1_1 _config) ->
Log.warn (fun m -> m "Initiate an h2 connection despite a requested http/1.1 connection.");
single_h2_request fd scheme user_pass host meth path headers body f f_init
| Some `HTTP_1_1, Some (`H2 _config) ->
Log.warn (fun m -> m "Initiate an http/1.1 connection despite a requested h2 connection.");
single_http_1_1_request fd user_pass host meth path headers body f f_init
let connection =
(if tls then
Lwt_result.lift (Lazy.force tls_config) >|= function
| `Custom c -> Some c
| `Default config ->
match Result.bind (Domain_name.of_string host) Domain_name.host with
| Ok peer -> Some (Tls.Config.peer config peer)
| Error _ -> Some config
else
Lwt_result.return None) >>= fun tls_config ->
let open Http_lwt_unix in
match keep_alive with
| None -> connect resolver ?port ?tls_config host
>>= (fun fd -> Lwt.return (Ok (fd, headers)))
| Some ({ fd = None; _ } as r) ->
let open Lwt.Infix in
Http_lwt_unix.keep_alive_lock keep_alive >>=
(fun () ->
let open Lwt_result.Infix in
connect resolver ?port ?tls_config host >>=
(fun fd ->
r.fd <- Some fd;
Lwt.return (Ok (fd, add_keep_alive headers))))
| Some { fd = Some fd; _ } ->
Lwt.return (Ok (fd, add_keep_alive headers))
in
connection >>= fun (fd, headers) ->
begin
match alpn_protocol fd, config with
| (Some `HTTP_1_1 | None), Some (`HTTP_1_1 config) ->
Log.debug (fun m -> m "Start an http/1.1 connection as expected.");
single_http_1_1_request ~config ?keep_alive fd user_pass host meth path headers body f f_init
| (Some `HTTP_1_1 | None), None ->
Log.debug (fun m -> m "Start an http/1.1 connection by default.");
single_http_1_1_request ?keep_alive fd user_pass host meth path headers body f f_init
| (Some `H2 | None), Some (`H2 config) ->
Log.debug (fun m -> m "Start an h2 connection as expected.");
single_h2_request ~config ?keep_alive fd scheme user_pass host meth path headers body f f_init
| Some `H2, None ->
Log.debug (fun m -> m "Start an h2 connection as requested by the server.");
single_h2_request ?keep_alive fd scheme user_pass host meth path headers body f f_init
| Some `H2, Some (`HTTP_1_1 _config) ->
Log.warn (fun m -> m "Initiate an h2 connection despite a requested http/1.1 connection.");
single_h2_request ?keep_alive fd scheme user_pass host meth path headers body f f_init
| Some `HTTP_1_1, Some (`H2 _config) ->
Log.warn (fun m -> m "Initiate an http/1.1 connection despite a requested h2 connection.");
single_http_1_1_request ?keep_alive fd user_pass host meth path headers body f f_init
end >>= fun (resp, body) ->
Lwt.map (fun body -> Ok (resp, body)) body

Expand Down Expand Up @@ -345,6 +388,7 @@ let request
?(max_redirect = 5)
?(follow_redirect = true)
?(happy_eyeballs = Happy_eyeballs_lwt.create ())
?keep_alive
uri
f f_init
=
Expand All @@ -367,22 +411,31 @@ let request
auth)
in
if not follow_redirect then
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri f f_init
single_request happy_eyeballs ?keep_alive ?config tls_config ~meth ~headers ?body
uri f f_init
else
let rec follow_redirect count uri =
if count = 0 then
Lwt.return (Error (`Msg "redirect limit exceeded"))
else
single_request happy_eyeballs ?config tls_config ~meth ~headers ?body uri f f_init
single_request happy_eyeballs ?keep_alive ?config tls_config ~meth ~headers ?body uri f f_init
>>= fun (resp, body) ->
if Status.is_redirection resp.status then
(match Headers.get resp.headers "location" with
| Some location ->
Lwt_result.lift (resolve_location ~uri ~location) >>= fun uri ->
Log.debug (fun m -> m "following redirect to %s" uri);
follow_redirect (pred count) uri
let open Lwt.Infix in
Http_lwt_unix.reset_keep_alive ~close:true keep_alive >>=
(fun () ->
let open Lwt_result.Infix in
Lwt_result.lift (resolve_location ~uri ~location) >>= fun uri ->
Log.debug (fun m -> m "following redirect to %s" uri);
follow_redirect (pred count) uri)
| None -> Lwt_result.return (resp, body))
else
Lwt_result.return (resp, body)
in
follow_redirect max_redirect uri

let reset_keep_alive = Http_lwt_unix.reset_keep_alive
let active_keep_alive = function Some { Http_lwt_unix.fd = Some _; _ } -> true
| _ -> false
9 changes: 8 additions & 1 deletion src/http_lwt_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type response =
(** [pp_response ppf response] pretty-prints the [response] on [ppf]. *)
val pp_response : Format.formatter -> response -> unit

type keep_alive

val new_keep_alive : unit -> keep_alive
val active_keep_alive : keep_alive option -> bool
val reset_keep_alive : ?close:bool -> keep_alive option -> unit Lwt.t

(** [request ~config ~authenticator ~meth ~headers ~body ~max_redirect
~follow_redirect ~happy_eyeballs uri f init] does a single request of [uri]
and returns the response. Each time part of the body is received,
Expand Down Expand Up @@ -67,7 +73,8 @@ val request
-> ?max_redirect:int
-> ?follow_redirect:bool
-> ?happy_eyeballs:Happy_eyeballs_lwt.t
-> ?keep_alive:keep_alive
-> string
-> (response -> 'a -> string -> 'a Lwt.t)
-> 'a
-> (response * 'a, [> `Msg of string ]) Lwt_result.t
-> (response * 'a, [ `Msg of string ]) Lwt_result.t
Loading

0 comments on commit 3945c8b

Please sign in to comment.