Skip to content

Commit

Permalink
Start moving Xapi periodic scheduler to an independent library (xapi-…
Browse files Browse the repository at this point in the history
…project#6139)

Based on a change from @psafont improve the periodic scheduler in order
to be reused.
  • Loading branch information
robhoes authored Dec 3, 2024
2 parents b12a6b0 + 3202058 commit 685ba39
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 94 deletions.
18 changes: 13 additions & 5 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
(library
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(modules :standard \ threadext_test)
(modules :standard \ ipq scheduler threadext_test ipq_test)
(libraries
threads.posix
unix
xapi-stdext-unix
xapi-stdext-pervasives)
)
(test
(name threadext_test)

(library
(public_name xapi-stdext-threads.scheduler)
(name xapi_stdext_threads_scheduler)
(modules ipq scheduler)
(libraries mtime mtime.clock threads.posix unix xapi-log xapi-stdext-threads)
)

(tests
(names threadext_test ipq_test)
(package xapi-stdext-threads)
(modules threadext_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix)
(modules threadext_test ipq_test)
(libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler)
)
61 changes: 33 additions & 28 deletions ocaml/xapi/ipq.ml → ...api-stdext/lib/xapi-stdext-threads/ipq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@

type 'a event = {ev: 'a; time: Mtime.t}

type 'a t = {mutable size: int; mutable data: 'a event array}
type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array}

exception EmptyHeap

let create n =
let create n default =
if n <= 0 then
invalid_arg "create"
else
{size= -n; data= [||]}
let default = {ev= default; time= Mtime_clock.now ()} in
{default; size= 0; data= Array.make n default}

let is_empty h = h.size <= 0

Expand All @@ -32,16 +33,11 @@ let resize h =
assert (n > 0) ;
let n' = 2 * n in
let d = h.data in
let d' = Array.make n' d.(0) in
let d' = Array.make n' h.default in
Array.blit d 0 d' 0 n ;
h.data <- d'

let add h x =
(* first addition: we allocate the array *)
if h.size < 0 then (
h.data <- Array.make (-h.size) x ;
h.size <- 0
) ;
let n = h.size in
(* resizing if needed *)
if n = Array.length h.data then resize h ;
Expand All @@ -64,10 +60,21 @@ let maximum h =

let remove h s =
if h.size <= 0 then raise EmptyHeap ;
if s < 0 || s >= h.size then
invalid_arg (Printf.sprintf "%s: index %d out of bounds" __FUNCTION__ s) ;
let n = h.size - 1 in
h.size <- n ;
let d = h.data in
let x = d.(n) in
d.(n) <- h.default ;
(* moving [x] up in the heap *)
let rec moveup i =
let fi = (i - 1) / 2 in
if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then (
d.(i) <- d.(fi) ;
moveup fi
) else
d.(i) <- x
in
(* moving [x] down in the heap *)
let rec movedown i =
let j = (2 * i) + 1 in
Expand All @@ -84,7 +91,13 @@ let remove h s =
else
d.(i) <- x
in
movedown s
if s = n then
()
else if Mtime.is_later d.(s).time ~than:x.time then
moveup s
else
movedown s ;
h.size <- n

let find h ev =
let rec iter n =
Expand Down Expand Up @@ -112,32 +125,24 @@ let pop_maximum h =
let m = maximum h in
remove h 0 ; m

let check h =
let d = h.data in
for i = 1 to h.size - 1 do
let fi = (i - 1) / 2 in
let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in
assert ordered
done

let iter f h =
let d = h.data in
for i = 0 to h.size - 1 do
f d.(i)
done

(*
let fold f h x0 =
let n = h.size in
let d = h.data in
let rec foldrec x i = if i >= n then x else foldrec (f d.(i) x) (succ i) in
foldrec x0 0

(*
let _ =
let test : int t = create 100 in
for i=0 to 99 do
let e = {time=Random.float 10.0; ev=i} in
add test e
done;
for i=0 to 49 do
let xx=find test i in
remove test xx
done;
(* remove test xx;*)
for i=0 to 49 do
let e=pop_maximum test in
Printf.printf "time: %f, site: %d\n" e.time e.ev
done
*)
58 changes: 58 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
(*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

type 'a event = {ev: 'a; time: Mtime.t}

type 'a t

exception EmptyHeap

val create : int -> 'a -> 'a t
(** [create n default] creates an empty Imperative priority queue.
The queue initially is initialized to store [n] elements.
The queue will expand beyond [n] automatically if needed.
[default] value will the used to fill unused data. *)

val is_empty : 'a t -> bool
(** Check if the queue is empty *)

val add : 'a t -> 'a event -> unit
(** Add an event to the queue *)

val remove : 'a t -> int -> unit
(** Remove an event from the queue passing the index.
@raise EmptyHeap if the queue is empty.
@raise Invalid_argument if the index is invalid. *)

val find_p : 'a t -> ('a -> bool) -> int
(** Find the index of an event which matches a given condition
or -1 if not found *)

val find : 'a t -> 'a -> int
(** Find the index of an event which matches a given event
or -1 if not found *)

val maximum : 'a t -> 'a event
(** Return a copy of the event with the next time.
@raise EmptyHeap if the queue is empty. *)

val pop_maximum : 'a t -> 'a event
(** Return and remove the event with the next time.
@raise EmptyHeap if the queue is empty. *)

val iter : ('a event -> unit) -> 'a t -> unit
(** Iterate given function on the list of events in the queue *)

val check : 'a t -> unit
(** Check internal queue state, used for debugging *)
143 changes: 143 additions & 0 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
(*
* Copyright (C) 2024 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

module Ipq = Xapi_stdext_threads_scheduler.Ipq

(* test we get "out of bound" exception calling Ipq.remove *)
let test_out_of_index () =
let q = Ipq.create 10 0 in
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
let is_oob = function
| Invalid_argument s when String.ends_with ~suffix:" out of bounds" s ->
true
| _ ->
false
in
let oob_check n =
(Alcotest.match_raises "out of bound" is_oob @@ fun () -> Ipq.remove q n) ;
Alcotest.(check bool) "same value" false (Ipq.is_empty q)
in
oob_check 10 ;
oob_check (-1) ;
oob_check 9 ;
oob_check 1 ;
(* this should succeed *)
Ipq.remove q 0

(* check queue does not retain some data after being removed *)
let test_leak () =
let default () = () in
let q = Ipq.create 10 default in
let array = Array.make 1024 'x' in
let use_array () = array.(0) <- 'a' in
let allocated = Atomic.make true in
Gc.finalise (fun _ -> Atomic.set allocated false) array ;
Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ;
Ipq.remove q 0 ;
Gc.full_major () ;
Gc.full_major () ;
Alcotest.(check bool) "allocated" false (Atomic.get allocated) ;
Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()}

(* test Ipq.is_empty call *)
let test_empty () =
let q = Ipq.create 10 0 in
Alcotest.(check bool) "same value" true (Ipq.is_empty q) ;
Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ;
Alcotest.(check bool) "same value" false (Ipq.is_empty q) ;
Ipq.remove q 0 ;
Alcotest.(check bool) "same value" true (Ipq.is_empty q)

module Int64Set = Set.Make (Int64)

let check = Ipq.check

(* get size of the queue *)
let size queue =
let l = ref 0 in
Ipq.iter (fun _ -> l := !l + 1) queue ;
!l

(* get a set of times from the queue *)
let set queue =
let s = ref Int64Set.empty in
Ipq.iter
(fun d ->
let t = d.time in
let t = Mtime.to_uint64_ns t in
s := Int64Set.add t !s
)
queue ;
!s

let test_old () =
let test : int Ipq.t = Ipq.create 100 0 in
let s = ref Int64Set.empty in
let add i =
let ti = Random.int64 1000000L in
let t = Mtime.of_uint64_ns ti in
let e = {Ipq.time= t; Ipq.ev= i} in
Ipq.add test e ;
s := Int64Set.add ti !s
in
for i = 0 to 49 do
add i
done ;
let first_half = set test in
for i = 50 to 99 do
add i
done ;
check test ;
(* we should have all elements *)
Alcotest.(check int) "100 elements" 100 (size test) ;

let all = set test in
Alcotest.(check int) "same list" 0 (Int64Set.compare !s all) ;

(* remove half of the elements *)
for i = 0 to 49 do
let xx = Ipq.find test i in
Printf.printf "Removing element %d position %d\n%!" i xx ;
Ipq.remove test xx ;
check test
done ;
Alcotest.(check int) "50 elements" 50 (size test) ;

(* make sure we have the right elements in the list *)
let s = set test in
let second_half = Int64Set.diff all first_half in
Alcotest.(check int) "same list" 0 (Int64Set.compare s second_half) ;

(* remove test *)
let prev = ref 0L in
for _ = 0 to 49 do
let e = Ipq.pop_maximum test in
let t = Mtime.to_uint64_ns e.time in
Alcotest.(check bool)
(Printf.sprintf "%Ld bigger than %Ld" t !prev)
true (t >= !prev) ;
Printf.printf "time: %Ld, site: %d\n" t e.ev ;
prev := t ;
check test
done

let tests =
[
("test_out_of_index", `Quick, test_out_of_index)
; ("test_leak", `Quick, test_leak)
; ("test_empty", `Quick, test_empty)
; ("test_old", `Quick, test_old)
]

let () = Alcotest.run "Ipq" [("generic", tests)]
Empty file.
Loading

0 comments on commit 685ba39

Please sign in to comment.