Skip to content

Commit

Permalink
CP-52039: Drop Semaphore from Xapi_stdext_threads
Browse files Browse the repository at this point in the history
Prior to version 4.12, OCaml's standard threads library (systhreads) had
no builtin concept of a semaphore, so one was implemented in
Xapi_stdext_threads.

We replace all usages of this with Semaphore.Counting from the standard
library and remove the implementation from Xapi_stdext_threads.

Technically, the interface provided by the previous semaphore is more
general: it permits arbitrary adjustments to the semaphore's counter,
allowing for a "weighted" style of locking. However, this is only used
in one place (with a weight value of 1, which is the same
decrement/increment value as normal).

Signed-off-by: Colin James <[email protected]>
  • Loading branch information
contificate committed Oct 30, 2024
1 parent 94dc48c commit 176c9e3
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 111 deletions.
2 changes: 1 addition & 1 deletion ocaml/libs/http-lib/http_svr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ let start ?header_read_timeout ?header_total_timeout ?max_header_length
; body=
handle_connection ~header_read_timeout ~header_total_timeout
~max_header_length x
; lock= Xapi_stdext_threads.Semaphore.create conn_limit
; lock= Semaphore.Counting.make conn_limit
}
in
let server = Server_io.server handler socket in
Expand Down
6 changes: 3 additions & 3 deletions ocaml/libs/http-lib/server_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type handler = {
name: string
; (* body should close the provided fd *)
body: Unix.sockaddr -> Unix.file_descr -> unit
; lock: Xapi_stdext_threads.Semaphore.t
; lock: Semaphore.Counting.t
}

let handler_by_thread (h : handler) (s : Unix.file_descr)
(caller : Unix.sockaddr) =
Thread.create
(fun () ->
Fun.protect
~finally:(fun () -> Xapi_stdext_threads.Semaphore.release h.lock 1)
~finally:(fun () -> Semaphore.Counting.release h.lock)
(Debug.with_thread_named h.name (fun () -> h.body caller s))
)
()
Expand All @@ -49,7 +49,7 @@ let establish_server ?(signal_fds = []) forker handler sock =
@@ Polly.wait epoll 2 (-1) (fun _ fd _ ->
(* If any of the signal_fd is active then bail out *)
if List.mem fd signal_fds then raise PleaseClose ;
Xapi_stdext_threads.Semaphore.acquire handler.lock 1 ;
Semaphore.Counting.acquire handler.lock ;
let s, caller = Unix.accept ~cloexec:true sock in
try ignore (forker handler s caller)
with exc ->
Expand Down
2 changes: 1 addition & 1 deletion ocaml/libs/http-lib/server_io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type handler = {
name: string (** used for naming the thread *)
; body: Unix.sockaddr -> Unix.file_descr -> unit
(** function called in a thread for each connection*)
; lock: Xapi_stdext_threads.Semaphore.t
; lock: Semaphore.Counting.t
}

type server = {
Expand Down
57 changes: 0 additions & 57 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.ml

This file was deleted.

40 changes: 0 additions & 40 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/semaphore.mli

This file was deleted.

12 changes: 10 additions & 2 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@

module M = Mutex

let finally = Xapi_stdext_pervasives.Pervasiveext.finally

module Mutex = struct
(** execute the function f with the mutex hold *)
let execute lock f =
Mutex.lock lock ;
Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> Mutex.unlock lock)
finally f (fun () -> Mutex.unlock lock)
end

module Semaphore = struct
let execute s f =
let module Semaphore = Semaphore.Counting in
Semaphore.acquire s ;
finally f (fun () -> Semaphore.release s)
end

(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
Expand Down Expand Up @@ -60,7 +69,6 @@ module Delay = struct
exception Pre_signalled

let wait (x : t) (seconds : float) =
let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
let to_close = ref [] in
let close' fd =
if List.mem fd !to_close then Unix.close fd ;
Expand Down
4 changes: 4 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/threadext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ module Mutex : sig
val execute : Mutex.t -> (unit -> 'a) -> 'a
end

module Semaphore : sig
val execute : Semaphore.Counting.t -> (unit -> 'a) -> 'a
end

val thread_iter_all_exns : ('a -> unit) -> 'a list -> ('a * exn) list

val thread_iter : ('a -> unit) -> 'a list -> unit
Expand Down
7 changes: 4 additions & 3 deletions ocaml/networkd/lib/network_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,13 @@ module Ovs = struct

val appctl : ?log:bool -> string list -> string
end = struct
module Semaphore = Xapi_stdext_threads.Semaphore
module Semaphore = Semaphore.Counting

let s = Semaphore.create 5
let s = Semaphore.make 5

let vsctl ?log args =
Semaphore.execute s (fun () ->
let execute = Xapi_stdext_threads.Threadext.Semaphore.execute in
execute s (fun () ->
call_script ~on_error:error_handler ?log ovs_vsctl
("--timeout=20" :: args)
)
Expand Down
8 changes: 5 additions & 3 deletions ocaml/xapi-aux/throttle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ module type SIZE = sig
end

module Make (Size : SIZE) = struct
module Semaphore = Xapi_stdext_threads.Semaphore
module Semaphore = Semaphore.Counting

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

let execute = Xapi_stdext_threads.Threadext.Semaphore.execute

let semaphore = ref None

let m = Mutex.create ()
Expand All @@ -29,11 +31,11 @@ module Make (Size : SIZE) = struct
with_lock m @@ fun () ->
match !semaphore with
| None ->
let result = Semaphore.create (Size.n ()) in
let result = Semaphore.make (Size.n ()) in
semaphore := Some result ;
result
| Some s ->
s

let execute f = Semaphore.execute (get_semaphore ()) f
let execute f = execute (get_semaphore ()) f
end
1 change: 0 additions & 1 deletion ocaml/xapi/xapi_sr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ module Rrdd = Rrd_client.Client
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

module Listext = Xapi_stdext_std.Listext
module Semaphore = Xapi_stdext_threads.Semaphore
module Unixext = Xapi_stdext_unix.Unixext

let finally = Xapi_stdext_pervasives.Pervasiveext.finally
Expand Down

0 comments on commit 176c9e3

Please sign in to comment.