Skip to content

Commit

Permalink
fix(disappearing-messages): insert group metadata-update into db from…
Browse files Browse the repository at this point in the history
… commits (#1614)
  • Loading branch information
mchenani authored Feb 7, 2025
1 parent 41cba8c commit 458bdaf
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 9 deletions.
34 changes: 32 additions & 2 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5014,11 +5014,16 @@ mod tests {
async fn test_disappearing_messages_deletion() {
let alix = new_test_client().await;
let alix_provider = alix.inner_client.mls_provider().unwrap();
let bola = new_test_client().await;
let bola_provider = bola.inner_client.mls_provider().unwrap();

// Step 1: Create a group
let alix_group = alix
.conversations()
.create_group(vec![], FfiCreateGroupOptions::default())
.create_group(
vec![bola.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

Expand All @@ -5034,7 +5039,7 @@ mod tests {
.find_messages(FfiListMessagesOptions::default())
.await
.unwrap();
assert_eq!(alix_messages.len(), 1);
assert_eq!(alix_messages.len(), 2);

// Step 4: Set disappearing settings to 5ns after the latest message
let latest_message_sent_at_ns = alix_messages.last().unwrap().sent_at_ns;
Expand Down Expand Up @@ -5067,6 +5072,31 @@ mod tests {
.is_conversation_message_disappearing_enabled()
.unwrap());

bola.conversations()
.sync_all_conversations(None)
.await
.unwrap();

let bola_group_from_db = bola_provider
.conn_ref()
.find_group(&alix_group.id())
.unwrap();
assert_eq!(
bola_group_from_db
.clone()
.unwrap()
.message_disappear_from_ns
.unwrap(),
disappearing_settings.from_ns
);
assert_eq!(
bola_group_from_db.unwrap().message_disappear_in_ns.unwrap(),
disappearing_settings.in_ns
);
assert!(alix_group
.is_conversation_message_disappearing_enabled()
.unwrap());

// Step 5: Send additional messages
for msg in &["Msg 2 from group", "Msg 3 from group", "Msg 4 from group"] {
alix_group.send(msg.as_bytes().to_vec()).await.unwrap();
Expand Down
13 changes: 9 additions & 4 deletions xmtp_mls/src/groups/disappearing_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ where
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
self.client
.mls_provider()?
.conn_ref()
.delete_expired_messages()?;
let provider = self.client.mls_provider()?;
match provider.conn_ref().delete_expired_messages() {
Ok(deleted_count) => {
tracing::info!("Successfully deleted {} expired messages", deleted_count);
}
Err(e) => {
tracing::error!("Failed to delete expired messages, error: {:?}", e);
}
}
Ok(())
}
async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
Expand Down
36 changes: 33 additions & 3 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use tracing::debug;
use xmtp_common::{retry_async, Retry, RetryableError};
use xmtp_content_types::{group_updated::GroupUpdatedCodec, CodecError, ContentCodec};
use xmtp_id::{InboxId, InboxIdRef};
use xmtp_proto::xmtp::mls::message_contents::group_updated;
use xmtp_proto::xmtp::mls::{
api::v1::{
group_message::{Version as GroupMessageVersion, V1 as GroupMessageV1},
Expand Down Expand Up @@ -908,7 +909,7 @@ where
Ok::<_, GroupMessageProcessingError>(provider.conn_ref().set_group_intent_to_publish(intent_id)?)
}
IntentState::Committed => {
self.handle_metadata_update(provider, &intent)?;
self.handle_metadata_update_from_intent(provider, &intent)?;
Ok(provider.conn_ref().set_group_intent_committed(intent_id)?)
}
IntentState::Published => {
Expand Down Expand Up @@ -955,7 +956,7 @@ where
}

/// In case of metadataUpdate will extract the updated fields and store them to the db
fn handle_metadata_update(
fn handle_metadata_update_from_intent(
&self,
provider: &XmtpOpenMlsProvider,
intent: &StoredGroupIntent,
Expand Down Expand Up @@ -983,6 +984,34 @@ where
Ok(())
}

fn handle_metadata_update_from_commit(
&self,
conn: &DbConnection,
metadata_field_changes: Vec<group_updated::MetadataFieldChange>,
) -> Result<(), StorageError> {
for change in metadata_field_changes {
match change.field_name.as_str() {
field_name if field_name == MetadataField::MessageDisappearFromNS.as_str() => {
let parsed_value = change
.new_value
.as_deref()
.and_then(|v| v.parse::<i64>().ok());
conn.update_message_disappearing_from_ns(self.group_id.clone(), parsed_value)?
}
field_name if field_name == MetadataField::MessageDisappearInNS.as_str() => {
let parsed_value = change
.new_value
.as_deref()
.and_then(|v| v.parse::<i64>().ok());
conn.update_message_disappearing_in_ns(self.group_id.clone(), parsed_value)?
}
_ => {} // Handle other metadata updates if needed
}
}

Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
async fn consume_message(
&self,
Expand Down Expand Up @@ -1113,7 +1142,7 @@ where
let sender_inbox_id = validated_commit.actor_inbox_id();

let payload: GroupUpdated = validated_commit.into();
let encoded_payload = GroupUpdatedCodec::encode(payload)?;
let encoded_payload = GroupUpdatedCodec::encode(payload.clone())?;
let mut encoded_payload_bytes = Vec::new();
encoded_payload.encode(&mut encoded_payload_bytes)?;

Expand All @@ -1136,6 +1165,7 @@ where
}
}
};
self.handle_metadata_update_from_commit(conn, payload.metadata_field_changes)?;
let msg = StoredGroupMessage {
id: message_id,
group_id: group_id.to_vec(),
Expand Down

0 comments on commit 458bdaf

Please sign in to comment.