Skip to content

Commit

Permalink
CP-52526: rate limit event updates (#6126)
Browse files Browse the repository at this point in the history
We generate O(N^2) events when we update O(N) fields: each field update
generates an event including the entire object, even if later we are
going to change other fields of the same object.

Instead of returning the individual field update events immediately (and
generating a storm of events whenever an API client watcher for VM power
events), we batch these event updates by introducing a minimum amount of
time that successive Event.from need to have between them.
(The client is working as expected here: when it gets an event and
processes it, it immediately calls Event.from to get more events)

Although this doesn't guarantee to eliminate the O(N^2) problem, in
practice it reduces the overhead significantly.

There is one case where we do want almost immediately notification of
updates: task completions (because then the client likely wants to send
us more tasks).

This PR makes the already existing rate limiting in Xapi_event
consistent and configurable, but doesn't yet introduce a batching delay
for Event.from (it does for Event.next, which is deprecated). A separate
PR (or config change) can then enable this for testing purposes, but
also allows us to roll the change back by changing the tunable in the
config file.

There is also a new microbenchmark introduced here, I'll need to update
that with the latest results.
  • Loading branch information
edwintorok authored Dec 12, 2024
2 parents 83f4517 + 257af94 commit 8a427b9
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 14 deletions.
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@
(synopsis "The toolstack daemon which implements the XenAPI")
(description "This daemon exposes the XenAPI and is used by clients such as 'xe' and 'XenCenter' to manage clusters of Xen-enabled hosts.")
(depends
(ocaml (>= 4.09))
(alcotest :with-test)
angstrom
astring
Expand Down
1 change: 1 addition & 0 deletions ocaml/xapi-aux/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(modes best)
(libraries
astring
clock
cstruct
forkexec
ipaddr
Expand Down
45 changes: 45 additions & 0 deletions ocaml/xapi-aux/throttle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,48 @@ module Make (Size : SIZE) = struct

let execute f = execute (get_semaphore ()) f
end

module Batching = struct
type t = {
delay_initial: Mtime.span
; delay_before: Mtime.span
; delay_between: Mtime.span
}

let make ~delay_before ~delay_between =
(* we are dividing, cannot overflow *)
let delay_initial =
Mtime.Span.to_float_ns delay_between /. 16.
|> Mtime.Span.of_float_ns
|> Option.get
in
{delay_initial; delay_before; delay_between}

let span_min a b = if Mtime.Span.is_shorter a ~than:b then a else b

(** [perform_delay delay] calls {!val:Thread.delay} when [delay] is non-zero.
Thread.delay 0 provides no fairness guarantees, the current thread may actually be the one that gets the global lock again.
Instead {!val:Thread.yield} could be used, which does provide fairness guarantees, but it may also introduce large latencies
when there are lots of threads waiting for the OCaml runtime lock. Only invoke this once, in the [delay_before] section.
*)
let perform_delay ~yield delay =
if Mtime.Span.is_longer delay ~than:Mtime.Span.zero then
Thread.delay (Clock.Timer.span_to_s delay)
else if yield then
(* this is a low-priority thread, if there are any other threads waiting, then run them now.
If there are no threads waiting then this a noop.
Requires OCaml >= 4.09 (older versions had fairness issues in Thread.yield)
*)
Thread.yield ()

let with_recursive_loop config f =
let rec self arg input =
let arg = span_min config.delay_between Mtime.Span.(2 * arg) in
perform_delay ~yield:false arg ;
(f [@tailcall]) (self arg) input
in
let self0 input = (f [@tailcall]) (self config.delay_initial) input in
perform_delay ~yield:true config.delay_before ;
f self0
end
36 changes: 36 additions & 0 deletions ocaml/xapi-aux/throttle.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,39 @@ module Make (_ : SIZE) : sig

val execute : (unit -> 'a) -> 'a
end

module Batching : sig
(** batching delay configuration *)
type t

val make : delay_before:Mtime.Span.t -> delay_between:Mtime.Span.t -> t
(** [make ~delay_before ~delay_between] creates a configuration,
where we delay the API call by [delay_before] once,
and then with [delay_between] between each recursive call.
*)

val with_recursive_loop : t -> (('a -> 'b) -> 'a -> 'b) -> 'a -> 'b
(** [with_recursive_loop config f arg] calls [f self arg], where [self] can be used
for recursive calls.
[arg] is an argument that the implementation of [f] can change between recursive calls for its own purposes,
otherwise [()] can be used.
A [delay_before] amount of seconds is inserted once, and [delay_between/8] is inserted between recursive calls,
except the first one, and delays increase exponentially until [delay_between] is reached
{v
delay_before
f ...
(self[@tailcall]) ...
f ...
(self[@tailcall]) ...
delay_between/8
f ...
(self[@tailcall]) ...
delay_between/4
f ...
v}
The delays are determined by [config], and [delay_between] uses an exponential backoff, up to [config.delay_between] delay.
*)
end
44 changes: 30 additions & 14 deletions ocaml/xapi/xapi_event.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ let is_lowercase str = String.for_all is_lowercase_char str
module Subscription = struct
type t = Class of string | Object of string * string | All

let is_task_only = function
| Class "task" | Object ("task", _) ->
true
| Class _ | Object _ | All ->
false

let of_string x =
if x = "*" then
All
Expand Down Expand Up @@ -470,6 +476,7 @@ let unregister ~__context ~classes =

(** Blocking call which returns the next set of events relevant to this session. *)
let rec next ~__context =
let batching = !Xapi_globs.event_next_delay in
let session = Context.get_session_id __context in
let open Next in
assert_subscribed session ;
Expand All @@ -489,11 +496,12 @@ let rec next ~__context =
)
in
(* Like grab_range () only guarantees to return a non-empty range by blocking if necessary *)
let rec grab_nonempty_range () =
let grab_nonempty_range =
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
let last_id, end_id = grab_range () in
if last_id = end_id then
let (_ : int64) = wait subscription end_id in
grab_nonempty_range ()
(self [@tailcall]) arg
else
(last_id, end_id)
in
Expand All @@ -511,7 +519,7 @@ let rec next ~__context =
else
rpc_of_events relevant

let from_inner __context session subs from from_t timer =
let from_inner __context session subs from from_t timer batching =
let open Xapi_database in
let open From in
(* The database tables involved in our subscription *)
Expand Down Expand Up @@ -599,7 +607,8 @@ let from_inner __context session subs from from_t timer =
(* Each event.from should have an independent subscription record *)
let msg_gen, messages, tableset, (creates, mods, deletes, last) =
with_call session subs (fun sub ->
let rec grab_nonempty_range () =
let grab_nonempty_range =
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
as result
) =
Expand All @@ -618,8 +627,7 @@ let from_inner __context session subs from from_t timer =
(* last id the client got is equivalent to the current one *)
last_msg_gen := msg_gen ;
wait2 sub last timer ;
Thread.delay 0.05 ;
grab_nonempty_range ()
(self [@tailcall]) arg
) else
result
in
Expand Down Expand Up @@ -698,6 +706,19 @@ let from_inner __context session subs from from_t timer =
{events; valid_ref_counts; token= Token.to_string (last, msg_gen)}

let from ~__context ~classes ~token ~timeout =
let duration =
timeout
|> Clock.Timer.s_to_span
|> Option.value ~default:Mtime.Span.(24 * hour)
in
let timer = Clock.Timer.start ~duration in
let subs = List.map Subscription.of_string classes in
let batching =
if List.for_all Subscription.is_task_only subs then
!Xapi_globs.event_from_task_delay
else
!Xapi_globs.event_from_delay
in
let session = Context.get_session_id __context in
let from, from_t =
try Token.of_string token
Expand All @@ -709,19 +730,14 @@ let from ~__context ~classes ~token ~timeout =
(Api_errors.event_from_token_parse_failure, [token])
)
in
let subs = List.map Subscription.of_string classes in
let duration =
timeout
|> Clock.Timer.s_to_span
|> Option.value ~default:Mtime.Span.(24 * hour)
in
let timer = Clock.Timer.start ~duration in
(* We need to iterate because it's possible for an empty event set
to be generated if we peek in-between a Modify and a Delete; we'll
miss the Delete event and fail to generate the Modify because the
snapshot can't be taken. *)
let rec loop () =
let event_from = from_inner __context session subs from from_t timer in
let event_from =
from_inner __context session subs from from_t timer batching
in
if event_from.events = [] && not (Clock.Timer.has_expired timer) then (
debug "suppressing empty event.from" ;
loop ()
Expand Down
44 changes: 44 additions & 0 deletions ocaml/xapi/xapi_globs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,47 @@ let tgroups_enabled = ref false
let xapi_requests_cgroup =
"/sys/fs/cgroup/cpu/control.slice/xapi.service/request"

(* Event.{from,next} batching delays *)
let make_batching name ~delay_before ~delay_between =
let name = Printf.sprintf "%s_delay" name in
let config = ref (Throttle.Batching.make ~delay_before ~delay_between)
and config_vals = ref (delay_before, delay_between) in
let set str =
Scanf.sscanf str "%f,%f" @@ fun delay_before delay_between ->
match
(Clock.Timer.s_to_span delay_before, Clock.Timer.s_to_span delay_between)
with
| Some delay_before, Some delay_between ->
config_vals := (delay_before, delay_between) ;
config := Throttle.Batching.make ~delay_before ~delay_between
| _ ->
D.warn
"Ignoring argument '%s'. (it only allows durations of less than 104 \
days)"
str
and get () =
let d1, d2 = !config_vals in
Printf.sprintf "%f,%f" (Clock.Timer.span_to_s d1) (Clock.Timer.span_to_s d2)
and desc =
Printf.sprintf
"delays in seconds before the API call, and between internal recursive \
calls, separated with a comma"
in
(config, (name, Arg.String set, get, desc))

let event_from_delay, event_from_entry =
make_batching "event_from" ~delay_before:Mtime.Span.zero
~delay_between:Mtime.Span.(50 * ms)

let event_from_task_delay, event_from_task_entry =
make_batching "event_from_task" ~delay_before:Mtime.Span.zero
~delay_between:Mtime.Span.(50 * ms)

let event_next_delay, event_next_entry =
make_batching "event_next"
~delay_before:Mtime.Span.(200 * ms)
~delay_between:Mtime.Span.(50 * ms)

let xapi_globs_spec =
[
( "master_connection_reset_timeout"
Expand Down Expand Up @@ -1644,6 +1685,9 @@ let other_options =
, (fun () -> string_of_bool !tgroups_enabled)
, "Turn on tgroups classification"
)
; event_from_entry
; event_from_task_entry
; event_next_entry
]

(* The options can be set with the variable xapiflags in /etc/sysconfig/xapi.
Expand Down
1 change: 1 addition & 0 deletions xapi.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ homepage: "https://xapi-project.github.io/"
bug-reports: "https://github.com/xapi-project/xen-api/issues"
depends: [
"dune" {>= "3.15"}
"ocaml" {>= "4.09"}
"alcotest" {with-test}
"angstrom"
"astring"
Expand Down

0 comments on commit 8a427b9

Please sign in to comment.