Skip to content

Commit

Permalink
chore: fix notification (#333)
Browse files Browse the repository at this point in the history
* chore: fix notification

* chore: add test
  • Loading branch information
appflowy authored Nov 1, 2024
1 parent fa89b05 commit 4175465
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 11 deletions.
19 changes: 14 additions & 5 deletions collab-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ pub struct DatabaseBody {
impl DatabaseBody {
fn open(collab: Collab, context: DatabaseContext) -> Result<(Self, Collab), DatabaseError> {
CollabType::Database.validate_require_data(&collab)?;
let body = Self::from_collab(&collab, context.collab_service)
let body = Self::from_collab(&collab, context.collab_service, Some(context.notifier))
.ok_or_else(|| DatabaseError::NoRequiredData("Can not open database".to_string()))?;
Ok((body, collab))
}
Expand Down Expand Up @@ -1712,6 +1712,7 @@ impl DatabaseBody {
pub fn from_collab(
collab: &Collab,
collab_service: Arc<dyn DatabaseCollabService>,
notifier: Option<DatabaseNotify>,
) -> Option<Self> {
let txn = collab.context.transact();
let root: MapRef = collab.data.get_with_txn(&txn, DATABASE)?;
Expand All @@ -1720,17 +1721,25 @@ impl DatabaseBody {
let views: MapRef = root.get_with_txn(&txn, VIEWS)?; // { DATABASE: { FIELDS: {:}, VIEWS: {:} } }
let metas: MapRef = root.get_with_txn(&txn, DATABASE_METAS)?; // { DATABASE: { FIELDS: {:}, VIEWS: {:}, METAS: {:} } }

let fields = FieldMap::new(fields, None);
let views = DatabaseViews::new(CollabOrigin::Empty, views, None);
let fields = FieldMap::new(fields, notifier.as_ref().map(|n| n.field_change_tx.clone()));
let views = DatabaseViews::new(
CollabOrigin::Empty,
views,
notifier.as_ref().map(|n| n.view_change_tx.clone()),
);
let metas = MetaMap::new(metas);
let block = Block::new(database_id, collab_service, None);
let block = Block::new(
database_id,
collab_service,
notifier.as_ref().map(|n| n.row_change_tx.clone()),
);
Some(Self {
root,
views: views.into(),
fields: fields.into(),
metas: metas.into(),
block,
notifier: None,
notifier,
})
}

Expand Down
2 changes: 1 addition & 1 deletion collab-database/src/views/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ pub fn view_from_map_ref<T: ReadTxn>(map_ref: &MapRef, txn: &T) -> Option<Databa
.map(|map_ref| FieldSettingsByFieldIdMap::from((txn, &map_ref)))
.unwrap_or_default();

let is_inline: bool = map_ref.get_with_txn(txn, IS_INLINE).unwrap_or_default();
let is_inline: bool = map_ref.get_with_txn(txn, IS_INLINE).unwrap_or(false);

Some(DatabaseView {
id,
Expand Down
10 changes: 10 additions & 0 deletions collab-database/src/views/view_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,17 @@ fn handle_array_event(
insert_row_orders,
delete_row_indexes,
});
} else {
#[cfg(feature = "verbose_log")]
trace!("database view observe array event: no row order change");
}
} else {
#[cfg(feature = "verbose_log")]
trace!(
"Can not find database view id when receive key:{:?} event:{:?}",
key,
array_event.path()
);
}
}

Expand Down
12 changes: 8 additions & 4 deletions collab-database/tests/database_test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use tokio::time::timeout;
use uuid::Uuid;

pub struct DatabaseTest {
#[allow(dead_code)]
collab_db: Arc<CollabKVDB>,
pub workspace_id: String,
pub collab_db: Arc<CollabKVDB>,
pub database: Database,
pub pre_define_row_ids: Vec<RowId>,
}
Expand Down Expand Up @@ -62,7 +62,7 @@ pub fn create_database(uid: i64, database_id: &str) -> DatabaseTest {
let collab_db = make_rocks_db();
let collab_service = Arc::new(TestUserDatabaseServiceImpl {
uid,
workspace_id,
workspace_id: workspace_id.clone(),
db: collab_db.clone(),
});

Expand All @@ -83,6 +83,7 @@ pub fn create_database(uid: i64, database_id: &str) -> DatabaseTest {
});

DatabaseTest {
workspace_id,
database,
collab_db,
pre_define_row_ids: vec![],
Expand Down Expand Up @@ -138,6 +139,7 @@ pub async fn create_database_with_db(
(
collab_db.clone(),
DatabaseTest {
workspace_id: workspace_id.to_string(),
database,
collab_db,
pre_define_row_ids: vec![],
Expand All @@ -160,6 +162,7 @@ pub async fn restore_database_from_db(
let context = DatabaseContext::new(collab_service);
let database = Database::open(database_id, context).await.unwrap();
DatabaseTest {
workspace_id: workspace_id.to_string(),
database,
collab_db,
pre_define_row_ids: vec![],
Expand Down Expand Up @@ -218,7 +221,7 @@ impl DatabaseTestBuilder {
let collab_db = Arc::new(CollabKVDB::open(path).unwrap());
let collab_service = Arc::new(TestUserDatabaseServiceImpl {
uid: self.uid,
workspace_id,
workspace_id: workspace_id.clone(),
db: collab_db.clone(),
});
let context = DatabaseContext::new(collab_service);
Expand All @@ -238,6 +241,7 @@ impl DatabaseTestBuilder {
};
let database = Database::create_with_view(params, context).await.unwrap();
DatabaseTest {
workspace_id,
database,
collab_db,
pre_define_row_ids: vec![],
Expand Down
48 changes: 47 additions & 1 deletion collab-database/tests/database_test/view_observe_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::database_test::helper::{create_database, wait_for_specific_event};
use crate::database_test::helper::{
create_database, restore_database_from_db, wait_for_specific_event,
};
use crate::helper::setup_log;
use collab_database::database::gen_row_id;

Expand Down Expand Up @@ -311,6 +313,50 @@ async fn observer_create_delete_row_test() {
.unwrap();
}

#[tokio::test]
async fn observer_create_row_when_reopen_a_database_test() {
let database_id = uuid::Uuid::new_v4().to_string();
let database_test = create_database(1, &database_id);
let object_id = database_test.database.object_id().to_string();
let db = database_test.collab_db.clone();
let workspace_id = database_test.workspace_id.clone();

let database_test = restore_database_from_db(1, &workspace_id, &object_id, db).await;
let database_test = Arc::new(Mutex::from(database_test));
let row_id = gen_row_id();
let cloned_database_test = database_test.clone();
let cloned_row_id = row_id.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(300)).await;
let mut db = cloned_database_test.lock().await;
db.create_row(CreateRowParams::new(cloned_row_id, database_id.clone()))
.await
.unwrap();
});

let view_change_rx = database_test.lock().await.subscribe_view_change().unwrap();
wait_for_specific_event(view_change_rx, |event| match event {
DatabaseViewChange::DidUpdateRowOrders {
database_view_id,
is_local_change: _,
insert_row_orders,
delete_row_indexes,
} => {
if database_view_id == "v1" {
assert_eq!(insert_row_orders.len(), 1);
assert_eq!(insert_row_orders[0].0.id, row_id);
assert!(delete_row_indexes.is_empty());
true
} else {
false
}
},
_ => false,
})
.await
.unwrap();
}

#[tokio::test]
async fn observe_update_view_test() {
setup_log();
Expand Down

0 comments on commit 4175465

Please sign in to comment.