diff --git a/examples/listen-changes.rs b/examples/listen-changes.rs index a1adc1b..3ce4aa5 100644 --- a/examples/listen-changes.rs +++ b/examples/listen-changes.rs @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box> { .add_target(TEST_TARGET_ID_BY_DOC_IDS, &mut listener)?; listener - .start(|event| async move { + .start(|event, _edb| async move { match event { FirestoreListenEvent::DocumentChange(ref doc_change) => { println!("Doc changed: {doc_change:?}"); @@ -87,6 +87,7 @@ async fn main() -> Result<(), Box> { } } _ => { + //storage.remove_target(&TEST_TARGET_ID_BY_DOC_IDS).ok(); println!("Received a listen response event to handle: {event:?}"); } } diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 365c36d..41a8abd 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -232,6 +232,11 @@ impl FirestoreDb { pub type FirestoreListenEvent = listen_response::ResponseType; +pub trait FirestoreTargetManager { + fn add_target(&mut self, target: FirestoreListenerTargetParams) -> FirestoreResult<()>; + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()>; +} + #[derive(Debug, Clone, Builder)] pub struct FirestoreListenerParams { pub retry_delay: Option, @@ -243,9 +248,9 @@ where S: FirestoreResumeStateStorage, { db: D, - storage: S, + initial_targets_storage: FirestoreTargetManagerStorage, + resume_state_storage: S, listener_params: FirestoreListenerParams, - targets: Vec, shutdown_flag: Arc, shutdown_handle: Option>, shutdown_writer: Option>>, @@ -258,44 +263,35 @@ where { pub async fn new( db: D, - storage: S, + resume_state_storage: S, listener_params: FirestoreListenerParams, ) -> FirestoreResult> { Ok(FirestoreListener { db, - storage, + initial_targets_storage: FirestoreTargetManagerStorage::new(false), + resume_state_storage, listener_params, - targets: vec![], shutdown_flag: Arc::new(AtomicBool::new(false)), shutdown_handle: None, shutdown_writer: None, }) } - pub fn add_target( - &mut self, - target_params: FirestoreListenerTargetParams, - ) -> FirestoreResult<()> { - target_params.validate()?; - self.targets.push(target_params); - Ok(()) - } - pub async fn start(&mut self, cb: FN) -> FirestoreResult<()> where - FN: Fn(FirestoreListenEvent) -> F + Send + Sync + 'static, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync + 'static, F: Future> + Send + 'static, { info!( - "Starting a Firestore listener for targets: {:?}...", - &self.targets.len() + "Starting a Firestore listener for initial targets: {:?}...", + &self.initial_targets_storage.targets.len() ); - let mut initial_states: HashMap = - HashMap::new(); - for target_params in &self.targets { - let initial_state = self - .storage + let mut targets_storage = self.initial_targets_storage.clone(); + + for target_params in &mut targets_storage.targets.values_mut() { + let maybe_initial_token = self + .resume_state_storage .read_resume_state(&target_params.target) .map_err(|err| { FirestoreError::SystemError(FirestoreSystemError::new( @@ -305,10 +301,7 @@ where }) .await?; - initial_states.insert( - target_params.target.clone(), - target_params.clone().opt_resume_type(initial_state), - ); + target_params.mopt_resume_type(maybe_initial_token); } let (tx, rx): (UnboundedSender, UnboundedReceiver) = @@ -317,9 +310,9 @@ where self.shutdown_writer = Some(Arc::new(tx)); self.shutdown_handle = Some(tokio::spawn(Self::listener_loop( self.db.clone(), - self.storage.clone(), + self.resume_state_storage.clone(), self.shutdown_flag.clone(), - initial_states, + targets_storage, self.listener_params.clone(), rx, cb, @@ -346,13 +339,13 @@ where db: D, storage: S, shutdown_flag: Arc, - mut targets_state: HashMap, + mut targets_state: FirestoreTargetManagerStorage, listener_params: FirestoreListenerParams, mut shutdown_receiver: UnboundedReceiver, cb: FN, ) where D: FirestoreListenSupport + Clone + Send + Sync, - FN: Fn(FirestoreListenEvent) -> F + Send + Sync, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync, F: Future> + Send, { let effective_delay = listener_params @@ -360,10 +353,13 @@ where .unwrap_or_else(|| std::time::Duration::from_secs(5)); while !shutdown_flag.load(Ordering::Relaxed) { - debug!("Start listening on {} targets ... ", targets_state.len()); + debug!( + "Start listening on {} targets ... ", + targets_state.targets.len() + ); match db - .listen_doc_changes(targets_state.values().cloned().collect()) + .listen_doc_changes(targets_state.targets.values().cloned().collect()) .await { Err(err) => { @@ -374,7 +370,7 @@ where Ok(mut listen_stream) => loop { tokio::select! { _ = shutdown_receiver.recv() => { - debug!("Exiting from listener on {} targets...", targets_state.len()); + debug!("Exiting from listener on {} targets...", targets_state.targets.len()); shutdown_receiver.close(); break; } @@ -386,40 +382,36 @@ where match tried { Ok(Some(event)) => { trace!("Received a listen response event to handle: {:?}", event); - match event.response_type { - Some(listen_response::ResponseType::TargetChange(ref target_change)) - if !target_change.resume_token.is_empty() => - { - for target_id_num in &target_change.target_ids { - match FirestoreListenerTarget::try_from(*target_id_num) { - Ok(target_id) => { - if let Some(target) = targets_state.get_mut(&target_id) { - let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); - - if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { - error!("Listener token storage error occurred {:?}.", err); - break; - } - else { - target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) + if let Some(response_type) = event.response_type { + if let listen_response::ResponseType::TargetChange(ref target_change) = &response_type { + if !target_change.resume_token.is_empty() { + for target_id_num in &target_change.target_ids { + match FirestoreListenerTarget::try_from(*target_id_num) { + Ok(target_id) => { + if let Some(target) = targets_state.targets.get_mut(&target_id) { + let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); + + if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { + error!("Listener token storage error occurred {:?}.", err); + break; + } + else { + target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) + } } + }, + Err(err) => { + error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); + break; } - }, - Err(err) => { - error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); - break; } } } - } - Some(response_type) => { - if let Err(err) = cb(response_type).await { - error!("Listener callback function error occurred {:?}.", err); - break; - } + if let Err(err) = cb(response_type, db.clone()).await { + error!("Listener callback function error occurred {:?}.", err); + break; } - None => {} } } Ok(None) => break, @@ -469,3 +461,50 @@ where } } } + +impl FirestoreTargetManager for FirestoreListener +where + D: FirestoreListenSupport + Clone + Send + Sync + 'static, + S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static, +{ + fn add_target(&mut self, target_params: FirestoreListenerTargetParams) -> FirestoreResult<()> { + self.initial_targets_storage.add_target(target_params) + } + + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()> { + self.initial_targets_storage.remove_target(target) + } +} + +#[derive(Clone)] +pub struct FirestoreTargetManagerStorage { + targets: HashMap, + change_log_mode: bool, + to_remove: Vec, +} + +impl FirestoreTargetManagerStorage { + fn new(change_log_mode: bool) -> Self { + Self { + targets: HashMap::new(), + change_log_mode, + to_remove: Vec::new(), + } + } +} + +impl FirestoreTargetManager for FirestoreTargetManagerStorage { + fn add_target(&mut self, target_params: FirestoreListenerTargetParams) -> FirestoreResult<()> { + target_params.validate()?; + self.targets + .insert(target_params.target.clone(), target_params); + Ok(()) + } + + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()> { + if self.targets.remove(target).is_none() && self.change_log_mode { + self.to_remove.push(target.clone()); + } + Ok(()) + } +} diff --git a/src/fluent_api/select_builder.rs b/src/fluent_api/select_builder.rs index e39a8f6..ad790f0 100644 --- a/src/fluent_api/select_builder.rs +++ b/src/fluent_api/select_builder.rs @@ -4,11 +4,10 @@ use crate::select_filter_builder::FirestoreQueryFilterBuilder; use crate::{ FirestoreAggregatedQueryParams, FirestoreAggregatedQuerySupport, FirestoreAggregation, FirestoreCollectionDocuments, FirestoreGetByIdSupport, FirestoreListenSupport, - FirestoreListener, FirestoreListenerParams, FirestoreListenerTarget, - FirestoreListenerTargetParams, FirestorePartition, FirestorePartitionQueryParams, - FirestoreQueryCollection, FirestoreQueryCursor, FirestoreQueryFilter, FirestoreQueryOrder, - FirestoreQueryParams, FirestoreQuerySupport, FirestoreResult, FirestoreResumeStateStorage, - FirestoreTargetType, + FirestoreListenerParams, FirestoreListenerTarget, FirestoreListenerTargetParams, + FirestorePartition, FirestorePartitionQueryParams, FirestoreQueryCollection, + FirestoreQueryCursor, FirestoreQueryFilter, FirestoreQueryOrder, FirestoreQueryParams, + FirestoreQuerySupport, FirestoreResult, FirestoreTargetManager, FirestoreTargetType, }; use futures::stream::BoxStream; use gcloud_sdk::google::firestore::v1::Document; @@ -780,13 +779,13 @@ where } #[inline] - pub fn add_target( + pub fn add_target( self, target: FirestoreListenerTarget, - listener: &mut FirestoreListener, + listener: &mut TM, ) -> FirestoreResult<()> where - S: FirestoreResumeStateStorage + Send + Sync + Clone + 'static, + TM: FirestoreTargetManager, { listener.add_target(FirestoreListenerTargetParams::new( target,