From ab24f6da65ea5b6ce72417b6fac4301943f67570 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Tue, 20 Feb 2024 13:46:20 +0000 Subject: [PATCH 01/12] CA-383867: Share code for handling Tpm requests This will allow to handle serialization of key as well as states in server_interface and the write cache Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/server_interface.ml | 69 +++------------------ ocaml/xapi-guard/lib/types.ml | 77 ++++++++++++++++++++++++ ocaml/xapi-guard/lib/types.mli | 29 +++++++++ 3 files changed, 116 insertions(+), 59 deletions(-) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index 4bcaae8a387..37e639f1192 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -18,6 +18,7 @@ open Lwt.Syntax module D = Debug.Make (struct let name = __MODULE__ end) open D +module Tpm = Xapi_guard.Types.Tpm type rpc_t = Rpc.t @@ -99,59 +100,6 @@ let serve_forever_lwt_callback rpc_fn path _ req body = in Cohttp_lwt_unix.Server.respond_string ~status:`Method_not_allowed ~body () -(* The TPM has 3 kinds of states *) -type state = { - permall: string (** permanent storage *) - ; savestate: string (** for ACPI S3 *) - ; volatilestate: string (** for snapshot/migration/etc. *) -} - -let split_char = ' ' - -let join_string = String.make 1 split_char - -let deserialize t = - match String.split_on_char split_char t with - | [permall] -> - (* backwards compat with reading tpm2-00.permall *) - {permall; savestate= ""; volatilestate= ""} - | [permall; savestate; volatilestate] -> - {permall; savestate; volatilestate} - | splits -> - Fmt.failwith "Invalid state: too many splits %d" (List.length splits) - -let serialize t = - (* it is assumed that swtpm has already base64 encoded this *) - String.concat join_string [t.permall; t.savestate; t.volatilestate] - -let lookup_key key t = - match key with - | "/tpm2-00.permall" -> - t.permall - | "/tpm2-00.savestate" -> - t.savestate - | "/tpm2-00.volatilestate" -> - t.volatilestate - | s -> - Fmt.invalid_arg "Unknown TPM state key: %s" s - -let update_key key state t = - if String.contains state split_char then - Fmt.invalid_arg - "State to be stored (%d bytes) contained forbidden separator: %c" - (String.length state) split_char ; - match key with - | "/tpm2-00.permall" -> - {t with permall= state} - | "/tpm2-00.savestate" -> - {t with savestate= state} - | "/tpm2-00.volatilestate" -> - {t with volatilestate= state} - | s -> - Fmt.invalid_arg "Unknown TPM state key: %s" s - -let empty = "" - let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = let get_vtpm_ref () = let* vm = @@ -179,29 +127,32 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = Lwt_mutex.with_lock mutex @@ fun () -> (* TODO: some logging *) match (Cohttp.Request.meth req, Uri.path uri) with - | `GET, key when key <> "/" -> + | `GET, path when path <> "/" -> let* self = get_vtpm_ref () in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in - let body = contents |> deserialize |> lookup_key key in + let key = Tpm.key_of_swtpm path in + let body = Tpm.(contents |> deserialize |> lookup ~key) in let headers = Cohttp.Header.of_list [("Content-Type", "application/octet-stream")] in Cohttp_lwt_unix.Server.respond_string ~headers ~status:`OK ~body () - | `PUT, key when key <> "/" -> + | `PUT, path when path <> "/" -> let* body = Cohttp_lwt.Body.to_string body in let* self = get_vtpm_ref () in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let key = Tpm.key_of_swtpm path in let contents = - contents |> deserialize |> update_key key body |> serialize + Tpm.(contents |> deserialize |> update key body |> serialize) in let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in Cohttp_lwt_unix.Server.respond ~status:`No_content ~body:Cohttp_lwt.Body.empty () - | `DELETE, key when key <> "/" -> + | `DELETE, path when path <> "/" -> let* self = get_vtpm_ref () in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let key = Tpm.key_of_swtpm path in let contents = - contents |> deserialize |> update_key key empty |> serialize + Tpm.(contents |> deserialize |> update key empty_state |> serialize) in let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in Cohttp_lwt_unix.Server.respond ~status:`No_content diff --git a/ocaml/xapi-guard/lib/types.ml b/ocaml/xapi-guard/lib/types.ml index 7b705c89a01..3f2b41c7682 100644 --- a/ocaml/xapi-guard/lib/types.ml +++ b/ocaml/xapi-guard/lib/types.ml @@ -3,3 +3,80 @@ module Service = struct let to_string = function Varstored -> "Varstored" | Swtpm -> "Swtpm" end + +module Tpm = struct + (* The TPM has 3 kinds of states *) + type t = { + permall: string (** permanent storage *) + ; savestate: string (** for ACPI S3 *) + ; volatilestate: string (** for snapshot/migration/etc. *) + } + + type key = Perm | Save | Volatile + + let key_of_swtpm = function + | "/tpm2-00.permall" -> + Perm + | "/tpm2-00.savestate" -> + Save + | "/tpm2-00.volatilestate" -> + Volatile + | s -> + Fmt.invalid_arg "Unknown TPM state key: %s" s + + let serialize_key = function Perm -> 0 | Save -> 1 | Volatile -> 2 + + let deserialize_key = function + | 0 -> + Perm + | 1 -> + Save + | 2 -> + Volatile + | s -> + Fmt.invalid_arg "Unknown TPM state key: %i" s + + let empty_state = "" + + let empty = {permall= ""; savestate= ""; volatilestate= ""} + + let split_char = ' ' + + let join_string = String.make 1 split_char + + let deserialize t = + match String.split_on_char split_char t with + | [permall] -> + (* backwards compat with reading tpm2-00.permall *) + {permall; savestate= ""; volatilestate= ""} + | [permall; savestate; volatilestate] -> + {permall; savestate; volatilestate} + | splits -> + Fmt.failwith "Invalid state: too many splits %d" (List.length splits) + + let serialize t = + (* it is assumed that swtpm has already base64 encoded this *) + String.concat join_string [t.permall; t.savestate; t.volatilestate] + + let lookup ~key t = + match key with + | Perm -> + t.permall + | Save -> + t.savestate + | Volatile -> + t.volatilestate + + let update key state t = + if String.contains state split_char then + Fmt.invalid_arg + "State to be stored (%d bytes) contained forbidden separator: %c" + (String.length state) split_char ; + match key with + | Perm -> + {t with permall= state} + | Save -> + {t with savestate= state} + | Volatile -> + {t with volatilestate= state} +end diff --git a/ocaml/xapi-guard/lib/types.mli b/ocaml/xapi-guard/lib/types.mli index 6bb036826d7..f210ea8c96a 100644 --- a/ocaml/xapi-guard/lib/types.mli +++ b/ocaml/xapi-guard/lib/types.mli @@ -5,3 +5,32 @@ module Service : sig val to_string : t -> string end + +module Tpm : sig + (** TPMs have 3 kind of states *) + type t + + (** key to access a single state *) + type key + + val key_of_swtpm : string -> key + (** [key_of_swtpm path] returns a state key represented by [path]. These paths + are parts of the requests generated by SWTPM and may contain slashes *) + + val deserialize_key : int -> key + + val serialize_key : key -> int + (** [serialize key] returns the state key represented by [key]. *) + + val empty : t + + val empty_state : string + + val deserialize : string -> t + + val serialize : t -> string + + val update : key -> string -> t -> t + + val lookup : key:key -> t -> string +end From a714f76f7204d12a5438329e7771f0c79fb6a73e Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Wed, 6 Dec 2023 10:08:09 +0000 Subject: [PATCH 02/12] CA-383867: Add local disk cache library for xapi guard This enables xapi-guard to decouple persistence of TPM contents from the xapi service being online. That is, when xapi is down. The contents of the TPMs will be written to disk, and when xapi is back online the contents will be uploaded. This is needed to protect VMs while xapi is being restarted, usually as part of an update. Some properties of the cache: - The cache is tried to be bypassed whenever possible, and is only used as fallback after a write fails. - The cache is handled by a thread that writes to cache and one that reads from it. They communicate through a bounded queue. - Whenever a TPM content is written to disk, previous versions of it are deleted. This helps the reading thread to catch up. - When the queue has been filled the writer stops adding elements to the queue, and the reader will try to flush the queue, and after it will try to flush the cache. After this happens both threads will transition to cache bypass operation. Signed-off-by: Pau Ruiz Safont --- doc/content/xapi-guard/_index.md | 56 +++ ocaml/xapi-guard/lib/disk_cache.ml | 449 ++++++++++++++++++++ ocaml/xapi-guard/lib/disk_cache.mli | 25 ++ ocaml/xapi-guard/lib/dune | 5 +- ocaml/xapi-guard/lib/lwt_bounded_stream.ml | 48 +++ ocaml/xapi-guard/lib/lwt_bounded_stream.mli | 34 ++ ocaml/xapi-guard/src/dune | 1 + ocaml/xapi-guard/test/cache_test.ml | 156 +++++++ ocaml/xapi-guard/test/cache_test.mli | 0 ocaml/xapi-guard/test/dune | 16 + 10 files changed, 789 insertions(+), 1 deletion(-) create mode 100644 ocaml/xapi-guard/lib/disk_cache.ml create mode 100644 ocaml/xapi-guard/lib/disk_cache.mli create mode 100644 ocaml/xapi-guard/lib/lwt_bounded_stream.ml create mode 100644 ocaml/xapi-guard/lib/lwt_bounded_stream.mli create mode 100644 ocaml/xapi-guard/test/cache_test.ml create mode 100644 ocaml/xapi-guard/test/cache_test.mli diff --git a/doc/content/xapi-guard/_index.md b/doc/content/xapi-guard/_index.md index bcafb968b07..39d64a558e5 100644 --- a/doc/content/xapi-guard/_index.md +++ b/doc/content/xapi-guard/_index.md @@ -17,6 +17,8 @@ Principles 2. Xenopsd is able to control xapi-guard through message switch, this access is not limited. 3. Listening to domain socket is restored whenever the daemon restarts to minimize disruption of running domains. +4. Disruptions to requests when xapi is unavailable is minimized. + The startup procedure is not blocked by the availability of xapi, and write requests from domains must not fail because xapi is unavailable. Overview @@ -26,3 +28,57 @@ Xapi-guard forwards calls from domains to xapi to persist UEFI variables, and up To do this, it listens to 1 socket per service (varstored, or swtpm) per domain. To create these sockets before the domains are running, it listens to a message-switch socket. This socket listens to calls from xenopsd, which orchestrates the domain creation. + +To protect the domains from xapi being unavailable transiently, xapi-guard provides an on-disk cache for vTPM writes. +This cache acts as a buffer and stores the requests temporarily until xapi can be contacted again. +This situation usually happens when xapi is being restarted as part of an update. +SWTPM, the vTPM daemon, reads the contents of the TPM from xapi-guard on startup, suspend, and resume. +During normal operation SWTPM does not send read requests from xapi-guard. + +The cache module consists of two Lwt threads, one that writes to disk, and another one that reads from disk. +The writer is triggered when a VM writes to the vTPM. +It never blocks if xapi is unreachable, but responds as soon as the data has been stored either by xapi or on the local disk, such that the VM receives a timely response to the write request. +Both try to send the requests to xapi, depending on the state, to attempt write all the cached data back to xapi, and stop using the cache. +The threads communicate through a bounded queue, this is done to limit the amount of memory used. +This queue is a performance optimisation, where the writer informs the reader precisely which are the names of the cache files, such that the reader does not need to list the cache directory. +And a full queue does not mean data loss, just a loss of performance; vTPM writes are still cached. + +This means that the cache operates in three modes: +- Direct: during normal operation the disk is not used at all +- Engaged: both threads use the queue to order events +- Disengaged: A thread dumps request to disk while the other reads the cache + until it's empty + +```mermaid +--- +title: Cache State +--- +stateDiagram-v2 + Disengaged + note right of Disengaged + Writer doesn't add requests to queue + Reader reads from cache and tries to push to xapi + end note + Direct + note left of Direct + Writer bypasses cache, send to xapi + Reader waits + end note + Engaged + note right of Engaged + Writer writes to cache and adds requests to queue + Reader reads from queue and tries to push to xapi + end note + + [*] --> Disengaged + + Disengaged --> Disengaged : Reader pushed pending TPMs to xapi, in the meantime TPMs appeared in the cache + Disengaged --> Direct : Reader pushed pending TPMs to xapi, cache is empty + + Direct --> Direct : Writer receives TPM, sent to xapi + Direct --> Engaged : Writer receives TPM, error when sent to xapi + + Engaged --> Direct : Reader sent TPM to xapi, finds an empty queue + Engaged --> Engaged : Writer receives TPM, queue is not full + Engaged --> Disengaged : Writer receives TPM, queue is full +``` diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml new file mode 100644 index 00000000000..b972654f085 --- /dev/null +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -0,0 +1,449 @@ +(* Copyright (C) Cloud Software Group, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +module D = Debug.Make (struct let name = __MODULE__ end) + +let ( // ) = Filename.concat + +let runtime_data = "/var/lib" // "xapi-guard" + +let ( let* ) = Lwt.bind + +let ( let@ ) f x = f x + +let with_lock = Lwt_mutex.with_lock + +type t = Uuidm.t * Mtime.t * Types.Tpm.key + +let cache_of service = runtime_data // Types.Service.to_string service + +let files_in dir ~otherwise = + Lwt.catch + (fun () -> + let* listing = Lwt_unix.files_of_directory dir |> Lwt_stream.to_list in + List.filter_map + (function "." | ".." -> None | name -> Some (dir // name)) + listing + |> Lwt.return + ) + otherwise + +let unlink_safe file = + let __FUN = __FUNCTION__ in + Lwt.catch + (fun () -> Lwt_unix.unlink file) + (function + | Unix.(Unix_error (ENOENT, _, _)) -> + Lwt.pause () + | e -> + D.info "%s: error %s when deleting %s, ignoring" __FUN + (Printexc.to_string e) file ; + Lwt.pause () + ) + +type valid_file = t * string + +type file = + | Latest of valid_file + | Outdated of valid_file + | Temporary of string + | Invalid of string + +let path_of_key root (uuid, timestamp, key) = + root + // Uuidm.to_string uuid + // Types.Tpm.(serialize_key key |> string_of_int) + // Mtime.(to_uint64_ns timestamp |> Int64.to_string) + +let key_of_path path = + let ( let* ) = Option.bind in + let key_dir = Filename.(dirname path) in + let* uuid = Filename.(basename (dirname key_dir)) |> Uuidm.of_string in + let* key = + Filename.basename key_dir + |> int_of_string_opt + |> Option.map Types.Tpm.deserialize_key + in + let* timestamp = + Filename.basename path + |> Int64.of_string_opt + |> Option.map Mtime.of_uint64_ns + in + Some ((uuid, timestamp, key), path) + +let path_is_temp path = + let pathlen = String.length path in + String.ends_with ~suffix:".pre" path + && key_of_path (String.sub path 0 (pathlen - 4)) |> Option.is_some + +let temp_of_path path = path ^ ".pre" + +let get_all_contents root = + let classify contents = + let rec loop (acc, found) = function + | [] -> + List.rev acc + | latest :: others -> ( + match key_of_path latest with + | None -> + let file = + if path_is_temp latest then + Temporary latest + else + Invalid latest + in + loop (file :: acc, found) others + | Some valid_file -> + let file = + if found then Outdated valid_file else Latest valid_file + in + loop (file :: acc, false) others + ) + in + let ordered = List.fast_sort (fun x y -> String.compare y x) contents in + loop ([], false) ordered + in + let empty = Fun.const (Lwt.return []) in + let contents_of_key key = + let* contents = files_in key ~otherwise:empty in + Lwt.return (classify contents) + in + let* tpms = files_in root ~otherwise:empty in + let* files = + Lwt_list.map_p + (fun tpm -> + let* keys = files_in tpm ~otherwise:empty in + Lwt_list.map_p contents_of_key keys + ) + tpms + in + Lwt.return List.(concat (concat files)) + +let persist_to ~filename:f_path ~contents = + let atomic_write_to_file ~perm f = + let tmp_path = temp_of_path f_path in + let dirname = Filename.dirname f_path in + let flags = Unix.[O_WRONLY; O_CREAT; O_SYNC] in + let* fd_tmp = Lwt_unix.openfile tmp_path flags perm in + let* () = + Lwt.finalize + (fun () -> + (* do not close fd when closing the channel, avoids double-closing the fd *) + let close () = Lwt.return_unit in + let chan = Lwt_io.of_fd ~mode:Output ~close fd_tmp in + let* () = + Lwt.finalize (fun () -> f chan) (fun () -> Lwt_io.close chan) + in + Lwt_unix.fsync fd_tmp + ) + (fun () -> Lwt_unix.close fd_tmp) + in + let* () = Lwt_unix.rename tmp_path f_path in + let* fd_dir = Lwt_unix.openfile dirname [O_RDONLY] 0 in + Lwt.finalize + (fun () -> Lwt_unix.fsync fd_dir) + (fun () -> Lwt_unix.close fd_dir) + in + let write out_chan = Lwt_io.write out_chan contents in + atomic_write_to_file ~perm:0o600 write + +(** - Direct: request doesn't pass through the cache + - Engaged: both side coordinate through the queue, writer ends the mode + when the queue has been filled. + - Disengaged: writer ignores the queue, reader empties it and the cache; + then it changes the mode to engaged. +*) +type state = Direct | Engaged | Disengaged + +type channel = { + queue: t Lwt_bounded_stream.t + ; push: t option -> unit option + ; lock: Lwt_mutex.t (* lock for the states *) + ; mutable state: state +} + +(* + Notes: + - uses Mtime.t to force usage of monotonic time + - This means that between runs (and reboots) cached stated is lost if not + persisted first. + IDEA: carryover: read contents of cache and "convert it" to the current run + + TODO: + - Add startup step to convert existing content to new time + - Exponential backoff on xapi push error + - Limit error logging on xapi push error: once per downtime is enough + *) + +module Writer : sig + val with_cache : + direct:(t -> string -> (unit, exn) Lwt_result.t) + -> Types.Service.t + -> channel + -> ((t -> string -> unit Lwt.t) -> 'a Lwt.t) + -> 'a Lwt.t + (** [with_cache ~direct typ queue context] creates a cache for content of + type [typ]. The cache is writable through the function [context], which + is provided a writing function to persist to the cache. It uses [channel] + to push events to + + Example: + Xapi_guard.Disk_cache.(Writer.with_cache ~direct:(upload session_cache) Tpm channel) + @@ fun write_tpm -> write_tpm (uuid, time, key) contents + *) +end = struct + let mkdir_p ?(perm = 0o755) path = + let rec loop acc path = + let create_dir () = Lwt_unix.mkdir path perm in + let create_subdirs () = Lwt_list.iter_s (fun (_, f) -> f ()) acc in + Lwt.try_bind create_dir create_subdirs (function + | Unix.(Unix_error (EEXIST, _, _)) -> + (* create directories, parents first *) + create_subdirs () + | Unix.(Unix_error (ENOENT, _, _)) -> + let parent = Filename.dirname path in + loop ((path, create_dir) :: acc) parent + | exn -> + let msg = + Printf.sprintf {|Could not create directory "%s" because: %s|} + path (Printexc.to_string exn) + in + Lwt.fail (Failure msg) + ) + in + loop [] path + + let files_in_existing dir = + let create_dir = function + | Unix.(Unix_error (ENOENT, _, _)) -> + let* () = mkdir_p dir ~perm:0o700 in + Lwt.return [] + | e -> + raise e + in + files_in dir ~otherwise:create_dir + + let write_contents ~direct root queue (uuid, now, key) contents = + let __FUN = __FUNCTION__ in + + let key_str = Types.Tpm.(serialize_key key |> string_of_int) in + let key_dir = root // Uuidm.(to_string uuid) // key_str in + (* 1. Record existing requests in cache *) + let* outdated_contents = files_in_existing key_dir in + + let filename = key_dir // (Mtime.to_uint64_ns now |> Int64.to_string) in + (* 2. Write new timestamped content to cache, atomically, if needed; and + notify the other side, if needed *) + let persist () = persist_to ~filename ~contents in + let persist_and_push () = + let push () = + match queue.push (Some (uuid, now, key)) with + | Some () -> + Lwt.return_unit + | None -> + (* Queue is full, change mode to ignore queue *) + queue.state <- Disengaged ; + Lwt.return_unit + in + let* () = persist () in + push () + in + let engage_and_persist exn = + queue.state <- Engaged ; + D.info "%s: Error on push. Reason: %s" __FUN (Printexc.to_string exn) ; + persist_and_push () + in + let* () = + with_lock queue.lock (fun () -> + match queue.state with + | Direct -> + Lwt.try_bind + (fun () -> direct (uuid, now, key) contents) + (function + | Ok () -> + Lwt.return_unit + | Error exn -> + engage_and_persist exn + ) + (function exn -> engage_and_persist exn) + | Engaged -> + persist_and_push () + | Disengaged -> + persist () + ) + in + (* 4. Delete previous requests from filesystem *) + let* _ = Lwt_list.map_p unlink_safe outdated_contents in + Lwt.return_unit + + let with_cache ~direct typ queue f = + let root = cache_of typ in + let* () = mkdir_p root ~perm:0o700 in + f (write_contents ~direct root queue) +end + +module Watcher : sig + val watch : + direct:(t -> string -> (unit, exn) Lwt_result.t) + -> Types.Service.t + -> channel + -> unit + -> unit Lwt.t +end = struct + type push_cache = File of valid_file | Update_all | Wait + + (* Outdated and invalid files can be deleted, keep temporary files just in case + they need to be recovered *) + let discarder = function + | Latest _ as f -> + Either.Left f + | Temporary _ as f -> + Left f + | Outdated (_, p) -> + Right p + | Invalid p -> + Right p + + let get_latest_and_delete_rest root = + let* files = get_all_contents root in + let keep, to_delete = List.partition_map discarder files in + let* () = Lwt_list.iter_p unlink_safe to_delete in + (* Ignore temporaty files *) + let latest = + List.filter_map (function Latest f -> Some f | _ -> None) keep + in + Lwt.return latest + + (** Warning, may raise Unix.Unix_error *) + let read_from ~filename = + let flags = Unix.[O_RDONLY] in + let perm = 0o000 in + Lwt_io.with_file ~flags ~perm ~mode:Input filename Lwt_io.read + + let retry_push push (uuid, timestamp, key) contents = + let __FUN = __FUNCTION__ in + let push' () = push (uuid, timestamp, key) contents in + let rec retry k = + let on_error e = + D.info "%s: Error on push, attempt %i. Reason: %s" __FUN k + (Printexc.to_string e) ; + let* () = Lwt_unix.sleep 0.1 in + retry (k + 1) + in + Lwt.try_bind push' + (function Ok () -> Lwt.return_unit | Error e -> on_error e) + on_error + in + retry 1 + + let push_file push (key, path) = + let __FUN = __FUNCTION__ in + let on_error = function + | Unix.(Unix_error (ENOENT, _, _)) -> + Lwt.return_unit + | exn -> + D.info "%s: error when reading '%s': %s" __FUN path + Printexc.(to_string exn) ; + Lwt.return_unit + in + + Lwt.try_bind + (fun () -> read_from ~filename:path) + (fun contents -> + let* () = retry_push push key contents in + unlink_safe path + ) + on_error + + let push_files push files = Lwt_list.iter_s (push_file push) (List.rev files) + + let update_all queue push root = + let __FUN = __FUNCTION__ in + let* contents = get_latest_and_delete_rest root in + let* () = push_files push contents in + let@ () = with_lock queue.lock in + let* contents = get_latest_and_delete_rest root in + let* () = + match contents with + | [] -> + queue.state <- Direct ; + D.debug "%s: Cache clean; Going direct" __FUN ; + Lwt.return_unit + | _ -> + Lwt.return_unit + in + Lwt.return_unit + + let resolve queue push root = function + | File file -> ( + let* () = push_file push file in + let@ () = with_lock queue.lock in + match queue.state with + | Direct | Disengaged -> + Lwt.return_unit + | Engaged -> + let () = + if Lwt_bounded_stream.size queue.queue = 0 then + queue.state <- Direct + in + Lwt.return_unit + ) + | Update_all -> + update_all queue push root + | Wait -> + (* Do not busy loop when the system can cope with the requests *) + Lwt_unix.sleep 0.2 + + let watch ~direct typ queue = + let root = cache_of typ in + let __FUN = __FUNCTION__ in + let rec loop () = + (* When the pushing side is disengaged it doesn't push events to the + queue, this means that trying to drain it completely would leave the + pulling side locked waiting when the queue is empty. + - Read the number of elements in the queue while draining it and + then switch to read the contents from the cache; or + - Switch immediately to reading the contents from cache and ignore + the contents of the queue by calling an specialized method in the + queue module to drain it. + *) + let get_action () = + let@ () = with_lock queue.lock in + match queue.state with + | Disengaged when Lwt_bounded_stream.size queue.queue < 1 -> + let* () = Lwt.pause () in + Lwt.return Update_all + | Direct -> + let* () = Lwt.pause () in + Lwt.return Wait + | _ -> ( + let* elem = Lwt_bounded_stream.get queue.queue in + match elem with + | None -> + raise (Failure "Other side closed channel, cannot continue") + | Some elem -> + Lwt.return (File (elem, path_of_key root elem)) + ) + in + let* action = get_action () in + let* () = resolve queue direct root action in + loop () + in + loop +end + +let setup typ direct = + let queue, push = Lwt_bounded_stream.create 4098 in + let lock = Lwt_mutex.create () in + let q = {queue; push; lock; state= Disengaged} in + (Writer.with_cache ~direct typ q, Watcher.watch ~direct typ q) diff --git a/ocaml/xapi-guard/lib/disk_cache.mli b/ocaml/xapi-guard/lib/disk_cache.mli new file mode 100644 index 00000000000..0f387d4ebd1 --- /dev/null +++ b/ocaml/xapi-guard/lib/disk_cache.mli @@ -0,0 +1,25 @@ +(* Copyright (C) Cloud Software Group, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; version 2.1 only. with the special + exception on linking described in file LICENSE. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. +*) + +(** [t] t is the minimal type to recognise elements in a cache. This does not + contain the contents of the elements being contained, only the metadata *) +type t = Uuidm.t * Mtime.t * Types.Tpm.key + +val setup : + Types.Service.t + -> (t -> string -> (unit, exn) Lwt_result.t) + -> (((t -> string -> unit Lwt.t) -> 'a Lwt.t) -> 'a Lwt.t) + * (unit -> unit Lwt.t) +(** [setup service push_callback] Returns a local disk buffer for [service] + which will use [push_callback] to push the elements to their final + destination *) diff --git a/ocaml/xapi-guard/lib/dune b/ocaml/xapi-guard/lib/dune index 87d10e7766e..052810ead5f 100644 --- a/ocaml/xapi-guard/lib/dune +++ b/ocaml/xapi-guard/lib/dune @@ -20,10 +20,13 @@ ) (library (name xapi_guard) - (modules dorpc types) + (modules dorpc types disk_cache lwt_bounded_stream) (libraries rpclib.core + inotify + inotify.lwt lwt + lwt.unix uri xapi-backtrace xapi-consts diff --git a/ocaml/xapi-guard/lib/lwt_bounded_stream.ml b/ocaml/xapi-guard/lib/lwt_bounded_stream.ml new file mode 100644 index 00000000000..90efe83758b --- /dev/null +++ b/ocaml/xapi-guard/lib/lwt_bounded_stream.ml @@ -0,0 +1,48 @@ +(* + * Copyright (c) 2012 Citrix Systems + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +let ( let* ) = Lwt.bind + +type 'a t = {stream: 'a Lwt_stream.t; capacity: int; size: int ref} + +let create capacity = + let stream, stream_push = Lwt_stream.create () in + let t = {stream; capacity; size= ref 0} in + let push = function + | Some _ when !(t.size) > t.capacity -> + None + | None -> + stream_push None ; Some () + | elem -> + stream_push elem ; incr t.size ; Some () + in + (t, push) + +let size {size; _} = !size + +let get_available t = + let all = Lwt_stream.get_available t.stream in + t.size := !(t.size) - List.length all ; + all + +let get t = + let* elem = Lwt_stream.get t.stream in + decr t.size ; Lwt.return elem + +let nget n t = + let* all = Lwt_stream.nget n t.stream in + t.size := !(t.size) - List.length all ; + Lwt.return all diff --git a/ocaml/xapi-guard/lib/lwt_bounded_stream.mli b/ocaml/xapi-guard/lib/lwt_bounded_stream.mli new file mode 100644 index 00000000000..b2b310f77e0 --- /dev/null +++ b/ocaml/xapi-guard/lib/lwt_bounded_stream.mli @@ -0,0 +1,34 @@ +(* + * Copyright (c) 2012 Citrix Systems + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(** Similar to Lwt_stream.bounded_push except threads never block in push() *) +type 'a t + +val create : int -> 'a t * ('a option -> unit option) +(** [create capacity] creates a stream which can contain at most + [capacity] elements *) + +val get_available : 'a t -> 'a list +(** [get_available t] returns all available elements from [t] without blocking *) + +val get : 'a t -> 'a option Lwt.t +(** [get t] returns an element from [t] *) + +val nget : int -> 'a t -> 'a list Lwt.t +(** [nget n t] returns [n] elements from [t] *) + +val size : 'a t -> int +(** [size t] return the number of enqueued elements *) diff --git a/ocaml/xapi-guard/src/dune b/ocaml/xapi-guard/src/dune index dfbf8e9a9e4..baac1d24101 100644 --- a/ocaml/xapi-guard/src/dune +++ b/ocaml/xapi-guard/src/dune @@ -1,5 +1,6 @@ (executable (name main) + (modules main) (libraries astring cmdliner diff --git a/ocaml/xapi-guard/test/cache_test.ml b/ocaml/xapi-guard/test/cache_test.ml new file mode 100644 index 00000000000..ce63c8cec8d --- /dev/null +++ b/ocaml/xapi-guard/test/cache_test.ml @@ -0,0 +1,156 @@ +let ( let@ ) f x = f x + +let ( let* ) = Lwt.bind + +module Tpm = Xapi_guard.Types.Tpm + +module TPMs = struct + let tpms_created = Atomic.make 1 + + let request_persist uuid write = + let __FUN = __FUNCTION__ in + + let key = Tpm.deserialize_key (Random.int 3) in + + let time = Mtime_clock.now () in + let serial_n = Atomic.fetch_and_add tpms_created 1 in + let contents = + Printf.sprintf "contents %s" (Mtime.to_uint64_ns time |> Int64.to_string) + in + let* () = + Logs_lwt.app (fun m -> + m "%s: Content № %i created: %a/%i/%a" __FUN serial_n Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp time + ) + in + write (uuid, time, key) contents +end + +let lwt_reporter () = + let buf_fmt ~like = + let b = Buffer.create 512 in + ( Fmt.with_buffer ~like b + , fun () -> + let m = Buffer.contents b in + Buffer.reset b ; m + ) + in + let app, app_flush = buf_fmt ~like:Fmt.stdout in + let dst, dst_flush = buf_fmt ~like:Fmt.stderr in + let reporter = Logs_fmt.reporter ~app ~dst () in + let report src level ~over k msgf = + let k () = + let write () = + match level with + | Logs.App -> + Lwt_io.write Lwt_io.stdout (app_flush ()) + | _ -> + Lwt_io.write Lwt_io.stderr (dst_flush ()) + in + let unblock () = over () ; Lwt.return_unit in + Lwt.finalize write unblock |> Lwt.ignore_result ; + k () + in + reporter.Logs.report src level ~over:(fun () -> ()) k msgf + in + {Logs.report} + +let setup_log level = + Logs.set_level level ; + Logs.set_reporter (lwt_reporter ()) ; + () + +let ok = Lwt_result.ok + +let retry_forever fname f = + let rec loop () = + let* () = + Lwt.catch f (function exn -> + let* () = + Logs_lwt.app (fun m -> + m "%s failed with %s, retrying..." fname (Printexc.to_string exn) + ) + in + Lwt_unix.sleep 0.5 + ) + in + loop () + [@@tailcall] + in + loop () + +let max_sent = 128 + +let received = ref 0 + +let throttled_reads = Mtime.Span.(200 * ms) + +let failing_writes_period = Mtime.Span.(500 * ms) + +let epoch = Mtime_clock.now () + +let should_fail () : bool = + let rec polarity elapsed = + if Mtime.Span.compare elapsed failing_writes_period < 0 then + true + else + not (polarity Mtime.Span.(abs_diff elapsed failing_writes_period)) + in + let elapsed = Mtime.span epoch (Mtime_clock.now ()) in + polarity elapsed + +let log (uuid, timestamp, key) content : (unit, exn) Result.t Lwt.t = + let __FUN = __FUNCTION__ in + let ( let* ) = Lwt_result.bind in + let maybe_fail () = + if should_fail () then + Lwt_result.fail + (failwith (Printf.sprintf {|oops, could not write '%s'|} content)) + else + Lwt_result.return () + in + let* () = maybe_fail () in + received := !received + 1 ; + Logs_lwt.app (fun m -> + m "%s Content № %i detected: %a/%i/%a" __FUN !received Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp timestamp + ) + |> ok + +let to_cache with_writer = + let __FUN = __FUNCTION__ in + let elapsed = Mtime_clock.counter () in + let rec loop_and_stop uuid sent () = + let sent = sent + 1 in + + let@ write_tpm = with_writer in + let* () = TPMs.request_persist uuid write_tpm in + if sent >= max_sent then + Logs_lwt.app (fun m -> + m "%s: Stopping requests after %i writes" __FUN sent + ) + else if Mtime.Span.compare (Mtime_clock.count elapsed) throttled_reads > 0 + then + let* () = Lwt_unix.sleep 0.1 in + loop_and_stop uuid sent () + else + let* () = Lwt.pause () in + loop_and_stop uuid sent () + in + List.init 4 (fun _ -> Uuidm.(v `V4)) + |> List.map (fun uuid -> loop_and_stop uuid 0 ()) + +let from_cache with_watcher = retry_forever "watcher" with_watcher + +let main () = + let with_writer, with_watcher = Xapi_guard.Disk_cache.(setup Swtpm log) in + let reader = from_cache with_watcher in + let writers = to_cache with_writer in + let* _ = Lwt.all (reader :: writers) in + Lwt.return_unit + +let () = + setup_log @@ Some Logs.Debug ; + Lwt_main.run (main ()) diff --git a/ocaml/xapi-guard/test/cache_test.mli b/ocaml/xapi-guard/test/cache_test.mli new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ocaml/xapi-guard/test/dune b/ocaml/xapi-guard/test/dune index 934256a9f7a..e082a47a690 100644 --- a/ocaml/xapi-guard/test/dune +++ b/ocaml/xapi-guard/test/dune @@ -1,6 +1,7 @@ (test (name xapi_guard_test) (modes exe) + (modules (:standard \ cache_test)) (libraries alcotest alcotest-lwt @@ -17,3 +18,18 @@ xen-api-client-lwt) (package varstored-guard) ) + +(executable + (name cache_test) + (modules cache_test) + (libraries + logs + logs.fmt + logs.lwt + lwt + lwt.unix + mtime + mtime.clock.os + uuidm + xapi_guard) + (preprocess (pps ppx_deriving_rpc))) From cfb7748dc2a36833a77970e2fafe5666159b13f5 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Fri, 16 Feb 2024 17:27:06 +0000 Subject: [PATCH 03/12] CA-383867: Delay conversion of VM's UUIDs to string This allows to pass the UUID directly to the on-disk cache that will be introduced Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/server_interface.ml | 11 +++++++---- ocaml/xapi-guard/src/main.ml | 2 +- ocaml/xapi-guard/test/xapi_guard_test.ml | 8 +++++--- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index 37e639f1192..aee1d242776 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -102,8 +102,9 @@ let serve_forever_lwt_callback rpc_fn path _ req body = let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = let get_vtpm_ref () = + let vm_uuid_str = Uuidm.to_string vm_uuid in let* vm = - with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid + with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid_str in let* vTPMs = with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_VTPMs ~self:vm in match vTPMs with @@ -113,7 +114,8 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = ignoring request" __FUNCTION__ ; let msg = - Printf.sprintf "No VTPM associated with VM %s, nothing to do" vm_uuid + Printf.sprintf "No VTPM associated with VM %s, nothing to do" + vm_uuid_str in raise (Failure msg) | self :: _ -> @@ -163,9 +165,10 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = (* Create a restricted RPC function and socket for a specific VM *) let make_server_varstored ~cache path vm_uuid = + let vm_uuid_str = Uuidm.to_string vm_uuid in let module Server = Xapi_idl_guard_varstored.Interface.RPC_API (Rpc_lwt.GenServer ()) in - let get_vm_ref () = with_xapi ~cache @@ VM.get_by_uuid ~uuid:vm_uuid in + let get_vm_ref () = with_xapi ~cache @@ VM.get_by_uuid ~uuid:vm_uuid_str in let ret v = (* TODO: maybe map XAPI exceptions *) Lwt.bind v Lwt.return_ok |> Rpc_lwt.T.put @@ -187,7 +190,7 @@ let make_server_varstored ~cache path vm_uuid = (let* (_ : _ Ref.t) = with_xapi ~cache @@ Message.create ~name:"VM_SECURE_BOOT_FAILED" ~priority ~cls:`VM - ~obj_uuid:vm_uuid ~body + ~obj_uuid:vm_uuid_str ~body in Lwt.return_unit ) diff --git a/ocaml/xapi-guard/src/main.ml b/ocaml/xapi-guard/src/main.ml index b80bf354516..6979665a8a8 100644 --- a/ocaml/xapi-guard/src/main.ml +++ b/ocaml/xapi-guard/src/main.ml @@ -110,7 +110,7 @@ let listen_for_vm {Persistent.vm_uuid; path; gid; typ} = (Types.Service.to_string typ) path vm_uuid_str ; let* () = safe_unlink path in - let* stop_server = make_server ~cache path vm_uuid_str in + let* stop_server = make_server ~cache path vm_uuid in let* () = log_fds () in Hashtbl.add sockets path (stop_server, (vm_uuid, gid, typ)) ; let* () = Lwt_unix.chmod path 0o660 in diff --git a/ocaml/xapi-guard/test/xapi_guard_test.ml b/ocaml/xapi-guard/test/xapi_guard_test.ml index 41ce8f6e347..c4996bef0c7 100644 --- a/ocaml/xapi-guard/test/xapi_guard_test.ml +++ b/ocaml/xapi-guard/test/xapi_guard_test.ml @@ -60,7 +60,9 @@ let xapi_rpc call = | _ -> Fmt.failwith "XAPI RPC call %s not expected in test" call.Rpc.name -let vm_uuid = Uuidx.(to_string (make ())) +let vm_uuid = Uuidm.v `V4 + +let vm_uuid_str = Uuidm.to_string vm_uuid let () = let old_hook = !Lwt.async_exception_hook in @@ -101,7 +103,7 @@ let with_rpc f switch () = let dict = Alcotest.(list @@ pair string string) let test_change_nvram ~rpc ~session_id () = - let* self = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid in + let* self = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid_str in let* nvram0 = VM.get_NVRAM ~rpc ~session_id ~self in Alcotest.(check' dict) ~msg:"nvram initial" ~expected:[] ~actual:nvram0 ; let contents = "nvramnew" in @@ -131,7 +133,7 @@ let test_bad_set_nvram ~rpc ~session_id () = let* () = VM.set_NVRAM_EFI_variables ~rpc ~session_id ~self:vm_bad ~value:"bad" in - let* vm_ref = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid in + let* vm_ref = VM.get_by_uuid ~rpc ~session_id ~uuid:vm_uuid_str in let* nvram = VM.get_NVRAM ~rpc ~session_id ~self:vm_ref in Alcotest.(check' dict) ~msg:"only managed to change own nvram" ~actual:nvram From 8582a708f6e1579be52740bbbf28f1761d32aeaa Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Fri, 16 Feb 2024 17:37:35 +0000 Subject: [PATCH 04/12] CA-383867: Segregate vtpm persistance out of the callback This allows to use the persistence function from outside the callback, which will be useful to thread into the on-disk cache Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/server_interface.ml | 62 +++++++++++++----------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index aee1d242776..7bd6e79a8b1 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -100,28 +100,37 @@ let serve_forever_lwt_callback rpc_fn path _ req body = in Cohttp_lwt_unix.Server.respond_string ~status:`Method_not_allowed ~body () -let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = - let get_vtpm_ref () = - let vm_uuid_str = Uuidm.to_string vm_uuid in - let* vm = - with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid_str - in - let* vTPMs = with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_VTPMs ~self:vm in - match vTPMs with - | [] -> - D.warn - "%s: received a request from a VM that has no VTPM associated, \ - ignoring request" - __FUNCTION__ ; - let msg = - Printf.sprintf "No VTPM associated with VM %s, nothing to do" - vm_uuid_str - in - raise (Failure msg) - | self :: _ -> - let* uuid = with_xapi ~cache @@ Xen_api_lwt_unix.VTPM.get_uuid ~self in - with_xapi ~cache @@ VTPM.get_by_uuid ~uuid +let with_xapi_vtpm ~cache vm_uuid = + let vm_uuid_str = Uuidm.to_string vm_uuid in + let* vm = + with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_by_uuid ~uuid:vm_uuid_str in + let* vTPMs = with_xapi ~cache @@ Xen_api_lwt_unix.VM.get_VTPMs ~self:vm in + match vTPMs with + | [] -> + D.warn + "%s: received a request from a VM that has no VTPM associated, \ + ignoring request" + __FUNCTION__ ; + let msg = + Printf.sprintf "No VTPM associated with VM %s, nothing to do" + vm_uuid_str + in + raise (Failure msg) + | self :: _ -> + let* uuid = with_xapi ~cache @@ Xen_api_lwt_unix.VTPM.get_uuid ~self in + with_xapi ~cache @@ VTPM.get_by_uuid ~uuid + +let push_vtpm ~cache vm_uuid path contents = + let* self = with_xapi_vtpm ~cache vm_uuid in + let* old_contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let contents = + Tpm.(old_contents |> deserialize |> update key contents |> serialize) + in + let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in + Lwt.return_unit + +let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = let uri = Cohttp.Request.uri req in (* in case the connection is interrupted/etc. we may still have pending operations, so use a per vTPM mutex to ensure we really only have 1 pending operation at a time for a vTPM @@ -130,7 +139,7 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = (* TODO: some logging *) match (Cohttp.Request.meth req, Uri.path uri) with | `GET, path when path <> "/" -> - let* self = get_vtpm_ref () in + let* self = with_xapi_vtpm ~cache vm_uuid in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in let key = Tpm.key_of_swtpm path in let body = Tpm.(contents |> deserialize |> lookup ~key) in @@ -140,17 +149,12 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = Cohttp_lwt_unix.Server.respond_string ~headers ~status:`OK ~body () | `PUT, path when path <> "/" -> let* body = Cohttp_lwt.Body.to_string body in - let* self = get_vtpm_ref () in - let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in let key = Tpm.key_of_swtpm path in - let contents = - Tpm.(contents |> deserialize |> update key body |> serialize) - in - let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in + let* () = push_vtpm ~cache vm_uuid key body in Cohttp_lwt_unix.Server.respond ~status:`No_content ~body:Cohttp_lwt.Body.empty () | `DELETE, path when path <> "/" -> - let* self = get_vtpm_ref () in + let* self = with_xapi_vtpm ~cache vm_uuid in let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in let key = Tpm.key_of_swtpm path in let contents = From b16a018a29c6ae0ef7ecc71ff69250fdae17eac2 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Fri, 9 Feb 2024 11:51:18 +0000 Subject: [PATCH 05/12] CA-383867: Integrate the cache into xapi-guard's main loop Now the process creates a thread to read from disk and push vtpm events to xapi at its own pace, and integrates the disk-writing part into the callback of the deprivileged sockets. Special consideration was taken for the resume, when the deprivileged sockets and the write-to-cache function need to be integrated in a different way from the codepath that creates the sockets from the message-switch server. Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/server_interface.ml | 19 ++-- ocaml/xapi-guard/src/main.ml | 115 ++++++++++++++++------- ocaml/xapi-guard/test/xapi_guard_test.ml | 3 +- 3 files changed, 92 insertions(+), 45 deletions(-) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index 7bd6e79a8b1..1ca19a68820 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -51,8 +51,8 @@ let () = * this is only needed for syscalls that would otherwise block *) Lwt_unix.set_pool_size 16 -let with_xapi ~cache f = - Lwt_unix.with_timeout 120. (fun () -> SessionCache.with_session cache f) +let with_xapi ~cache ?(timeout = 120.) f = + Lwt_unix.with_timeout timeout (fun () -> SessionCache.with_session cache f) let serve_forever_lwt path callback = let conn_closed _ = () in @@ -121,17 +121,18 @@ let with_xapi_vtpm ~cache vm_uuid = let* uuid = with_xapi ~cache @@ Xen_api_lwt_unix.VTPM.get_uuid ~self in with_xapi ~cache @@ VTPM.get_by_uuid ~uuid -let push_vtpm ~cache vm_uuid path contents = +let push_vtpm ~cache (vm_uuid, _timestamp, key) contents = let* self = with_xapi_vtpm ~cache vm_uuid in let* old_contents = with_xapi ~cache @@ VTPM.get_contents ~self in let contents = Tpm.(old_contents |> deserialize |> update key contents |> serialize) in let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in - Lwt.return_unit + Lwt_result.return () -let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = +let serve_forever_lwt_callback_vtpm ~cache mutex persist vm_uuid _ req body = let uri = Cohttp.Request.uri req in + let timestamp = Mtime_clock.now () in (* in case the connection is interrupted/etc. we may still have pending operations, so use a per vTPM mutex to ensure we really only have 1 pending operation at a time for a vTPM *) @@ -150,7 +151,7 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = | `PUT, path when path <> "/" -> let* body = Cohttp_lwt.Body.to_string body in let key = Tpm.key_of_swtpm path in - let* () = push_vtpm ~cache vm_uuid key body in + let* () = persist (vm_uuid, timestamp, key) body in Cohttp_lwt_unix.Server.respond ~status:`No_content ~body:Cohttp_lwt.Body.empty () | `DELETE, path when path <> "/" -> @@ -168,7 +169,7 @@ let serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid _ req body = Cohttp_lwt_unix.Server.respond_string ~status:`Method_not_allowed ~body () (* Create a restricted RPC function and socket for a specific VM *) -let make_server_varstored ~cache path vm_uuid = +let make_server_varstored _persist ~cache path vm_uuid = let vm_uuid_str = Uuidm.to_string vm_uuid in let module Server = Xapi_idl_guard_varstored.Interface.RPC_API (Rpc_lwt.GenServer ()) in @@ -211,7 +212,7 @@ let make_server_varstored ~cache path vm_uuid = serve_forever_lwt_callback (Rpc_lwt.server Server.implementation) path |> serve_forever_lwt path -let make_server_vtpm_rest ~cache path vm_uuid = +let make_server_vtpm_rest persist ~cache path vm_uuid = let mutex = Lwt_mutex.create () in - let callback = serve_forever_lwt_callback_vtpm ~cache mutex vm_uuid in + let callback = serve_forever_lwt_callback_vtpm ~cache mutex persist vm_uuid in serve_forever_lwt path callback diff --git a/ocaml/xapi-guard/src/main.ml b/ocaml/xapi-guard/src/main.ml index 6979665a8a8..cfb3d3f258a 100644 --- a/ocaml/xapi-guard/src/main.ml +++ b/ocaml/xapi-guard/src/main.ml @@ -18,6 +18,8 @@ open Xapi_guard_server module Types = Xapi_guard.Types module SessionCache = Xen_api_lwt_unix.SessionCache +let ( let@ ) f x = f x + let daemon_name = "xapi-guard" module D = Debug.Make (struct let name = daemon_name end) @@ -97,7 +99,7 @@ let () = Xen_api_lwt_unix.SessionCache.destroy cache ) -let listen_for_vm {Persistent.vm_uuid; path; gid; typ} = +let listen_for_vm write_push {Persistent.vm_uuid; path; gid; typ} = let make_server = match typ with | Varstored -> @@ -110,19 +112,25 @@ let listen_for_vm {Persistent.vm_uuid; path; gid; typ} = (Types.Service.to_string typ) path vm_uuid_str ; let* () = safe_unlink path in - let* stop_server = make_server ~cache path vm_uuid in + let* stop_server = make_server write_push ~cache path vm_uuid in let* () = log_fds () in Hashtbl.add sockets path (stop_server, (vm_uuid, gid, typ)) ; let* () = Lwt_unix.chmod path 0o660 in Lwt_unix.chown path 0 gid -let resume () = +let resume ~vtpm_write_push ~uefi_write_push () = let* vms = Persistent.loadfrom recover_path in - let+ () = Lwt_list.iter_p listen_for_vm vms in + let listen_to_vm = function + | Persistent.{typ= Varstored; _} as vm -> + listen_for_vm uefi_write_push vm + | Persistent.{typ= Swtpm; _} as vm -> + listen_for_vm vtpm_write_push vm + in + let+ () = Lwt_list.iter_p listen_to_vm vms in D.debug "%s: completed" __FUNCTION__ (* caller here is trusted (xenopsd through message-switch) *) -let depriv_varstored_create dbg vm_uuid gid path = +let depriv_varstored_create write_push dbg vm_uuid gid path = if Hashtbl.mem sockets path then Lwt.return_error (Xapi_idl_guard_privileged.Interface.InternalError @@ -134,7 +142,9 @@ let depriv_varstored_create dbg vm_uuid gid path = @@ ( D.debug "[%s] creating deprivileged socket at %s, owned by group %d" dbg path gid ; - let* () = listen_for_vm {Persistent.path; vm_uuid; gid; typ= Varstored} in + let* () = + listen_for_vm write_push {Persistent.path; vm_uuid; gid; typ= Varstored} + in store_args sockets ) @@ -156,7 +166,7 @@ let depriv_varstored_destroy dbg gid path = D.debug "[%s] stopped server for gid %d and removed socket" dbg gid ; Lwt.return_unit -let depriv_swtpm_create dbg vm_uuid gid path = +let depriv_swtpm_create write_push dbg vm_uuid gid path = if Hashtbl.mem sockets path then Lwt.return_error (Xapi_idl_guard_privileged.Interface.InternalError @@ -168,7 +178,9 @@ let depriv_swtpm_create dbg vm_uuid gid path = @@ ( D.debug "[%s] creating deprivileged socket at %s, owned by group %d" dbg path gid ; - let* () = listen_for_vm {Persistent.path; vm_uuid; gid; typ= Swtpm} in + let* () = + listen_for_vm write_push {Persistent.path; vm_uuid; gid; typ= Swtpm} + in store_args sockets ) @@ -197,6 +209,9 @@ let depriv_swtpm_destroy dbg gid path = Lwt.return_unit (* TODO: these 2 APIs need to be updated to go through the generic interface *) +(* These 2 functions are only reachable from message-switch. They are part of + the control plane and be called when xapi controls the lifecycle of a VM, so + it's OK to assume it's available. *) let vtpm_set_contents dbg vtpm_uuid contents = let open Xen_api_lwt_unix in @@ -215,55 +230,85 @@ let vtpm_get_contents _dbg vtpm_uuid = @@ let* self = Server_interface.with_xapi ~cache @@ VTPM.get_by_uuid ~uuid in Server_interface.with_xapi ~cache @@ VTPM.get_contents ~self -let rpc_fn = +let rpc_fn ~vtpm_write_push ~uefi_write_push = let module Server = Xapi_idl_guard_privileged.Interface.RPC_API (Rpc_lwt.GenServer ()) in (* bind APIs *) - Server.varstore_create depriv_varstored_create ; + Server.varstore_create (depriv_varstored_create uefi_write_push) ; Server.varstore_destroy depriv_varstored_destroy ; - Server.vtpm_create depriv_swtpm_create ; + Server.vtpm_create (depriv_swtpm_create vtpm_write_push) ; Server.vtpm_destroy depriv_swtpm_destroy ; Server.vtpm_set_contents vtpm_set_contents ; Server.vtpm_get_contents vtpm_get_contents ; Rpc_lwt.server Server.implementation -let process body = +let process ~vtpm_write_push ~uefi_write_push body = let+ response = Xapi_guard.Dorpc.wrap_rpc Xapi_idl_guard_privileged.Interface.E.error (fun () -> let call = Jsonrpc.call_of_string body in D.debug "Received request from message-switch, method %s" call.Rpc.name ; - rpc_fn call + rpc_fn ~vtpm_write_push ~uefi_write_push call ) in Jsonrpc.string_of_response response +let retry_forever fname f = + let rec loop () = + let* () = + Lwt.catch f (function exn -> + D.info "%s failed with %s, retrying..." fname (Printexc.to_string exn) ; + Lwt_unix.sleep 0.5 + ) + in + (loop [@tailcall]) () + in + loop () + +let cache_reader with_watcher = retry_forever "cache watcher" with_watcher + let make_message_switch_server () = + let with_swtpm_push, with_watch = + Xapi_guard.Disk_cache.(setup Swtpm (Server_interface.push_vtpm ~cache)) + in let open Message_switch_lwt.Protocol_lwt in let wait_server, server_stopped = Lwt.task () in - let* result = - Server.listen ~process ~switch:!Xcp_client.switch_path - ~queue:Xapi_idl_guard_privileged.Interface.queue_name () + let@ vtpm_write_push = with_swtpm_push in + let uefi_write_push _ _ = + (* This is unused for the time being, added to be consistent with both + interfaces *) + Lwt.return_unit in - match Server.error_to_msg result with - | Ok t -> - Lwt_switch.add_hook (Some Server_interface.shutdown) (fun () -> - D.debug "Stopping message-switch queue server" ; - let+ () = Server.shutdown ~t () in - Lwt.wakeup server_stopped () - ) ; - (* best effort resume *) - let* () = - Lwt.catch resume (fun e -> - D.log_backtrace () ; - D.warn "Resume failed: %s" (Printexc.to_string e) ; - Lwt.return_unit - ) - in - wait_server - | Error (`Msg m) -> - Lwt.fail_with - (Printf.sprintf "Failed to listen on message-switch queue: %s" m) + let server = + let* result = + Server.listen + ~process:(process ~vtpm_write_push ~uefi_write_push) + ~switch:!Xcp_client.switch_path + ~queue:Xapi_idl_guard_privileged.Interface.queue_name () + in + match Server.error_to_msg result with + | Ok t -> + Lwt_switch.add_hook (Some Server_interface.shutdown) (fun () -> + D.debug "Stopping message-switch queue server" ; + let+ () = Server.shutdown ~t () in + Lwt.wakeup server_stopped () + ) ; + (* best effort resume *) + let* () = + Lwt.catch (resume ~vtpm_write_push ~uefi_write_push) (fun e -> + D.log_backtrace () ; + D.warn "Resume failed: %s" (Printexc.to_string e) ; + Lwt.return_unit + ) + in + wait_server + | Error (`Msg m) -> + Lwt.fail_with + (Printf.sprintf "Failed to listen on message-switch queue: %s" m) + in + let reader = cache_reader with_watch in + let* _ = Lwt.all [server; reader] in + Lwt.return_unit let main log_level = Debug.set_level log_level ; diff --git a/ocaml/xapi-guard/test/xapi_guard_test.ml b/ocaml/xapi-guard/test/xapi_guard_test.ml index c4996bef0c7..86efb713d29 100644 --- a/ocaml/xapi-guard/test/xapi_guard_test.ml +++ b/ocaml/xapi-guard/test/xapi_guard_test.ml @@ -80,9 +80,10 @@ let with_rpc f switch () = in (Lwt_switch.add_hook (Some switch) @@ fun () -> SessionCache.destroy cache) ; let path = Filename.concat tmp "socket" in + let push_nothing _ = Lwt_result.return () in (* Create an internal server on 'path', the socket that varstored would connect to *) let* stop_server = - Server_interface.make_server_varstored ~cache path vm_uuid + Server_interface.make_server_varstored push_nothing ~cache path vm_uuid in (* rpc simulates what varstored would do *) let uri = Uri.make ~scheme:"file" ~path () |> Uri.to_string in From 77d2b1dea6e5f5932b29e3d3e9ae4de294a873bc Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Tue, 20 Feb 2024 15:43:21 +0000 Subject: [PATCH 06/12] xapi-guard: reduce the number of calls to fetch the vTPM ref Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/server_interface.ml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index 1ca19a68820..ced743e501f 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -118,8 +118,7 @@ let with_xapi_vtpm ~cache vm_uuid = in raise (Failure msg) | self :: _ -> - let* uuid = with_xapi ~cache @@ Xen_api_lwt_unix.VTPM.get_uuid ~self in - with_xapi ~cache @@ VTPM.get_by_uuid ~uuid + Lwt.return self let push_vtpm ~cache (vm_uuid, _timestamp, key) contents = let* self = with_xapi_vtpm ~cache vm_uuid in From bf0e9c0bf38f4b3aab780c222d83a8865ca5d59b Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Fri, 23 Feb 2024 15:23:48 +0000 Subject: [PATCH 07/12] CA-383867: Add startup procedure to xapi-guard Because timestamps depend on a monotonic timestamp that depends on boot, files need to be renamed to ensure future writes have higher timestamps to be considered newer and be uploaded to xapi. On top of this, allows to report about remnant temporary files, delete invalid files and remove empty directories. Signed-off-by: Pau Ruiz Safont --- doc/content/xapi-guard/_index.md | 14 +++++ ocaml/xapi-guard/lib/disk_cache.ml | 85 +++++++++++++++++++++++++++-- ocaml/xapi-guard/lib/disk_cache.mli | 4 +- ocaml/xapi-guard/src/main.ml | 2 +- ocaml/xapi-guard/test/cache_test.ml | 5 +- 5 files changed, 101 insertions(+), 9 deletions(-) diff --git a/doc/content/xapi-guard/_index.md b/doc/content/xapi-guard/_index.md index 39d64a558e5..433f92f9d5e 100644 --- a/doc/content/xapi-guard/_index.md +++ b/doc/content/xapi-guard/_index.md @@ -35,6 +35,9 @@ This situation usually happens when xapi is being restarted as part of an update SWTPM, the vTPM daemon, reads the contents of the TPM from xapi-guard on startup, suspend, and resume. During normal operation SWTPM does not send read requests from xapi-guard. +Structure +--------- + The cache module consists of two Lwt threads, one that writes to disk, and another one that reads from disk. The writer is triggered when a VM writes to the vTPM. It never blocks if xapi is unreachable, but responds as soon as the data has been stored either by xapi or on the local disk, such that the VM receives a timely response to the write request. @@ -82,3 +85,14 @@ stateDiagram-v2 Engaged --> Engaged : Writer receives TPM, queue is not full Engaged --> Disengaged : Writer receives TPM, queue is full ``` + +Startup +------ + +At startup, there's a dedicated routine to transform the existing contents of the cache. +This is currently done because the timestamp reference change on each boot. +This means that the existing contents might have timestamps considered more recent than timestamps of writes coming from running events, leading to missing content updates. +This must be avoided and instead the updates with offending timestamps are renamed to a timestamp taken from the current timestamp, ensuring a consistent +ordering. +The routine is also used to keep a minimal file tree: unrecognised files are deleted, temporary files created to ensure atomic writes are left untouched, and empty directories are deleted. +This mechanism can be changed in the future to migrate to other formats. diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index b972654f085..5ededfbc81b 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -107,7 +107,7 @@ let get_all_contents root = let file = if found then Outdated valid_file else Latest valid_file in - loop (file :: acc, false) others + loop (file :: acc, true) others ) in let ordered = List.fast_sort (fun x y -> String.compare y x) contents in @@ -180,7 +180,6 @@ type channel = { IDEA: carryover: read contents of cache and "convert it" to the current run TODO: - - Add startup step to convert existing content to new time - Exponential backoff on xapi push error - Limit error logging on xapi push error: once per downtime is enough *) @@ -442,8 +441,86 @@ end = struct loop end +(** Module use to change the cache contents before the reader and writer start + running *) +module Setup : sig + val retime_cache_contents : Types.Service.t -> unit Lwt.t +end = struct + type file_action = + | Keep of file + | Delete of string + | Move of {from: string; into: string} + + let get_fs_action root now = function + | Latest ((uuid, timestamp, key), from) as latest -> + if Mtime.is_later ~than:now timestamp then + let timestamp = now in + let into = path_of_key root (uuid, timestamp, key) in + Move {from; into} + else + Keep latest + | Temporary _ as temp -> + Keep temp + | Invalid p | Outdated (_, p) -> + Delete p + + let commit __FUN = function + | Keep (Temporary p) -> + D.warn "%s: Found temporary file, ignoring '%s'" __FUN p ; + Lwt.return_unit + | Keep _ -> + Lwt.return_unit + | Delete p -> + D.info "%s: Deleting '%s'" __FUN p ; + Lwt_unix.unlink p + | Move {from; into} -> + D.info "%s: Moving '%s' to '%s'" __FUN from into ; + Lwt_unix.rename from into + + let rec delete_empty_dirs ~delete_root root = + (* Delete subdirectories, then *) + let* files = files_in root ~otherwise:(fun _ -> Lwt.return []) in + let* () = + Lwt_list.iter_p + (fun path -> + let* {st_kind; _} = Lwt_unix.stat path in + match st_kind with + | S_DIR -> + delete_empty_dirs ~delete_root:true path + | _ -> + Lwt.return_unit + ) + files + in + if not delete_root then + Lwt.return_unit + else + let* files = files_in root ~otherwise:(fun _ -> Lwt.return []) in + Lwt.catch + (fun () -> + if files = [] then + Lwt_unix.rmdir root + else + Lwt.return_unit + ) + (fun _ -> Lwt.return_unit) + + (* The code assumes it's the only with access to the disk cache while running *) + let retime_cache_contents typ = + let now = Mtime_clock.now () in + let root = cache_of typ in + let* contents = get_all_contents root in + let* () = + contents + |> List.map (get_fs_action root now) + |> Lwt_list.iter_p (commit __FUNCTION__) + in + delete_empty_dirs ~delete_root:false root +end + let setup typ direct = - let queue, push = Lwt_bounded_stream.create 4098 in + let* () = Setup.retime_cache_contents typ in + let queue, push = Lwt_bounded_stream.create 2 in let lock = Lwt_mutex.create () in let q = {queue; push; lock; state= Disengaged} in - (Writer.with_cache ~direct typ q, Watcher.watch ~direct typ q) + Lwt.return (Writer.with_cache ~direct typ q, Watcher.watch ~direct typ q) diff --git a/ocaml/xapi-guard/lib/disk_cache.mli b/ocaml/xapi-guard/lib/disk_cache.mli index 0f387d4ebd1..c8614bff31b 100644 --- a/ocaml/xapi-guard/lib/disk_cache.mli +++ b/ocaml/xapi-guard/lib/disk_cache.mli @@ -18,8 +18,10 @@ type t = Uuidm.t * Mtime.t * Types.Tpm.key val setup : Types.Service.t -> (t -> string -> (unit, exn) Lwt_result.t) - -> (((t -> string -> unit Lwt.t) -> 'a Lwt.t) -> 'a Lwt.t) + -> ( (((t -> string -> unit Lwt.t) -> 'a Lwt.t) -> 'a Lwt.t) * (unit -> unit Lwt.t) + ) + Lwt.t (** [setup service push_callback] Returns a local disk buffer for [service] which will use [push_callback] to push the elements to their final destination *) diff --git a/ocaml/xapi-guard/src/main.ml b/ocaml/xapi-guard/src/main.ml index cfb3d3f258a..f0e57121f07 100644 --- a/ocaml/xapi-guard/src/main.ml +++ b/ocaml/xapi-guard/src/main.ml @@ -268,7 +268,7 @@ let retry_forever fname f = let cache_reader with_watcher = retry_forever "cache watcher" with_watcher let make_message_switch_server () = - let with_swtpm_push, with_watch = + let* with_swtpm_push, with_watch = Xapi_guard.Disk_cache.(setup Swtpm (Server_interface.push_vtpm ~cache)) in let open Message_switch_lwt.Protocol_lwt in diff --git a/ocaml/xapi-guard/test/cache_test.ml b/ocaml/xapi-guard/test/cache_test.ml index ce63c8cec8d..e171de6b635 100644 --- a/ocaml/xapi-guard/test/cache_test.ml +++ b/ocaml/xapi-guard/test/cache_test.ml @@ -75,8 +75,7 @@ let retry_forever fname f = Lwt_unix.sleep 0.5 ) in - loop () - [@@tailcall] + (loop [@tailcall]) () in loop () @@ -145,7 +144,7 @@ let to_cache with_writer = let from_cache with_watcher = retry_forever "watcher" with_watcher let main () = - let with_writer, with_watcher = Xapi_guard.Disk_cache.(setup Swtpm log) in + let* with_writer, with_watcher = Xapi_guard.Disk_cache.(setup Swtpm log) in let reader = from_cache with_watcher in let writers = to_cache with_writer in let* _ = Lwt.all (reader :: writers) in From 4a67a207ad4b1601f52f09f909067972741183d2 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Tue, 27 Feb 2024 13:19:45 +0000 Subject: [PATCH 08/12] CA-383867: Prepare xapi-guard cache to change fallback on write error This is needed to a be able to disable the disk cache completely, maintaining previous behaviour if needed. Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/disk_cache.ml | 55 +++++++++++++++++++----------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index 5ededfbc81b..ddc78a334be 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -241,8 +241,11 @@ end = struct let* outdated_contents = files_in_existing key_dir in let filename = key_dir // (Mtime.to_uint64_ns now |> Int64.to_string) in - (* 2. Write new timestamped content to cache, atomically, if needed; and - notify the other side, if needed *) + + (* 2. Try to push the changes, if possible. If it's not possible because of + the mode or a failure, write new timestamped content to cache, + atomically; and finally notify the other side if needed *) + (* Note that all queue operations must use while holding its mutex *) let persist () = persist_to ~filename ~contents in let persist_and_push () = let push () = @@ -260,27 +263,39 @@ end = struct let engage_and_persist exn = queue.state <- Engaged ; D.info "%s: Error on push. Reason: %s" __FUN (Printexc.to_string exn) ; - persist_and_push () + let* () = persist_and_push () in + Lwt_result.return () + in + let _fail exn = + Debug.log_backtrace exn (Backtrace.get exn) ; + Lwt_result.fail exn + in + let read_state_and_push on_exception () = + match queue.state with + | Direct -> + let* result = + Lwt.try_bind + (fun () -> direct (uuid, now, key) contents) + (function + | Ok () -> Lwt_result.return () | Error exn -> on_exception exn + ) + on_exception + in + Lwt.return result + | Engaged -> + let* () = persist_and_push () in + Lwt_result.return () + | Disengaged -> + let* () = persist () in + Lwt_result.return () in + let on_exception = engage_and_persist in + + let* result = with_lock queue.lock (read_state_and_push on_exception) in let* () = - with_lock queue.lock (fun () -> - match queue.state with - | Direct -> - Lwt.try_bind - (fun () -> direct (uuid, now, key) contents) - (function - | Ok () -> - Lwt.return_unit - | Error exn -> - engage_and_persist exn - ) - (function exn -> engage_and_persist exn) - | Engaged -> - persist_and_push () - | Disengaged -> - persist () - ) + match result with Ok () -> Lwt.return_unit | Error exn -> raise exn in + (* 4. Delete previous requests from filesystem *) let* _ = Lwt_list.map_p unlink_safe outdated_contents in Lwt.return_unit From b80357e5c4e4a7db23ff3576549ba9737c92007d Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Tue, 27 Feb 2024 15:14:32 +0000 Subject: [PATCH 09/12] CA-383867: Allow xapi-guard to disable the disk cache This is done through the fist point. Xapi_fist is not used directly because it needs to a new opam package, creating a lot of churn which is currently unwanted. Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/disk_cache.ml | 14 ++++++++++++-- ocaml/xapi/xapi_fist.ml | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index ddc78a334be..60911410b8d 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -27,6 +27,15 @@ type t = Uuidm.t * Mtime.t * Types.Tpm.key let cache_of service = runtime_data // Types.Service.to_string service +let fistpoint () = + let name = "/tmp/fist_disable_xapi_guard_cache" in + Lwt.catch + (fun () -> + let* () = Lwt_unix.access name [Unix.F_OK] in + Lwt.return true + ) + (fun _ -> Lwt.return false) + let files_in dir ~otherwise = Lwt.catch (fun () -> @@ -266,7 +275,7 @@ end = struct let* () = persist_and_push () in Lwt_result.return () in - let _fail exn = + let fail exn = Debug.log_backtrace exn (Backtrace.get exn) ; Lwt_result.fail exn in @@ -289,7 +298,8 @@ end = struct let* () = persist () in Lwt_result.return () in - let on_exception = engage_and_persist in + let* cache_disabled = fistpoint () in + let on_exception = if cache_disabled then fail else engage_and_persist in let* result = with_lock queue.lock (read_state_and_push on_exception) in let* () = diff --git a/ocaml/xapi/xapi_fist.ml b/ocaml/xapi/xapi_fist.ml index 7798713e494..4f211185a92 100644 --- a/ocaml/xapi/xapi_fist.ml +++ b/ocaml/xapi/xapi_fist.ml @@ -160,3 +160,5 @@ let int_seed name : int option = let exchange_certificates_in_pool () : int option = let name = "exchange_certificates_in_pool" in int_seed name + +let disable_xapi_guard_cache () = fistpoint "disable_xapi_guard_cache" From 196c88c7701c6a34519be0c9e748d0cff3c9f7ba Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Wed, 28 Feb 2024 16:41:48 +0000 Subject: [PATCH 10/12] CA-383867: Make xapi-guard's cache aware of read requests Now all domains' vtpm read requests go through the cache. The read function is the same as before. There is no change in behaviour Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/disk_cache.ml | 50 +++++++++---- ocaml/xapi-guard/lib/disk_cache.mli | 12 ++-- ocaml/xapi-guard/lib/server_interface.ml | 19 +++-- ocaml/xapi-guard/src/main.ml | 42 ++++++----- ocaml/xapi-guard/test/cache_test.ml | 91 ++++++++++++++++++------ 5 files changed, 150 insertions(+), 64 deletions(-) diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index 60911410b8d..60ac2f1d0fa 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -195,19 +195,21 @@ type channel = { module Writer : sig val with_cache : - direct:(t -> string -> (unit, exn) Lwt_result.t) + direct: + (t -> (string, exn) Lwt_result.t) + * (t -> string -> (unit, exn) Lwt_result.t) -> Types.Service.t -> channel - -> ((t -> string -> unit Lwt.t) -> 'a Lwt.t) + -> ((t -> string Lwt.t) * (t -> string -> unit Lwt.t) -> 'a Lwt.t) -> 'a Lwt.t (** [with_cache ~direct typ queue context] creates a cache for content of - type [typ]. The cache is writable through the function [context], which - is provided a writing function to persist to the cache. It uses [channel] - to push events to + type [typ]. The cache is readable and writable through the function + [context], which is provided a reading and writing functions [direct]. + It uses [channel] to push events to Example: - Xapi_guard.Disk_cache.(Writer.with_cache ~direct:(upload session_cache) Tpm channel) - @@ fun write_tpm -> write_tpm (uuid, time, key) contents + Xapi_guard.Disk_cache.(Writer.with_cache ~direct:(read, upload) Tpm channel) + @@ fun read_tpm, write_tpm -> write_tpm (uuid, time, key) contents *) end = struct let mkdir_p ?(perm = 0o755) path = @@ -241,9 +243,30 @@ end = struct in files_in dir ~otherwise:create_dir + let fail exn = + Debug.log_backtrace exn (Backtrace.get exn) ; + Lwt_result.fail exn + + let read_contents ~direct _root (uuid, now, key) = + let read, _ = direct in + let* result = + Lwt.try_bind + (fun () -> read (uuid, now, key)) + (function + | Ok contents -> Lwt_result.return contents | Error exn -> fail exn + ) + fail + in + match result with + | Ok contents -> + Lwt.return contents + | Error exn -> + raise exn + let write_contents ~direct root queue (uuid, now, key) contents = let __FUN = __FUNCTION__ in + let _, direct = direct in let key_str = Types.Tpm.(serialize_key key |> string_of_int) in let key_dir = root // Uuidm.(to_string uuid) // key_str in (* 1. Record existing requests in cache *) @@ -275,10 +298,6 @@ end = struct let* () = persist_and_push () in Lwt_result.return () in - let fail exn = - Debug.log_backtrace exn (Backtrace.get exn) ; - Lwt_result.fail exn - in let read_state_and_push on_exception () = match queue.state with | Direct -> @@ -313,7 +332,7 @@ end = struct let with_cache ~direct typ queue f = let root = cache_of typ in let* () = mkdir_p root ~perm:0o700 in - f (write_contents ~direct root queue) + f (read_contents ~direct root, write_contents ~direct root queue) end module Watcher : sig @@ -543,9 +562,12 @@ end = struct delete_empty_dirs ~delete_root:false root end -let setup typ direct = +let setup typ read write = let* () = Setup.retime_cache_contents typ in let queue, push = Lwt_bounded_stream.create 2 in let lock = Lwt_mutex.create () in let q = {queue; push; lock; state= Disengaged} in - Lwt.return (Writer.with_cache ~direct typ q, Watcher.watch ~direct typ q) + Lwt.return + ( Writer.with_cache ~direct:(read, write) typ q + , Watcher.watch ~direct:write typ q + ) diff --git a/ocaml/xapi-guard/lib/disk_cache.mli b/ocaml/xapi-guard/lib/disk_cache.mli index c8614bff31b..08c345615f5 100644 --- a/ocaml/xapi-guard/lib/disk_cache.mli +++ b/ocaml/xapi-guard/lib/disk_cache.mli @@ -17,11 +17,15 @@ type t = Uuidm.t * Mtime.t * Types.Tpm.key val setup : Types.Service.t + -> (t -> (string, exn) Lwt_result.t) -> (t -> string -> (unit, exn) Lwt_result.t) - -> ( (((t -> string -> unit Lwt.t) -> 'a Lwt.t) -> 'a Lwt.t) + -> ( ( ((t -> string Lwt.t) * (t -> string -> unit Lwt.t) -> 'a Lwt.t) + -> 'a Lwt.t + ) * (unit -> unit Lwt.t) ) Lwt.t -(** [setup service push_callback] Returns a local disk buffer for [service] - which will use [push_callback] to push the elements to their final - destination *) +(** [setup service read_callback push_callback] Returns a local disk buffer for + [service] which will use [push_callback] to push the elements to their + final destination and [read_callback] to read elements if they are not in + the buffer. *) diff --git a/ocaml/xapi-guard/lib/server_interface.ml b/ocaml/xapi-guard/lib/server_interface.ml index ced743e501f..0884c2bf1b2 100644 --- a/ocaml/xapi-guard/lib/server_interface.ml +++ b/ocaml/xapi-guard/lib/server_interface.ml @@ -129,7 +129,14 @@ let push_vtpm ~cache (vm_uuid, _timestamp, key) contents = let* () = with_xapi ~cache @@ VTPM.set_contents ~self ~contents in Lwt_result.return () -let serve_forever_lwt_callback_vtpm ~cache mutex persist vm_uuid _ req body = +let read_vtpm ~cache (vm_uuid, _timestamp, key) = + let* self = with_xapi_vtpm ~cache vm_uuid in + let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in + let body = Tpm.(contents |> deserialize |> lookup ~key) in + Lwt_result.return body + +let serve_forever_lwt_callback_vtpm ~cache mutex (read, persist) vm_uuid _ req + body = let uri = Cohttp.Request.uri req in let timestamp = Mtime_clock.now () in (* in case the connection is interrupted/etc. we may still have pending operations, @@ -139,10 +146,8 @@ let serve_forever_lwt_callback_vtpm ~cache mutex persist vm_uuid _ req body = (* TODO: some logging *) match (Cohttp.Request.meth req, Uri.path uri) with | `GET, path when path <> "/" -> - let* self = with_xapi_vtpm ~cache vm_uuid in - let* contents = with_xapi ~cache @@ VTPM.get_contents ~self in let key = Tpm.key_of_swtpm path in - let body = Tpm.(contents |> deserialize |> lookup ~key) in + let* body = read (vm_uuid, timestamp, key) in let headers = Cohttp.Header.of_list [("Content-Type", "application/octet-stream")] in @@ -211,7 +216,9 @@ let make_server_varstored _persist ~cache path vm_uuid = serve_forever_lwt_callback (Rpc_lwt.server Server.implementation) path |> serve_forever_lwt path -let make_server_vtpm_rest persist ~cache path vm_uuid = +let make_server_vtpm_rest read_write ~cache path vm_uuid = let mutex = Lwt_mutex.create () in - let callback = serve_forever_lwt_callback_vtpm ~cache mutex persist vm_uuid in + let callback = + serve_forever_lwt_callback_vtpm ~cache mutex read_write vm_uuid + in serve_forever_lwt path callback diff --git a/ocaml/xapi-guard/src/main.ml b/ocaml/xapi-guard/src/main.ml index f0e57121f07..9fb40aa038b 100644 --- a/ocaml/xapi-guard/src/main.ml +++ b/ocaml/xapi-guard/src/main.ml @@ -99,7 +99,7 @@ let () = Xen_api_lwt_unix.SessionCache.destroy cache ) -let listen_for_vm write_push {Persistent.vm_uuid; path; gid; typ} = +let listen_for_vm read_write {Persistent.vm_uuid; path; gid; typ} = let make_server = match typ with | Varstored -> @@ -112,19 +112,19 @@ let listen_for_vm write_push {Persistent.vm_uuid; path; gid; typ} = (Types.Service.to_string typ) path vm_uuid_str ; let* () = safe_unlink path in - let* stop_server = make_server write_push ~cache path vm_uuid in + let* stop_server = make_server read_write ~cache path vm_uuid in let* () = log_fds () in Hashtbl.add sockets path (stop_server, (vm_uuid, gid, typ)) ; let* () = Lwt_unix.chmod path 0o660 in Lwt_unix.chown path 0 gid -let resume ~vtpm_write_push ~uefi_write_push () = +let resume ~vtpm_read_write ~uefi_read_write () = let* vms = Persistent.loadfrom recover_path in let listen_to_vm = function | Persistent.{typ= Varstored; _} as vm -> - listen_for_vm uefi_write_push vm + listen_for_vm uefi_read_write vm | Persistent.{typ= Swtpm; _} as vm -> - listen_for_vm vtpm_write_push vm + listen_for_vm vtpm_read_write vm in let+ () = Lwt_list.iter_p listen_to_vm vms in D.debug "%s: completed" __FUNCTION__ @@ -166,7 +166,7 @@ let depriv_varstored_destroy dbg gid path = D.debug "[%s] stopped server for gid %d and removed socket" dbg gid ; Lwt.return_unit -let depriv_swtpm_create write_push dbg vm_uuid gid path = +let depriv_swtpm_create read_write dbg vm_uuid gid path = if Hashtbl.mem sockets path then Lwt.return_error (Xapi_idl_guard_privileged.Interface.InternalError @@ -179,7 +179,7 @@ let depriv_swtpm_create write_push dbg vm_uuid gid path = ( D.debug "[%s] creating deprivileged socket at %s, owned by group %d" dbg path gid ; let* () = - listen_for_vm write_push {Persistent.path; vm_uuid; gid; typ= Swtpm} + listen_for_vm read_write {Persistent.path; vm_uuid; gid; typ= Swtpm} in store_args sockets ) @@ -230,25 +230,25 @@ let vtpm_get_contents _dbg vtpm_uuid = @@ let* self = Server_interface.with_xapi ~cache @@ VTPM.get_by_uuid ~uuid in Server_interface.with_xapi ~cache @@ VTPM.get_contents ~self -let rpc_fn ~vtpm_write_push ~uefi_write_push = +let rpc_fn ~vtpm_read_write ~uefi_read_write = let module Server = Xapi_idl_guard_privileged.Interface.RPC_API (Rpc_lwt.GenServer ()) in (* bind APIs *) - Server.varstore_create (depriv_varstored_create uefi_write_push) ; + Server.varstore_create (depriv_varstored_create uefi_read_write) ; Server.varstore_destroy depriv_varstored_destroy ; - Server.vtpm_create (depriv_swtpm_create vtpm_write_push) ; + Server.vtpm_create (depriv_swtpm_create vtpm_read_write) ; Server.vtpm_destroy depriv_swtpm_destroy ; Server.vtpm_set_contents vtpm_set_contents ; Server.vtpm_get_contents vtpm_get_contents ; Rpc_lwt.server Server.implementation -let process ~vtpm_write_push ~uefi_write_push body = +let process ~vtpm_read_write ~uefi_read_write body = let+ response = Xapi_guard.Dorpc.wrap_rpc Xapi_idl_guard_privileged.Interface.E.error (fun () -> let call = Jsonrpc.call_of_string body in D.debug "Received request from message-switch, method %s" call.Rpc.name ; - rpc_fn ~vtpm_write_push ~uefi_write_push call + rpc_fn ~vtpm_read_write ~uefi_read_write call ) in Jsonrpc.string_of_response response @@ -268,21 +268,25 @@ let retry_forever fname f = let cache_reader with_watcher = retry_forever "cache watcher" with_watcher let make_message_switch_server () = - let* with_swtpm_push, with_watch = - Xapi_guard.Disk_cache.(setup Swtpm (Server_interface.push_vtpm ~cache)) + let* with_swtpm_cache, with_watch = + Xapi_guard.Disk_cache.( + setup Swtpm + Server_interface.(read_vtpm ~cache) + Server_interface.(push_vtpm ~cache) + ) in let open Message_switch_lwt.Protocol_lwt in let wait_server, server_stopped = Lwt.task () in - let@ vtpm_write_push = with_swtpm_push in - let uefi_write_push _ _ = + let@ vtpm_read_write = with_swtpm_cache in + let uefi_read_write = (* This is unused for the time being, added to be consistent with both interfaces *) - Lwt.return_unit + ((fun _ -> Lwt.return ""), fun _ _ -> Lwt.return_unit) in let server = let* result = Server.listen - ~process:(process ~vtpm_write_push ~uefi_write_push) + ~process:(process ~vtpm_read_write ~uefi_read_write) ~switch:!Xcp_client.switch_path ~queue:Xapi_idl_guard_privileged.Interface.queue_name () in @@ -295,7 +299,7 @@ let make_message_switch_server () = ) ; (* best effort resume *) let* () = - Lwt.catch (resume ~vtpm_write_push ~uefi_write_push) (fun e -> + Lwt.catch (resume ~vtpm_read_write ~uefi_read_write) (fun e -> D.log_backtrace () ; D.warn "Resume failed: %s" (Printexc.to_string e) ; Lwt.return_unit diff --git a/ocaml/xapi-guard/test/cache_test.ml b/ocaml/xapi-guard/test/cache_test.ml index e171de6b635..97b144839a6 100644 --- a/ocaml/xapi-guard/test/cache_test.ml +++ b/ocaml/xapi-guard/test/cache_test.ml @@ -5,7 +5,9 @@ let ( let* ) = Lwt.bind module Tpm = Xapi_guard.Types.Tpm module TPMs = struct - let tpms_created = Atomic.make 1 + let writes_created = Atomic.make 1 + + let reads_created = Atomic.make 1 let request_persist uuid write = let __FUN = __FUNCTION__ in @@ -13,18 +15,35 @@ module TPMs = struct let key = Tpm.deserialize_key (Random.int 3) in let time = Mtime_clock.now () in - let serial_n = Atomic.fetch_and_add tpms_created 1 in + let serial_n = Atomic.fetch_and_add writes_created 1 in let contents = Printf.sprintf "contents %s" (Mtime.to_uint64_ns time |> Int64.to_string) in let* () = Logs_lwt.app (fun m -> - m "%s: Content № %i created: %a/%i/%a" __FUN serial_n Uuidm.pp uuid + m "%s: Write № %i requested: %a/%i/%a" __FUN serial_n Uuidm.pp uuid Tpm.(serialize_key key) Mtime.pp time ) in write (uuid, time, key) contents + + let request_read uuid read = + let __FUN = __FUNCTION__ in + + let key = Tpm.deserialize_key (Random.int 3) in + + let time = Mtime_clock.now () in + let serial_n = Atomic.fetch_and_add reads_created 1 in + let* () = + Logs_lwt.app (fun m -> + m "%s: Read № %i requested: %a/%i/%a" __FUN serial_n Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp time + ) + in + let* () = Lwt_unix.sleep 0.05 in + read (uuid, time, key) end let lwt_reporter () = @@ -79,9 +98,13 @@ let retry_forever fname f = in loop () -let max_sent = 128 +let max_writes = 128 + +let max_reads = 500_000 + +let received_writes = ref 0 -let received = ref 0 +let received_reads = ref 0 let throttled_reads = Mtime.Span.(200 * ms) @@ -99,7 +122,7 @@ let should_fail () : bool = let elapsed = Mtime.span epoch (Mtime_clock.now ()) in polarity elapsed -let log (uuid, timestamp, key) content : (unit, exn) Result.t Lwt.t = +let log_write (uuid, timestamp, key) content = let __FUN = __FUNCTION__ in let ( let* ) = Lwt_result.bind in let maybe_fail () = @@ -110,43 +133,69 @@ let log (uuid, timestamp, key) content : (unit, exn) Result.t Lwt.t = Lwt_result.return () in let* () = maybe_fail () in - received := !received + 1 ; + received_writes := !received_writes + 1 ; Logs_lwt.app (fun m -> - m "%s Content № %i detected: %a/%i/%a" __FUN !received Uuidm.pp uuid + m "%s Write № %i detected: %a/%i/%a" __FUN !received_writes Uuidm.pp uuid Tpm.(serialize_key key) Mtime.pp timestamp ) |> ok -let to_cache with_writer = +let log_read (uuid, timestamp, key) = + let __FUN = __FUNCTION__ in + let ( let* ) = Lwt_result.bind in + received_reads := !received_reads + 1 ; + let* () = + Logs_lwt.app (fun m -> + m "%s Read to source № %i detected: %a/%i/%a" __FUN !received_reads + Uuidm.pp uuid + Tpm.(serialize_key key) + Mtime.pp timestamp + ) + |> ok + in + Lwt_result.return "yes" + +let to_cache with_read_writes = let __FUN = __FUNCTION__ in let elapsed = Mtime_clock.counter () in - let rec loop_and_stop uuid sent () = + let persist uuid (_, write_tpm) = TPMs.request_persist uuid write_tpm in + let read uuid (read_tpm, _) = + let* contents = TPMs.request_read uuid read_tpm in + Logs_lwt.app (fun m -> m "%s Read received: '%s'" __FUN contents) + in + let rec loop_and_stop f name uuid max sent = let sent = sent + 1 in - - let@ write_tpm = with_writer in - let* () = TPMs.request_persist uuid write_tpm in - if sent >= max_sent then + let@ read_write = with_read_writes in + let* () = f uuid read_write in + if sent >= max then Logs_lwt.app (fun m -> - m "%s: Stopping requests after %i writes" __FUN sent + m "%s: Stopping requests after %i %ss" __FUN sent name ) else if Mtime.Span.compare (Mtime_clock.count elapsed) throttled_reads > 0 then let* () = Lwt_unix.sleep 0.1 in - loop_and_stop uuid sent () + loop_and_stop f name uuid max sent else let* () = Lwt.pause () in - loop_and_stop uuid sent () + loop_and_stop f name uuid max sent in - List.init 4 (fun _ -> Uuidm.(v `V4)) - |> List.map (fun uuid -> loop_and_stop uuid 0 ()) + let vms = List.init 4 (fun _ -> Uuidm.(v `V4)) in + + List.concat + [ + List.map (fun uuid -> loop_and_stop persist "write" uuid max_writes 0) vms + ; List.map (fun uuid -> loop_and_stop read "read" uuid max_reads 0) vms + ] let from_cache with_watcher = retry_forever "watcher" with_watcher let main () = - let* with_writer, with_watcher = Xapi_guard.Disk_cache.(setup Swtpm log) in + let* with_read_writes, with_watcher = + Xapi_guard.Disk_cache.(setup Swtpm log_read log_write) + in let reader = from_cache with_watcher in - let writers = to_cache with_writer in + let writers = to_cache with_read_writes in let* _ = Lwt.all (reader :: writers) in Lwt.return_unit From 7e4c476103a9f0da721c9025f0e4721fbc486a79 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Wed, 28 Feb 2024 17:34:22 +0000 Subject: [PATCH 11/12] CA-383867: xapi-guard's cache contents are used in read requests For domains requesting the TPM's contents, the xapi-guards returns the contents in the cache, if they are available from in-flight requests. It falls back to xapi if that couldn't be possible. The cache doesn't try to provide any availability for reads, like it does for writes. This means that if swtpm issues a read request while xapi is offline, the request will fail, as it happened before this change. Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/disk_cache.ml | 113 +++++++++++++++++------------ 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index 60ac2f1d0fa..f3b83739f21 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -97,35 +97,33 @@ let path_is_temp path = let temp_of_path path = path ^ ".pre" -let get_all_contents root = - let classify contents = - let rec loop (acc, found) = function - | [] -> - List.rev acc - | latest :: others -> ( - match key_of_path latest with - | None -> - let file = - if path_is_temp latest then - Temporary latest - else - Invalid latest - in - loop (file :: acc, found) others - | Some valid_file -> - let file = - if found then Outdated valid_file else Latest valid_file - in - loop (file :: acc, true) others - ) - in - let ordered = List.fast_sort (fun x y -> String.compare y x) contents in - loop ([], false) ordered +let sort_updates contents = + let rec loop (acc, found) = function + | [] -> + List.rev acc + | latest :: others -> ( + match key_of_path latest with + | None -> + let file = + if path_is_temp latest then + Temporary latest + else + Invalid latest + in + loop (file :: acc, found) others + | Some valid_file -> + let file = if found then Outdated valid_file else Latest valid_file in + loop (file :: acc, true) others + ) in + let ordered = List.fast_sort (fun x y -> String.compare y x) contents in + loop ([], false) ordered + +let get_all_contents root = let empty = Fun.const (Lwt.return []) in let contents_of_key key = let* contents = files_in key ~otherwise:empty in - Lwt.return (classify contents) + Lwt.return (sort_updates contents) in let* tpms = files_in root ~otherwise:empty in let* files = @@ -138,6 +136,12 @@ let get_all_contents root = in Lwt.return List.(concat (concat files)) +(** Warning, may raise Unix.Unix_error *) +let read_from ~filename = + let flags = Unix.[O_RDONLY] in + let perm = 0o000 in + Lwt_io.with_file ~flags ~perm ~mode:Input filename Lwt_io.read + let persist_to ~filename:f_path ~contents = let atomic_write_to_file ~perm f = let tmp_path = temp_of_path f_path in @@ -247,21 +251,46 @@ end = struct Debug.log_backtrace exn (Backtrace.get exn) ; Lwt_result.fail exn - let read_contents ~direct _root (uuid, now, key) = - let read, _ = direct in - let* result = - Lwt.try_bind - (fun () -> read (uuid, now, key)) - (function - | Ok contents -> Lwt_result.return contents | Error exn -> fail exn - ) - fail + let read_contents ~direct root (uuid, now, key) = + let read_remote () = + let read, _ = direct in + let* result = + Lwt.try_bind + (fun () -> read (uuid, now, key)) + (function + | Ok contents -> Lwt_result.return contents | Error exn -> fail exn + ) + fail + in + match result with + | Ok contents -> + Lwt.return contents + | Error exn -> + raise exn + in + + let key_str = Types.Tpm.(serialize_key key |> string_of_int) in + let key_dir = root // Uuidm.(to_string uuid) // key_str in + + (* 1. Get updates *) + let* contents = files_in key_dir ~otherwise:(fun _ -> Lwt.return []) in + let updates = sort_updates contents in + + (* 2. Pick latest *) + let only_latest = function + | Latest (_, p) -> + Either.Left p + | Temporary p | Outdated (_, p) | Invalid p -> + Right p + in + let latest, _ = List.partition_map only_latest updates in + + (* 3. fall back to remote read if needed *) + let get_contents path = + Lwt.catch (fun () -> read_from ~filename:path) (fun _ -> read_remote ()) in - match result with - | Ok contents -> - Lwt.return contents - | Error exn -> - raise exn + + match latest with path :: _ -> get_contents path | [] -> read_remote () let write_contents ~direct root queue (uuid, now, key) contents = let __FUN = __FUNCTION__ in @@ -367,12 +396,6 @@ end = struct in Lwt.return latest - (** Warning, may raise Unix.Unix_error *) - let read_from ~filename = - let flags = Unix.[O_RDONLY] in - let perm = 0o000 in - Lwt_io.with_file ~flags ~perm ~mode:Input filename Lwt_io.read - let retry_push push (uuid, timestamp, key) contents = let __FUN = __FUNCTION__ in let push' () = push (uuid, timestamp, key) contents in From dcb8042cfd4b432e84643442fc53c04c5aaae926 Mon Sep 17 00:00:00 2001 From: Pau Ruiz Safont Date: Thu, 29 Feb 2024 10:11:23 +0000 Subject: [PATCH 12/12] CA-383867: xapi-guard cache sort files by timestamp Previously, they were sorted by string order, which in rare cases might lead to erroneous ordering Signed-off-by: Pau Ruiz Safont --- ocaml/xapi-guard/lib/disk_cache.ml | 44 ++++++++++++++++++------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/ocaml/xapi-guard/lib/disk_cache.ml b/ocaml/xapi-guard/lib/disk_cache.ml index f3b83739f21..0f0a6e2c248 100644 --- a/ocaml/xapi-guard/lib/disk_cache.ml +++ b/ocaml/xapi-guard/lib/disk_cache.ml @@ -98,26 +98,34 @@ let path_is_temp path = let temp_of_path path = path ^ ".pre" let sort_updates contents = - let rec loop (acc, found) = function + let classify elem = + match key_of_path elem with + | None -> + let file = + if path_is_temp elem then + Temporary elem + else + Invalid elem + in + Either.Right file + | Some valid_file -> + Either.Left valid_file + in + let valid_files, invalid = List.partition_map classify contents in + + let valid = + let ordered = + List.fast_sort + (fun ((_, x, _), _) ((_, y, _), _) -> Mtime.compare y x) + valid_files + in + match ordered with | [] -> - List.rev acc - | latest :: others -> ( - match key_of_path latest with - | None -> - let file = - if path_is_temp latest then - Temporary latest - else - Invalid latest - in - loop (file :: acc, found) others - | Some valid_file -> - let file = if found then Outdated valid_file else Latest valid_file in - loop (file :: acc, true) others - ) + [] + | latest :: outdated -> + Latest latest :: List.map (fun outdated -> Outdated outdated) outdated in - let ordered = List.fast_sort (fun x y -> String.compare y x) contents in - loop ([], false) ordered + List.concat [valid; invalid] let get_all_contents root = let empty = Fun.const (Lwt.return []) in