Skip to content

Commit

Permalink
Refactor watcher creation code
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Liu <[email protected]>
  • Loading branch information
Vincent-lau committed Jun 19, 2024
1 parent 7348b74 commit bb2fe47
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 129 deletions.
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout
~verify ;
(* Create the watcher here in addition to resync_host since pool_create
in resync_host only calls cluster_host.create for pool member nodes *)
create_cluster_watcher_on_master ~__context ~host ;
Watcher.create_as_necessary ~__context ~host ;
Xapi_cluster_host_helpers.update_allowed_operations ~__context
~self:cluster_host_ref ;
D.debug "Created Cluster: %s and Cluster_host: %s"
Expand Down
7 changes: 4 additions & 3 deletions ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ let resync_host ~__context ~host =
(* Note that join_internal and enable both use the clustering lock *)
Client.Client.Cluster_host.enable ~rpc ~session_id ~self
) ;
(* create the watcher here so that the watcher exists after toolstack restart *)
create_cluster_watcher_on_master ~__context ~host ;
(* create the watcher here so that the watcher exists after toolstack
restart *)
Watcher.create_as_necessary ~__context ~host ;
Xapi_observer.initialise_observer ~__context
Xapi_observer_components.Xapi_clusterd ;
let verify = Stunnel_client.get_verify_by_default () in
Expand Down Expand Up @@ -378,7 +379,7 @@ let enable ~__context ~self =
"Cluster_host.enable: xapi-clusterd not running - attempting to start" ;
Xapi_clustering.Daemon.enable ~__context
) ;
create_cluster_watcher_on_master ~__context ~host ;
Watcher.create_as_necessary ~__context ~host ;
Xapi_observer.initialise_observer ~__context
Xapi_observer_components.Xapi_clusterd ;
let verify = Stunnel_client.get_verify_by_default () in
Expand Down
255 changes: 130 additions & 125 deletions ocaml/xapi/xapi_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -426,133 +426,138 @@ let compute_corosync_max_host_failures ~__context =
in
corosync_ha_max_hosts

let on_corosync_update ~__context ~cluster updates =
debug
"%s: Received %d updates from corosync_notifyd , run diagnostics to get \
new state"
__FUNCTION__ (List.length updates) ;
let m =
Cluster_client.LocalClient.diagnostics (rpc ~__context)
"update quorum api fields with diagnostics"
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get m with
| Ok diag ->
Db.Cluster.set_is_quorate ~__context ~self:cluster ~value:diag.is_quorate ;
let all_cluster_hosts = Db.Cluster_host.get_all ~__context in
let ip_ch =
List.map
(fun ch ->
let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in
let ipstr =
ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF)
|> ipstr_of_address
in
(ipstr, ch)
)
all_cluster_hosts
in
let current_time = API.Date.now () in
( match diag.quorum_members with
| None ->
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
module Watcher = struct
let on_corosync_update ~__context ~cluster updates =
debug
"%s: Received %d updates from corosync_notifyd , run diagnostics to get \
new state"
__FUNCTION__ (List.length updates) ;
let m =
Cluster_client.LocalClient.diagnostics (rpc ~__context)
"update quorum api fields with diagnostics"
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get m with
| Ok diag ->
Db.Cluster.set_is_quorate ~__context ~self:cluster
~value:diag.is_quorate ;
let all_cluster_hosts = Db.Cluster_host.get_all ~__context in
let ip_ch =
List.map
(fun ch ->
let pIF = Db.Cluster_host.get_PIF ~__context ~self:ch in
let ipstr =
ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF)
|> ipstr_of_address
in
(ipstr, ch)
)
all_cluster_hosts
| Some nodel ->
let quorum_hosts =
List.filter_map
(fun {addr; _} ->
let ipstr = ipstr_of_address addr in
match List.assoc_opt ipstr ip_ch with
| None ->
error
"%s: cannot find cluster host with network address %s, \
ignoring this host"
__FUNCTION__ ipstr ;
None
| Some ch ->
Some ch
in
let current_time = API.Date.now () in
( match diag.quorum_members with
| None ->
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
nodel
in
let missing_hosts =
List.filter
(fun h -> not (List.mem h quorum_hosts))
all_cluster_hosts
in
let new_hosts =
List.filter
(fun h -> not (Db.Cluster_host.get_live ~__context ~self:h))
quorum_hosts
in
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:true ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
new_hosts ;
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
missing_hosts ;
maybe_generate_alert ~__context ~missing_hosts ~new_hosts
~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum
) ;
Db.Cluster.set_quorum ~__context ~self:cluster
~value:(Int64.of_int diag.quorum) ;
Db.Cluster.set_live_hosts ~__context ~self:cluster
~value:(Int64.of_int diag.total_votes)
| Error (InternalError message) | Error (Unix_error message) ->
warn "%s Cannot query diagnostics due to %s, not performing update"
__FUNCTION__ message
| exception exn ->
warn
"%s: Got exception %s while retrieving diagnostics info, not \
performing update"
__FUNCTION__ (Printexc.to_string exn)

let create_cluster_watcher_on_master ~__context ~host =
if Helpers.is_pool_master ~__context ~host then
let watch () =
while !Daemon.enabled do
let m =
Cluster_client.LocalClient.UPDATES.get (rpc ~__context)
"call cluster watcher" 3.
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get m with
| Ok updates -> (
match find_cluster_host ~__context ~host with
| Some ch ->
let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in
on_corosync_update ~__context ~cluster updates
| None ->
()
)
| Error (InternalError "UPDATES.Timeout") ->
(* UPDATES.get timed out, this is normal, now retry *)
| Some nodel ->
let quorum_hosts =
List.filter_map
(fun {addr; _} ->
let ipstr = ipstr_of_address addr in
match List.assoc_opt ipstr ip_ch with
| None ->
error
"%s: cannot find cluster host with network address %s, \
ignoring this host"
__FUNCTION__ ipstr ;
None
| Some ch ->
Some ch
)
nodel
in
let missing_hosts =
List.filter
(fun h -> not (List.mem h quorum_hosts))
all_cluster_hosts
in
let new_hosts =
List.filter
(fun h -> not (Db.Cluster_host.get_live ~__context ~self:h))
quorum_hosts
in
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:true ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
new_hosts ;
List.iter
(fun self ->
Db.Cluster_host.set_live ~__context ~self ~value:false ;
Db.Cluster_host.set_last_update_live ~__context ~self
~value:current_time
)
missing_hosts ;
maybe_generate_alert ~__context ~missing_hosts ~new_hosts
~num_hosts:(List.length quorum_hosts) ~quorum:diag.quorum
) ;
Db.Cluster.set_quorum ~__context ~self:cluster
~value:(Int64.of_int diag.quorum) ;
Db.Cluster.set_live_hosts ~__context ~self:cluster
~value:(Int64.of_int diag.total_votes)
| Error (InternalError message) | Error (Unix_error message) ->
warn "%s Cannot query diagnostics due to %s, not performing update"
__FUNCTION__ message
| exception exn ->
warn
"%s: Got exception %s while retrieving diagnostics info, not \
performing update"
__FUNCTION__ (Printexc.to_string exn)

let watch_cluster_change ~__context ~host =
while !Daemon.enabled do
let m =
Cluster_client.LocalClient.UPDATES.get (rpc ~__context)
"call cluster watcher" 3.
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get m with
| Ok updates -> (
match find_cluster_host ~__context ~host with
| Some ch ->
let cluster = Db.Cluster_host.get_cluster ~__context ~self:ch in
on_corosync_update ~__context ~cluster updates
| None ->
()
| Error (InternalError message) | Error (Unix_error message) ->
warn "%s: Cannot query cluster host updates with error %s"
__FUNCTION__ message
| exception exn ->
warn
"%s: Got exception %s while query cluster host updates, retrying"
__FUNCTION__ (Printexc.to_string exn) ;
Thread.delay 3.
done
in
if Xapi_cluster_helpers.cluster_health_enabled ~__context then (
debug "%s: create watcher for corosync-notifyd on master" __FUNCTION__ ;
ignore @@ Thread.create watch ()
) else
debug
"%s: not creating watcher for corosync-notifyd: feature cluster_health \
not enabled"
__FUNCTION__
)
| Error (InternalError "UPDATES.Timeout") ->
(* UPDATES.get timed out, this is normal, now retry *)
()
| Error (InternalError message) | Error (Unix_error message) ->
warn "%s: Cannot query cluster host updates with error %s"
__FUNCTION__ message
| exception exn ->
warn "%s: Got exception %s while query cluster host updates, retrying"
__FUNCTION__ (Printexc.to_string exn) ;
Thread.delay 3.
done

(** [create_as_necessary] will create cluster watchers on the coordinator if they are not
already created.
There is no need to destroy them: once the clustering daemon is disabled,
these threads will exit as well. *)
let create_as_necessary ~__context ~host =
if Helpers.is_pool_master ~__context ~host then
if Xapi_cluster_helpers.cluster_health_enabled ~__context then (
debug "%s: create watcher for corosync-notifyd on coordinator"
__FUNCTION__ ;

ignore
@@ Thread.create (fun () -> watch_cluster_change ~__context ~host) ()
)
end

0 comments on commit bb2fe47

Please sign in to comment.