From f688f856cbb5d7106133320baf9bd6b9035c7a5a Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Mon, 6 Feb 2023 17:20:27 +0100 Subject: [PATCH 1/6] Passing database instance to listen callback and remove Sync --- examples/listen-changes.rs | 2 +- src/db/listen_changes.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/listen-changes.rs b/examples/listen-changes.rs index d27c7cf..695e611 100644 --- a/examples/listen-changes.rs +++ b/examples/listen-changes.rs @@ -111,7 +111,7 @@ async fn main() -> Result<(), Box> { .add_target(TEST_TARGET_ID_BY_DOC_IDS, &mut listener)?; listener - .start(|event| async move { + .start(|event, _edb| async move { match event { FirestoreListenEvent::DocumentChange(ref doc_change) => { println!("Doc changed: {doc_change:?}"); diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 55cec98..60bdd01 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -227,7 +227,7 @@ where pub async fn start(&mut self, cb: FN) -> FirestoreResult<()> where - FN: Fn(FirestoreListenEvent) -> F + Send + Sync + 'static, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync + 'static, F: Future> + Send + 'static, { info!( @@ -296,7 +296,7 @@ where cb: FN, ) where D: FirestoreListenSupport + Clone + Send + Sync, - FN: Fn(FirestoreListenEvent) -> F + Send + Sync, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync, F: Future> + Send, { while !shutdown_flag.load(Ordering::Relaxed) { From 23accc9db3ce03fb34d38b8ae70903c3531b28d4 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Mon, 6 Feb 2023 18:57:08 +0100 Subject: [PATCH 2/6] Change clone to ref for listen_changes --- src/db/listen_changes.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 60bdd01..90552f7 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -227,7 +227,7 @@ where pub async fn start(&mut self, cb: FN) -> FirestoreResult<()> where - FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync + 'static, + FN: Fn(FirestoreListenEvent, &D) -> F + Send + Sync + 'static, F: Future> + Send + 'static, { info!( @@ -296,7 +296,7 @@ where cb: FN, ) where D: FirestoreListenSupport + Clone + Send + Sync, - FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync, + FN: Fn(FirestoreListenEvent, &D) -> F + Send + Sync, F: Future> + Send, { while !shutdown_flag.load(Ordering::Relaxed) { @@ -342,7 +342,7 @@ where } Some(response_type) => { - if let Err(err) = cb(response_type).await { + if let Err(err) = cb(response_type, &db).await { error!("Listener callback function error occurred {:?}.", err); break; } From ce2aac51bb884b2e470a2258f002fb20f5429035 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Mon, 6 Feb 2023 19:20:51 +0100 Subject: [PATCH 3/6] Revert back to clone for now --- src/db/listen_changes.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 90552f7..de0a9ff 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -227,7 +227,7 @@ where pub async fn start(&mut self, cb: FN) -> FirestoreResult<()> where - FN: Fn(FirestoreListenEvent, &D) -> F + Send + Sync + 'static, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync + 'static, F: Future> + Send + 'static, { info!( @@ -296,7 +296,7 @@ where cb: FN, ) where D: FirestoreListenSupport + Clone + Send + Sync, - FN: Fn(FirestoreListenEvent, &D) -> F + Send + Sync, + FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync, F: Future> + Send, { while !shutdown_flag.load(Ordering::Relaxed) { @@ -342,7 +342,7 @@ where } Some(response_type) => { - if let Err(err) = cb(response_type, &db).await { + if let Err(err) = cb(response_type, db.clone()).await { error!("Listener callback function error occurred {:?}.", err); break; } From 2ddc9d17b9dfde8e4efdc97fdac1ae6ccfb8f47f Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 11 Aug 2023 11:01:50 +0200 Subject: [PATCH 4/6] FirestoreTargetManager support --- src/db/listen_changes.rs | 31 ++++++++++++++++++++++--------- src/fluent_api/select_builder.rs | 15 +++++++-------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 898792d..603f3be 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -232,6 +232,11 @@ impl FirestoreDb { pub type FirestoreListenEvent = listen_response::ResponseType; +pub trait FirestoreTargetManager { + fn add_target(&mut self, target: FirestoreListenerTargetParams) -> FirestoreResult<()>; + fn remove_target(&mut self, target: FirestoreListenerTarget) -> FirestoreResult<()>; +} + #[derive(Debug, Clone, Builder)] pub struct FirestoreListenerParams { pub retry_delay: Option, @@ -272,15 +277,6 @@ where }) } - pub fn add_target( - &mut self, - target_params: FirestoreListenerTargetParams, - ) -> FirestoreResult<()> { - target_params.validate()?; - self.targets.push(target_params); - Ok(()) - } - pub async fn start(&mut self, cb: FN) -> FirestoreResult<()> where FN: Fn(FirestoreListenEvent, D) -> F + Send + Sync + 'static, @@ -469,3 +465,20 @@ where } } } + +impl FirestoreTargetManager for FirestoreListener +where + D: FirestoreListenSupport + Clone + Send + Sync + 'static, + S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static, +{ + fn add_target(&mut self, target_params: FirestoreListenerTargetParams) -> FirestoreResult<()> { + target_params.validate()?; + self.targets.push(target_params); + Ok(()) + } + + fn remove_target(&mut self, target: FirestoreListenerTarget) -> FirestoreResult<()> { + self.targets.retain(|t| t.target != target); + Ok(()) + } +} diff --git a/src/fluent_api/select_builder.rs b/src/fluent_api/select_builder.rs index e39a8f6..ad790f0 100644 --- a/src/fluent_api/select_builder.rs +++ b/src/fluent_api/select_builder.rs @@ -4,11 +4,10 @@ use crate::select_filter_builder::FirestoreQueryFilterBuilder; use crate::{ FirestoreAggregatedQueryParams, FirestoreAggregatedQuerySupport, FirestoreAggregation, FirestoreCollectionDocuments, FirestoreGetByIdSupport, FirestoreListenSupport, - FirestoreListener, FirestoreListenerParams, FirestoreListenerTarget, - FirestoreListenerTargetParams, FirestorePartition, FirestorePartitionQueryParams, - FirestoreQueryCollection, FirestoreQueryCursor, FirestoreQueryFilter, FirestoreQueryOrder, - FirestoreQueryParams, FirestoreQuerySupport, FirestoreResult, FirestoreResumeStateStorage, - FirestoreTargetType, + FirestoreListenerParams, FirestoreListenerTarget, FirestoreListenerTargetParams, + FirestorePartition, FirestorePartitionQueryParams, FirestoreQueryCollection, + FirestoreQueryCursor, FirestoreQueryFilter, FirestoreQueryOrder, FirestoreQueryParams, + FirestoreQuerySupport, FirestoreResult, FirestoreTargetManager, FirestoreTargetType, }; use futures::stream::BoxStream; use gcloud_sdk::google::firestore::v1::Document; @@ -780,13 +779,13 @@ where } #[inline] - pub fn add_target( + pub fn add_target( self, target: FirestoreListenerTarget, - listener: &mut FirestoreListener, + listener: &mut TM, ) -> FirestoreResult<()> where - S: FirestoreResumeStateStorage + Send + Sync + Clone + 'static, + TM: FirestoreTargetManager, { listener.add_target(FirestoreListenerTargetParams::new( target, From b14b8bacb7bd8d01c874bba1057823b6b4bb9e9b Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 11 Aug 2023 11:55:00 +0200 Subject: [PATCH 5/6] FirestoreTargetManager store updates --- examples/listen-changes.rs | 1 + src/db/listen_changes.rs | 125 +++++++++++++++++++++++-------------- 2 files changed, 79 insertions(+), 47 deletions(-) diff --git a/examples/listen-changes.rs b/examples/listen-changes.rs index 891a46e..3ce4aa5 100644 --- a/examples/listen-changes.rs +++ b/examples/listen-changes.rs @@ -87,6 +87,7 @@ async fn main() -> Result<(), Box> { } } _ => { + //storage.remove_target(&TEST_TARGET_ID_BY_DOC_IDS).ok(); println!("Received a listen response event to handle: {event:?}"); } } diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index 603f3be..a05c518 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -234,7 +234,7 @@ pub type FirestoreListenEvent = listen_response::ResponseType; pub trait FirestoreTargetManager { fn add_target(&mut self, target: FirestoreListenerTargetParams) -> FirestoreResult<()>; - fn remove_target(&mut self, target: FirestoreListenerTarget) -> FirestoreResult<()>; + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()>; } #[derive(Debug, Clone, Builder)] @@ -248,9 +248,9 @@ where S: FirestoreResumeStateStorage, { db: D, - storage: S, + initial_targets_storage: FirestoreTargetManagerStorage, + resume_state_storage: S, listener_params: FirestoreListenerParams, - targets: Vec, shutdown_flag: Arc, shutdown_handle: Option>, shutdown_writer: Option>>, @@ -263,14 +263,14 @@ where { pub async fn new( db: D, - storage: S, + resume_state_storage: S, listener_params: FirestoreListenerParams, ) -> FirestoreResult> { Ok(FirestoreListener { db, - storage, + initial_targets_storage: FirestoreTargetManagerStorage::new(false), + resume_state_storage, listener_params, - targets: vec![], shutdown_flag: Arc::new(AtomicBool::new(false)), shutdown_handle: None, shutdown_writer: None, @@ -283,15 +283,15 @@ where F: Future> + Send + 'static, { info!( - "Starting a Firestore listener for targets: {:?}...", - &self.targets.len() + "Starting a Firestore listener for initial targets: {:?}...", + &self.initial_targets_storage.targets.len() ); - let mut initial_states: HashMap = - HashMap::new(); - for target_params in &self.targets { - let initial_state = self - .storage + let mut targets_storage = self.initial_targets_storage.clone(); + + for (_, target_params) in &mut targets_storage.targets { + let maybe_initial_token = self + .resume_state_storage .read_resume_state(&target_params.target) .map_err(|err| { FirestoreError::SystemError(FirestoreSystemError::new( @@ -301,10 +301,7 @@ where }) .await?; - initial_states.insert( - target_params.target.clone(), - target_params.clone().opt_resume_type(initial_state), - ); + target_params.mopt_resume_type(maybe_initial_token); } let (tx, rx): (UnboundedSender, UnboundedReceiver) = @@ -313,9 +310,9 @@ where self.shutdown_writer = Some(Arc::new(tx)); self.shutdown_handle = Some(tokio::spawn(Self::listener_loop( self.db.clone(), - self.storage.clone(), + self.resume_state_storage.clone(), self.shutdown_flag.clone(), - initial_states, + targets_storage, self.listener_params.clone(), rx, cb, @@ -342,7 +339,7 @@ where db: D, storage: S, shutdown_flag: Arc, - mut targets_state: HashMap, + mut targets_state: FirestoreTargetManagerStorage, listener_params: FirestoreListenerParams, mut shutdown_receiver: UnboundedReceiver, cb: FN, @@ -356,10 +353,13 @@ where .unwrap_or_else(|| std::time::Duration::from_secs(5)); while !shutdown_flag.load(Ordering::Relaxed) { - debug!("Start listening on {} targets ... ", targets_state.len()); + debug!( + "Start listening on {} targets ... ", + targets_state.targets.len() + ); match db - .listen_doc_changes(targets_state.values().cloned().collect()) + .listen_doc_changes(targets_state.targets.values().cloned().collect()) .await { Err(err) => { @@ -370,7 +370,7 @@ where Ok(mut listen_stream) => loop { tokio::select! { _ = shutdown_receiver.recv() => { - debug!("Exiting from listener on {} targets...", targets_state.len()); + debug!("Exiting from listener on {} targets...", targets_state.targets.len()); shutdown_receiver.close(); break; } @@ -383,33 +383,32 @@ where Ok(Some(event)) => { trace!("Received a listen response event to handle: {:?}", event); match event.response_type { - Some(listen_response::ResponseType::TargetChange(ref target_change)) - if !target_change.resume_token.is_empty() => - { - for target_id_num in &target_change.target_ids { - match FirestoreListenerTarget::try_from(*target_id_num) { - Ok(target_id) => { - if let Some(target) = targets_state.get_mut(&target_id) { - let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); - - if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { - error!("Listener token storage error occurred {:?}.", err); + Some(response_type) => { + if let listen_response::ResponseType::TargetChange(ref target_change) = &response_type { + if !target_change.resume_token.is_empty() { + for target_id_num in &target_change.target_ids { + match FirestoreListenerTarget::try_from(*target_id_num) { + Ok(target_id) => { + if let Some(target) = targets_state.targets.get_mut(&target_id) { + let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); + + if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { + error!("Listener token storage error occurred {:?}.", err); + break; + } + else { + target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) + } + } + }, + Err(err) => { + error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); break; } - else { - target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) - } } - }, - Err(err) => { - error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); - break; } } } - - } - Some(response_type) => { if let Err(err) = cb(response_type, db.clone()).await { error!("Listener callback function error occurred {:?}.", err); break; @@ -471,14 +470,46 @@ where D: FirestoreListenSupport + Clone + Send + Sync + 'static, S: FirestoreResumeStateStorage + Clone + Send + Sync + 'static, { + fn add_target(&mut self, target_params: FirestoreListenerTargetParams) -> FirestoreResult<()> { + self.initial_targets_storage.add_target(target_params) + } + + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()> { + self.initial_targets_storage.remove_target(target) + } +} + +#[derive(Clone)] +pub struct FirestoreTargetManagerStorage { + targets: HashMap, + change_log_mode: bool, + to_remove: Vec, +} + +impl FirestoreTargetManagerStorage { + fn new(change_log_mode: bool) -> Self { + Self { + targets: HashMap::new(), + change_log_mode, + to_remove: Vec::new(), + } + } +} + +impl FirestoreTargetManager for FirestoreTargetManagerStorage { fn add_target(&mut self, target_params: FirestoreListenerTargetParams) -> FirestoreResult<()> { target_params.validate()?; - self.targets.push(target_params); + self.targets + .insert(target_params.target.clone(), target_params); Ok(()) } - fn remove_target(&mut self, target: FirestoreListenerTarget) -> FirestoreResult<()> { - self.targets.retain(|t| t.target != target); + fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()> { + if self.targets.remove(target).is_none() { + if self.change_log_mode { + self.to_remove.push(target.clone()); + } + } Ok(()) } } From 91168f6b69edc0ec1e5f1491caee409232f91600 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 11 Aug 2023 11:57:54 +0200 Subject: [PATCH 6/6] Clippy fixes --- src/db/listen_changes.rs | 57 ++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/src/db/listen_changes.rs b/src/db/listen_changes.rs index a05c518..41a8abd 100644 --- a/src/db/listen_changes.rs +++ b/src/db/listen_changes.rs @@ -289,7 +289,7 @@ where let mut targets_storage = self.initial_targets_storage.clone(); - for (_, target_params) in &mut targets_storage.targets { + for target_params in &mut targets_storage.targets.values_mut() { let maybe_initial_token = self .resume_state_storage .read_resume_state(&target_params.target) @@ -382,39 +382,36 @@ where match tried { Ok(Some(event)) => { trace!("Received a listen response event to handle: {:?}", event); - match event.response_type { - Some(response_type) => { - if let listen_response::ResponseType::TargetChange(ref target_change) = &response_type { - if !target_change.resume_token.is_empty() { - for target_id_num in &target_change.target_ids { - match FirestoreListenerTarget::try_from(*target_id_num) { - Ok(target_id) => { - if let Some(target) = targets_state.targets.get_mut(&target_id) { - let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); - - if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { - error!("Listener token storage error occurred {:?}.", err); - break; - } - else { - target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) - } + if let Some(response_type) = event.response_type { + if let listen_response::ResponseType::TargetChange(ref target_change) = &response_type { + if !target_change.resume_token.is_empty() { + for target_id_num in &target_change.target_ids { + match FirestoreListenerTarget::try_from(*target_id_num) { + Ok(target_id) => { + if let Some(target) = targets_state.targets.get_mut(&target_id) { + let new_token: FirestoreListenerToken = target_change.resume_token.clone().into(); + + if let Err(err) = storage.update_resume_token(&target.target, new_token.clone()).await { + error!("Listener token storage error occurred {:?}.", err); + break; + } + else { + target.resume_type = Some(FirestoreListenerTargetResumeType::Token(new_token)) } - }, - Err(err) => { - error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); - break; } + }, + Err(err) => { + error!("Listener system error - unexpected target ID: {} {:?}.", target_id_num, err); + break; } } } } - if let Err(err) = cb(response_type, db.clone()).await { - error!("Listener callback function error occurred {:?}.", err); - break; - } } - None => {} + if let Err(err) = cb(response_type, db.clone()).await { + error!("Listener callback function error occurred {:?}.", err); + break; + } } } Ok(None) => break, @@ -505,10 +502,8 @@ impl FirestoreTargetManager for FirestoreTargetManagerStorage { } fn remove_target(&mut self, target: &FirestoreListenerTarget) -> FirestoreResult<()> { - if self.targets.remove(target).is_none() { - if self.change_log_mode { - self.to_remove.push(target.clone()); - } + if self.targets.remove(target).is_none() && self.change_log_mode { + self.to_remove.push(target.clone()); } Ok(()) }