Skip to content

Commit

Permalink
Merge pull request #5250 from Vincent-lau/private/shul2/tracing-clusterd
Browse files Browse the repository at this point in the history
CP-45469: Distributed tracing for xapi-clusterd
  • Loading branch information
robhoes authored Nov 30, 2023
2 parents 84fe2e5 + e678326 commit b238f13
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 174 deletions.
25 changes: 22 additions & 3 deletions ocaml/libs/tracing/tracing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,23 @@ module Spans = struct
| None ->
if Hashtbl.length spans < !max_traces then
Hashtbl.add spans key [span]
else
debug "%s exceeded max traces when adding to span table"
__FUNCTION__
| Some span_list ->
if List.length span_list < !max_spans then
Hashtbl.replace spans key (span :: span_list)
else
debug "%s exceeded max traces when adding to span table"
__FUNCTION__
)

let remove_from_spans span =
let key = span.Span.context.trace_id in
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
match Hashtbl.find_opt spans key with
| None ->
debug "Span does not exist or already finished" ;
debug "%s span does not exist or already finished" __FUNCTION__ ;
None
| Some span_list ->
( match
Expand All @@ -268,9 +274,15 @@ module Spans = struct
| None ->
if Hashtbl.length finished_spans < !max_traces then
Hashtbl.add finished_spans key [span]
else
debug "%s exceeded max traces when adding to finished span table"
__FUNCTION__
| Some span_list ->
if List.length span_list < !max_spans then
Hashtbl.replace finished_spans key (span :: span_list)
else
debug "%s exceeded max traces when adding to finished span table"
__FUNCTION__
)

let mark_finished span = Option.iter add_to_finished (remove_from_spans span)
Expand Down Expand Up @@ -490,7 +502,11 @@ let create ~enabled ~attributes ~endpoints ~name_label ~uuid =
| None ->
Hashtbl.add tracer_providers uuid provider
| Some _ ->
failwith "Tracing: TracerProvider already exists"
(* CP-45469: It is ok not to have an exception here since it is unlikely that the
user has caused the issue, so no need to propagate back. It is also
handy to not change the control flow since calls like cluster_pool_resync
might not be aware that a TracerProvider has already been created.*)
error "Tracing : TracerProvider %s already exists" name_label
)

let destroy ~uuid =
Expand All @@ -510,7 +526,8 @@ let get_tracer ~name =
| Some provider ->
Tracer.create ~name ~provider
| None ->
warn "No provider found" ; Tracer.no_op
warn "No provider found for tracing %s" name ;
Tracer.no_op

let enable_span_garbage_collector ?(timeout = 86400.) () =
Spans.GC.initialise_thread ~timeout
Expand Down Expand Up @@ -819,3 +836,5 @@ module Export = struct
end

let main = Export.Destination.main

let flush_spans = Export.Destination.flush_spans
2 changes: 2 additions & 0 deletions ocaml/libs/tracing/tracing.mli
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,6 @@ val set_observe : bool -> unit

val validate_attribute : string * string -> bool

val flush_spans : unit -> unit

val main : unit -> Thread.t
15 changes: 15 additions & 0 deletions ocaml/tests/test_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ let test_clusterd_rpc ~__context call =
{success= true; contents= Rpc.String test_token; is_notification= false}
| ("enable" | "disable" | "destroy" | "leave" | "set-tls-verification"), _ ->
Rpc.{success= true; contents= Rpc.Null; is_notification= false}
| ( ( "Observer.create"
| "Observer.destroy"
| "Observer.set_enabled"
| "Observer.set_attributes"
| "Observer.set_endpoints"
| "Observer.init"
| "Observer.set_trace_log_dir"
| "Observer.set_export_interval"
| "Observer.set_host_id"
| "Observer.set_max_traces"
| "Observer.set_max_spans"
| "Observer.set_max_file_size"
| "Observer.set_compress_tracing_files" )
, _ ) ->
Rpc.{success= true; contents= Rpc.Null; is_notification= false}
| "diagnostics", _ ->
let open Cluster_interface in
let id = 1l in
Expand Down
4 changes: 4 additions & 0 deletions ocaml/xapi-idl/cluster/cluster_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ let rpc url call = rpc_internal url call |> Idl.IdM.return
(* There is also a Remote API between clustering daemons on different hosts.
Call this a Local API because it is an API inside a host *)
module LocalClient = Cluster_interface.LocalAPI (IDL.GenClient ())

module LocalClientExn = Cluster_interface.LocalAPI (Idl.Exn.GenClient (struct
let rpc = rpc_internal json_url
end))
78 changes: 78 additions & 0 deletions ocaml/xapi-idl/cluster/cluster_interface.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(* Cluster interface *)

open Rpc
open Idl

let service_name = "cluster"
Expand Down Expand Up @@ -271,4 +272,81 @@ module LocalAPI (R : RPC) = struct
@-> enabled_p
@-> returning unit_p err
)

module Observer = struct
open TypeCombinators

let endpoints_p = Param.mk ~name:"endpoints" (list Types.string)

let bool_p = Param.mk ~name:"bool" Types.bool

let uuid_p = Param.mk ~name:"uuid" Types.string

let name_label_p = Param.mk ~name:"name_label" Types.string

let dict_p = Param.mk ~name:"dict" dict

let string_p = Param.mk ~name:"string" Types.string

let int_p = Param.mk ~name:"int" Types.int

let float_p = Param.mk ~name:"float" Types.float

let create =
declare "Observer.create" []
(debug_info_p
@-> uuid_p
@-> name_label_p
@-> dict_p
@-> endpoints_p
@-> bool_p
@-> returning unit_p err
)

let destroy =
declare "Observer.destroy" []
(debug_info_p @-> uuid_p @-> returning unit_p err)

let set_enabled =
declare "Observer.set_enabled" []
(debug_info_p @-> uuid_p @-> bool_p @-> returning unit_p err)

let set_attributes =
declare "Observer.set_attributes" []
(debug_info_p @-> uuid_p @-> dict_p @-> returning unit_p err)

let set_endpoints =
declare "Observer.set_endpoints" []
(debug_info_p @-> uuid_p @-> endpoints_p @-> returning unit_p err)

let init = declare "Observer.init" [] (debug_info_p @-> returning unit_p err)

let set_trace_log_dir =
declare "Observer.set_trace_log_dir" []
(debug_info_p @-> string_p @-> returning unit_p err)

let set_export_interval =
declare "Observer.set_export_interval" []
(debug_info_p @-> float_p @-> returning unit_p err)

let set_host_id =
declare "Observer.set_host_id" []
(debug_info_p @-> string_p @-> returning unit_p err)

let set_max_traces =
declare "Observer.set_max_traces" []
(debug_info_p @-> int_p @-> returning unit_p err)

let set_max_spans =
declare "Observer.set_max_spans" []
(debug_info_p @-> int_p @-> returning unit_p err)

let set_max_file_size =
declare "Observer.set_max_file_size" []
(debug_info_p @-> int_p @-> returning unit_p err)

let set_compress_tracing_files =
declare "Observer.set_compress_tracing_files" []
(debug_info_p @-> bool_p @-> returning unit_p err)
end
end
47 changes: 47 additions & 0 deletions ocaml/xapi-idl/lib/typeCombinators.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
(*
* Copyright (C) Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

type dict = (string * string) list [@@deriving rpcty]

let option ?name ?(description = []) d =
let open Rpc.Types in
let name =
Option.fold ~none:(Printf.sprintf "%s option" d.name) ~some:Fun.id name
in
{name; description; ty= Option d.ty}

let list ?name ?(description = []) d =
let open Rpc.Types in
let name =
Option.fold ~none:(Printf.sprintf "list of %ss" d.name) ~some:Fun.id name
in
{name; description; ty= List d.ty}

let pair ?name ?(description = []) (p0, p2) =
let open Rpc.Types in
let name =
Option.fold
~none:(Printf.sprintf "pair of %s and %s" p0.name p2.name)
~some:Fun.id name
in
{name; description; ty= Tuple (p0.ty, p2.ty)}

let triple ?name ?(description = []) (p1, p2, p3) =
let open Rpc.Types in
let name =
Option.fold
~none:(Printf.sprintf "triple of %s, %s and %s" p1.name p2.name p3.name)
~some:Fun.id name
in
{name; description; ty= Tuple3 (p1.ty, p2.ty, p3.ty)}
43 changes: 43 additions & 0 deletions ocaml/xapi-idl/lib/typeCombinators.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
(*
* Copyright (C) Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

type dict = (string * string) list

val typ_of_dict : (string * string) list Rpc.Types.typ

val dict : (string * string) list Rpc.Types.def

val option :
?name:string
-> ?description:string list
-> 'a Rpc.Types.def
-> 'a option Rpc.Types.def

val list :
?name:string
-> ?description:string list
-> 'a Rpc.Types.def
-> 'a list Rpc.Types.def

val pair :
?name:string
-> ?description:string list
-> 'a Rpc.Types.def * 'b Rpc.Types.def
-> ('a * 'b) Rpc.Types.def

val triple :
?name:string
-> ?description:string list
-> 'a Rpc.Types.def * 'b Rpc.Types.def * 'c Rpc.Types.def
-> ('a * 'b * 'c) Rpc.Types.def
42 changes: 0 additions & 42 deletions ocaml/xapi-idl/storage/storage_interface.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,6 @@ let typ_of_rpc_t =
; of_rpc= (fun x -> Ok x)
}

module TypeCombinators = struct
let option ?name ?(description = []) d =
let open Rpc.Types in
let name =
match name with Some n -> n | None -> Printf.sprintf "%s option" d.name
in
{name; description; ty= Option d.ty}

let list ?name ?(description = []) d =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "list of %ss" d.name
in
{name; description; ty= List d.ty}

let pair ?name ?(description = []) (p1, p2) =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "pair of %s and %s" p1.name p2.name
in
{name; description; ty= Tuple (p1.ty, p2.ty)}

let triple ?name ?(description = []) (p1, p2, p3) =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "triple of %s, %s and %s" p1.name p2.name p3.name
in
{name; description; ty= Tuple3 (p1.ty, p2.ty, p3.ty)}
end

let service_name = "storage"

let queue_name = ref (Xcp_service.common_prefix ^ service_name)
Expand Down
44 changes: 0 additions & 44 deletions ocaml/xapi-idl/xen/xenops_interface.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,50 +34,6 @@ let typ_of_rpc_t =
; of_rpc= (fun x -> Ok x)
}

type dict = (string * string) list [@@deriving rpcty]

module TypeCombinators = struct
let option ?name ?(description = []) d =
let open Rpc.Types in
let name =
match name with Some n -> n | None -> Printf.sprintf "%s option" d.name
in
{name; description; ty= Option d.ty}

let list ?name ?(description = []) d =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "list of %ss" d.name
in
{name; description; ty= List d.ty}

let pair ?name ?(description = []) (p1, p2) =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "pair of %s and %s" p1.name p2.name
in
{name; description; ty= Tuple (p1.ty, p2.ty)}

let triple ?name ?(description = []) (p1, p2, p3) =
let open Rpc.Types in
let name =
match name with
| Some n ->
n
| None ->
Printf.sprintf "triple of %s, %s and %s" p1.name p2.name p3.name
in
{name; description; ty= Tuple3 (p1.ty, p2.ty, p3.ty)}
end

include Xenops_types.TopLevel

let service_name = "xenops"
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout
* should be replaced by a general function that checks the given cluster_stack *)
Pool_features.assert_enabled ~__context ~f:Features.Corosync ;
with_clustering_lock __LOC__ (fun () ->
let dbg = Context.string_of_task __context in
let dbg = Context.string_of_task_and_tracing __context in
validate_params ~token_timeout ~token_timeout_coefficient ;
let cluster_ref = Ref.make () in
let cluster_host_ref = Ref.make () in
Expand Down
Loading

0 comments on commit b238f13

Please sign in to comment.