Skip to content

Commit

Permalink
chore: send realtime update
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Aug 11, 2023
1 parent ba963fa commit 64cd0a5
Show file tree
Hide file tree
Showing 22 changed files with 234 additions and 138 deletions.
35 changes: 28 additions & 7 deletions appflowy-integrate/src/collab_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use collab_plugins::cloud_storage::{CollabObject, CollabType, RemoteCollabStorag
use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use collab_plugins::local_storage::CollabPersistenceConfig;
use collab_plugins::snapshot::{CollabSnapshotPlugin, SnapshotPersistence};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};

use crate::config::{CollabPluginConfig, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY};

Expand All @@ -25,7 +25,11 @@ pub enum CollabStorageType {

pub trait CollabStorageProvider: Send + Sync + 'static {
fn storage_type(&self) -> CollabStorageType;
fn get_storage(&self, storage_type: &CollabStorageType) -> Option<Arc<dyn RemoteCollabStorage>>;
fn get_storage(
&self,
collab_object: &CollabObject,
storage_type: &CollabStorageType,
) -> Option<Arc<dyn RemoteCollabStorage>>;
fn is_sync_enabled(&self) -> bool;
}

Expand All @@ -37,8 +41,12 @@ where
(**self).storage_type()
}

fn get_storage(&self, storage_type: &CollabStorageType) -> Option<Arc<dyn RemoteCollabStorage>> {
(**self).get_storage(storage_type)
fn get_storage(
&self,
collab_object: &CollabObject,
storage_type: &CollabStorageType,
) -> Option<Arc<dyn RemoteCollabStorage>> {
(**self).get_storage(collab_object, storage_type)
}

fn is_sync_enabled(&self) -> bool {
Expand All @@ -51,6 +59,7 @@ pub struct AppFlowyCollabBuilder {
workspace_id: RwLock<Option<String>>,
cloud_storage: RwLock<Arc<dyn CollabStorageProvider>>,
snapshot_persistence: Option<Arc<dyn SnapshotPersistence>>,
device_id: Mutex<String>,
}

impl AppFlowyCollabBuilder {
Expand All @@ -63,13 +72,18 @@ impl AppFlowyCollabBuilder {
workspace_id: Default::default(),
cloud_storage: RwLock::new(Arc::new(cloud_storage)),
snapshot_persistence,
device_id: Default::default(),
}
}

pub fn initialize(&self, workspace_id: String) {
*self.workspace_id.write() = Some(workspace_id);
}

pub fn set_sync_device(&self, device_id: String) {
*self.device_id.lock() = device_id;
}

pub fn update_network(&self, reachable: bool) {
if reachable {
self
Expand Down Expand Up @@ -164,9 +178,12 @@ impl AppFlowyCollabBuilder {
anyhow::anyhow!("When using supabase plugin, the workspace_id should not be empty")
})?;
let collab_object = CollabObject::new(uid, object_id.to_string(), object_type.clone())
.with_workspace_id(workspace_id);
.with_workspace_id(workspace_id)
.with_device_id(self.device_id.lock().clone());
let local_collab_storage = collab_db.clone();
if let Some(remote_collab_storage) = cloud_storage.get_storage(&cloud_storage_type) {
if let Some(remote_collab_storage) =
cloud_storage.get_storage(&collab_object, &cloud_storage_type)
{
let plugin = SupabaseDBPlugin::new(
uid,
collab_object,
Expand Down Expand Up @@ -208,7 +225,11 @@ impl CollabStorageProvider for DefaultCollabStorageProvider {
CollabStorageType::Local
}

fn get_storage(&self, _storage_type: &CollabStorageType) -> Option<Arc<dyn RemoteCollabStorage>> {
fn get_storage(
&self,
collab_object: &CollabObject,
storage_type: &CollabStorageType,
) -> Option<Arc<dyn RemoteCollabStorage>> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions collab-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Database {
None => Self::create(database_id, context),
Some(database) => {
let collab_guard = context.collab.lock();
let (fields, views, metas) = collab_guard.with_transact_mut(|txn| {
let (fields, views, metas) = collab_guard.with_origin_transact_mut(|txn| {
// { DATABASE: { FIELDS: {:} } }
let fields = collab_guard
.get_map_with_txn(txn, vec![DATABASE, FIELDS])
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Database {
return Err(DatabaseError::InvalidDatabaseID("database_id is empty"));
}
let collab_guard = context.collab.lock();
let (database, fields, views, metas) = collab_guard.with_transact_mut(|txn| {
let (database, fields, views, metas) = collab_guard.with_origin_transact_mut(|txn| {
// { DATABASE: {:} }
let database = collab_guard
.get_map_with_txn(txn, vec![DATABASE])
Expand Down
8 changes: 4 additions & 4 deletions collab-database/src/rows/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl DatabaseRow {
let database_row = Self::new(uid, row_id, collab_db, collab);
let data = database_row.data.clone();
let meta = database_row.meta.clone();
database_row.collab.lock().with_transact_mut(|txn| {
database_row.collab.lock().with_origin_transact_mut(|txn| {
RowBuilder::new(row.id, txn, data.into_inner(), meta.into_inner())
.update(|update| {
update
Expand Down Expand Up @@ -84,7 +84,7 @@ impl DatabaseRow {

// If any of the data is missing, we need to create it.
let mut txn = if data.is_none() || meta.is_none() || comments.is_none() {
Some(collab_guard.transact_mut())
Some(collab_guard.origin_transact_mut())
} else {
None
};
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DatabaseRow {
where
F: FnOnce(RowUpdate),
{
self.collab.lock().with_transact_mut(|txn| {
self.collab.lock().with_origin_transact_mut(|txn| {
let mut update = RowUpdate::new(txn, &self.data, &self.meta);

// Update the last modified timestamp before we call the update function.
Expand All @@ -160,7 +160,7 @@ impl DatabaseRow {
self
.collab
.lock()
.with_transact_mut(|txn| match Uuid::parse_str(&self.row_id) {
.with_origin_transact_mut(|txn| match Uuid::parse_str(&self.row_id) {
Ok(row_id) => {
let update = RowMetaUpdate::new(txn, &self.meta, row_id);
f(update)
Expand Down
2 changes: 1 addition & 1 deletion collab-database/src/user/relation/db_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl DatabaseRelation {

let relation_map = match row_relation_map {
None => collab_guard
.with_transact_mut(|txn| collab_guard.insert_map_with_txn(txn, ROW_RELATION_MAP)),
.with_origin_transact_mut(|txn| collab_guard.insert_map_with_txn(txn, ROW_RELATION_MAP)),
Some(row_relation_map) => row_relation_map,
};

Expand Down
4 changes: 2 additions & 2 deletions collab-database/src/user/user_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl WorkspaceDatabase {
) -> Result<Database, DatabaseError> {
let collab = self.collab_for_database(database_id, CollabRawData::default());
let update = Update::decode_v1(&snapshot.data)?;
collab.lock().with_transact_mut(|txn| {
collab.lock().with_origin_transact_mut(|txn| {
txn.apply_update(update);
});

Expand Down Expand Up @@ -369,7 +369,7 @@ fn get_database_array_ref(collab: &Collab) -> DatabaseArray {
};

let databases = match array {
None => collab.with_transact_mut(|txn| {
None => collab.with_origin_transact_mut(|txn| {
collab.create_array_with_txn::<MapPrelim<lib0Any>>(txn, DATABASES, vec![])
}),
Some(array) => array,
Expand Down
59 changes: 30 additions & 29 deletions collab-document/src/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Document {

/// Apply actions to the document.
pub fn apply_action(&self, actions: Vec<BlockAction>) {
self.inner.lock().with_transact_mut(|txn| {
self.inner.lock().with_origin_transact_mut(|txn| {
for action in actions {
let payload = action.payload;
let mut block = payload.block;
Expand Down Expand Up @@ -372,37 +372,38 @@ impl Document {
data: Option<DocumentData>,
) -> Result<Self, DocumentError> {
let mut collab_guard = collab.lock();
let (root, block_operation, children_operation) = collab_guard.with_transact_mut(|txn| {
// { document: {:} }
let root = collab_guard.insert_map_with_txn(txn, ROOT);
// { document: { blocks: {:} } }
let blocks = root.insert_map_with_txn(txn, BLOCKS);
// { document: { blocks: {:}, meta: {:} } }
let meta = root.insert_map_with_txn(txn, META);
// {document: { blocks: {:}, meta: { children_map: {:} } }
let children_map = meta.insert_map_with_txn(txn, CHILDREN_MAP);

let children_operation = ChildrenOperation::new(children_map);
let block_operation = BlockOperation::new(blocks, children_operation.clone());

// If the data is not None, insert the data to the document.
if let Some(data) = data {
root.insert_with_txn(txn, PAGE_ID, data.page_id);

for (_, block) in data.blocks {
block_operation.create_block_with_txn(txn, block)?;
}
let (root, block_operation, children_operation) =
collab_guard.with_origin_transact_mut(|txn| {
// { document: {:} }
let root = collab_guard.insert_map_with_txn(txn, ROOT);
// { document: { blocks: {:} } }
let blocks = root.insert_map_with_txn(txn, BLOCKS);
// { document: { blocks: {:}, meta: {:} } }
let meta = root.insert_map_with_txn(txn, META);
// {document: { blocks: {:}, meta: { children_map: {:} } }
let children_map = meta.insert_map_with_txn(txn, CHILDREN_MAP);

let children_operation = ChildrenOperation::new(children_map);
let block_operation = BlockOperation::new(blocks, children_operation.clone());

// If the data is not None, insert the data to the document.
if let Some(data) = data {
root.insert_with_txn(txn, PAGE_ID, data.page_id);

for (_, block) in data.blocks {
block_operation.create_block_with_txn(txn, block)?;
}

for (id, child_ids) in data.meta.children_map {
let map = children_operation.get_children_with_txn(txn, &id);
child_ids.iter().for_each(|child_id| {
map.push_back(txn, child_id.to_string());
});
for (id, child_ids) in data.meta.children_map {
let map = children_operation.get_children_with_txn(txn, &id);
child_ids.iter().for_each(|child_id| {
map.push_back(txn, child_id.to_string());
});
}
}
}

Ok::<_, DocumentError>((root, block_operation, children_operation))
})?;
Ok::<_, DocumentError>((root, block_operation, children_operation))
})?;

collab_guard.enable_undo_redo();
let subscription = RootDeepSubscription::default();
Expand Down
2 changes: 1 addition & 1 deletion collab-folder/src/core/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ fn create_folder(
let collab_guard = collab.lock();

let (folder, workspaces, views, trash, favorites, meta, subscription) = collab_guard
.with_transact_mut(|txn| {
.with_origin_transact_mut(|txn| {
// create the folder
let mut folder = collab_guard.insert_map_with_txn_if_not_exist(txn, FOLDER);
let subscription = subscribe_folder_change(&mut folder);
Expand Down
2 changes: 1 addition & 1 deletion collab-plugins/src/cloud_storage/aws/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl RemoteCollabStorage for AWSCollabCloudStorageImpl {
Ok(())
}

async fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
None
}
}
Expand Down
2 changes: 1 addition & 1 deletion collab-plugins/src/cloud_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use remote_collab::{
CollabObject, CollabType, MsgId, RemoteCollabSnapshot, RemoteCollabState, RemoteCollabStorage,
RemoteUpdateReceiver,
RemoteUpdateReceiver, RemoteUpdateSender,
};
pub use yrs::merge_updates_v1;
pub use yrs::updates::decoder::Decode;
Expand Down
21 changes: 12 additions & 9 deletions collab-plugins/src/cloud_storage/postgres/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ fn create_snapshot_if_need(
weak_local_collab_storage.upgrade(),
weak_remote_collab_storage.upgrade(),
) {
match remote_collab_storage.get_collab_state(&object.id).await {
match remote_collab_storage
.get_collab_state(&object.object_id)
.await
{
Ok(Some(collab_state)) => {
if !should_create_snapshot(&collab_state) {
return;
Expand All @@ -155,27 +158,27 @@ fn create_snapshot_if_need(
},
}

tracing::trace!("Create remote snapshot for {}", object.id);
tracing::trace!("Create remote snapshot for {}", object.object_id);
let cloned_object = object.clone();
if let Ok(Ok(doc_state)) = tokio::task::spawn_blocking(move || {
let local = Collab::new(uid, object.id.clone(), vec![]);
let mut txn = local.transact_mut();
let local = Collab::new(uid, object.object_id.clone(), vec![]);
let mut txn = local.origin_transact_mut();
let _ = local_collab_storage
.read_txn()
.load_doc(uid, &object.id, &mut txn)?;
.load_doc(uid, &object.object_id, &mut txn)?;
drop(txn);

// Only sync with the remote if the remote update is not empty
if !remote_update.is_empty() {
let remote = Collab::new(uid, object.id.clone(), vec![]);
let mut txn = local.transact_mut();
let remote = Collab::new(uid, object.object_id.clone(), vec![]);
let mut txn = local.origin_transact_mut();
txn.try_apply_update(Update::decode_v1(&remote_update)?)?;
drop(txn);

let local_sv = local.transact().state_vector();
let encode_update = remote.transact().encode_state_as_update_v1(&local_sv);
if let Ok(update) = Update::decode_v1(&encode_update) {
let mut txn = local.transact_mut();
let mut txn = local.origin_transact_mut();
txn.try_apply_update(update)?;
drop(txn);
}
Expand All @@ -192,7 +195,7 @@ fn create_snapshot_if_need(
.await
{
Ok(snapshot_id) => {
tracing::debug!("{} remote snapshot created", cloned_object.id);
tracing::debug!("{} remote snapshot created", cloned_object.object_id);
if let Some(local_collab) = weak_local_collab.upgrade() {
local_collab
.lock()
Expand Down
Loading

0 comments on commit 64cd0a5

Please sign in to comment.