From b71bfbf0c32b95fb935f0c1269cfcadcdb02e40a Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Fri, 29 Nov 2024 18:41:38 +0100 Subject: [PATCH] enh: refactor store.rs (more consistent, fewer clones) --- core/bin/core_api.rs | 83 ++++---- core/src/blocks/database_schema.rs | 2 +- core/src/data_sources/data_source.rs | 34 ++- core/src/data_sources/folder.rs | 57 +---- core/src/data_sources/node.rs | 14 ++ core/src/databases/table.rs | 58 ++--- core/src/stores/postgres.rs | 305 ++++++++++++++++----------- core/src/stores/store.rs | 92 +++++--- 8 files changed, 375 insertions(+), 270 deletions(-) diff --git a/core/bin/core_api.rs b/core/bin/core_api.rs index 905ad6ab7bce..4fb2ddff8e9b 100644 --- a/core/bin/core_api.rs +++ b/core/bin/core_api.rs @@ -35,7 +35,6 @@ use dust::{ blocks::block::BlockType, data_sources::{ data_source::{self, Section}, - folder::Folder, qdrant::QdrantClients, }, databases::{ @@ -50,7 +49,10 @@ use dust::{ run, search_filter::{Filterable, SearchFilter}, sqlite_workers::client::{self, HEARTBEAT_INTERVAL_MS}, - stores::{postgres, store}, + stores::{ + postgres, + store::{self, UpsertFolder, UpsertTable}, + }, utils::{self, error_response, APIError, APIResponse, CoreRequestMakeSpan}, }; @@ -2064,22 +2066,21 @@ async fn tables_upsert( match state .store - .upsert_table( - &project, - &data_source_id, - &payload.table_id, - &payload.name, - &payload.description, - match payload.timestamp { - Some(timestamp) => timestamp, - None => utils::now(), + .upsert_data_source_table( + project, + data_source_id, + UpsertTable { + table_id: payload.table_id, + name: payload.name, + description: payload.description, + timestamp: payload.timestamp.unwrap_or(utils::now()), + tags: payload.tags, + parents: payload.parents, + remote_database_table_id: payload.remote_database_table_id, + remote_database_secret_id: payload.remote_database_secret_id, + title: payload.title, + mime_type: payload.mime_type, }, - &payload.tags, - &payload.parents, - payload.remote_database_table_id, - payload.remote_database_secret_id, - payload.title, - payload.mime_type, ) .await { @@ -2133,7 +2134,7 @@ async fn tables_retrieve( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => error_response( @@ -2215,7 +2216,7 @@ async fn tables_list( match state .store - .list_tables( + .list_data_source_tables( &project, &data_source_id, &view_filter, @@ -2253,7 +2254,7 @@ async fn tables_delete( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => error_response( @@ -2302,7 +2303,7 @@ async fn tables_update_parents( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => error_response( @@ -2355,7 +2356,7 @@ async fn tables_rows_upsert( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => { @@ -2444,7 +2445,7 @@ async fn tables_rows_retrieve( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => { @@ -2520,7 +2521,7 @@ async fn tables_rows_delete( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => { @@ -2612,7 +2613,7 @@ async fn tables_rows_list( match state .store - .load_table(&project, &data_source_id, &table_id) + .load_data_source_table(&project, &data_source_id, &table_id) .await { Err(e) => { @@ -2685,16 +2686,20 @@ async fn folders_upsert( ) -> (StatusCode, Json) { let project = project::Project::new_from_id(project_id); - let folder = Folder::new( - &project, - &data_source_id, - &payload.folder_id.clone(), - payload.timestamp.unwrap_or(utils::now()), - &payload.title, - payload.parents, - ); - - match state.store.upsert_data_source_folder(&folder).await { + match state + .store + .upsert_data_source_folder( + project, + data_source_id, + UpsertFolder { + folder_id: payload.folder_id, + timestamp: payload.timestamp.unwrap_or(utils::now()), + parents: payload.parents, + title: payload.title, + }, + ) + .await + { Err(e) => { return error_response( StatusCode::INTERNAL_SERVER_ERROR, @@ -2703,7 +2708,7 @@ async fn folders_upsert( Some(e), ) } - Ok(()) => ( + Ok(folder) => ( StatusCode::OK, Json(APIResponse { error: None, @@ -2898,7 +2903,11 @@ async fn databases_query_run( .map(|(project_id, data_source_id, table_id)| { let project = project::Project::new_from_id(project_id); let store = state.store.clone(); - async move { store.load_table(&project, &data_source_id, &table_id).await } + async move { + store + .load_data_source_table(&project, &data_source_id, &table_id) + .await + } }), ) .await diff --git a/core/src/blocks/database_schema.rs b/core/src/blocks/database_schema.rs index 22a78db9f061..1649c76d8507 100644 --- a/core/src/blocks/database_schema.rs +++ b/core/src/blocks/database_schema.rs @@ -163,7 +163,7 @@ pub async fn load_tables_from_identifiers( let (project, data_source_name) = project_and_data_source_by_data_source_view .get(&(*workspace_id, *data_source_or_view_id)) .expect("Unreachable: missing project."); - store.load_table(&project, &data_source_name, &table_id) + store.load_data_source_table(&project, &data_source_name, &table_id) }, )) .await?) diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index 5c30b8bb0551..9390c61bae57 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -9,7 +9,7 @@ use crate::providers::embedder::{EmbedderRequest, EmbedderVector}; use crate::providers::provider::ProviderID; use crate::run::Credentials; use crate::search_filter::{Filterable, SearchFilter}; -use crate::stores::store::Store; +use crate::stores::store::{Store, UpsertDocument}; use crate::utils; use anyhow::{anyhow, Result}; use futures::future::try_join_all; @@ -725,20 +725,42 @@ impl DataSource { .await?; } + // Store upsert does not save the text and token count. + // These fields don't actually exist in the SQL table. + // Because of this, we have to manually construct the UpsertDocument, and save + // owned values for text and token count so we can return them. + // TODO(@fontanierh): use a different type for "DocumentWithTextAndTokenCount" + let doc_text = main_collection_document.text; + let doc_token_count = main_collection_document.token_count; + let sql_upsert_params = UpsertDocument { + document_id: main_collection_document.document_id, + timestamp: main_collection_document.timestamp, + tags: main_collection_document.tags, + parents: main_collection_document.parents, + source_url: main_collection_document.source_url, + hash: main_collection_document.hash, + text_size: main_collection_document.text_size, + chunk_count: main_collection_document.chunk_count, + chunks: main_collection_document.chunks, + }; + // Upsert document (SQL). - store + let mut doc = store .upsert_data_source_document( &self.project, - &self.data_source_id, - &main_collection_document, + self.data_source_id.clone(), + sql_upsert_params, ) .await?; + doc.text = doc_text; + doc.token_count = doc_token_count; + // Clean-up old superseded versions. self.scrub_document_superseded_versions(store, &document_id) .await?; - Ok(main_collection_document) + Ok(doc) } async fn upsert_for_embedder( @@ -1954,7 +1976,7 @@ impl DataSource { // Delete tables (concurrently). let (tables, total) = store - .list_tables(&self.project, &self.data_source_id, &None, &None, None) + .list_data_source_tables(&self.project, &self.data_source_id, &None, &None, None) .await?; try_join_all( tables diff --git a/core/src/data_sources/folder.rs b/core/src/data_sources/folder.rs index 9c79cbe40b22..66898cb6d6da 100644 --- a/core/src/data_sources/folder.rs +++ b/core/src/data_sources/folder.rs @@ -2,8 +2,6 @@ use serde::{Deserialize, Serialize}; use crate::project::Project; -use super::node::{Node, NodeType}; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Folder { project: Project, @@ -19,34 +17,23 @@ pub const FOLDER_MIMETYPE: &str = "application/vnd.dust.folder"; impl Folder { pub fn new( - project: &Project, - data_source_id: &str, - folder_id: &str, + project: Project, + data_source_id: String, + folder_id: String, timestamp: u64, - title: &str, + title: String, parents: Vec, ) -> Self { Folder { - project: project.clone(), - data_source_id: data_source_id.to_string(), - folder_id: folder_id.to_string(), + project: project, + data_source_id: data_source_id, + folder_id: folder_id, timestamp, - title: title.to_string(), + title: title, parents, } } - pub fn from_node(node: &Node) -> Self { - Folder::new( - node.project(), - node.data_source_id(), - node.node_id(), - node.timestamp(), - node.title(), - node.parents().clone(), - ) - } - pub fn project(&self) -> &Project { &self.project } @@ -66,31 +53,3 @@ impl Folder { &self.parents } } - -impl From for Folder { - fn from(node: Node) -> Self { - Folder::new( - node.project(), - node.data_source_id(), - node.node_id(), - node.timestamp(), - node.title(), - node.parents().clone(), - ) - } -} - -impl From for Node { - fn from(folder: Folder) -> Self { - Node::new( - &folder.project, - &folder.data_source_id, - &folder.folder_id, - NodeType::Folder, - folder.timestamp, - &folder.title, - FOLDER_MIMETYPE, - folder.parents.clone(), - ) - } -} diff --git a/core/src/data_sources/node.rs b/core/src/data_sources/node.rs index bf81f4a8d8f9..50ff393211ea 100644 --- a/core/src/data_sources/node.rs +++ b/core/src/data_sources/node.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use crate::project::Project; +use super::folder::Folder; + #[derive(Debug, Clone, Serialize, PartialEq, Deserialize, Copy)] pub enum NodeType { Document, @@ -68,4 +70,16 @@ impl Node { pub fn parents(&self) -> &Vec { &self.parents } + + // Consumes self into a Folder. + pub fn into_folder(self) -> Folder { + Folder::new( + self.project, + self.data_source_id, + self.node_id, + self.timestamp, + self.title, + self.parents, + ) + } } diff --git a/core/src/databases/table.rs b/core/src/databases/table.rs index f6ad468fde7e..4d30e168f810 100644 --- a/core/src/databases/table.rs +++ b/core/src/databases/table.rs @@ -71,35 +71,35 @@ pub struct Table { impl Table { pub fn new( - project: &Project, - data_source_id: &str, + project: Project, + data_source_id: String, created: u64, - table_id: &str, - name: &str, - description: &str, + table_id: String, + name: String, + description: String, timestamp: u64, - title: &str, - mime_type: &str, + title: String, + mime_type: String, tags: Vec, parents: Vec, - schema: &Option, + schema: Option, schema_stale_at: Option, remote_database_table_id: Option, remote_database_secret_id: Option, ) -> Self { Table { - project: project.clone(), - data_source_id: data_source_id.to_string(), + project: project, + data_source_id: data_source_id, created, - table_id: table_id.to_string(), - name: name.to_string(), - description: description.to_string(), + table_id: table_id, + name: name, + description: description, timestamp, tags, - title: title.to_string(), - mime_type: mime_type.to_string(), + title: title, + mime_type: mime_type, parents, - schema: schema.clone(), + schema: schema, schema_stale_at, remote_database_table_id, remote_database_secret_id, @@ -196,7 +196,7 @@ impl Table { } store - .delete_table(&self.project, &self.data_source_id, &self.table_id) + .delete_data_source_table(&self.project, &self.data_source_id, &self.table_id) .await?; Ok(()) @@ -208,7 +208,7 @@ impl Table { parents: Vec, ) -> Result<()> { store - .update_table_parents( + .update_data_source_table_parents( &self.project, &self.data_source_id, &&self.table_id, @@ -325,7 +325,7 @@ impl LocalTable { now = utils::now(); store - .update_table_schema( + .update_data_source_table_schema( &self.table.project, &self.table.data_source_id, &self.table.table_id, @@ -348,7 +348,7 @@ impl LocalTable { // This is why we invalidate the schema when doing incremental updates, and next time // the schema is requested, it will be recomputed from all the rows. store - .invalidate_table_schema( + .invalidate_data_source_table_schema( &self.table.project, &self.table.data_source_id, &self.table.table_id, @@ -447,7 +447,7 @@ impl LocalTable { let schema = self.compute_schema(databases_store).await?; store - .update_table_schema( + .update_data_source_table_schema( &self.table.project, &self.table.data_source_id, &self.table.table_id, @@ -565,18 +565,18 @@ mod tests { let schema = TableSchema::from_rows_async(rows).await?; let table = Table::new( - &Project::new_from_id(42), - "data_source_id", + Project::new_from_id(42), + "data_source_id".to_string(), utils::now(), - "table_id", - "test_dbml", - "Test records for DBML rendering", + "table_id".to_string(), + "test_dbml".to_string(), + "Test records for DBML rendering".to_string(), utils::now(), - "test_dbml", - "text/plain", + "test_dbml".to_string(), + "text/plain".to_string(), vec![], vec![], - &Some(schema), + Some(schema), None, None, None, diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index 85c8aa2652d3..684a89f29ac6 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -39,11 +39,22 @@ use crate::{ utils, }; +use super::store::{UpsertDocument, UpsertFolder, UpsertTable}; + #[derive(Clone)] pub struct PostgresStore { pool: Pool>, } +pub struct UpsertNode<'a> { + pub node_id: &'a str, + pub node_type: &'a NodeType, + pub timestamp: u64, + pub title: &'a str, + pub mime_type: &'a str, + pub parents: &'a Vec, +} + impl PostgresStore { pub async fn new(db_uri: &str) -> Result { let manager = PostgresConnectionManager::new_from_stringlike(db_uri, NoTls)?; @@ -133,14 +144,14 @@ impl PostgresStore { async fn upsert_data_source_node( &self, - node: &Node, + params: UpsertNode<'_>, data_source_row_id: i64, row_id: i64, tx: &Transaction<'_>, ) -> Result<()> { let created = utils::now(); - let (document_row_id, table_row_id, folder_row_id) = match node.node_type() { + let (document_row_id, table_row_id, folder_row_id) = match params.node_type { NodeType::Document => (Some(row_id), None, None), NodeType::Table => (None, Some(row_id), None), NodeType::Folder => (None, None, Some(row_id)), @@ -164,11 +175,11 @@ impl PostgresStore { &[ &data_source_row_id, &(created as i64), - &node.node_id(), - &(node.timestamp() as i64), - &node.title(), - &node.mime_type(), - &node.parents(), + ¶ms.node_id, + &(params.timestamp as i64), + ¶ms.title, + ¶ms.mime_type, + ¶ms.parents, &document_row_id, &table_row_id, &folder_row_id, @@ -1730,20 +1741,12 @@ impl Store for PostgresStore { async fn upsert_data_source_document( &self, project: &Project, - data_source_id: &str, - document: &Document, - ) -> Result<()> { + data_source_id: String, + params: UpsertDocument, + ) -> Result { + let document_created = utils::now(); + let project_id = project.project_id(); - let data_source_id = data_source_id.to_string(); - let document_id = document.document_id.clone(); - let document_created = document.created; - let document_timestamp = document.timestamp; - let document_tags = document.tags.clone(); - let document_parents = document.parents.clone(); - let document_source_url = document.source_url.clone(); - let document_hash = document.hash.clone(); - let document_text_size = document.text_size; - let document_chunk_count = document.chunks.len() as u64; let pool = self.pool.clone(); let mut c = pool.get().await?; @@ -1770,7 +1773,7 @@ impl Store for PostgresStore { ) .await?; let _ = tx - .query(&stmt, &[&data_source_row_id, &document_id]) + .query(&stmt, &[&data_source_row_id, ¶ms.document_id]) .await?; let stmt = tx @@ -1778,31 +1781,50 @@ impl Store for PostgresStore { "INSERT INTO data_sources_documents \ (id, data_source, created, document_id, timestamp, tags_array, parents, \ source_url, hash, text_size, chunk_count, status) \ - VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id", + VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, created, token_count", ) .await?; - tx.query_one( - &stmt, - &[ - &data_source_row_id, - &(document_created as i64), - &document_id, - &(document_timestamp as i64), - &document_tags, - &document_parents, - &document_source_url, - &document_hash, - &(document_text_size as i64), - &(document_chunk_count as i64), - &"latest", - ], - ) - .await?; + let r = tx + .query_one( + &stmt, + &[ + &data_source_row_id, + &(document_created as i64), + ¶ms.document_id, + &(params.timestamp as i64), + ¶ms.tags, + ¶ms.parents, + ¶ms.source_url, + ¶ms.hash, + &(params.text_size as i64), + &(params.chunk_count as i64), + &"latest", + ], + ) + .await?; + + let _id: i64 = r.get(0); + let created: i64 = r.get(1); + let token_count: Option = r.get(2); tx.commit().await?; - Ok(()) + Ok(Document { + data_source_id, + created: created as u64, + document_id: params.document_id, + timestamp: params.timestamp, + tags: params.tags, + parents: params.parents, + source_url: params.source_url, + hash: params.hash, + text_size: params.text_size, + chunk_count: params.chunk_count as usize, + chunks: vec![], + text: None, + token_count: token_count.map(|t| t as usize), + }) } async fn list_data_source_documents( @@ -2381,35 +2403,15 @@ impl Store for PostgresStore { .collect::>()) } - async fn upsert_table( + async fn upsert_data_source_table( &self, - project: &Project, - data_source_id: &str, - table_id: &str, - name: &str, - description: &str, - timestamp: u64, - tags: &Vec, - parents: &Vec, - remote_database_table_id: Option, - remote_database_secret_id: Option, - title: Option, - mime_type: Option, + project: Project, + data_source_id: String, + params: UpsertTable, ) -> Result { let project_id = project.project_id(); - let data_source_id = data_source_id.to_string(); let table_created = utils::now(); - let table_id = table_id.to_string(); - let table_name = name.to_string(); - let table_description = description.to_string(); - let table_timestamp = timestamp; - let table_tags = tags.clone(); - let table_parents = parents.clone(); - let table_remote_database_table_id = remote_database_table_id.clone(); - let table_remote_database_secret_id = remote_database_secret_id.clone(); - let table_title = title.clone(); - let table_mime_type = mime_type.clone(); let pool = self.pool.clone(); let mut c = pool.get().await?; @@ -2437,52 +2439,77 @@ impl Store for PostgresStore { SET name = EXCLUDED.name, description = EXCLUDED.description, \ timestamp = EXCLUDED.timestamp, tags_array = EXCLUDED.tags_array, parents = EXCLUDED.parents, \ remote_database_table_id = EXCLUDED.remote_database_table_id, remote_database_secret_id = EXCLUDED.remote_database_secret_id \ - RETURNING id", + RETURNING id, created, schema, schema_stale_at", ) .await?; - let table_row_id = tx + let table_row = tx .query_one( &stmt, &[ &data_source_row_id, &(table_created as i64), - &table_id, - &table_name, - &table_description, - &(table_timestamp as i64), - &table_tags, - &table_parents, - &table_remote_database_table_id, - &table_remote_database_secret_id, + ¶ms.table_id, + ¶ms.name, + ¶ms.description, + &(params.timestamp as i64), + ¶ms.tags, + ¶ms.parents, + ¶ms.remote_database_table_id, + ¶ms.remote_database_secret_id, ], ) - .await? - .get(0); + .await?; + + let table_row_id = table_row.get::(0); + let table_created = table_row.get::(1) as u64; + let raw_schema = table_row.get::>(2); + let table_schema_stale_at = table_row.get::>(3); + + let parsed_schema: Option = match raw_schema { + None => None, + Some(schema) => { + if schema.is_empty() { + None + } else { + Some(serde_json::from_str(&schema)?) + } + } + }; + + let should_upsert_node = params.title.is_some() && params.mime_type.is_some(); + let title = params.title.unwrap_or(params.name.clone()); let table = Table::new( project, - &data_source_id, + data_source_id, table_created, - &table_id, - &table_name, - &table_description, - table_timestamp, - &table_title.unwrap_or(table_name.clone()), - &table_mime_type.unwrap_or("text/csv".to_string()), - table_tags, - table_parents, - &None, - None, - table_remote_database_table_id, - table_remote_database_secret_id, + params.table_id, + params.name, + params.description, + params.timestamp, + title, + params.mime_type.unwrap_or("text/csv".to_string()), + params.tags, + params.parents, + parsed_schema, + table_schema_stale_at.map(|t| t as u64), + params.remote_database_table_id, + params.remote_database_secret_id, ); // TODO(KW_SEARCH_INFRA): make title/mime_type not optional. // Upsert the data source node if title and mime_type are present. Otherwise, we skip the upsert. - if let (Some(_), Some(_)) = (title, mime_type) { + if should_upsert_node { self.upsert_data_source_node( - &table.clone().into(), + UpsertNode { + node_id: table.table_id(), + node_type: &NodeType::Table, + timestamp: table.timestamp(), + title: table.title(), + mime_type: table.mime_type(), + parents: table.parents(), + }, data_source_row_id, table_row_id, &tx, @@ -2494,7 +2521,7 @@ impl Store for PostgresStore { Ok(table) } - async fn update_table_schema( + async fn update_data_source_table_schema( &self, project: &Project, data_source_id: &str, @@ -2541,7 +2568,7 @@ impl Store for PostgresStore { Ok(()) } - async fn update_table_parents( + async fn update_data_source_table_parents( &self, project: &Project, data_source_id: &str, @@ -2589,7 +2616,7 @@ impl Store for PostgresStore { Ok(()) } - async fn invalidate_table_schema( + async fn invalidate_data_source_table_schema( &self, project: &Project, data_source_id: &str, @@ -2629,7 +2656,7 @@ impl Store for PostgresStore { Ok(()) } - async fn load_table( + async fn load_data_source_table( &self, project: &Project, data_source_id: &str, @@ -2721,19 +2748,23 @@ impl Store for PostgresStore { } } }; + + // TODO(KW_SEARCH_INFRA) use title + let title = name.clone(); + Ok(Some(Table::new( - project, - &data_source_id, + project.clone(), + data_source_id.clone(), created as u64, - &table_id, - &name, - &description, + table_id, + name, + description, timestamp as u64, - &name, // TODO(KW_SEARCH_INFRA) use title - "text/csv", // TODO(KW_SEARCH_INFRA) use mimetype + title, + "text/csv".to_string(), // TODO(KW_SEARCH_INFRA) use mimetype tags, parents, - &parsed_schema, + parsed_schema, schema_stale_at.map(|t| t as u64), remote_database_table_id, remote_database_secret_id, @@ -2742,7 +2773,7 @@ impl Store for PostgresStore { } } - async fn list_tables( + async fn list_data_source_tables( &self, project: &Project, data_source_id: &str, @@ -2852,19 +2883,22 @@ impl Store for PostgresStore { } }; + // TODO(KW_SEARCH_INFRA) use title + let title = name.clone(); + Ok(Table::new( - project, - &data_source_id, + project.clone(), + data_source_id.clone(), created as u64, - &table_id, - &name, - &description, + table_id, + name, + description, timestamp as u64, - &name, // TODO(KW_SEARCH_INFRA) use title - "text/csv", // TODO(KW_SEARCH_INFRA)use mimetype + title, + "text/csv".to_string(), // TODO(KW_SEARCH_INFRA)use mimetype tags, parents, - &parsed_schema, + parsed_schema, schema_stale_at.map(|t| t as u64), remote_database_table_id, remote_database_secret_id, @@ -2893,7 +2927,7 @@ impl Store for PostgresStore { Ok((tables, total)) } - async fn delete_table( + async fn delete_data_source_table( &self, project: &Project, data_source_id: &str, @@ -2934,8 +2968,13 @@ impl Store for PostgresStore { Ok(()) } - async fn upsert_data_source_folder(&self, folder: &Folder) -> Result<()> { - let data_source_id = folder.data_source_id(); + async fn upsert_data_source_folder( + &self, + project: Project, + data_source_id: String, + params: UpsertFolder, + ) -> Result { + let project_id = project.project_id(); let pool = self.pool.clone(); let mut c = pool.get().await?; @@ -2947,7 +2986,7 @@ impl Store for PostgresStore { let r = tx .query( "SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2 LIMIT 1", - &[&folder.project().project_id(), &data_source_id], + &[&project_id, &data_source_id], ) .await?; let data_source_row_id: i64 = match r.len() { @@ -2963,20 +3002,38 @@ impl Store for PostgresStore { VALUES (DEFAULT, $1, $2, $3) \ ON CONFLICT (folder_id, data_source) DO UPDATE \ SET folder_id = data_sources_folders.folder_id \ - RETURNING id", + RETURNING id, created", ) .await?; - let folder_row_id = tx + let r = tx .query_one( &stmt, - &[&data_source_row_id, &(created as i64), &folder.folder_id()], + &[&data_source_row_id, &(created as i64), ¶ms.folder_id], ) - .await? - .get(0); + .await?; + + let folder_row_id: i64 = r.get(0); + let created: i64 = r.get(1); + + let folder = Folder::new( + project, + data_source_id, + params.folder_id, + created as u64, + params.title, + params.parents, + ); self.upsert_data_source_node( - &folder.clone().into(), + UpsertNode { + node_id: folder.folder_id(), + node_type: &NodeType::Folder, + timestamp: folder.timestamp(), + title: folder.title(), + mime_type: "text/csv", + parents: folder.parents(), + }, data_source_row_id, folder_row_id, &tx, @@ -2984,7 +3041,7 @@ impl Store for PostgresStore { .await?; tx.commit().await?; - Ok(()) + Ok(folder) } async fn load_data_source_folder( @@ -3016,7 +3073,7 @@ impl Store for PostgresStore { match row.len() { 0 => Ok(None), - 1 => Ok(Some(node.into())), + 1 => Ok(Some(node.into_folder())), _ => unreachable!(), } } diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index dcf5a2560218..a789dbe51e31 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -9,7 +9,7 @@ use crate::{ blocks::block::BlockType, cached_request::CachedRequest, data_sources::{ - data_source::{DataSource, DataSourceConfig, Document, DocumentVersion}, + data_source::{Chunk, DataSource, DataSourceConfig, Document, DocumentVersion}, folder::Folder, node::Node, }, @@ -26,6 +26,54 @@ use crate::{ sqlite_workers::client::SqliteWorker, }; +pub struct UpsertDocument { + pub document_id: String, + pub timestamp: u64, + pub tags: Vec, + pub parents: Vec, + pub source_url: Option, + pub hash: String, + pub text_size: u64, + pub chunk_count: usize, + pub chunks: Vec, +} + +impl From for UpsertDocument { + fn from(document: Document) -> Self { + UpsertDocument { + document_id: document.document_id, + timestamp: document.timestamp, + tags: document.tags, + parents: document.parents, + source_url: document.source_url, + hash: document.hash, + text_size: document.text_size, + chunk_count: document.chunk_count, + chunks: document.chunks, + } + } +} + +pub struct UpsertTable { + pub table_id: String, + pub name: String, + pub description: String, + pub timestamp: u64, + pub tags: Vec, + pub parents: Vec, + pub remote_database_table_id: Option, + pub remote_database_secret_id: Option, + pub title: Option, + pub mime_type: Option, +} + +pub struct UpsertFolder { + pub folder_id: String, + pub timestamp: u64, + pub title: String, + pub parents: Vec, +} + #[async_trait] pub trait Store { fn raw_pool(&self) -> &Pool>; @@ -134,9 +182,9 @@ pub trait Store { async fn upsert_data_source_document( &self, project: &Project, - data_source_id: &str, - document: &Document, - ) -> Result<()>; + data_source_id: String, + params: UpsertDocument, + ) -> Result; async fn update_data_source_document_tags( &self, project: &Project, @@ -211,48 +259,39 @@ pub trait Store { ) -> Result>; async fn delete_database(&self, table_ids_hash: &str) -> Result<()>; // Tables - async fn upsert_table( + async fn upsert_data_source_table( &self, - project: &Project, - data_source_id: &str, - table_id: &str, - name: &str, - description: &str, - timestamp: u64, - tags: &Vec, - parents: &Vec, - remote_database_table_id: Option, - remote_database_secret_id: Option, - title: Option, - mime_type: Option, + project: Project, + data_source_id: String, + params: UpsertTable, ) -> Result
; - async fn update_table_schema( + async fn update_data_source_table_schema( &self, project: &Project, data_source_id: &str, table_id: &str, schema: &TableSchema, ) -> Result<()>; - async fn update_table_parents( + async fn update_data_source_table_parents( &self, project: &Project, data_source_id: &str, table_id: &str, parents: &Vec, ) -> Result<()>; - async fn invalidate_table_schema( + async fn invalidate_data_source_table_schema( &self, project: &Project, data_source_id: &str, table_id: &str, ) -> Result<()>; - async fn load_table( + async fn load_data_source_table( &self, project: &Project, data_source_id: &str, table_id: &str, ) -> Result>; - async fn list_tables( + async fn list_data_source_tables( &self, project: &Project, data_source_id: &str, @@ -260,14 +299,19 @@ pub trait Store { table_ids: &Option>, limit_offset: Option<(usize, usize)>, ) -> Result<(Vec
, usize)>; - async fn delete_table( + async fn delete_data_source_table( &self, project: &Project, data_source_id: &str, table_id: &str, ) -> Result<()>; // Folders - async fn upsert_data_source_folder(&self, folder: &Folder) -> Result<()>; + async fn upsert_data_source_folder( + &self, + project: Project, + data_source_id: String, + params: UpsertFolder, + ) -> Result; async fn load_data_source_folder( &self, project: &Project,