Skip to content

Commit

Permalink
CP-50475: parallelize device ops during VM lifecycle ops
Browse files Browse the repository at this point in the history
Operations on different devices should be independent and therefore can be
parallelized. This both means parallelizing operations on different device
types and on devices for the same type.

An atom to serialize action has been introduced because the operation regarding
a single device must be kept serialized.

Signed-off-by: Pau Ruiz Safont <[email protected]>
  • Loading branch information
psafont committed Oct 16, 2024
1 parent 8eb26b6 commit 0425b0b
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 65 deletions.
167 changes: 106 additions & 61 deletions ocaml/xenopsd/lib/xenops_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type atomic =
| VM_rename of (Vm.id * Vm.id * rename_when)
| VM_import_metadata of (Vm.id * Metadata.t)
| Parallel of Vm.id * string * atomic list
| Serial of Vm.id * string * atomic list
| Best_effort of atomic
[@@deriving rpcty]

Expand Down Expand Up @@ -271,6 +272,9 @@ let rec name_of_atomic = function
| Parallel (_, _, atomics) ->
Printf.sprintf "Parallel (%s)"
(String.concat " | " (List.map name_of_atomic atomics))
| Serial (_, _, atomics) ->
Printf.sprintf "Serial (%s)"
(String.concat " & " (List.map name_of_atomic atomics))
| Best_effort atomic ->
Printf.sprintf "Best_effort (%s)" (name_of_atomic atomic)

Expand Down Expand Up @@ -1556,8 +1560,13 @@ let collect_into apply = function [] -> [] | [op] -> [op] | lst -> apply lst
let parallel name ~id =
collect_into (fun ls -> [Parallel (id, Printf.sprintf "%s VM=%s" name id, ls)])

let serial name ~id =
collect_into (fun ls -> [Serial (id, Printf.sprintf "%s VM=%s" name id, ls)])

let parallel_concat name ~id lst = parallel name ~id (List.concat lst)

let serial_concat name ~id lst = serial name ~id (List.concat lst)

let parallel_map name ~id lst f = parallel name ~id (List.concat_map f lst)

let map_or_empty f x = Option.value ~default:[] (Option.map f x)
Expand All @@ -1573,6 +1582,23 @@ let rec atomics_of_operation = function
List.partition (is_nvidia_sriov vgpus) pcis
in
let no_sharept = List.exists is_no_sharept vgpus in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_epoch_and_plug %s" typ in
let name_one = pf "VBD.activate_epoch_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial_concat name_one ~id
[
[VBD_set_active (vbd.Vbd.id, true)]
; map_or_empty
(fun x ->
[VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)]
)
vbd.Vbd.backend
; [VBD_plug vbd.Vbd.id]
]
)
in
[
dequarantine_ops vgpus
; [
Expand All @@ -1581,35 +1607,35 @@ let rec atomics_of_operation = function
; VM_create (id, None, None, no_sharept)
; VM_build (id, force)
]
; List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
(* keeping behaviour of vbd_plug_order: rw vbds must be plugged before
ro vbds, see vbd_plug_sets *)
; List.concat_map
(fun (ty, vbds) ->
parallel_map (Printf.sprintf "VBD.epoch_begin %s" ty) ~id vbds
(fun vbd ->
map_or_empty
(fun x ->
[VBD_epoch_begin (vbd.Vbd.id, x, vbd.Vbd.persistent)]
)
vbd.Vbd.backend
)
)
[("RW", vbds_rw); ("RO", vbds_ro)]
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
; parallel_map "VBD.plug RW" ~id vbds_rw (fun vbd -> [VBD_plug vbd.Vbd.id])
; parallel_map "VBD.plug RO" ~id vbds_ro (fun vbd -> [VBD_plug vbd.Vbd.id])
; List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
; List.map (fun pci -> PCI_plug (pci.Pci.id, false)) pcis_sriov
; parallel_concat "Devices.plug (no qemu)" ~id
[
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
serial_concat "VBDs.acticate_epoch_and_plug RW+RO" ~id
[plug_vbds "RW" vbds_rw; plug_vbds "RO" vbds_ro]
; List.concat_map
(fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
vifs
; serial_concat "VGPUs.activate & PCI.plug (SRIOV)" ~id
[
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; parallel_map "PCIs.plug (SRIOV)" ~id pcis_sriov (fun pci ->
[PCI_plug (pci.Pci.id, false)]
)
]
]
; [VM_create_device_model (id, false)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so the
following operations occur after creating the device models *)
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
; parallel_concat "Devices.plug (qemu)" ~id
[
List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; List.map (fun vusb -> VUSB_plug vusb.Vusb.id) vusbs
]
(* At this point the domain is considered survivable. *)
; [VM_set_domain_action_request (id, None)]
]
Expand All @@ -1622,50 +1648,60 @@ let rec atomics_of_operation = function
[
map_or_empty (fun x -> [VM_shutdown_domain (id, PowerOff, x)]) timeout
(* Before shutting down a VM, we need to unplug its VUSBs. *)
; List.map (fun vusb -> VUSB_unplug vusb.Vusb.id) vusbs
; parallel_map "VUSBs.unplug" ~id vusbs (fun vusb ->
[VUSB_unplug vusb.Vusb.id]
)
; [
(* CA-315450: in a hard shutdown or snapshot revert, timeout=None and
VM_shutdown_domain is not called. To avoid any interference, we
pause the domain before destroying the device model. *)
Best_effort (VM_pause id)
; VM_destroy_device_model id
]
; parallel_map "VBD.unplug" ~id vbds (fun vbd ->
[VBD_unplug (vbd.Vbd.id, true)]
)
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
; parallel_concat "Devices.unplug" ~id
[
List.map (fun vbd -> VBD_unplug (vbd.Vbd.id, true)) vbds
; List.map (fun vif -> VIF_unplug (vif.Vif.id, true)) vifs
; List.map (fun pci -> PCI_unplug pci.Pci.id) pcis
]
; [VM_destroy id]
]
|> List.concat
| VM_restore_vifs id ->
let vifs = VIF_DB.vifs id in
[
List.map (fun vif -> VIF_set_active (vif.Vif.id, true)) vifs
; List.map (fun vif -> VIF_plug vif.Vif.id) vifs
]
|> List.concat
parallel_map "VIFs.activate_and_plug" ~id vifs (fun vif ->
serial "VIF.activate_and_plug" ~id
[VIF_set_active (vif.Vif.id, true); VIF_plug vif.Vif.id]
)
| VM_restore_devices (id, restore_vifs) ->
let vbds_rw, vbds_ro = VBD_DB.vbds id |> vbd_plug_sets in
let vgpus = VGPU_DB.vgpus id in
let pcis = PCI_DB.pcis id |> pci_plug_order in
let pcis_other = List.filter (is_not_nvidia_sriov vgpus) pcis in
let plug_vbds typ vbds =
let pf = Printf.sprintf in
let name_multi = pf "VBDs.activate_and_plug %s" typ in
let name_one = pf "VBD.activate_and_plug %s" typ in
parallel_map name_multi ~id vbds (fun vbd ->
serial name_one ~id
[VBD_set_active (vbd.Vbd.id, true); VBD_plug vbd.Vbd.id]
)
in
[
List.map
(fun vbd -> VBD_set_active (vbd.Vbd.id, true))
(vbds_rw @ vbds_ro)
; (* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
parallel_map "VBD.plug RW" ~id vbds_rw (fun vbd -> [VBD_plug vbd.Vbd.id])
; parallel_map "VBD.plug RO" ~id vbds_ro (fun vbd -> [VBD_plug vbd.Vbd.id])
(* rw vbds must be plugged before ro vbds, see vbd_plug_sets *)
plug_vbds "RW" vbds_rw
; plug_vbds "RO" vbds_ro
; (if restore_vifs then atomics_of_operation (VM_restore_vifs id) else [])
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, true)) vgpus
(* Nvidia SRIOV PCI devices have been already been plugged *)
; [
VM_create_device_model (id, true)
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
]
; List.map (fun pci -> PCI_plug (pci.Pci.id, true)) pcis_other
; (* Nvidia SRIOV PCI devices have been already been plugged *)
parallel_map "VGPUs.activate" ~id vgpus (fun vgpu ->
[VGPU_set_active (vgpu.Vgpu.id, true)]
)
; [VM_create_device_model (id, true)]
(* PCI and USB devices are hot-plugged into HVM guests via QEMU, so
the following operations occur after creating the device models *)
; parallel_map "PCIs.plug" ~id pcis_other (fun pci ->
[PCI_plug (pci.Pci.id, true)]
)
]
|> List.concat
| VM_poweroff (id, timeout) ->
Expand All @@ -1678,17 +1714,24 @@ let rec atomics_of_operation = function
else
Xenops_hooks.reason__clean_shutdown
in
[
[VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, timeout))
; parallel_map "VBD.epoch_end" ~id vbds (fun vbd ->
let unplug_vbd vbd =
serial_concat "VBD.epoch_and_deactivate" ~id
[
map_or_empty
(fun x -> [VBD_epoch_end (vbd.Vbd.id, x)])
vbd.Vbd.backend
)
; List.map (fun vbd -> VBD_set_active (vbd.Vbd.id, false)) vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
; [VBD_set_active (vbd.Vbd.id, false)]
]
in
[
[VM_hook_script (id, Xenops_hooks.VM_pre_destroy, reason)]
; atomics_of_operation (VM_shutdown (id, timeout))
; parallel_concat "Devices.deactivate" ~id
[
List.concat_map unplug_vbd vbds
; List.map (fun vif -> VIF_set_active (vif.Vif.id, false)) vifs
; List.map (fun vgpu -> VGPU_set_active (vgpu.Vgpu.id, false)) vgpus
]
; [VM_hook_script (id, Xenops_hooks.VM_post_destroy, reason)]
]
|> List.concat
Expand Down Expand Up @@ -1825,7 +1868,7 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
(Xenops_task.id_of_handle t)
(List.length atoms) description
in
let with_tracing = parallel_id_with_tracing parallel_id t in
let with_tracing = id_with_tracing parallel_id t in
debug "begin_%s" parallel_id ;
let task_list =
queue_atomics_and_wait ~progress_callback ~max_parallel_atoms:10
Expand Down Expand Up @@ -1869,6 +1912,8 @@ let rec perform_atomic ~progress_callback ?subtask:_ ?result (op : atomic)
List.iter
(fun err -> match err with None -> () | Some e -> raise e)
errors
| Serial (_, _, atoms) ->
List.iter (Fun.flip (perform_atomic ~progress_callback) t) atoms
| VIF_plug id ->
debug "VIF.plug %s" (VIF_DB.string_of_id id) ;
B.VIF.plug t (VIF_DB.vm_of id) (VIF_DB.read_exn id) ;
Expand Down Expand Up @@ -2468,7 +2513,7 @@ and trigger_cleanup_after_failure_atom op t =
immediate_operation dbg id (VM_check_state id)
| Best_effort op ->
trigger_cleanup_after_failure_atom op t
| Parallel (_id, _description, ops) ->
| Parallel (_id, _description, ops) | Serial (_id, _description, ops) ->
List.iter (fun op -> trigger_cleanup_after_failure_atom op t) ops
| VM_rename (id1, id2, _) ->
immediate_operation dbg id1 (VM_check_state id1) ;
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xenopsd/lib/xenops_task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ let is_task task = function
| _ ->
None

let parallel_id_with_tracing parallel_id t =
Debug_info.make ~log:parallel_id ~tracing:(Xenops_task.tracing t)
let id_with_tracing id t =
Debug_info.make ~log:id ~tracing:(Xenops_task.tracing t)
|> Debug_info.to_string

let dbg_with_traceparent_of_task t =
Expand Down
3 changes: 1 addition & 2 deletions ocaml/xenopsd/lib/xenops_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,7 @@ let chunks size lst =
[op] :: xs :: xss
)
[] lst
|> List.map (fun xs -> List.rev xs)
|> List.rev
|> List.rev_map (fun xs -> List.rev xs)

let really_kill pid =
try Unixext.kill_and_wait pid
Expand Down

0 comments on commit 0425b0b

Please sign in to comment.