Skip to content

Commit

Permalink
chore: update encode database row
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Oct 8, 2024
1 parent 7fa917b commit 907ca13
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
47 changes: 24 additions & 23 deletions collab-database/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ use collab::preclude::{
use collab::util::{AnyExt, ArrayExt};
use collab_entity::define::{DATABASE, DATABASE_ID, DATABASE_METAS};
use collab_entity::CollabType;
use futures::StreamExt;
use futures::stream::iter;
use futures::stream::StreamExt;
use futures::{stream, Stream};
use nanoid::nanoid;
use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;

use serde::{Deserialize, Serialize};
use std::sync::Arc;

pub use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, trace};
Expand Down Expand Up @@ -185,29 +187,28 @@ impl Database {
encoded_collab: encoded_collab(&self.collab, &CollabType::Database)?,
};

// Get all row orders
let row_orders = self.get_all_row_orders().await;
let database_rows: Vec<Option<Arc<RwLock<DatabaseRow>>>> = futures::future::join_all(
row_orders
.iter()
.map(|chunk_row| self.get_or_init_database_row(&chunk_row.id)),
)
.await;

info!("[Database]: encode {} database rows", database_rows.len());
let encoded_row_collabs = database_rows
.into_par_iter() // Parallel iterator over the cloned rows
.filter_map(|database_row| {
let database_row = database_row?;
let row_collab = &database_row.blocking_read().collab;
let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?;
Some(EncodedCollabInfo {
object_id: row_collab.object_id().to_string(),
collab_type: CollabType::DatabaseRow,
encoded_collab,
})
let row_stream = iter(row_orders)
.then(|chunk_row| async move { self.get_or_init_database_row(&chunk_row.id).await });

let mut encoded_row_collabs_stream =
Box::pin(row_stream.filter_map(|database_row| async move {
let database_row = database_row?;
let read_guard = database_row.read().await;
let row_collab = &read_guard.collab;
let encoded_collab = encoded_collab(row_collab, &CollabType::DatabaseRow).ok()?;
Some(EncodedCollabInfo {
object_id: row_collab.object_id().to_string(),
collab_type: CollabType::DatabaseRow,
encoded_collab,
})
.collect::<Vec<_>>();
}));

// Collect the results from the stream
let mut encoded_row_collabs = Vec::new();
while let Some(encoded_row_collab) = encoded_row_collabs_stream.next().await {
encoded_row_collabs.push(encoded_row_collab);
}

Ok(EncodedDatabase {
encoded_database_collab,
Expand Down
16 changes: 16 additions & 0 deletions collab-importer/tests/notion_test/import_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ use std::env::temp_dir;
use std::path::PathBuf;
use std::sync::Arc;

// #[tokio::test]
// async fn import_two_spaces_test2() {
// let (_cleaner, file_path) = unzip_test_asset("appflowy_io").await.unwrap();
// let importer = NotionImporter::new(
// 1,
// &file_path,
// uuid::Uuid::new_v4(),
// "http://test.appflowy.cloud".to_string(),
// )
// .unwrap();
// let info = importer.import().await.unwrap();
//
// let views = info.build_nested_views().await.flatten_views();
// assert!(!views.is_empty())
// }

#[tokio::test]
async fn import_two_spaces_test() {
let (_cleaner, file_path) = unzip_test_asset("two_spaces").await.unwrap();
Expand Down

0 comments on commit 907ca13

Please sign in to comment.