Skip to content

Commit

Permalink
chore: add device id
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Aug 12, 2023
1 parent 3d1c41f commit c7efd24
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion appflowy-integrate/src/collab_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl AppFlowyCollabBuilder {
collab_db.clone(),
config.clone(),
))
.with_device_id(self.device_id.lock().clone())
.build()?,
);

Expand Down Expand Up @@ -197,7 +198,8 @@ impl AppFlowyCollabBuilder {

if let Some(snapshot_persistence) = &self.snapshot_persistence {
if config.enable_snapshot {
let collab_object = CollabObject::new(uid, object_id.to_string(), object_type);
let collab_object = CollabObject::new(uid, object_id.to_string(), object_type)
.with_device_id(self.device_id.lock().clone());
let snapshot_plugin = CollabSnapshotPlugin::new(
uid,
collab_object,
Expand Down
8 changes: 6 additions & 2 deletions collab-database/tests/database_test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use collab_database::user::DatabaseCollabService;
use collab_database::views::{CreateDatabaseParams, DatabaseLayout, LayoutSetting, LayoutSettings};
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::cloud_storage::CollabType;

use collab_plugins::local_storage::CollabPersistenceConfig;

use tempfile::TempDir;

pub use crate::helper::*;
Expand Down Expand Up @@ -48,7 +48,10 @@ pub fn create_database(uid: i64, database_id: &str) -> DatabaseTest {
let tempdir = TempDir::new().unwrap();
let path = tempdir.into_path();
let collab_db = Arc::new(RocksCollabDB::open(path).unwrap());
let collab = CollabBuilder::new(uid, database_id).build().unwrap();
let collab = CollabBuilder::new(uid, database_id)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
let collab_builder = Arc::new(TestUserDatabaseCollabBuilderImpl());
let block = Block::new(uid, Arc::downgrade(&collab_db), collab_builder.clone());
Expand Down Expand Up @@ -178,6 +181,7 @@ impl DatabaseTestBuilder {
let path = tempdir.into_path();
let collab_db = Arc::new(RocksCollabDB::open(path).unwrap());
let collab = CollabBuilder::new(self.uid, &self.database_id)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand Down
5 changes: 3 additions & 2 deletions collab-database/tests/user_test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use collab_database::user::{
use collab_database::views::{CreateDatabaseParams, DatabaseLayout};
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::cloud_storage::CollabType;
use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use collab_plugins::local_storage::CollabPersistenceConfig;
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Receiver};

use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use collab_plugins::local_storage::CollabPersistenceConfig;
use rand::Rng;
use tempfile::TempDir;

Expand Down Expand Up @@ -76,6 +76,7 @@ impl DatabaseCollabService for TestUserDatabaseCollabBuilderImpl {
config: &CollabPersistenceConfig,
) -> Arc<MutexCollab> {
let collab = CollabBuilder::new(uid, object_id)
.with_device_id("1")
.with_raw_data(collab_raw_data)
.with_plugin(RocksdbDiskPlugin::new_with_config(
uid,
Expand Down
4 changes: 3 additions & 1 deletion collab-document/tests/blocks/block_test_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use collab_document::blocks::{
};
use collab_document::document::Document;
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use nanoid::nanoid;
use serde_json::json;

use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;

use crate::util::document_storage;

pub const TEXT_BLOCK_TYPE: &str = "paragraph";
Expand All @@ -29,6 +30,7 @@ impl BlockTestCore {
let disk_plugin = RocksdbDiskPlugin::new(1, Arc::downgrade(&db));
let collab = CollabBuilder::new(1, doc_id)
.with_plugin(disk_plugin)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand Down
5 changes: 4 additions & 1 deletion collab-document/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use collab_document::blocks::{Block, BlockAction, DocumentData, DocumentMeta};
use collab_document::document::Document;
use collab_document::error::DocumentError;
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use nanoid::nanoid;
use serde_json::{json, Value};

use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use tempfile::TempDir;
use tracing_subscriber::{fmt::Subscriber, util::SubscriberInitExt, EnvFilter};
use zip::ZipArchive;
Expand All @@ -34,6 +35,7 @@ impl DocumentTest {
let disk_plugin = RocksdbDiskPlugin::new(uid, Arc::downgrade(&db));
let collab = CollabBuilder::new(1, doc_id)
.with_plugin(disk_plugin)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand Down Expand Up @@ -98,6 +100,7 @@ pub fn open_document_with_db(uid: i64, doc_id: &str, db: Arc<RocksCollabDB>) ->
let disk_plugin = RocksdbDiskPlugin::new(uid, Arc::downgrade(&db));
let collab = CollabBuilder::new(uid, doc_id)
.with_plugin(disk_plugin)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand Down
3 changes: 3 additions & 0 deletions collab-folder/tests/folder_test/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::{Arc, Once};
use collab::preclude::CollabBuilder;
use collab_folder::core::*;
use collab_persistence::kv::rocks_kv::RocksCollabDB;

use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
use nanoid::nanoid;
use tempfile::TempDir;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub fn create_folder_with_data(id: &str, folder_data: Option<FolderData>) -> Fol

let collab = CollabBuilder::new(1, id)
.with_plugin(disk_plugin)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand All @@ -77,6 +79,7 @@ pub fn open_folder_with_db(uid: i64, object_id: &str, db_path: PathBuf) -> Folde
let cleaner: Cleaner = Cleaner::new(db_path);
let collab = CollabBuilder::new(1, object_id)
.with_plugin(disk_plugin)
.with_device_id("1")
.build()
.unwrap();
collab.lock().initialize();
Expand Down
1 change: 1 addition & 0 deletions collab-plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde_json = "1.0"
rand = { version = "0.8" }
similar = { version = "2.2.1" }
tokio-stream = { version = "0.1.14", features = ["sync"] }
uuid = { version = "1.3.3", features = ["v4"] }

[dev-dependencies]
collab-plugins = { path = ".", features = ["rocksdb_plugin", "disk_sled", "aws_storage_plugin"] }
Expand Down
14 changes: 12 additions & 2 deletions collab-plugins/src/cloud_storage/postgres/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ fn create_snapshot_if_need(
tracing::trace!("Create remote snapshot for {}", object.object_id);
let cloned_object = object.clone();
if let Ok(Ok(doc_state)) = tokio::task::spawn_blocking(move || {
let local = Collab::new(uid, object.object_id.clone(), vec![]);
let local = Collab::new(
uid,
object.object_id.clone(),
object.get_device_id(),
vec![],
);
let mut txn = local.origin_transact_mut();
let _ = local_collab_storage
.read_txn()
Expand All @@ -171,7 +176,12 @@ fn create_snapshot_if_need(

// Only sync with the remote if the remote update is not empty
if !remote_update.is_empty() {
let remote = Collab::new(uid, object.object_id.clone(), vec![]);
let remote = Collab::new(
uid,
object.object_id.clone(),
object.get_device_id(),
vec![],
);
let mut txn = local.origin_transact_mut();
txn.try_apply_update(Update::decode_v1(&remote_update)?)?;
drop(txn);
Expand Down
11 changes: 8 additions & 3 deletions collab-plugins/src/cloud_storage/remote_collab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl RemoteCollab {
if let Ok(mut txn) = collab.try_transaction_mut() {
match Update::decode_v1(&update) {
Ok(update) => {
tracing::trace!("apply remote update: {:?}", update);
if let Err(e) = txn.try_apply_update(update) {
tracing::error!("apply remote update failed: {:?}", e);
}
Expand Down Expand Up @@ -656,8 +655,14 @@ impl CollabObject {
self.meta.get("workspace_id").cloned()
}

pub fn get_device_id(&self) -> Option<String> {
self.meta.get("device_id").cloned()
pub fn get_device_id(&self) -> String {
match self.meta.get("device_id").cloned() {
None => {
tracing::error!("Unexpected empty device id");
uuid::Uuid::new_v4().to_string()
},
Some(device_id) => device_id,
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions collab-plugins/src/snapshot/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CollabPlugin for CollabSnapshotPlugin {
weak_collab_db.upgrade(),
weak_snapshot_persistence.upgrade(),
) {
let snapshot_collab = Collab::new(uid, object.object_id.clone(), vec![]);
let snapshot_collab = Collab::new(uid, object.object_id.clone(), "1", vec![]);
let mut txn = snapshot_collab.origin_transact_mut();
if let Err(e) = collab_db
.read_txn()
Expand Down Expand Up @@ -208,7 +208,7 @@ pub fn try_decode_snapshot(
match {
let mut wrapper = AssertUnwindSafe(&mut decoded_str);
panic::catch_unwind(move || {
let collab = Collab::new(uid, object_id, vec![]);
let collab = Collab::new(uid, object_id, "1", vec![]);
if let Ok(update) = Update::decode_v1(data) {
let mut txn = collab.origin_transact_mut();
txn.apply_update(update);
Expand Down
28 changes: 24 additions & 4 deletions collab-plugins/tests/disk/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ impl CollabPersistenceTest {
}

pub async fn create_collab(&mut self, doc_id: String) {
let collab = Arc::new(CollabBuilder::new(1, &doc_id).build().unwrap());
let collab = Arc::new(
CollabBuilder::new(1, &doc_id)
.with_device_id("1")
.build()
.unwrap(),
);
collab.lock().add_plugin(self.disk_plugin.clone());
let object_id = collab.lock().object_id.clone();
collab.lock().add_plugin(self.make_snapshot_plugin(
Expand Down Expand Up @@ -160,7 +165,12 @@ impl CollabPersistenceTest {
}

pub async fn assert_collab(&mut self, id: &str, expected: JsonValue) {
let collab = Arc::new(CollabBuilder::new(1, id).build().unwrap());
let collab = Arc::new(
CollabBuilder::new(1, id)
.with_device_id("1")
.build()
.unwrap(),
);
collab.lock().add_plugin(self.disk_plugin.clone());
collab.lock().add_plugin(self.make_snapshot_plugin(
self.uid,
Expand Down Expand Up @@ -201,6 +211,7 @@ impl CollabPersistenceTest {
Script::CreateDocumentWithDiskPlugin { id, plugin } => {
let collab = Arc::new(
CollabBuilder::new(1, &id)
.with_device_id("1")
.with_plugin(plugin.clone())
.build()
.unwrap(),
Expand All @@ -226,6 +237,7 @@ impl CollabPersistenceTest {
},
Script::OpenDocumentWithDiskPlugin { id } => {
let collab = CollabBuilder::new(1, &id)
.with_device_id("1")
.with_plugin(self.disk_plugin.clone())
.build()
.unwrap();
Expand Down Expand Up @@ -286,7 +298,10 @@ impl CollabPersistenceTest {
self.collab_by_id.get(&id).unwrap().clone(),
);
let snapshots = snapshot_plugin.get_snapshots(&id);
let collab = CollabBuilder::new(1, &id).build().unwrap();
let collab = CollabBuilder::new(1, &id)
.with_device_id("1")
.build()
.unwrap();
collab.lock().with_origin_transact_mut(|txn| {
txn.apply_update(Update::decode_v1(&snapshots[index as usize].data).unwrap());
});
Expand All @@ -295,7 +310,12 @@ impl CollabPersistenceTest {
assert_json_diff::assert_json_eq!(json, expected);
},
Script::AssertDocument { id, expected } => {
let collab = Arc::new(CollabBuilder::new(1, &id).build().unwrap());
let collab = Arc::new(
CollabBuilder::new(1, &id)
.with_device_id("1")
.build()
.unwrap(),
);
collab.lock().add_plugin(self.disk_plugin.clone());
collab.lock().add_plugin(self.make_snapshot_plugin(
self.uid,
Expand Down
2 changes: 1 addition & 1 deletion collab-plugins/tests/util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl Rng {

fn origin_from_tcp_stream(stream: &TcpStream) -> CollabOrigin {
let address = stream.local_addr().unwrap();
let origin = CollabClient::new(address.port() as i64, &address.to_string());
let origin = CollabClient::new(address.port() as i64, address.to_string());
CollabOrigin::Client(origin)
}

Expand Down
9 changes: 1 addition & 8 deletions collab-sync/src/server/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::core::origin::CollabOrigin;
use futures_util::{SinkExt, StreamExt};
use lib0::encoding::Write;
use tokio::select;
Expand Down Expand Up @@ -276,10 +276,3 @@ fn gen_awareness_update_message(
let update = awareness.update_with_clients(changed)?;
Ok(update)
}

pub fn server_origin() -> CollabClient {
CollabClient {
uid: 0,
device_id: "".to_string(),
}
}
14 changes: 8 additions & 6 deletions collab/src/core/collab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ pub struct Collab {
}

impl Collab {
pub fn new<T: AsRef<str>>(uid: i64, object_id: T, plugins: Vec<Arc<dyn CollabPlugin>>) -> Collab {
let origin = CollabClient::new(uid, "");
pub fn new<T: AsRef<str>>(
uid: i64,
object_id: T,
device_id: impl ToString,
plugins: Vec<Arc<dyn CollabPlugin>>,
) -> Collab {
let origin = CollabClient::new(uid, device_id);
Self::new_with_client(CollabOrigin::Client(origin), object_id, plugins)
}

Expand Down Expand Up @@ -639,10 +644,7 @@ impl CollabBuilder {
}

pub fn build(self) -> Result<MutexCollab, CollabError> {
let origin = CollabOrigin::Client(CollabClient {
uid: self.uid,
device_id: self.device_id,
});
let origin = CollabOrigin::Client(CollabClient::new(self.uid, self.device_id));
MutexCollab::new_with_raw_data(origin, &self.object_id, self.updates, self.plugins)
}
}
Expand Down
17 changes: 10 additions & 7 deletions collab/src/core/origin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

use serde::{Deserialize, Serialize};
use yrs::{Origin, TransactionMut};

#[derive(Clone, Eq, PartialEq, Hash, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -52,7 +53,7 @@ impl From<&Origin> for CollabOrigin {
#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Debug, Clone)]
pub struct CollabClient {
pub uid: i64,
pub device_id: String,
device_id: String,
}

impl Display for CollabClient {
Expand All @@ -65,11 +66,13 @@ impl Display for CollabClient {
}

impl CollabClient {
pub fn new(uid: i64, device_id: &str) -> Self {
Self {
uid,
device_id: device_id.to_string(),
}
pub fn new(uid: i64, device_id: impl ToString) -> Self {
let device_id = device_id.to_string();
debug_assert!(
!device_id.is_empty(),
"device_id should not be empty string"
);
Self { uid, device_id }
}
}

Expand Down
Loading

0 comments on commit c7efd24

Please sign in to comment.