Skip to content

Commit

Permalink
xapi: move the 'periodic' scheduler to xapi-stdext-threads
Browse files Browse the repository at this point in the history
This allows the scheduler based on threads to be used by components that aren't
xapi, like forkexecd.

Signed-off-by: Pau Ruiz Safont <[email protected]>
  • Loading branch information
psafont authored and freddy77 committed Dec 3, 2024
1 parent b782202 commit 9e3ad1c
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 60 deletions.
10 changes: 9 additions & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
(library
(public_name xapi-stdext-threads)
(name xapi_stdext_threads)
(modules :standard \ threadext_test)
(modules :standard \ ipq scheduler threadext_test)
(libraries
threads.posix
unix
xapi-stdext-unix
xapi-stdext-pervasives)
)

(library
(public_name xapi-stdext-threads.scheduler)
(name xapi_stdext_threads_scheduler)
(modules ipq scheduler)
(libraries mtime mtime.clock threads.posix unix xapi-log xapi-stdext-threads)
)

(test
(name threadext_test)
(package xapi-stdext-threads)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* GNU Lesser General Public License for more details.
*)

module D = Debug.Make (struct let name = "backgroundscheduler" end)
module D = Debug.Make (struct let name = __MODULE__ end)

open D
module Delay = Xapi_stdext_threads.Threadext.Delay
Expand All @@ -30,16 +30,20 @@ let (queue : t Ipq.t) = Ipq.create 50
let lock = Mutex.create ()

module Clock = struct
(** time span of s seconds *)
let span s = Mtime.Span.of_uint64_ns (Int64.of_float (s *. 1e9))

let span_to_s span =
Mtime.Span.to_uint64_ns span |> Int64.to_float |> fun ns -> ns /. 1e9

let add_span clock secs =
(* return mix or max available value if the add overflows *)
match Mtime.add_span clock (span secs) with
| Some t ->
t
| None when secs > 0. ->
Mtime.max_stamp
| None ->
raise
Api_errors.(Server_error (internal_error, ["clock overflow"; __LOC__]))
Mtime.min_stamp
end

let add_to_queue ?(signal = true) name ty start newfunc =
Expand All @@ -59,7 +63,7 @@ let remove_from_queue name =
Ipq.remove queue index

let loop () =
debug "Periodic scheduler started" ;
debug "%s started" __MODULE__ ;
try
while true do
let empty = with_lock lock (fun () -> Ipq.is_empty queue) in
Expand All @@ -82,8 +86,8 @@ let loop () =
) else (* Sleep until next event. *)
let sleep =
Mtime.(span next.Ipq.time now)
|> Mtime.Span.add (Clock.span 0.001)
|> Scheduler.span_to_s
|> Mtime.Span.(add ms)
|> Clock.span_to_s
in
try ignore (Delay.wait delay sleep)
with e ->
Expand All @@ -102,5 +106,5 @@ let loop () =
done
with _ ->
error
"Periodic scheduler died! Xapi will no longer function well and should \
be restarted."
"Scheduler thread died! This daemon will no longer function well and \
should be restarted."
File renamed without changes.
1 change: 1 addition & 0 deletions ocaml/tests/common/dune
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
xapi-test-utils
xapi-types
xapi-stdext-date
xapi-stdext-threads.scheduler
xapi-stdext-unix
)
)
Expand Down
8 changes: 4 additions & 4 deletions ocaml/tests/common/test_event_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ let ps_start = ref false

let scheduler_mutex = Mutex.create ()

module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let start_periodic_scheduler () =
Mutex.lock scheduler_mutex ;
if !ps_start then
()
else (
Xapi_periodic_scheduler.add_to_queue "dummy"
(Xapi_periodic_scheduler.Periodic 60.0) 0.0 (fun () -> ()
) ;
Scheduler.add_to_queue "dummy" (Scheduler.Periodic 60.0) 0.0 (fun () -> ()) ;
Xapi_event.register_hooks () ;
ignore (Thread.create Xapi_periodic_scheduler.loop ()) ;
ignore (Thread.create Scheduler.loop ()) ;
ps_start := true
) ;
Mutex.unlock scheduler_mutex
Expand Down
2 changes: 2 additions & 0 deletions ocaml/xapi/dune
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
xapi-stdext-pervasives
xapi-stdext-std
xapi-stdext-threads
xapi-stdext-threads.scheduler
xapi-stdext-unix
xapi-stdext-zerocheck
xapi-tracing
Expand Down Expand Up @@ -256,6 +257,7 @@
xapi-stdext-pervasives
xapi-stdext-std
xapi-stdext-threads
xapi-stdext-threads.scheduler
xapi-stdext-unix
xapi-types
xapi_aux
Expand Down
16 changes: 7 additions & 9 deletions ocaml/xapi/extauth_plugin_ADwinbind.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ end)
open D
open Xapi_stdext_std.Xstringext
open Auth_signature
module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

let finally = Xapi_stdext_pervasives.Pervasiveext.finally

Expand Down Expand Up @@ -1172,16 +1173,14 @@ module ClosestKdc = struct
let trigger_update ~start =
if Pool_role.is_master () then (
debug "Trigger task: %s" periodic_update_task_name ;
Xapi_periodic_scheduler.add_to_queue periodic_update_task_name
(Xapi_periodic_scheduler.Periodic
!Xapi_globs.winbind_update_closest_kdc_interval
)
Scheduler.add_to_queue periodic_update_task_name
(Scheduler.Periodic !Xapi_globs.winbind_update_closest_kdc_interval)
start update
)

let stop_update () =
if Pool_role.is_master () then
Xapi_periodic_scheduler.remove_from_queue periodic_update_task_name
Scheduler.remove_from_queue periodic_update_task_name
end

module RotateMachinePassword = struct
Expand Down Expand Up @@ -1302,11 +1301,10 @@ module RotateMachinePassword = struct

let trigger_rotate ~start =
debug "Trigger task: %s" task_name ;
Xapi_periodic_scheduler.add_to_queue task_name
(Xapi_periodic_scheduler.Periodic !Xapi_globs.winbind_machine_pwd_timeout)
start rotate
Scheduler.add_to_queue task_name
(Scheduler.Periodic !Xapi_globs.winbind_machine_pwd_timeout) start rotate

let stop_rotate () = Xapi_periodic_scheduler.remove_from_queue task_name
let stop_rotate () = Scheduler.remove_from_queue task_name
end

let build_netbios_name ~config_params =
Expand Down
7 changes: 4 additions & 3 deletions ocaml/xapi/helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,9 @@ let make_timeboxed_rpc ~__context timeout rpc : Rpc.response =
in
List.iter Locking_helpers.kill_resource resources
in
Xapi_periodic_scheduler.add_to_queue (Ref.string_of task_id)
Xapi_periodic_scheduler.OneShot timeout cancel ;
let module Scheduler = Xapi_stdext_threads_scheduler.Scheduler in
Scheduler.add_to_queue (Ref.string_of task_id) Scheduler.OneShot timeout
cancel ;
let transport =
if Pool_role.is_master () then
Unix Xapi_globs.unix_domain_socket
Expand All @@ -459,7 +460,7 @@ let make_timeboxed_rpc ~__context timeout rpc : Rpc.response =
let result =
XMLRPC_protocol.rpc ~srcstr:"xapi" ~dststr:"xapi" ~transport ~http rpc
in
Xapi_periodic_scheduler.remove_from_queue (Ref.string_of task_id) ;
Scheduler.remove_from_queue (Ref.string_of task_id) ;
result
)

Expand Down
6 changes: 3 additions & 3 deletions ocaml/xapi/pool_periodic_update_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module D = Debug.Make (struct let name = __MODULE__ end)

open D
open Client
module Scheduler = Xapi_stdext_threads_scheduler.Scheduler

type frequency = Daily | Weekly of int

Expand Down Expand Up @@ -162,12 +163,11 @@ let rec update_sync () =
)

and add_to_queue ~__context () =
let open Xapi_periodic_scheduler in
add_to_queue periodic_update_sync_task_name OneShot
Scheduler.add_to_queue periodic_update_sync_task_name Scheduler.OneShot
(seconds_until_next_schedule ~__context)
update_sync

let set_enabled ~__context ~value =
Xapi_periodic_scheduler.remove_from_queue periodic_update_sync_task_name ;
Scheduler.remove_from_queue periodic_update_sync_task_name ;
if value then
add_to_queue ~__context ()
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ let server_init () =
)
; ( "Starting periodic scheduler"
, [Startup.OnThread]
, Xapi_periodic_scheduler.loop
, Xapi_stdext_threads_scheduler.Scheduler.loop
)
; ( "Synchronising host configuration files"
, []
Expand Down
6 changes: 3 additions & 3 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,12 @@ module From = struct
&& (not (session_is_invalid call))
&& Unix.gettimeofday () < deadline
do
Xapi_periodic_scheduler.add_to_queue timeoutname
Xapi_periodic_scheduler.OneShot
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue timeoutname
Xapi_stdext_threads_scheduler.Scheduler.OneShot
(deadline -. Unix.gettimeofday () +. 0.5)
(fun () -> Condition.broadcast c) ;
Condition.wait c m ;
Xapi_periodic_scheduler.remove_from_queue timeoutname
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue timeoutname
done
) ;
if session_is_invalid call then (
Expand Down
14 changes: 7 additions & 7 deletions ocaml/xapi/xapi_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -938,20 +938,20 @@ let ask_host_if_it_is_a_slave ~__context ~host =
"ask_host_if_it_is_a_slave: host taking a long time to respond - IP: \
%s; uuid: %s"
ip uuid ;
Xapi_periodic_scheduler.add_to_queue task_name
Xapi_periodic_scheduler.OneShot timeout
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue task_name
Xapi_stdext_threads_scheduler.Scheduler.OneShot timeout
(log_host_slow_to_respond (min (2. *. timeout) 300.))
in
Xapi_periodic_scheduler.add_to_queue task_name
Xapi_periodic_scheduler.OneShot timeout
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue task_name
Xapi_stdext_threads_scheduler.Scheduler.OneShot timeout
(log_host_slow_to_respond timeout) ;
let res =
Message_forwarding.do_op_on_localsession_nolivecheck ~local_fn ~__context
~host (fun session_id rpc ->
Client.Client.Pool.is_slave ~rpc ~session_id ~host
)
in
Xapi_periodic_scheduler.remove_from_queue task_name ;
Xapi_stdext_threads_scheduler.Scheduler.remove_from_queue task_name ;
res
in
Server_helpers.exec_with_subtask ~__context "host.ask_host_if_it_is_a_slave"
Expand Down Expand Up @@ -1497,8 +1497,8 @@ let sync_data ~__context ~host = Xapi_sync.sync_host ~__context host
(* Nb, no attempt to wrap exceptions yet *)

let backup_rrds ~__context ~host:_ ~delay =
Xapi_periodic_scheduler.add_to_queue "RRD backup"
Xapi_periodic_scheduler.OneShot delay (fun _ ->
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue "RRD backup"
Xapi_stdext_threads_scheduler.Scheduler.OneShot delay (fun _ ->
let master_address = Pool_role.get_master_address_opt () in
log_and_ignore_exn (Rrdd.backup_rrds master_address) ;
log_and_ignore_exn (fun () ->
Expand Down
47 changes: 27 additions & 20 deletions ocaml/xapi/xapi_periodic_scheduler_init.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,46 +76,53 @@ let register ~__context =
let update_all_subjects_delay = 10.0 in
(* initial delay = 10 seconds *)
if master then
Xapi_periodic_scheduler.add_to_queue "Synchronising RRDs/messages"
(Xapi_periodic_scheduler.Periodic sync_timer) sync_delay sync_func ;
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Synchronising RRDs/messages"
(Xapi_stdext_threads_scheduler.Scheduler.Periodic sync_timer) sync_delay
sync_func ;
if master then
Xapi_periodic_scheduler.add_to_queue "Backing up RRDs"
(Xapi_periodic_scheduler.Periodic rrdbackup_timer) rrdbackup_delay
rrdbackup_func ;
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue "Backing up RRDs"
(Xapi_stdext_threads_scheduler.Scheduler.Periodic rrdbackup_timer)
rrdbackup_delay rrdbackup_func ;
if master then
Xapi_periodic_scheduler.add_to_queue
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Revalidating externally-authenticated sessions"
(Xapi_periodic_scheduler.Periodic
(Xapi_stdext_threads_scheduler.Scheduler.Periodic
!Xapi_globs.session_revalidation_interval
) session_revalidation_delay session_revalidation_func ;
)
session_revalidation_delay session_revalidation_func ;
if master then
Xapi_periodic_scheduler.add_to_queue
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Trying to update subjects' info using external directory service (if \
any)"
(Xapi_periodic_scheduler.Periodic !Xapi_globs.update_all_subjects_interval)
(Xapi_stdext_threads_scheduler.Scheduler.Periodic
!Xapi_globs.update_all_subjects_interval
)
update_all_subjects_delay update_all_subjects_func ;
Xapi_periodic_scheduler.add_to_queue "Periodic scheduler heartbeat"
(Xapi_periodic_scheduler.Periodic hb_timer) 240.0 hb_func ;
Xapi_periodic_scheduler.add_to_queue "Update monitor configuration"
(Xapi_periodic_scheduler.Periodic 3600.0) 3600.0
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Periodic scheduler heartbeat"
(Xapi_stdext_threads_scheduler.Scheduler.Periodic hb_timer) 240.0 hb_func ;
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Update monitor configuration"
(Xapi_stdext_threads_scheduler.Scheduler.Periodic 3600.0) 3600.0
Monitor_master.update_configuration_from_master ;
( if master then
let freq = !Xapi_globs.failed_login_alert_freq |> float_of_int in
Xapi_periodic_scheduler.add_to_queue
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Periodic alert failed login attempts"
(Xapi_periodic_scheduler.Periodic freq) freq
(Xapi_stdext_threads_scheduler.Scheduler.Periodic freq) freq
Xapi_pool.alert_failed_login_attempts
) ;
Xapi_periodic_scheduler.add_to_queue "broken_kernel"
(Xapi_periodic_scheduler.Periodic 600.) 600. (fun () ->
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue "broken_kernel"
(Xapi_stdext_threads_scheduler.Scheduler.Periodic 600.) 600. (fun () ->
Server_helpers.exec_with_new_task
"Periodic alert if the running kernel is broken in some serious way."
(fun __context -> Xapi_host.alert_if_kernel_broken ~__context
)
) ;
Xapi_periodic_scheduler.add_to_queue
Xapi_stdext_threads_scheduler.Scheduler.add_to_queue
"Period alert if TLS verification emergency disabled"
(Xapi_periodic_scheduler.Periodic 600.) 600. (fun () ->
(Xapi_stdext_threads_scheduler.Scheduler.Periodic 600.) 600. (fun () ->
Server_helpers.exec_with_new_task
"Period alert if TLS verification emergency disabled" (fun __context ->
Xapi_host.alert_if_tls_verification_was_emergency_disabled ~__context
Expand Down

0 comments on commit 9e3ad1c

Please sign in to comment.