diff --git a/crates/libs/core/src/client/connection.rs b/crates/libs/core/src/client/connection.rs new file mode 100644 index 00000000..e107e3f6 --- /dev/null +++ b/crates/libs/core/src/client/connection.rs @@ -0,0 +1,138 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See License.txt in the repo root for license information. +// -------------------------------------------------- + +use mssf_com::FabricClient::{ + IFabricClientConnectionEventHandler, IFabricClientConnectionEventHandler_Impl, + IFabricGatewayInformationResult, +}; + +use crate::{strings::HSTRINGWrap, types::NodeId}; + +/// Internal trait that rust code implements that can be bridged into IFabricClientConnectionEventHandler. +/// Not exposed to user. +pub trait ClientConnectionEventHandler: 'static { + fn on_connected(&self, info: &GatewayInformationResult) -> crate::Result<()>; + fn on_disconnected(&self, info: &GatewayInformationResult) -> crate::Result<()>; +} + +/// FabricClient connection information. +/// Traslated from IFabricGatewayInformationResult +#[derive(Debug, Clone)] +pub struct GatewayInformationResult { + pub node_address: crate::HSTRING, + pub node_id: NodeId, + pub node_instance_id: u64, + pub node_name: crate::HSTRING, +} + +impl GatewayInformationResult { + fn from_com(com: &IFabricGatewayInformationResult) -> Self { + let info = unsafe { com.get_GatewayInformation().as_ref().unwrap() }; + Self { + node_address: HSTRINGWrap::from(info.NodeAddress).into(), + node_id: info.NodeId.into(), + node_instance_id: info.NodeInstanceId, + node_name: HSTRINGWrap::from(info.NodeName).into(), + } + } +} + +/// Bridge for IFabricClientConnectionEventHandler. +/// Turn rust trait into SF com object. +#[windows_core::implement(IFabricClientConnectionEventHandler)] +pub struct ClientConnectionEventHandlerBridge +where + T: ClientConnectionEventHandler, +{ + inner: T, +} + +impl ClientConnectionEventHandlerBridge +where + T: ClientConnectionEventHandler, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } + pub fn new_com(inner: T) -> IFabricClientConnectionEventHandler { + Self::new(inner).into() + } +} + +impl IFabricClientConnectionEventHandler_Impl for ClientConnectionEventHandlerBridge +where + T: ClientConnectionEventHandler, +{ + fn OnConnected( + &self, + gw_info: Option<&IFabricGatewayInformationResult>, + ) -> windows_core::Result<()> { + let info = GatewayInformationResult::from_com(gw_info.unwrap()); + self.inner.on_connected(&info) + } + + fn OnDisconnected( + &self, + gw_info: Option<&IFabricGatewayInformationResult>, + ) -> windows_core::Result<()> { + let info = GatewayInformationResult::from_com(gw_info.unwrap()); + self.inner.on_disconnected(&info) + } +} + +/// Connection notification function signature to avoid code repeatition. +/// Trait alias feature in rust (not yet stable) would eliminate this trait definition. +pub trait ConnectionNotificationFn: + Fn(&GatewayInformationResult) -> crate::Result<()> + 'static +{ +} +impl crate::Result<()> + 'static> ConnectionNotificationFn + for T +{ +} + +/// Lambda implementation of the ClientConnectionEventHandler trait. +/// This is used in FabricClientBuilder to build handler from functions. +pub struct LambdaClientConnectionNotificationHandler { + f_conn: Option>, + f_disconn: Option>, +} + +impl LambdaClientConnectionNotificationHandler { + pub fn new() -> Self { + Self { + f_conn: None, + f_disconn: None, + } + } + + /// Set the on_connected callback. + pub fn set_f_conn(&mut self, f: impl ConnectionNotificationFn) { + self.f_conn = Some(Box::new(f)); + } + + /// Set the on_disconnected callback. + pub fn set_f_disconn(&mut self, f: impl ConnectionNotificationFn) { + self.f_disconn = Some(Box::new(f)); + } +} + +impl ClientConnectionEventHandler for LambdaClientConnectionNotificationHandler { + fn on_connected(&self, info: &GatewayInformationResult) -> crate::Result<()> { + if let Some(f) = &self.f_conn { + f(info) + } else { + Ok(()) + } + } + + fn on_disconnected(&self, info: &GatewayInformationResult) -> crate::Result<()> { + if let Some(f) = &self.f_disconn { + f(info) + } else { + Ok(()) + } + } +} diff --git a/crates/libs/core/src/client/mod.rs b/crates/libs/core/src/client/mod.rs index 471e6234..3e77401a 100644 --- a/crates/libs/core/src/client/mod.rs +++ b/crates/libs/core/src/client/mod.rs @@ -3,19 +3,160 @@ // Licensed under the MIT License (MIT). See License.txt in the repo root for license information. // ------------------------------------------------------------ +use connection::{ClientConnectionEventHandlerBridge, LambdaClientConnectionNotificationHandler}; use mssf_com::FabricClient::{ + FabricCreateLocalClient4, IFabricClientConnectionEventHandler, IFabricPropertyManagementClient2, IFabricQueryClient10, IFabricServiceManagementClient6, + IFabricServiceNotificationEventHandler, +}; +use notification::{ + LambdaServiceNotificationHandler, ServiceNotificationEventHandler, + ServiceNotificationEventHandlerBridge, }; use windows_core::Interface; +use crate::types::ClientRole; + use self::{query_client::QueryClient, svc_mgmt_client::ServiceManagementClient}; +mod connection; +mod notification; pub mod query_client; pub mod svc_mgmt_client; +// reexport +pub use connection::GatewayInformationResult; +pub use notification::ServiceNotification; + #[cfg(test)] mod tests; +/// Creates FabricClient com object using SF com API. +fn create_local_client_internal( + service_notification_handler: Option<&IFabricServiceNotificationEventHandler>, + client_connection_handler: Option<&IFabricClientConnectionEventHandler>, + client_role: Option, +) -> T { + let role = client_role.unwrap_or(ClientRole::User); + assert_ne!( + role, + ClientRole::Unknown, + "Unknown role should not be used." + ); + let raw = unsafe { + FabricCreateLocalClient4( + service_notification_handler, + client_connection_handler, + role.into(), + &T::IID, + ) + } + .expect("failed to create fabric client"); + // if params are right, client should be created. There is no network call involved during obj creation. + unsafe { T::from_raw(raw) } +} + +// Builder for FabricClient +pub struct FabricClientBuilder { + sn_handler: Option, + cc_handler: Option, + client_role: ClientRole, +} + +impl Default for FabricClientBuilder { + fn default() -> Self { + Self::new() + } +} + +impl FabricClientBuilder { + /// Creates the builder. + pub fn new() -> Self { + Self { + sn_handler: None, + cc_handler: None, + client_role: ClientRole::User, + } + } + + /// Configures the service notification handler internally. + fn with_service_notification_handler( + mut self, + handler: impl ServiceNotificationEventHandler, + ) -> Self { + self.sn_handler = Some(ServiceNotificationEventHandlerBridge::new_com(handler)); + self + } + + /// Configures the service notification handler. + /// See details in `register_service_notification_filter` API. + /// If the service endpoint change matches the registered filter, + /// this notification is invoked + pub fn with_on_service_notification(self, f: T) -> Self + where + T: Fn(&ServiceNotification) -> crate::Result<()> + 'static, + { + let handler = LambdaServiceNotificationHandler::new(f); + self.with_service_notification_handler(handler) + } + + /// When FabricClient connects to the SF cluster, this callback is invoked. + pub fn with_on_client_connect(mut self, f: T) -> Self + where + T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static, + { + if self.cc_handler.is_none() { + self.cc_handler = Some(LambdaClientConnectionNotificationHandler::new()); + } + if let Some(cc) = self.cc_handler.as_mut() { + cc.set_f_conn(f) + } + self + } + + /// When FabricClient disconnets to the SF cluster, this callback is called. + /// This callback is not called on Drop of FabricClient. + pub fn with_on_client_disconnect(mut self, f: T) -> Self + where + T: Fn(&GatewayInformationResult) -> crate::Result<()> + 'static, + { + if self.cc_handler.is_none() { + self.cc_handler = Some(LambdaClientConnectionNotificationHandler::new()); + } + if let Some(cc) = self.cc_handler.as_mut() { + cc.set_f_disconn(f) + } + self + } + + /// Sets the role of the client connection. Default is User if not set. + pub fn with_client_role(mut self, role: ClientRole) -> Self { + self.client_role = role; + self + } + + /// Build the fabricclient + /// Remarks: FabricClient connect to SF cluster when + /// the first API call is triggered. Build/create of the object does not + /// establish connection. + pub fn build(self) -> FabricClient { + let c = Self::build_interface(self); + FabricClient::from_com(c) + } + + /// Build the specific com interface of the fabric client. + pub fn build_interface(self) -> T { + let cc_handler = self + .cc_handler + .map(ClientConnectionEventHandlerBridge::new_com); + create_local_client_internal::( + self.sn_handler.as_ref(), + cc_handler.as_ref(), + Some(self.client_role), + ) + } +} + // FabricClient safe wrapper // The design of FabricClient follows from the csharp client: // https://github.com/microsoft/service-fabric/blob/master/src/prod/src/managed/Api/src/System/Fabric/FabricClient.cs @@ -26,18 +167,7 @@ pub struct FabricClient { com_query_client: IFabricQueryClient10, } -impl Default for FabricClient { - fn default() -> Self { - Self::new() - } -} - impl FabricClient { - pub fn new() -> Self { - let com = crate::sync::CreateLocalClient::(); - Self::from_com(com) - } - // Get a copy of COM object pub fn get_com(&self) -> IFabricPropertyManagementClient2 { self.com_property_client.clone() diff --git a/crates/libs/core/src/client/notification.rs b/crates/libs/core/src/client/notification.rs new file mode 100644 index 00000000..c48a064e --- /dev/null +++ b/crates/libs/core/src/client/notification.rs @@ -0,0 +1,159 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See License.txt in the repo root for license information. +// ------------------------------------------------------------ + +use mssf_com::{ + FabricClient::{ + IFabricServiceEndpointsVersion, IFabricServiceNotification, + IFabricServiceNotificationEventHandler, IFabricServiceNotificationEventHandler_Impl, + }, + FabricTypes::FABRIC_RESOLVED_SERVICE_ENDPOINT, +}; + +use crate::{ + iter::{FabricIter, FabricListAccessor}, + types::ServicePartitionInformation, +}; + +use super::svc_mgmt_client::ResolvedServiceEndpoint; + +/// Rust trait to turn rust code into IFabricServiceNotificationEventHandler. +/// Not exposed to user +pub trait ServiceNotificationEventHandler: 'static { + fn on_notification(&self, notification: &ServiceNotification) -> crate::Result<()>; +} + +/// Content of the service notification callback. +#[derive(Debug, Clone)] +pub struct ServiceNotification { + pub partition_info: ServicePartitionInformation, + pub partition_id: crate::GUID, + pub endpoints: ServiceEndpointList, + com: IFabricServiceNotification, +} + +impl ServiceNotification { + fn from_com(com: IFabricServiceNotification) -> Self { + let raw = unsafe { com.get_Notification().as_ref().unwrap() }; + Self { + partition_info: unsafe { raw.PartitionInfo.as_ref().unwrap().into() }, + partition_id: raw.PartitionId, + endpoints: ServiceEndpointList { com: com.clone() }, + com, + } + } + + pub fn get_version(&self) -> crate::Result { + let version = unsafe { self.com.GetVersion() }?; + Ok(ServiceEndpointsVersion::from_com(version)) + } +} + +#[derive(Debug, Clone)] +pub struct ServiceEndpointList { + com: IFabricServiceNotification, +} + +impl ServiceEndpointList { + // Get iterator for the list + pub fn iter(&self) -> ServiceEndpointListIter { + ServiceEndpointListIter::new(self, self) + } +} + +/// mssf_core iterator infrastructure implementation +impl FabricListAccessor for ServiceEndpointList { + fn get_count(&self) -> u32 { + let raw = unsafe { self.com.get_Notification().as_ref().unwrap() }; + raw.EndpointCount + } + + fn get_first_item(&self) -> *const FABRIC_RESOLVED_SERVICE_ENDPOINT { + let raw = unsafe { self.com.get_Notification().as_ref().unwrap() }; + raw.Endpoints + } +} + +type ServiceEndpointListIter<'a> = + FabricIter<'a, FABRIC_RESOLVED_SERVICE_ENDPOINT, ResolvedServiceEndpoint, ServiceEndpointList>; + +/// IFabricServiceEndpointsVersion wrapper. +pub struct ServiceEndpointsVersion { + com: IFabricServiceEndpointsVersion, +} + +impl ServiceEndpointsVersion { + fn from_com(com: IFabricServiceEndpointsVersion) -> Self { + Self { com } + } + + /// TODO: documentation. + pub fn compare(&self, other: &ServiceEndpointsVersion) -> crate::Result { + unsafe { self.com.Compare(&other.com) } + } +} + +// Bridge implementation for the notification handler to turn rust code into SF com object. +#[windows_core::implement(IFabricServiceNotificationEventHandler)] +pub struct ServiceNotificationEventHandlerBridge +where + T: ServiceNotificationEventHandler, +{ + inner: T, +} + +impl ServiceNotificationEventHandlerBridge +where + T: ServiceNotificationEventHandler, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } + + pub fn new_com(inner: T) -> IFabricServiceNotificationEventHandler { + Self::new(inner).into() + } +} + +impl IFabricServiceNotificationEventHandler_Impl for ServiceNotificationEventHandlerBridge +where + T: ServiceNotificationEventHandler, +{ + fn OnNotification( + &self, + notification: Option<&IFabricServiceNotification>, + ) -> crate::Result<()> { + let com = notification.unwrap(); + let msg = ServiceNotification::from_com(com.to_owned()); + self.inner.on_notification(&msg) + } +} + +/// Lambda implemnentation of ServiceNotificationEventHandler trait. +/// This is used in FabricClientBuilder to build function into handler. +/// Not exposed to user. +pub struct LambdaServiceNotificationHandler +where + T: Fn(&ServiceNotification) -> crate::Result<()> + 'static, +{ + f: T, +} + +impl LambdaServiceNotificationHandler +where + T: Fn(&ServiceNotification) -> crate::Result<()> + 'static, +{ + pub fn new(f: T) -> Self { + Self { f } + } +} + +impl ServiceNotificationEventHandler for LambdaServiceNotificationHandler +where + T: Fn(&ServiceNotification) -> crate::Result<()> + 'static, +{ + fn on_notification(&self, notification: &ServiceNotification) -> crate::Result<()> { + (self.f)(notification) + } +} diff --git a/crates/libs/core/src/client/tests.rs b/crates/libs/core/src/client/tests.rs index 99f59729..23831e7b 100644 --- a/crates/libs/core/src/client/tests.rs +++ b/crates/libs/core/src/client/tests.rs @@ -12,14 +12,14 @@ use tokio_util::sync::CancellationToken; use windows_core::HSTRING; use crate::{ - client::{svc_mgmt_client::PartitionKeyType, FabricClient}, + client::{svc_mgmt_client::PartitionKeyType, FabricClientBuilder}, error::FabricErrorCode, types::{NodeQueryDescription, NodeStatusFilter, PagedQueryDescription}, }; #[tokio::test] async fn test_fabric_client() { - let c = FabricClient::new(); + let c = FabricClientBuilder::new().build(); let qc = c.get_query_manager(); let timeout = Duration::from_secs(1); let paging_status; diff --git a/crates/libs/core/src/runtime/node_context.rs b/crates/libs/core/src/runtime/node_context.rs index 5538aa5e..1eb87052 100644 --- a/crates/libs/core/src/runtime/node_context.rs +++ b/crates/libs/core/src/runtime/node_context.rs @@ -9,6 +9,7 @@ use windows_core::{Interface, HSTRING}; use crate::{ strings::HSTRINGWrap, sync::{fabric_begin_end_proxy2, CancellationToken}, + types::NodeId, }; pub fn get_com_node_context( @@ -27,12 +28,6 @@ pub fn get_com_node_context( ) } -#[derive(Debug)] -pub struct NodeId { - pub low: u64, - pub high: u64, -} - #[derive(Debug)] pub struct NodeContext { com: IFabricNodeContextResult, @@ -81,10 +76,7 @@ impl From<&IFabricNodeContextResult> for NodeContext { node_type: HSTRINGWrap::from(raw_ref.NodeType).into(), ip_address_or_fqdn: HSTRINGWrap::from(raw_ref.IPAddressOrFQDN).into(), node_instance_id: raw_ref.NodeInstanceId, - node_id: NodeId { - low: raw_ref.NodeId.Low, - high: raw_ref.NodeId.High, - }, + node_id: raw_ref.NodeId.into(), } } } diff --git a/crates/libs/core/src/sync/mod.rs b/crates/libs/core/src/sync/mod.rs index c6e8ee73..1dd7a3d1 100644 --- a/crates/libs/core/src/sync/mod.rs +++ b/crates/libs/core/src/sync/mod.rs @@ -13,16 +13,11 @@ use std::{ task::{Context, Poll}, }; -use mssf_com::{ - FabricClient::FabricCreateLocalClient, - FabricCommon::{ - IFabricAsyncOperationCallback, IFabricAsyncOperationCallback_Impl, - IFabricAsyncOperationContext, - }, +use mssf_com::FabricCommon::{ + IFabricAsyncOperationCallback, IFabricAsyncOperationCallback_Impl, IFabricAsyncOperationContext, }; use tokio::sync::oneshot::Receiver; use windows::core::implement; -use windows_core::Interface; mod proxy; pub mod wait; @@ -36,11 +31,6 @@ pub use cancel::*; // fabric code begins here -// Creates the local client -pub fn CreateLocalClient() -> T { - unsafe { T::from_raw(FabricCreateLocalClient(&T::IID).expect("cannot get localclient")) } -} - pub trait Callback: FnOnce(::core::option::Option<&IFabricAsyncOperationContext>) + 'static { @@ -183,7 +173,7 @@ mod tests { use windows::core::implement; use windows_core::{Interface, HSTRING}; - use super::{oneshot_channel, CreateLocalClient, FabricReceiver, SBox}; + use super::{oneshot_channel, FabricReceiver, SBox}; use super::AwaitableCallback2; @@ -242,7 +232,7 @@ mod tests { pub fn new() -> $name { return $name { com: paste::item! { - crate::sync::CreateLocalClient::]>() + crate::client::FabricClientBuilder::new().build_interface::]>() }, }; } @@ -330,7 +320,8 @@ mod tests { impl FabricQueryClient { pub fn new() -> FabricQueryClient { FabricQueryClient { - com: CreateLocalClient::(), + com: crate::client::FabricClientBuilder::new() + .build_interface::(), } } @@ -520,7 +511,8 @@ mod tests { #[test] fn local_client_create() { - let _mgmt = CreateLocalClient::(); + let _mgmt = crate::client::FabricClientBuilder::new() + .build_interface::(); } #[tokio::test] diff --git a/crates/libs/core/src/types/client/mod.rs b/crates/libs/core/src/types/client/mod.rs index 8e5796bf..407703a4 100644 --- a/crates/libs/core/src/types/client/mod.rs +++ b/crates/libs/core/src/types/client/mod.rs @@ -6,8 +6,9 @@ // This mod contains fabric client related types mod partition; use mssf_com::FabricTypes::{ - FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS, - FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX, + FABRIC_CLIENT_ROLE, FABRIC_CLIENT_ROLE_ADMIN, FABRIC_CLIENT_ROLE_UNKNOWN, + FABRIC_CLIENT_ROLE_USER, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, + FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NAME_PREFIX, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_NONE, FABRIC_SERVICE_NOTIFICATION_FILTER_FLAGS_PRIMARY_ONLY, FABRIC_URI, }; @@ -54,3 +55,34 @@ impl From<&ServiceNotificationFilterDescription> } } } + +// FABRIC_CLIENT_ROLE +#[derive(Debug, PartialEq, Clone)] +pub enum ClientRole { + Unknown, // Do not pass this in SF api, use User instead. + User, + Admin, + // ElevatedAdmin not supported by SF 6.x sdk yet. +} + +impl From for ClientRole { + fn from(value: FABRIC_CLIENT_ROLE) -> Self { + match value { + FABRIC_CLIENT_ROLE_UNKNOWN => Self::Unknown, + FABRIC_CLIENT_ROLE_USER => Self::User, + FABRIC_CLIENT_ROLE_ADMIN => Self::Admin, + // FABRIC_CLIENT_ROLE_ELEVATED_ADMIN => Self::ElevatedAdmin, + _ => Self::Unknown, + } + } +} + +impl From for FABRIC_CLIENT_ROLE { + fn from(value: ClientRole) -> Self { + match value { + ClientRole::Unknown => FABRIC_CLIENT_ROLE_UNKNOWN, + ClientRole::User => FABRIC_CLIENT_ROLE_USER, + ClientRole::Admin => FABRIC_CLIENT_ROLE_ADMIN, + } + } +} diff --git a/crates/libs/core/src/types/client/node.rs b/crates/libs/core/src/types/client/node.rs index ef3c1c41..7907df08 100644 --- a/crates/libs/core/src/types/client/node.rs +++ b/crates/libs/core/src/types/client/node.rs @@ -11,7 +11,7 @@ use bitflags::bitflags; use mssf_com::{ FabricClient::IFabricGetNodeListResult2, FabricTypes::{ - FABRIC_NODE_QUERY_RESULT_ITEM, FABRIC_NODE_QUERY_RESULT_ITEM_EX1, + FABRIC_NODE_ID, FABRIC_NODE_QUERY_RESULT_ITEM, FABRIC_NODE_QUERY_RESULT_ITEM_EX1, FABRIC_NODE_QUERY_RESULT_ITEM_EX2, FABRIC_PAGING_STATUS, FABRIC_QUERY_NODE_STATUS_FILTER_ALL, FABRIC_QUERY_NODE_STATUS_FILTER_DEFAULT, FABRIC_QUERY_NODE_STATUS_FILTER_DISABLED, FABRIC_QUERY_NODE_STATUS_FILTER_DISABLING, @@ -145,3 +145,19 @@ impl From<&FABRIC_NODE_QUERY_RESULT_ITEM> for Node { } } } + +// FABRIC_NODE_ID +#[derive(Debug, Clone)] +pub struct NodeId { + pub low: u64, + pub high: u64, +} + +impl From for NodeId { + fn from(value: FABRIC_NODE_ID) -> Self { + Self { + low: value.Low, + high: value.High, + } + } +} diff --git a/crates/samples/echomain-stateful2/src/test.rs b/crates/samples/echomain-stateful2/src/test.rs index 679aa1f6..3d15b5a7 100644 --- a/crates/samples/echomain-stateful2/src/test.rs +++ b/crates/samples/echomain-stateful2/src/test.rs @@ -11,7 +11,7 @@ use mssf_core::{ PartitionKeyType, ResolvedServiceEndpoint, ResolvedServicePartition, ServiceEndpointRole, ServicePartitionKind, }, - FabricClient, + FabricClient, FabricClientBuilder, }, error::FabricErrorCode, types::{ @@ -233,7 +233,7 @@ impl TestClient { // Uses fabric client to perform various actions for this service. #[tokio::test] async fn test_partition_info() { - let fc = FabricClient::new(); + let fc = FabricClientBuilder::new().build(); let tc = TestClient::new(fc.clone()); let timeout = Duration::from_secs(1); diff --git a/crates/samples/echomain/src/test.rs b/crates/samples/echomain/src/test.rs index 6c6f32b4..c36638bd 100644 --- a/crates/samples/echomain/src/test.rs +++ b/crates/samples/echomain/src/test.rs @@ -11,7 +11,7 @@ use mssf_core::{ PartitionKeyType, ResolvedServiceEndpoint, ResolvedServicePartitionInfo, ServiceEndpointRole, ServicePartitionKind, }, - FabricClient, + FabricClient, FabricClientBuilder, GatewayInformationResult, ServiceNotification, }, error::FabricErrorCode, types::{ @@ -25,6 +25,8 @@ use mssf_core::{ }; static ECHO_SVC_URI: &str = "fabric:/EchoApp/EchoAppService"; +static MAX_RETRY_COUNT: i32 = 5; +static RETRY_DURATION_SHORT: Duration = Duration::from_secs(1); // Test client for echo server. pub struct EchoTestClient { @@ -114,7 +116,28 @@ impl EchoTestClient { // Uses fabric client to perform various actions to the app. #[tokio::test] async fn test_fabric_client() { - let fc = FabricClient::new(); + // channel for service notification + let (sn_tx, mut sn_rx) = tokio::sync::mpsc::channel::(1); + // channel for client connection notification + let (cc_tx, mut cc_rx) = tokio::sync::mpsc::channel::(1); + let fc = FabricClientBuilder::new() + .with_on_service_notification(move |notification| { + sn_tx + .blocking_send(notification.clone()) + .expect("cannot send notification"); + Ok(()) + }) + .with_on_client_connect(move |gw| { + cc_tx.blocking_send(gw.clone()).expect("cannot send"); + Ok(()) + }) + .with_on_client_disconnect(move |_| { + // This is not invoked in this test. FabricClient does not invoke this on drop. + panic!("client disconnected"); + }) + .with_client_role(mssf_core::types::ClientRole::User) + .build(); + let ec = EchoTestClient::new(fc.clone()); let timeout = Duration::from_secs(1); @@ -128,6 +151,10 @@ async fn test_fabric_client() { // assert_eq!(stateless.health_state, HealthState::Ok); assert_ne!(single.id, GUID::zeroed()); + // Connection event notification should be received since we already sent a request. + let gw = cc_rx.try_recv().expect("notification not present"); + assert!(!gw.node_name.is_empty()); + // Get replica info let stateless_replica = ec.get_replica(single.id).await.unwrap(); @@ -182,18 +209,38 @@ async fn test_fabric_client() { if replica2.instance_id != stateless_replica.instance_id { break; } else { - if count > 5 { + if count > MAX_RETRY_COUNT { panic!( "replica id not changed after retry. original {}, new {}", stateless_replica.instance_id, replica2.instance_id ); } // replica has not changed yet. - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(RETRY_DURATION_SHORT).await; } count += 1; } + // check service notification is invoked because service addr is changed for + // replica removal and recreation. + for i in 0..MAX_RETRY_COUNT { + match sn_rx.try_recv() { + Ok(sn) => { + assert_eq!(sn.partition_id, single.id); + break; + } + Err(e) => { + if e == tokio::sync::mpsc::error::TryRecvError::Disconnected { + panic!("channnel should not be closed"); + } + if i == MAX_RETRY_COUNT { + panic!("notification not received"); + } + tokio::time::sleep(RETRY_DURATION_SHORT).await; + } + }; + } + // unregisters the notification mgmt.unregister_service_notification_filter(filter_handle, timeout, None) .await