diff --git a/ocaml/libs/http-lib/http_svr.ml b/ocaml/libs/http-lib/http_svr.ml index 3c8ec7facbb..54a8b96ba73 100644 --- a/ocaml/libs/http-lib/http_svr.ml +++ b/ocaml/libs/http-lib/http_svr.ml @@ -648,7 +648,7 @@ let start ?header_read_timeout ?header_total_timeout ?max_header_length ; body= handle_connection ~header_read_timeout ~header_total_timeout ~max_header_length x - ; lock= Xapi_stdext_threads.Semaphore.create conn_limit + ; lock= Semaphore.Counting.make conn_limit } in let server = Server_io.server handler socket in diff --git a/ocaml/libs/http-lib/server_io.ml b/ocaml/libs/http-lib/server_io.ml index 09abf253ee1..c821a27c024 100644 --- a/ocaml/libs/http-lib/server_io.ml +++ b/ocaml/libs/http-lib/server_io.ml @@ -23,7 +23,7 @@ type handler = { name: string ; (* body should close the provided fd *) body: Unix.sockaddr -> Unix.file_descr -> unit - ; lock: Xapi_stdext_threads.Semaphore.t + ; lock: Semaphore.Counting.t } let handler_by_thread (h : handler) (s : Unix.file_descr) @@ -31,7 +31,7 @@ let handler_by_thread (h : handler) (s : Unix.file_descr) Thread.create (fun () -> Fun.protect - ~finally:(fun () -> Xapi_stdext_threads.Semaphore.release h.lock 1) + ~finally:(fun () -> Semaphore.Counting.release h.lock) (Debug.with_thread_named h.name (fun () -> h.body caller s)) ) () @@ -49,7 +49,7 @@ let establish_server ?(signal_fds = []) forker handler sock = @@ Polly.wait epoll 2 (-1) (fun _ fd _ -> (* If any of the signal_fd is active then bail out *) if List.mem fd signal_fds then raise PleaseClose ; - Xapi_stdext_threads.Semaphore.acquire handler.lock 1 ; + Semaphore.Counting.acquire handler.lock ; let s, caller = Unix.accept ~cloexec:true sock in try ignore (forker handler s caller) with exc -> diff --git a/ocaml/libs/http-lib/server_io.mli b/ocaml/libs/http-lib/server_io.mli index 3aca0234743..3c52f53a804 100644 --- a/ocaml/libs/http-lib/server_io.mli +++ b/ocaml/libs/http-lib/server_io.mli @@ -16,7 +16,7 @@ type handler = { name: string (** used for naming the thread *) ; body: Unix.sockaddr -> Unix.file_descr -> unit (** function called in a thread for each connection*) - ; lock: Xapi_stdext_threads.Semaphore.t + ; lock: Semaphore.Counting.t } type server = { diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.ml deleted file mode 100644 index 06621049c91..00000000000 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.ml +++ /dev/null @@ -1,57 +0,0 @@ -(* - * Copyright (C) Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -type t = {mutable n: int; m: Mutex.t; c: Condition.t} - -let create n = - if n <= 0 then - invalid_arg (Printf.sprintf "Semaphore value must be positive, got %d" n) ; - let m = Mutex.create () and c = Condition.create () in - {n; m; c} - -exception Inconsistent_state of string - -let inconsistent_state fmt = - Printf.ksprintf (fun msg -> raise (Inconsistent_state msg)) fmt - -let acquire s k = - if k <= 0 then - invalid_arg - (Printf.sprintf "Semaphore acquisition requires a positive value, got %d" - k - ) ; - Mutex.lock s.m ; - while s.n < k do - Condition.wait s.c s.m - done ; - if not (s.n >= k) then - inconsistent_state "Semaphore value cannot be smaller than %d, got %d" k s.n ; - s.n <- s.n - k ; - Condition.signal s.c ; - Mutex.unlock s.m - -let release s k = - if k <= 0 then - invalid_arg - (Printf.sprintf "Semaphore release requires a positive value, got %d" k) ; - Mutex.lock s.m ; - s.n <- s.n + k ; - Condition.signal s.c ; - Mutex.unlock s.m - -let execute_with_weight s k f = - acquire s k ; - Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> release s k) - -let execute s f = execute_with_weight s 1 f diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.mli deleted file mode 100644 index 207e612032d..00000000000 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.mli +++ /dev/null @@ -1,40 +0,0 @@ -(* - * Copyright (C) Citrix Systems Inc. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation; version 2.1 only. with the special - * exception on linking described in file LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - *) - -type t - -exception Inconsistent_state of string - -val create : int -> t -(** [create n] create a semaphore with initial value [n] (a positive integer). - Raise {!Invalid_argument} if [n] <= 0 *) - -val acquire : t -> int -> unit -(** [acquire k s] block until the semaphore value is >= [k] (a positive integer), - then atomically decrement the semaphore value by [k]. - Raise {!Invalid_argument} if [k] <= 0 *) - -val release : t -> int -> unit -(** [release k s] atomically increment the semaphore value by [k] (a positive - integer). - Raise {!Invalid_argument} if [k] <= 0 *) - -val execute_with_weight : t -> int -> (unit -> 'a) -> 'a -(** [execute_with_weight s k f] {!acquire} the semaphore with [k], - then run [f ()], and finally {!release} the semaphore with the same value [k] - (even in case of failure in the execution of [f]). - Return the value of [f ()] or re-raise the exception if any. *) - -val execute : t -> (unit -> 'a) -> 'a -(** [execute s f] same as [{execute_with_weight} s 1 f] *) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml index 1ca5e916ef4..311d985ca69 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml @@ -14,11 +14,20 @@ module M = Mutex +let finally = Xapi_stdext_pervasives.Pervasiveext.finally + module Mutex = struct (** execute the function f with the mutex hold *) let execute lock f = Mutex.lock lock ; - Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> Mutex.unlock lock) + finally f (fun () -> Mutex.unlock lock) +end + +module Semaphore = struct + let execute s f = + let module Semaphore = Semaphore.Counting in + Semaphore.acquire s ; + finally f (fun () -> Semaphore.release s) end (** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception. @@ -60,7 +69,6 @@ module Delay = struct exception Pre_signalled let wait (x : t) (seconds : float) = - let finally = Xapi_stdext_pervasives.Pervasiveext.finally in let to_close = ref [] in let close' fd = if List.mem fd !to_close then Unix.close fd ; diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli index 057aedfa700..b5edcff21b8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli @@ -15,6 +15,10 @@ module Mutex : sig val execute : Mutex.t -> (unit -> 'a) -> 'a end +module Semaphore : sig + val execute : Semaphore.Counting.t -> (unit -> 'a) -> 'a +end + val thread_iter_all_exns : ('a -> unit) -> 'a list -> ('a * exn) list val thread_iter : ('a -> unit) -> 'a list -> unit diff --git a/ocaml/networkd/lib/network_utils.ml b/ocaml/networkd/lib/network_utils.ml index 39417cf1177..4a473b29579 100644 --- a/ocaml/networkd/lib/network_utils.ml +++ b/ocaml/networkd/lib/network_utils.ml @@ -1197,12 +1197,13 @@ module Ovs = struct val appctl : ?log:bool -> string list -> string end = struct - module Semaphore = Xapi_stdext_threads.Semaphore + module Semaphore = Semaphore.Counting - let s = Semaphore.create 5 + let s = Semaphore.make 5 let vsctl ?log args = - Semaphore.execute s (fun () -> + let execute = Xapi_stdext_threads.Threadext.Semaphore.execute in + execute s (fun () -> call_script ~on_error:error_handler ?log ovs_vsctl ("--timeout=20" :: args) ) diff --git a/ocaml/xapi-aux/throttle.ml b/ocaml/xapi-aux/throttle.ml index 7be2ac9bd48..a9dacf7f164 100644 --- a/ocaml/xapi-aux/throttle.ml +++ b/ocaml/xapi-aux/throttle.ml @@ -17,10 +17,12 @@ module type SIZE = sig end module Make (Size : SIZE) = struct - module Semaphore = Xapi_stdext_threads.Semaphore + module Semaphore = Semaphore.Counting let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute + let execute = Xapi_stdext_threads.Threadext.Semaphore.execute + let semaphore = ref None let m = Mutex.create () @@ -29,11 +31,11 @@ module Make (Size : SIZE) = struct with_lock m @@ fun () -> match !semaphore with | None -> - let result = Semaphore.create (Size.n ()) in + let result = Semaphore.make (Size.n ()) in semaphore := Some result ; result | Some s -> s - let execute f = Semaphore.execute (get_semaphore ()) f + let execute f = execute (get_semaphore ()) f end diff --git a/ocaml/xapi/xapi_sr.ml b/ocaml/xapi/xapi_sr.ml index d572660e72d..7a83493b2de 100644 --- a/ocaml/xapi/xapi_sr.ml +++ b/ocaml/xapi/xapi_sr.ml @@ -20,7 +20,6 @@ module Rrdd = Rrd_client.Client let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute module Listext = Xapi_stdext_std.Listext -module Semaphore = Xapi_stdext_threads.Semaphore module Unixext = Xapi_stdext_unix.Unixext let finally = Xapi_stdext_pervasives.Pervasiveext.finally